diff options
-rw-r--r-- | source3/lib/dbwrap_ctdb.c | 196 |
1 files changed, 46 insertions, 150 deletions
diff --git a/source3/lib/dbwrap_ctdb.c b/source3/lib/dbwrap_ctdb.c index a6bda8e403..d46b64dba5 100644 --- a/source3/lib/dbwrap_ctdb.c +++ b/source3/lib/dbwrap_ctdb.c @@ -34,6 +34,7 @@ struct db_ctdb_transaction_handle { }; struct db_ctdb_ctx { + struct db_context *db; struct tdb_wrap *wtdb; uint32 db_id; struct db_ctdb_transaction_handle *transaction; @@ -400,6 +401,40 @@ static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ct return result; } +static int db_ctdb_record_destructor(struct db_record *rec) +{ + struct db_ctdb_transaction_handle *h = talloc_get_type_abort( + rec->private_data, struct db_ctdb_transaction_handle); + h->ctx->db->transaction_cancel(h->ctx->db); + return 0; +} + +/* + auto-create a transaction for persistent databases + */ +static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx, + TALLOC_CTX *mem_ctx, + TDB_DATA key) +{ + int res; + struct db_record *rec; + + res = db_ctdb_transaction_start(ctx->db); + if (res == -1) { + return NULL; + } + + rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key); + if (rec == NULL) { + ctx->db->transaction_cancel(ctx->db); + return NULL; + } + + /* destroy this transaction when we release the lock */ + talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor); + return rec; +} + /* stores a record inside a transaction @@ -681,130 +716,6 @@ static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag) } -/* for persistent databases the store is a bit different. We have to - ask the ctdb daemon to push the record to all nodes after the - store */ -static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag) -{ - struct db_ctdb_rec *crec; - struct db_record *record; - TDB_DATA cdata; - int ret; - NTSTATUS status; - uint32_t count; - int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5); - - for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec; - (count < max_retries) && !NT_STATUS_IS_OK(status); - count++) - { - if (count > 0) { - /* retry */ - /* - * There is a hack here: We use rec as a memory - * context and re-use it as the record struct ptr. - * We don't free the record data allocated - * in each turn. So all gets freed when the caller - * releases the original record. This is because - * we don't get the record passed in by reference - * in the first place and the caller relies on - * having to free the record himself. - */ - record = fetch_locked_internal(crec->ctdb_ctx, - rec, - rec->key, - true /* persistent */); - if (record == NULL) { - DEBUG(5, ("fetch_locked_internal failed.\n")); - status = NT_STATUS_NO_MEMORY; - break; - } - } - - crec = talloc_get_type_abort(record->private_data, - struct db_ctdb_rec); - - cdata.dsize = sizeof(crec->header) + data.dsize; - - if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) { - return NT_STATUS_NO_MEMORY; - } - - crec->header.rsn++; - - memcpy(cdata.dptr, &crec->header, sizeof(crec->header)); - memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize); - - status = ctdbd_start_persistent_update( - messaging_ctdbd_connection(), - crec->ctdb_ctx->db_id, - rec->key, - cdata); - - if (NT_STATUS_IS_OK(status)) { - ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, - cdata, TDB_REPLACE); - status = (ret == 0) - ? NT_STATUS_OK - : tdb_error_to_ntstatus( - crec->ctdb_ctx->wtdb->tdb); - } - - /* - * release the lock *now* in order to prevent deadlocks. - * - * There is a tradeoff: Usually, the record is still locked - * after db->store operation. This lock is usually released - * via the talloc destructor with the TALLOC_FREE to - * the record. So we have two choices: - * - * - Either re-lock the record after the call to persistent_store - * or cancel_persistent update and this way not changing any - * assumptions callers may have about the state, but possibly - * introducing new race conditions. - * - * - Or don't lock the record again but just remove the - * talloc_destructor. This is less racy but assumes that - * the lock is always released via TALLOC_FREE of the record. - * - * I choose the first variant for now since it seems less racy. - * We can't guarantee that we succeed in getting the lock - * anyways. The only real danger here is that a caller - * performs multiple store operations after a fetch_locked() - * which is currently not the case. - */ - tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key); - talloc_set_destructor(record, NULL); - - /* now tell ctdbd to update this record on all other nodes */ - if (NT_STATUS_IS_OK(status)) { - status = ctdbd_persistent_store( - messaging_ctdbd_connection(), - crec->ctdb_ctx->db_id, - rec->key, - cdata); - } else { - ctdbd_cancel_persistent_update( - messaging_ctdbd_connection(), - crec->ctdb_ctx->db_id, - rec->key, - cdata); - break; - } - - SAFE_FREE(cdata.dptr); - } /* retry-loop */ - - if (!NT_STATUS_IS_OK(status)) { - DEBUG(5, ("ctdbd_persistent_store failed after " - "%d retries with error %s - giving up.\n", - count, nt_errstr(status))); - } - - SAFE_FREE(cdata.dptr); - - return status; -} static NTSTATUS db_ctdb_delete(struct db_record *rec) { @@ -821,21 +732,6 @@ static NTSTATUS db_ctdb_delete(struct db_record *rec) } -static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec) -{ - TDB_DATA data; - - /* - * We have to store the header with empty data. TODO: Fix the - * tdb-level cleanup - */ - - ZERO_STRUCT(data); - - return db_ctdb_store_persistent(rec, data, 0); - -} - static int db_ctdb_record_destr(struct db_record* data) { struct db_ctdb_rec *crec = talloc_get_type_abort( @@ -867,10 +763,6 @@ static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx, TDB_DATA ctdb_data; int migrate_attempts = 0; - if (ctx->transaction != NULL) { - return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key); - } - if (!(result = talloc(mem_ctx, struct db_record))) { DEBUG(0, ("talloc failed\n")); return NULL; @@ -913,13 +805,8 @@ again: return NULL; } - if (persistent) { - result->store = db_ctdb_store_persistent; - result->delete_rec = db_ctdb_delete_persistent; - } else { - result->store = db_ctdb_store; - result->delete_rec = db_ctdb_delete; - } + result->store = db_ctdb_store; + result->delete_rec = db_ctdb_delete; talloc_set_destructor(result, db_ctdb_record_destr); ctdb_data = tdb_fetch(ctx->wtdb->tdb, key); @@ -988,6 +875,14 @@ static struct db_record *db_ctdb_fetch_locked(struct db_context *db, struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data, struct db_ctdb_ctx); + if (ctx->transaction != NULL) { + return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key); + } + + if (db->persistent) { + return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key); + } + return fetch_locked_internal(ctx, mem_ctx, key, db->persistent); } @@ -1210,6 +1105,7 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx, } db_ctdb->transaction = NULL; + db_ctdb->db = result; if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) { DEBUG(0, ("ctdbd_db_attach failed for %s\n", name)); |