/* Unix SMB/CIFS implementation. Database interface wrapper around tdb/ctdb Copyright (C) Volker Lendecke 2005-2007 Copyright (C) Stefan Metzmacher 2008 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #include "includes.h" #include "librpc/gen_ndr/ndr_messaging.h" struct db_tdb2_ctx { struct db_context *db; const char *name; struct tdb_wrap *mtdb; const char *mtdb_path; bool master_transaction; struct { int hash_size; int tdb_flags; int open_flags; mode_t mode; } open; struct tdb_wrap *ltdb; const char *ltdb_path; bool local_transaction; int transaction; bool out_of_sync; uint32_t lseqnum; uint32_t mseqnum; #define DB_TDB2_MASTER_SEQNUM_KEYSTR "DB_TDB2_MASTER_SEQNUM_KEYSTR" TDB_DATA mseqkey; uint32_t max_buffer_size; uint32_t current_buffer_size; struct dbwrap_tdb2_changes changes; }; static NTSTATUS db_tdb2_store(struct db_record *rec, TDB_DATA data, int flag); static NTSTATUS db_tdb2_delete(struct db_record *rec); static void db_tdb2_queue_change(struct db_tdb2_ctx *db_ctx, const TDB_DATA key); static void db_tdb2_send_notify(struct db_tdb2_ctx *db_ctx); static struct db_context *db_open_tdb2_ex(TALLOC_CTX *mem_ctx, const char *name, int hash_size, int tdb_flags, int open_flags, mode_t mode, const struct dbwrap_tdb2_changes *chgs); static int db_tdb2_sync_from_master(struct db_tdb2_ctx *db_ctx, const struct dbwrap_tdb2_changes *changes); static int db_tdb2_open_master(struct db_tdb2_ctx *db_ctx, bool transaction, const struct dbwrap_tdb2_changes *changes); static int db_tdb2_commit_local(struct db_tdb2_ctx *db_ctx, uint32_t mseqnum); static int db_tdb2_close_master(struct db_tdb2_ctx *db_ctx); static int db_tdb2_transaction_cancel(struct db_context *db); static void db_tdb2_receive_changes(struct messaging_context *msg, void *private_data, uint32_t msg_type, struct server_id server_id, DATA_BLOB *data); static struct messaging_context *global_tdb2_msg_ctx; static bool global_tdb2_msg_ctx_initialized; void db_tdb2_setup_messaging(struct messaging_context *msg_ctx, bool server) { global_tdb2_msg_ctx = msg_ctx; global_tdb2_msg_ctx_initialized = true; if (!server) { return; } if (!lp_parm_bool(-1, "dbwrap", "use_tdb2", false)) { return; } messaging_register(msg_ctx, NULL, MSG_DBWRAP_TDB2_CHANGES, db_tdb2_receive_changes); } static struct messaging_context *db_tdb2_get_global_messaging_context(void) { struct messaging_context *msg_ctx; if (global_tdb2_msg_ctx_initialized) { return global_tdb2_msg_ctx; } msg_ctx = messaging_init(NULL, procid_self(), event_context_init(NULL)); db_tdb2_setup_messaging(msg_ctx, false); return global_tdb2_msg_ctx; } struct tdb_fetch_locked_state { TALLOC_CTX *mem_ctx; struct db_record *result; }; static int db_tdb2_fetchlock_parse(TDB_DATA key, TDB_DATA data, void *private_data) { struct tdb_fetch_locked_state *state = (struct tdb_fetch_locked_state *)private_data; state->result = (struct db_record *)talloc_size( state->mem_ctx, sizeof(struct db_record) + key.dsize + data.dsize); if (state->result == NULL) { return 0; } state->result->key.dsize = key.dsize; state->result->key.dptr = ((uint8 *)state->result) + sizeof(struct db_record); memcpy(state->result->key.dptr, key.dptr, key.dsize); state->result->value.dsize = data.dsize; if (data.dsize > 0) { state->result->value.dptr = state->result->key.dptr+key.dsize; memcpy(state->result->value.dptr, data.dptr, data.dsize); } else { state->result->value.dptr = NULL; } return 0; } static struct db_record *db_tdb2_fetch_locked(struct db_context *db, TALLOC_CTX *mem_ctx, TDB_DATA key) { struct db_tdb2_ctx *ctx = talloc_get_type_abort(db->private_data, struct db_tdb2_ctx); struct tdb_fetch_locked_state state; /* Do not accidently allocate/deallocate w/o need when debug level is lower than needed */ if(DEBUGLEVEL >= 10) { char *keystr = hex_encode(NULL, (unsigned char*)key.dptr, key.dsize); DEBUG(10, (DEBUGLEVEL > 10 ? "Locking key %s\n" : "Locking key %.20s\n", keystr)); TALLOC_FREE(keystr); } /* * we only support modifications within a * started transaction. */ if (ctx->transaction == 0) { DEBUG(0, ("db_tdb2_fetch_locked[%s]: no transaction started\n", ctx->name)); smb_panic("no transaction"); return NULL; } state.mem_ctx = mem_ctx; state.result = NULL; tdb_parse_record(ctx->mtdb->tdb, key, db_tdb2_fetchlock_parse, &state); if (state.result == NULL) { db_tdb2_fetchlock_parse(key, tdb_null, &state); } if (state.result == NULL) { return NULL; } state.result->private_data = talloc_reference(state.result, ctx); state.result->store = db_tdb2_store; state.result->delete_rec = db_tdb2_delete; DEBUG(10, ("Allocated locked data 0x%p\n", state.result)); return state.result; } struct tdb_fetch_state { TALLOC_CTX *mem_ctx; int result; TDB_DATA data; }; static int db_tdb2_fetch_parse(TDB_DATA key, TDB_DATA data, void *private_data) { struct tdb_fetch_state *state = (struct tdb_fetch_state *)private_data; state->data.dptr = (uint8 *)talloc_memdup(state->mem_ctx, data.dptr, data.dsize); if (state->data.dptr == NULL) { state->result = -1; return 0; } state->data.dsize = data.dsize; return 0; } static void db_tdb2_resync_before_read(struct db_tdb2_ctx *db_ctx, TDB_DATA *kbuf) { if (db_ctx->mtdb) { return; } if (!db_ctx->out_of_sync) { return; } /* * this function operates on the local copy, * so hide the DB_TDB2_MASTER_SEQNUM_KEYSTR from the caller. */ if (kbuf && (db_ctx->mseqkey.dsize == kbuf->dsize) && (memcmp(db_ctx->mseqkey.dptr, kbuf->dptr, kbuf->dsize) == 0)) { return; } DEBUG(0,("resync_before_read[%s/%s]\n", db_ctx->mtdb_path, db_ctx->ltdb_path)); db_tdb2_open_master(db_ctx, false, NULL); db_tdb2_close_master(db_ctx); } static int db_tdb2_fetch(struct db_context *db, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *pdata) { struct db_tdb2_ctx *ctx = talloc_get_type_abort( db->private_data, struct db_tdb2_ctx); struct tdb_fetch_state state; db_tdb2_resync_before_read(ctx, &key); if (ctx->out_of_sync) { DEBUG(0,("out of sync[%s] failing fetch\n", ctx->ltdb_path)); errno = EIO; return -1; } state.mem_ctx = mem_ctx; state.result = 0; state.data = tdb_null; tdb_parse_record(ctx->ltdb->tdb, key, db_tdb2_fetch_parse, &state); if (state.result == -1) { return -1; } *pdata = state.data; return 0; } static NTSTATUS db_tdb2_store(struct db_record *rec, TDB_DATA data, int flag) { struct db_tdb2_ctx *ctx = talloc_get_type_abort(rec->private_data, struct db_tdb2_ctx); int ret; /* * This has a bug: We need to replace rec->value for correct * operation, but right now brlock and locking don't use the value * anymore after it was stored. */ /* first store it to the master copy */ ret = tdb_store(ctx->mtdb->tdb, rec->key, data, flag); if (ret != 0) { return NT_STATUS_UNSUCCESSFUL; } /* then store it to the local copy */ ret = tdb_store(ctx->ltdb->tdb, rec->key, data, flag); if (ret != 0) { /* try to restore the old value in the master copy */ if (rec->value.dptr) { tdb_store(ctx->mtdb->tdb, rec->key, rec->value, TDB_REPLACE); } else { tdb_delete(ctx->mtdb->tdb, rec->key); } return NT_STATUS_INTERNAL_DB_CORRUPTION; } db_tdb2_queue_change(ctx, rec->key); return NT_STATUS_OK; } static NTSTATUS db_tdb2_delete(struct db_record *rec) { struct db_tdb2_ctx *ctx = talloc_get_type_abort(rec->private_data, struct db_tdb2_ctx); int ret; ret = tdb_delete(ctx->mtdb->tdb, rec->key); if (ret != 0) { if (tdb_error(ctx->mtdb->tdb) == TDB_ERR_NOEXIST) { return NT_STATUS_NOT_FOUND; } return NT_STATUS_UNSUCCESSFUL; } ret = tdb_delete(ctx->ltdb->tdb, rec->key); if (ret != 0) { /* try to restore the value in the master copy */ tdb_store(ctx->mtdb->tdb, rec->key, rec->value, TDB_REPLACE); return NT_STATUS_INTERNAL_DB_CORRUPTION; } db_tdb2_queue_change(ctx, rec->key); return NT_STATUS_OK; } struct db_tdb2_traverse_ctx { struct db_tdb2_ctx *db_ctx; int (*f)(struct db_record *rec, void *private_data); void *private_data; }; static int db_tdb2_traverse_func(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *private_data) { struct db_tdb2_traverse_ctx *ctx = (struct db_tdb2_traverse_ctx *)private_data; struct db_record rec; /* this function operates on the master copy */ rec.key = kbuf; rec.value = dbuf; rec.store = db_tdb2_store; rec.delete_rec = db_tdb2_delete; rec.private_data = ctx->db_ctx; return ctx->f(&rec, ctx->private_data); } static int db_tdb2_traverse(struct db_context *db, int (*f)(struct db_record *rec, void *private_data), void *private_data) { struct db_tdb2_ctx *db_ctx = talloc_get_type_abort(db->private_data, struct db_tdb2_ctx); struct db_tdb2_traverse_ctx ctx; /* * we only support modifications within a * started transaction. */ if (db_ctx->transaction == 0) { DEBUG(0, ("db_tdb2_traverse[%s]: no transaction started\n", db_ctx->name)); smb_panic("no transaction"); return -1; } /* here we traverse the master copy */ ctx.db_ctx = db_ctx; ctx.f = f; ctx.private_data = private_data; return tdb_traverse(db_ctx->mtdb->tdb, db_tdb2_traverse_func, &ctx); } static NTSTATUS db_tdb2_store_deny(struct db_record *rec, TDB_DATA data, int flag) { return NT_STATUS_MEDIA_WRITE_PROTECTED; } static NTSTATUS db_tdb2_delete_deny(struct db_record *rec) { return NT_STATUS_MEDIA_WRITE_PROTECTED; } static int db_tdb2_traverse_read_func(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *private_data) { struct db_tdb2_traverse_ctx *ctx = (struct db_tdb2_traverse_ctx *)private_data; struct db_record rec; /* * this function operates on the local copy, * so hide the DB_TDB2_MASTER_SEQNUM_KEYSTR from the caller. */ if ((ctx->db_ctx->mseqkey.dsize == kbuf.dsize) && (memcmp(ctx->db_ctx->mseqkey.dptr, kbuf.dptr, kbuf.dsize) == 0)) { return 0; } rec.key = kbuf; rec.value = dbuf; rec.store = db_tdb2_store_deny; rec.delete_rec = db_tdb2_delete_deny; rec.private_data = ctx->db_ctx; return ctx->f(&rec, ctx->private_data); } static int db_tdb2_traverse_read(struct db_context *db, int (*f)(struct db_record *rec, void *private_data), void *private_data) { struct db_tdb2_ctx *db_ctx = talloc_get_type_abort(db->private_data, struct db_tdb2_ctx); struct db_tdb2_traverse_ctx ctx; int ret; db_tdb2_resync_before_read(db_ctx, NULL); if (db_ctx->out_of_sync) { DEBUG(0,("out of sync[%s] failing traverse_read\n", db_ctx->ltdb_path)); errno = EIO; return -1; } /* here we traverse the local copy */ ctx.db_ctx = db_ctx; ctx.f = f; ctx.private_data = private_data; ret = tdb_traverse_read(db_ctx->ltdb->tdb, db_tdb2_traverse_read_func, &ctx); if (ret > 0) { /* we have filtered one entry */ ret--; } return ret; } static int db_tdb2_get_seqnum(struct db_context *db) { struct db_tdb2_ctx *db_ctx = talloc_get_type_abort(db->private_data, struct db_tdb2_ctx); uint32_t nlseq; uint32_t nmseq; bool ok; nlseq = tdb_get_seqnum(db_ctx->ltdb->tdb); if (nlseq == db_ctx->lseqnum) { return db_ctx->mseqnum; } ok = tdb_fetch_uint32_byblob(db_ctx->ltdb->tdb, db_ctx->mseqkey, &nmseq); if (!ok) { /* TODO: what should we do here? */ return db_ctx->mseqnum; } db_ctx->lseqnum = nlseq; db_ctx->mseqnum = nmseq; return db_ctx->mseqnum; } static int db_tdb2_transaction_start(struct db_context *db) { struct db_tdb2_ctx *db_ctx = talloc_get_type_abort(db->private_data, struct db_tdb2_ctx); int ret; if (db_ctx->transaction) { db_ctx->transaction++; return 0; } /* we need to open the master tdb in order to */ ret = db_tdb2_open_master(db_ctx, true, NULL); if (ret != 0) { return ret; } ret = tdb_transaction_start(db_ctx->ltdb->tdb); if (ret != 0) { db_tdb2_close_master(db_ctx); return ret; } db_ctx->local_transaction = true; db_ctx->transaction = 1; return 0; } static void db_tdb2_queue_change(struct db_tdb2_ctx *db_ctx, const TDB_DATA key) { size_t size_needed = 4 + key.dsize; size_t size_new = db_ctx->current_buffer_size + size_needed; uint32_t i; DATA_BLOB *keys; db_ctx->changes.num_changes++; if (db_ctx->changes.num_changes > 1 && db_ctx->changes.keys == NULL) { /* * this means we already overflowed */ return; } if (db_ctx->changes.num_changes == 1) { db_ctx->changes.old_seqnum = db_ctx->mseqnum; } for (i=0; i < db_ctx->changes.num_keys; i++) { int ret; if (key.dsize != db_ctx->changes.keys[i].length) { continue; } ret = memcmp(key.dptr, db_ctx->changes.keys[i].data, key.dsize); if (ret != 0) { continue; } /* * the key is already in the list * so we're done */ return; } if (db_ctx->max_buffer_size < size_new) { goto overflow; } keys = TALLOC_REALLOC_ARRAY(db_ctx, db_ctx->changes.keys, DATA_BLOB, db_ctx->changes.num_keys + 1); if (!keys) { goto overflow; } db_ctx->changes.keys = keys; keys[db_ctx->changes.num_keys].data = talloc_memdup(keys, key.dptr, key.dsize); if (!keys[db_ctx->changes.num_keys].data) { goto overflow; } keys[db_ctx->changes.num_keys].length = key.dsize; db_ctx->changes.num_keys++; db_ctx->current_buffer_size = size_new; return; overflow: /* * on overflow discard the buffer and let * the others reload the whole tdb */ db_ctx->current_buffer_size = 0; db_ctx->changes.num_keys = 0; TALLOC_FREE(db_ctx->changes.keys); return; } static void db_tdb2_send_notify(struct db_tdb2_ctx *db_ctx) { enum ndr_err_code ndr_err; bool ok; DATA_BLOB blob; struct messaging_context *msg_ctx; int num_msgs = 0; struct server_id self = procid_self(); msg_ctx = db_tdb2_get_global_messaging_context(); db_ctx->changes.name = db_ctx->name; DEBUG(10,("%s[%s] size[%u/%u] changes[%u] keys[%u] seqnum[%u=>%u]\n", __FUNCTION__, db_ctx->changes.name, db_ctx->current_buffer_size, db_ctx->max_buffer_size, db_ctx->changes.num_changes, db_ctx->changes.num_keys, db_ctx->changes.old_seqnum, db_ctx->changes.new_seqnum)); if (db_ctx->changes.num_changes == 0) { DEBUG(10,("db_tdb2_send_notify[%s]: no changes\n", db_ctx->changes.name)); goto done; } if (!msg_ctx) { DEBUG(1,("db_tdb2_send_notify[%s]: skipped (no msg ctx)\n", db_ctx->changes.name)); goto done; } ndr_err = ndr_push_struct_blob( &blob, talloc_tos(), &db_ctx->changes, (ndr_push_flags_fn_t)ndr_push_dbwrap_tdb2_changes); if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { DEBUG(0,("db_tdb2_send_notify[%s]: failed to push changes: %s\n", db_ctx->changes.name, nt_errstr(ndr_map_error2ntstatus(ndr_err)))); goto done; } ok = message_send_all(msg_ctx, MSG_DBWRAP_TDB2_CHANGES, blob.data, blob.length, &num_msgs); if (!ok) { DEBUG(0,("db_tdb2_send_notify[%s]: failed to send changes\n", db_ctx->changes.name)); goto done; } DEBUG(10,("db_tdb2_send_notify[%s]: pid %s send %u messages\n", db_ctx->name, procid_str_static(&self), num_msgs)); done: TALLOC_FREE(db_ctx->changes.keys); ZERO_STRUCT(db_ctx->changes); return; } static void db_tdb2_receive_changes(struct messaging_context *msg, void *private_data, uint32_t msg_type, struct server_id server_id, DATA_BLOB *data) { enum ndr_err_code ndr_err; struct dbwrap_tdb2_changes changes; struct db_context *db; struct server_id self; if (procid_is_me(&server_id)) { DEBUG(0,("db_tdb2_receive_changes: ignore selfpacket\n")); return; } self = procid_self(); DEBUG(10,("db_tdb2_receive_changes: from %s to %s\n", procid_str(debug_ctx(), &server_id), procid_str(debug_ctx(), &self))); ndr_err = ndr_pull_struct_blob_all( data, talloc_tos(), &changes, (ndr_pull_flags_fn_t)ndr_pull_dbwrap_tdb2_changes); if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { DEBUG(0,("db_tdb2_receive_changes: failed to pull changes: %s\n", nt_errstr(ndr_map_error2ntstatus(ndr_err)))); goto done; } if(DEBUGLEVEL >= 10) { NDR_PRINT_DEBUG(dbwrap_tdb2_changes, &changes); } /* open the db, this will sync it */ db = db_open_tdb2_ex(talloc_tos(), changes.name, 0, 0, O_RDWR, 0600, &changes); TALLOC_FREE(db); done: return; } static int db_tdb2_transaction_commit(struct db_context *db) { struct db_tdb2_ctx *db_ctx = talloc_get_type_abort(db->private_data, struct db_tdb2_ctx); int ret; uint32_t mseqnum; if (db_ctx->transaction == 0) { return -1; } else if (db_ctx->transaction > 1) { db_ctx->transaction--; return 0; } mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb); db_ctx->changes.new_seqnum = mseqnum; /* first commit to the master copy */ ret = tdb_transaction_commit(db_ctx->mtdb->tdb); db_ctx->master_transaction = false; if (ret != 0) { int saved_errno = errno; db_tdb2_transaction_cancel(db); errno = saved_errno; return ret; } /* * Note: as we've already commited the changes to the master copy * so we ignore errors in the following functions */ ret = db_tdb2_commit_local(db_ctx, mseqnum); if (ret == 0) { db_ctx->out_of_sync = false; } else { db_ctx->out_of_sync = true; } db_ctx->transaction = 0; db_tdb2_close_master(db_ctx); db_tdb2_send_notify(db_ctx); return 0; } static int db_tdb2_transaction_cancel(struct db_context *db) { struct db_tdb2_ctx *db_ctx = talloc_get_type_abort(db->private_data, struct db_tdb2_ctx); int saved_errno; int ret; if (db_ctx->transaction == 0) { return -1; } if (db_ctx->transaction > 1) { db_ctx->transaction--; return 0; } /* cancel the transaction and close the master copy */ ret = db_tdb2_close_master(db_ctx); saved_errno = errno; /* now cancel on the local copy and ignore any error */ tdb_transaction_cancel(db_ctx->ltdb->tdb); db_ctx->local_transaction = false; db_ctx->transaction = 0; errno = saved_errno; return ret; } static int db_tdb2_open_master(struct db_tdb2_ctx *db_ctx, bool transaction, const struct dbwrap_tdb2_changes *changes) { int ret; db_ctx->mtdb = tdb_wrap_open(db_ctx, db_ctx->mtdb_path, db_ctx->open.hash_size, db_ctx->open.tdb_flags|TDB_NOMMAP|TDB_SEQNUM, db_ctx->open.open_flags, db_ctx->open.mode); if (db_ctx->mtdb == NULL) { DEBUG(0, ("Could not open master tdb[%s]: %s\n", db_ctx->mtdb_path, strerror(errno))); return -1; } DEBUG(10,("open_master[%s]\n", db_ctx->mtdb_path)); if (!db_ctx->ltdb) { struct stat st; if (fstat(tdb_fd(db_ctx->mtdb->tdb), &st) == 0) { db_ctx->open.mode = st.st_mode; } /* make sure the local one uses the same hash size as the master one */ db_ctx->open.hash_size = tdb_hash_size(db_ctx->mtdb->tdb); db_ctx->ltdb = tdb_wrap_open(db_ctx, db_ctx->ltdb_path, db_ctx->open.hash_size, db_ctx->open.tdb_flags|TDB_SEQNUM, db_ctx->open.open_flags|O_CREAT, db_ctx->open.mode); if (db_ctx->ltdb == NULL) { DEBUG(0, ("Could not open local tdb[%s]: %s\n", db_ctx->ltdb_path, strerror(errno))); TALLOC_FREE(db_ctx->mtdb); return -1; } DEBUG(10,("open_local[%s]\n", db_ctx->ltdb_path)); } if (transaction) { ret = tdb_transaction_start(db_ctx->mtdb->tdb); if (ret != 0) { DEBUG(0,("open failed to start transaction[%s]\n", db_ctx->mtdb_path)); db_tdb2_close_master(db_ctx); return ret; } db_ctx->master_transaction = true; } ret = db_tdb2_sync_from_master(db_ctx, changes); if (ret != 0) { DEBUG(0,("open failed to sync from master[%s]\n", db_ctx->ltdb_path)); db_tdb2_close_master(db_ctx); return ret; } return 0; } static int db_tdb2_commit_local(struct db_tdb2_ctx *db_ctx, uint32_t mseqnum) { bool ok; int ret; /* first fetch the master seqnum */ db_ctx->mseqnum = mseqnum; /* now we try to store the master seqnum in the local tdb */ ok = tdb_store_uint32_byblob(db_ctx->ltdb->tdb, db_ctx->mseqkey, db_ctx->mseqnum); if (!ok) { tdb_transaction_cancel(db_ctx->ltdb->tdb); db_ctx->local_transaction = false; DEBUG(0,("local failed[%s] store mseq[%u]\n", db_ctx->ltdb_path, db_ctx->mseqnum)); return -1; } /* now commit all changes to the local tdb */ ret = tdb_transaction_commit(db_ctx->ltdb->tdb); db_ctx->local_transaction = false; if (ret != 0) { DEBUG(0,("local failed[%s] commit mseq[%u]\n", db_ctx->ltdb_path, db_ctx->mseqnum)); return ret; } /* * and update the cached local seqnum this is needed to * let us cache the master seqnum. */ db_ctx->lseqnum = tdb_get_seqnum(db_ctx->ltdb->tdb); DEBUG(10,("local updated[%s] mseq[%u]\n", db_ctx->ltdb_path, db_ctx->mseqnum)); return 0; } static int db_tdb2_close_master(struct db_tdb2_ctx *db_ctx) { if (db_ctx->master_transaction) { tdb_transaction_cancel(db_ctx->mtdb->tdb); } db_ctx->master_transaction = false; /* now we can close the master handle */ TALLOC_FREE(db_ctx->mtdb); DEBUG(10,("close_master[%s] ok\n", db_ctx->mtdb_path)); return 0; } static int db_tdb2_traverse_sync_all_func(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *private_data) { struct db_tdb2_traverse_ctx *ctx = (struct db_tdb2_traverse_ctx *)private_data; uint32_t *seqnum = (uint32_t *)ctx->private_data; int ret; DEBUG(10,("sync_entry[%s]\n", ctx->db_ctx->mtdb_path)); /* Do not accidently allocate/deallocate w/o need when debug level is lower than needed */ if(DEBUGLEVEL >= 10) { char *keystr = hex_encode(NULL, (unsigned char*)kbuf.dptr, kbuf.dsize); DEBUG(10, (DEBUGLEVEL > 10 ? "Locking key %s\n" : "Locking key %.20s\n", keystr)); TALLOC_FREE(keystr); } ret = tdb_store(ctx->db_ctx->ltdb->tdb, kbuf, dbuf, TDB_INSERT); if (ret != 0) { DEBUG(0,("sync_entry[%s] %d: %s\n", ctx->db_ctx->ltdb_path, ret, tdb_errorstr(ctx->db_ctx->ltdb->tdb))); return ret; } *seqnum = tdb_get_seqnum(ctx->db_ctx->mtdb->tdb); return 0; } static int db_tdb2_sync_all(struct db_tdb2_ctx *db_ctx, uint32_t *seqnum) { struct db_tdb2_traverse_ctx ctx; int ret; ret = tdb_wipe_all(db_ctx->ltdb->tdb); if (ret != 0) { DEBUG(0,("tdb_wipe_all[%s] failed %d: %s\n", db_ctx->ltdb_path, ret, tdb_errorstr(db_ctx->ltdb->tdb))); return ret; } ctx.db_ctx = db_ctx; ctx.f = NULL; ctx.private_data = seqnum; ret = tdb_traverse_read(db_ctx->mtdb->tdb, db_tdb2_traverse_sync_all_func, &ctx); DEBUG(10,("db_tdb2_sync_all[%s] count[%d]\n", db_ctx->mtdb_path, ret)); if (ret < 0) { return ret; } return 0; } static int db_tdb2_sync_changes(struct db_tdb2_ctx *db_ctx, const struct dbwrap_tdb2_changes *changes, uint32_t *seqnum) { uint32_t cseqnum; uint32_t mseqnum; uint32_t i; int ret; bool need_full_sync = false; DEBUG(10,("db_tdb2_sync_changes[%s] changes[%u]\n", changes->name, changes->num_changes)); if(DEBUGLEVEL >= 10) { NDR_PRINT_DEBUG(dbwrap_tdb2_changes, discard_const(changes)); } /* for the master tdb for reading */ ret = tdb_lockall_read(db_ctx->mtdb->tdb); if (ret != 0) { DEBUG(0,("tdb_lockall_read[%s] %d\n", db_ctx->mtdb_path, ret)); return ret; } /* first fetch seqnum we know about */ cseqnum = db_tdb2_get_seqnum(db_ctx->db); /* then fetch the master seqnum */ mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb); if (cseqnum == mseqnum) { DEBUG(10,("db_tdb2_sync_changes[%s] uptodate[%u]\n", db_ctx->mtdb_path, mseqnum)); /* we hit a race before and now noticed we're uptodate */ goto done; } /* now see if the changes describe what we need */ if (changes->old_seqnum != cseqnum) { need_full_sync = true; } if (changes->new_seqnum != mseqnum) { need_full_sync = true; } /* this was the overflow case */ if (changes->num_keys == 0) { need_full_sync = true; } if (need_full_sync) { tdb_unlockall_read(db_ctx->mtdb->tdb); DEBUG(0,("fallback to full sync[%s] seq[%u=>%u] keys[%u]\n", db_ctx->ltdb_path, cseqnum, mseqnum, changes->num_keys)); return db_tdb2_sync_all(db_ctx, &mseqnum); } for (i=0; i < changes->num_keys; i++) { const char *op = NULL; bool del = false; TDB_DATA key; TDB_DATA val; key.dsize = changes->keys[i].length; key.dptr = changes->keys[i].data; val = tdb_fetch(db_ctx->mtdb->tdb, key); ret = tdb_error(db_ctx->mtdb->tdb); if (ret == TDB_ERR_NOEXIST) { del = true; } else if (ret != 0) { DEBUG(0,("sync_changes[%s] failure %d\n", db_ctx->mtdb_path, ret)); goto failed; } if (del) { op = "delete"; ret = tdb_delete(db_ctx->ltdb->tdb, key); DEBUG(10,("sync_changes[%s] delete key[%u] %d\n", db_ctx->mtdb_path, i, ret)); } else { op = "store"; ret = tdb_store(db_ctx->ltdb->tdb, key, val, TDB_REPLACE); DEBUG(10,("sync_changes[%s] store key[%u] %d\n", db_ctx->mtdb_path, i, ret)); } SAFE_FREE(val.dptr); if (ret != 0) { DEBUG(0,("sync_changes[%s] %s key[%u] failed %d\n", db_ctx->mtdb_path, op, i, ret)); goto failed; } } done: tdb_unlockall_read(db_ctx->mtdb->tdb); *seqnum = mseqnum; return 0; failed: tdb_unlockall_read(db_ctx->mtdb->tdb); return ret; } static int db_tdb2_sync_from_master(struct db_tdb2_ctx *db_ctx, const struct dbwrap_tdb2_changes *changes) { int ret; uint32_t cseqnum; uint32_t mseqnum; bool force = false; /* first fetch seqnum we know about */ cseqnum = db_tdb2_get_seqnum(db_ctx->db); /* then fetch the master seqnum */ mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb); if (db_ctx->lseqnum == 0) { force = true; } if (!force && cseqnum == mseqnum) { DEBUG(10,("uptodate[%s] mseq[%u]\n", db_ctx->ltdb_path, mseqnum)); /* the local copy is uptodate, close the master db */ return 0; } DEBUG(10,("not uptodate[%s] seq[%u=>%u]\n", db_ctx->ltdb_path, cseqnum, mseqnum)); ret = tdb_transaction_start(db_ctx->ltdb->tdb); if (ret != 0) { DEBUG(0,("failed to start transaction[%s] %d: %s\n", db_ctx->ltdb_path, ret, tdb_errorstr(db_ctx->ltdb->tdb))); db_ctx->out_of_sync = true; return ret; } db_ctx->local_transaction = true; if (changes && !force) { ret = db_tdb2_sync_changes(db_ctx, changes, &mseqnum); if (ret != 0) { db_ctx->out_of_sync = true; tdb_transaction_cancel(db_ctx->ltdb->tdb); db_ctx->local_transaction = false; return ret; } } else { ret = db_tdb2_sync_all(db_ctx, &mseqnum); if (ret != 0) { db_ctx->out_of_sync = true; tdb_transaction_cancel(db_ctx->ltdb->tdb); db_ctx->local_transaction = false; return ret; } } ret = db_tdb2_commit_local(db_ctx, mseqnum); if (ret != 0) { db_ctx->out_of_sync = true; return ret; } db_ctx->out_of_sync = false; return 0; } static int db_tdb2_ctx_destructor(struct db_tdb2_ctx *db_tdb2) { db_tdb2_close_master(db_tdb2); if (db_tdb2->local_transaction) { tdb_transaction_cancel(db_tdb2->ltdb->tdb); } db_tdb2->local_transaction = false; TALLOC_FREE(db_tdb2->ltdb); return 0; } static struct db_context *db_open_tdb2_ex(TALLOC_CTX *mem_ctx, const char *name, int hash_size, int tdb_flags, int open_flags, mode_t mode, const struct dbwrap_tdb2_changes *chgs) { struct db_context *result = NULL; struct db_tdb2_ctx *db_tdb2; int ret; const char *md; const char *ld; const char *bn; bn = strrchr_m(name, '/'); if (bn) { bn++; DEBUG(3,("db_open_tdb2: use basename[%s] of abspath[%s]:\n", bn, name)); } else { bn = name; } md = lp_parm_const_string(-1, "dbwrap_tdb2", "master directory", NULL); if (!md) { DEBUG(0,("'dbwrap_tdb2:master directory' empty\n")); goto fail; } ld = lp_parm_const_string(-1, "dbwrap_tdb2", "local directory", NULL); if (!ld) { DEBUG(0,("'dbwrap_tdb2:local directory' empty\n")); goto fail; } result = TALLOC_ZERO_P(mem_ctx, struct db_context); if (result == NULL) { DEBUG(0, ("talloc failed\n")); goto fail; } result->private_data = db_tdb2 = TALLOC_ZERO_P(result, struct db_tdb2_ctx); if (db_tdb2 == NULL) { DEBUG(0, ("talloc failed\n")); goto fail; } db_tdb2->db = result; db_tdb2->open.hash_size = hash_size; db_tdb2->open.tdb_flags = tdb_flags; db_tdb2->open.open_flags= open_flags; db_tdb2->open.mode = mode; db_tdb2->max_buffer_size = lp_parm_ulong(-1, "dbwrap_tdb2", "notify buffer size", 512); db_tdb2->name = talloc_strdup(db_tdb2, bn); if (db_tdb2->name == NULL) { DEBUG(0, ("talloc_strdup failed\n")); goto fail; } db_tdb2->mtdb_path = talloc_asprintf(db_tdb2, "%s/%s", md, bn); if (db_tdb2->mtdb_path == NULL) { DEBUG(0, ("talloc_asprintf failed\n")); goto fail; } db_tdb2->ltdb_path = talloc_asprintf(db_tdb2, "%s/%s.tdb2", ld, bn); if (db_tdb2->ltdb_path == NULL) { DEBUG(0, ("talloc_asprintf failed\n")); goto fail; } db_tdb2->mseqkey = string_term_tdb_data(DB_TDB2_MASTER_SEQNUM_KEYSTR); /* * this implicit opens the local one if as it's not yet open * it syncs the local copy. */ ret = db_tdb2_open_master(db_tdb2, false, chgs); if (ret != 0) { goto fail; } ret = db_tdb2_close_master(db_tdb2); if (ret != 0) { goto fail; } DEBUG(10,("db_open_tdb2[%s] opened with mseq[%u]\n", db_tdb2->name, db_tdb2->mseqnum)); result->fetch_locked = db_tdb2_fetch_locked; result->fetch = db_tdb2_fetch; result->traverse = db_tdb2_traverse; result->traverse_read = db_tdb2_traverse_read; result->get_seqnum = db_tdb2_get_seqnum; result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0); result->transaction_start = db_tdb2_transaction_start; result->transaction_commit = db_tdb2_transaction_commit; result->transaction_cancel = db_tdb2_transaction_cancel; talloc_set_destructor(db_tdb2, db_tdb2_ctx_destructor); return result; fail: if (result != NULL) { TALLOC_FREE(result); } return NULL; } struct db_context *db_open_tdb2(TALLOC_CTX *mem_ctx, const char *name, int hash_size, int tdb_flags, int open_flags, mode_t mode) { return db_open_tdb2_ex(mem_ctx, name, hash_size, tdb_flags, open_flags, mode, NULL); }