From 66156220ebf5dc212e9aa86015b7301d1b665b50 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Thu, 5 Apr 2007 03:51:49 +0000 Subject: r22082: merged the ctdb changes from bzr added opendb ctdb backend from ronnie (This used to be commit b0da25cb79f860bfa14ba7a8419c7996d936292b) --- source4/cluster/ctdb/common/ctdb_call.c | 93 ++++++++++++++++++++++++++++++--- 1 file changed, 87 insertions(+), 6 deletions(-) (limited to 'source4/cluster/ctdb/common/ctdb_call.c') diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c index 75355f7ae7..0b4195140c 100644 --- a/source4/cluster/ctdb/common/ctdb_call.c +++ b/source4/cluster/ctdb/common/ctdb_call.c @@ -189,7 +189,7 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, r->hdr.srcnode = ctdb->vnn; r->hdr.reqid = c->hdr.reqid; r->db_id = c->db_id; - r->dmaster = header->laccessor; + r->dmaster = c->hdr.srcnode; r->keylen = key->dsize; r->datalen = data->dsize; memcpy(&r->data[0], key->dptr, key->dsize); @@ -239,7 +239,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr ctdb_send_error(ctdb, hdr, ret, "Unknown database in request. db_id==0x%08x",c->db_id); return; } - + /* fetch the current record */ ret = ctdb_ltdb_fetch(ctdb_db, key, &header, &data2); if (ret != 0) { @@ -329,9 +329,12 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) } /* if this nodes has done enough consecutive calls on the same record - then give them the record */ - if (header.laccessor == c->hdr.srcnode && - header.lacount >= ctdb->max_lacount) { + then give them the record + or if the node requested an immediate migration + */ + if ( (header.laccessor == c->hdr.srcnode + && header.lacount >= ctdb->max_lacount) + || c->flags&CTDB_IMMEDIATE_MIGRATION ) { ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data); talloc_free(data.dptr); return; @@ -373,6 +376,7 @@ struct ctdb_call_state { struct ctdb_call call; int redirect_count; struct ctdb_ltdb_header header; + void *fetch_private; }; @@ -419,7 +423,6 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) } ctdb_db = state->ctdb_db; - data.dptr = c->data; data.dsize = c->datalen; @@ -578,6 +581,7 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct c state->c->hdr.srcnode = ctdb->vnn; /* this limits us to 16k outstanding messages - not unreasonable */ state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + state->c->flags = call->flags; state->c->db_id = ctdb_db->db_id; state->c->callid = call->call_id; state->c->keylen = call->key.dsize; @@ -604,6 +608,13 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct c } + +struct ctdb_record_handle { + struct ctdb_db_context *ctdb_db; + TDB_DATA key; + TDB_DATA *data; +}; + /* make a remote ctdb call - async recv. @@ -612,6 +623,8 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct c */ int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) { + struct ctdb_record_handle *rec; + while (state->state < CTDB_CALL_DONE) { event_loop_once(state->node->ctdb->ev); } @@ -620,6 +633,18 @@ int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) talloc_free(state); return -1; } + + rec = state->fetch_private; + + /* ugly hack to manage forced migration */ + if (rec != NULL) { + rec->data->dptr = talloc_memdup(rec, state->call.reply_data.dptr, + state->call.reply_data.dsize); + rec->data->dsize = state->call.reply_data.dsize; + talloc_free(state); + return 0; + } + if (state->call.reply_data.dsize) { call->reply_data.dptr = talloc_memdup(state->node->ctdb, state->call.reply_data.dptr, @@ -643,3 +668,59 @@ int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) state = ctdb_call_send(ctdb_db, call); return ctdb_call_recv(state, call); } + + + + + + +struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) +{ + struct ctdb_call call; + struct ctdb_record_handle *rec; + struct ctdb_call_state *state; + int ret; + + ZERO_STRUCT(call); + call.call_id = CTDB_FETCH_FUNC; + call.key = key; + call.flags = CTDB_IMMEDIATE_MIGRATION; + + rec = talloc(mem_ctx, struct ctdb_record_handle); + CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec); + + rec->ctdb_db = ctdb_db; + rec->key = key; + rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); + rec->data = data; + + state = ctdb_call_send(ctdb_db, &call); + state->fetch_private = rec; + + ret = ctdb_call_recv(state, &call); + if (ret != 0) { + talloc_free(rec); + return NULL; + } + + return rec; +} + + +int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data) +{ + int ret; + struct ctdb_ltdb_header header; + + /* should be avoided if possible hang header off rec ? */ + ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL); + if (ret) { + ctdb_set_error(rec->ctdb_db->ctdb, "Fetch of locally held record failed"); + return ret; + } + + ret = ctdb_ltdb_store(rec->ctdb_db, rec->key, &header, data); + + return ret; +} -- cgit