diff options
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb_call.c')
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_call.c | 265 |
1 files changed, 82 insertions, 183 deletions
diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c index ab5c2cce3b..76a7e97a87 100644 --- a/source4/cluster/ctdb/common/ctdb_call.c +++ b/source4/cluster/ctdb/common/ctdb_call.c @@ -47,9 +47,9 @@ /* local version of ctdb_call */ -static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, - struct ctdb_ltdb_header *header, TDB_DATA *data, - uint32_t caller) +int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, + struct ctdb_ltdb_header *header, TDB_DATA *data, + uint32_t caller) { struct ctdb_call_info *c; struct ctdb_registered_call *fn; @@ -105,6 +105,7 @@ static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *ca if (c->reply_data) { call->reply_data = *c->reply_data; talloc_steal(ctdb, call->reply_data.dptr); + talloc_set_name_const(call->reply_data.dptr, __location__); } else { call->reply_data.dptr = NULL; call->reply_data.dsize = 0; @@ -140,7 +141,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb, msglen = strlen(msg)+1; len = offsetof(struct ctdb_reply_error, msg); - r = ctdb->methods->allocate_pkt(ctdb, len + msglen); + r = ctdb->methods->allocate_pkt(msg, len + msglen); CTDB_NO_MEMORY_FATAL(ctdb, r); talloc_set_name_const(r, "send_error packet"); @@ -155,11 +156,9 @@ static void ctdb_send_error(struct ctdb_context *ctdb, r->msglen = msglen; memcpy(&r->msg[0], msg, msglen); - talloc_free(msg); - ctdb_queue_packet(ctdb, &r->hdr); - talloc_free(r); + talloc_free(msg); } @@ -223,16 +222,12 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, memcpy(&r->data[0], key->dptr, key->dsize); memcpy(&r->data[key->dsize], data->dptr, data->dsize); - if (r->hdr.destnode == ctdb->vnn) { - /* we are the lmaster - don't send to ourselves */ - ctdb_request_dmaster(ctdb, &r->hdr); - } else { - ctdb_queue_packet(ctdb, &r->hdr); - - /* update the ltdb to record the new dmaster */ - header->dmaster = r->hdr.destnode; - ctdb_ltdb_store(ctdb_db, *key, header, *data); - } + /* XXX - probably not necessary when lmaster==dmaster + update the ltdb to record the new dmaster */ + header->dmaster = r->hdr.destnode; + ctdb_ltdb_store(ctdb_db, *key, header, *data); + + ctdb_queue_packet(ctdb, &r->hdr); talloc_free(r); } @@ -252,6 +247,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr struct ctdb_ltdb_header header; struct ctdb_db_context *ctdb_db; int ret, len; + TALLOC_CTX *tmp_ctx; key.dptr = c->data; key.dsize = c->keylen; @@ -267,28 +263,41 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr } /* fetch the current record */ - ret = ctdb_ltdb_fetch(ctdb_db, key, &header, hdr, &data2); - if (ret != 0) { + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2, + ctdb_recv_raw_pkt, ctdb); + if (ret == -1) { ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record"); return; } - + if (ret == -2) { + DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n")); + return; + } + /* its a protocol error if the sending node is not the current dmaster */ - if (header.dmaster != hdr->srcnode) { + if (header.dmaster != hdr->srcnode && + hdr->srcnode != ctdb_lmaster(ctdb_db->ctdb, &key)) { ctdb_fatal(ctdb, "dmaster request from non-master"); return; } - + header.dmaster = c->dmaster; - if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) { + ret = ctdb_ltdb_store(ctdb_db, key, &header, data); + ctdb_ltdb_unlock(ctdb_db, key); + if (ret != 0) { ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster"); return; } + /* put the packet on a temporary context, allowing us to safely free + it below even if ctdb_reply_dmaster() has freed it already */ + tmp_ctx = talloc_new(ctdb); + /* send the CTDB_REPLY_DMASTER */ len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize; - r = ctdb->methods->allocate_pkt(ctdb, len); + r = ctdb->methods->allocate_pkt(tmp_ctx, len); CTDB_NO_MEMORY_FATAL(ctdb, r); + talloc_set_name_const(r, "reply_dmaster packet"); r->hdr.length = len; r->hdr.ctdb_magic = CTDB_MAGIC; @@ -300,13 +309,9 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr r->datalen = data.dsize; memcpy(&r->data[0], data.dptr, data.dsize); - if (r->hdr.destnode == r->hdr.srcnode) { - ctdb_reply_dmaster(ctdb, &r->hdr); - } else { - ctdb_queue_packet(ctdb, &r->hdr); - } + ctdb_queue_packet(ctdb, &r->hdr); - talloc_free(r); + talloc_free(tmp_ctx); } @@ -341,17 +346,23 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) fetches the record data (if any), thus avoiding a 2nd fetch of the data if the call will be answered locally */ - ret = ctdb_ltdb_fetch(ctdb_db, call.key, &header, hdr, &data); - if (ret != 0) { + ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data, + ctdb_recv_raw_pkt, ctdb); + if (ret == -1) { ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call"); return; } + if (ret == -2) { + DEBUG(2,(__location__ " deferred ctdb_request_call\n")); + return; + } /* if we are not the dmaster, then send a redirect to the requesting node */ if (header.dmaster != ctdb->vnn) { ctdb_call_send_redirect(ctdb, c, &header); talloc_free(data.dptr); + ctdb_ltdb_unlock(ctdb_db, call.key); return; } @@ -364,11 +375,14 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) || c->flags&CTDB_IMMEDIATE_MIGRATION ) { ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data); talloc_free(data.dptr); + ctdb_ltdb_unlock(ctdb_db, call.key); return; } ctdb_call_local(ctdb_db, &call, &header, &data, c->hdr.srcnode); + ctdb_ltdb_unlock(ctdb_db, call.key); + len = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize; r = ctdb->methods->allocate_pkt(ctdb, len); CTDB_NO_MEMORY_FATAL(ctdb, r); @@ -396,15 +410,18 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) called when a CTDB_REPLY_CALL packet comes in This packet comes in response to a CTDB_REQ_CALL request packet. It - contains any reply data freom the call + contains any reply data from the call */ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr; struct ctdb_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); - if (state == NULL) return; + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); + if (state == NULL) { + DEBUG(0, ("reqid %d not found\n", hdr->reqid)); + return; + } state->call.reply_data.dptr = c->data; state->call.reply_data.dsize = c->datalen; @@ -412,10 +429,6 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) talloc_steal(state, c); - /* get an extra reference here - this prevents the free in ctdb_recv_pkt() - from freeing the data */ - (void)talloc_reference(state, c); - state->state = CTDB_CALL_DONE; if (state->async.fn) { state->async.fn(state); @@ -435,13 +448,25 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_call_state *state; struct ctdb_db_context *ctdb_db; TDB_DATA data; + int ret; - state = idr_find(ctdb->idr, hdr->reqid); + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state == NULL) { return; } + ctdb_db = state->ctdb_db; + ret = ctdb_ltdb_lock_requeue(ctdb_db, state->call.key, hdr, + ctdb_recv_raw_pkt, ctdb); + if (ret == -2) { + return; + } + if (ret != 0) { + DEBUG(0,(__location__ " Failed to get lock in ctdb_reply_dmaster\n")); + return; + } + data.dptr = c->data; data.dsize = c->datalen; @@ -452,12 +477,17 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) state->header.dmaster = ctdb->vnn; if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) { + ctdb_ltdb_unlock(ctdb_db, state->call.key); ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n"); return; } ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn); + ctdb_ltdb_unlock(ctdb_db, state->call.key); + + talloc_steal(state, state->call.reply_data.dptr); + state->state = CTDB_CALL_DONE; if (state->async.fn) { state->async.fn(state); @@ -473,7 +503,7 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr; struct ctdb_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state == NULL) return; talloc_steal(state, c); @@ -498,7 +528,7 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr; struct ctdb_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state == NULL) return; talloc_steal(state, c); @@ -510,6 +540,7 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) /* send it off again */ state->node = ctdb->nodes[c->dmaster]; + state->c->hdr.destnode = c->dmaster; ctdb_queue_packet(ctdb, &state->c->hdr); } @@ -578,6 +609,7 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, state->ctdb_db = ctdb_db; ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn); + talloc_steal(state, state->call.reply_data.dptr); event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state); @@ -591,45 +623,27 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, This constructs a ctdb_call request and queues it for processing. This call never blocks. */ -static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctdb_db, - struct ctdb_call *call) +struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call, + struct ctdb_ltdb_header *header) { uint32_t len; struct ctdb_call_state *state; - int ret; - struct ctdb_ltdb_header header; - TDB_DATA data; struct ctdb_context *ctdb = ctdb_db->ctdb; - /* - if we are the dmaster for this key then we don't need to - send it off at all, we can bypass the network and handle it - locally. To find out if we are the dmaster we need to look - in our ltdb - */ - ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); - if (ret != 0) return NULL; - - if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { - return ctdb_call_local_send(ctdb_db, call, &header, &data); - } - state = talloc_zero(ctdb_db, struct ctdb_call_state); CTDB_NO_MEMORY_NULL(ctdb, state); - talloc_steal(state, data.dptr); - len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; - state->c = ctdb->methods->allocate_pkt(ctdb, len); + state->c = ctdb->methods->allocate_pkt(state, len); CTDB_NO_MEMORY_NULL(ctdb, state->c); talloc_set_name_const(state->c, "req_call packet"); - talloc_steal(state, state->c); state->c->hdr.length = len; state->c->hdr.ctdb_magic = CTDB_MAGIC; state->c->hdr.ctdb_version = CTDB_VERSION; state->c->hdr.operation = CTDB_REQ_CALL; - state->c->hdr.destnode = header.dmaster; + state->c->hdr.destnode = header->dmaster; state->c->hdr.srcnode = ctdb->vnn; /* this limits us to 16k outstanding messages - not unreasonable */ state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); @@ -645,9 +659,9 @@ static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctd state->call.call_data.dptr = &state->c->data[call->key.dsize]; state->call.key.dptr = &state->c->data[0]; - state->node = ctdb->nodes[header.dmaster]; + state->node = ctdb->nodes[header->dmaster]; state->state = CTDB_CALL_WAIT; - state->header = header; + state->header = *header; state->ctdb_db = ctdb_db; talloc_set_destructor(state, ctdb_call_destructor); @@ -660,29 +674,13 @@ static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctd } /* - make a remote ctdb call - async send - - This constructs a ctdb_call request and queues it for processing. - This call never blocks. -*/ -struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) -{ - if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_call_send(ctdb_db, call); - } - return ctdb_daemon_call_send(ctdb_db, call); -} - -/* make a remote ctdb call - async recv - called in daemon context This is called when the program wants to wait for a ctdb_call to complete and get the results. This call will block unless the call has already completed. */ -static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) { - struct ctdb_record_handle *rec; - while (state->state < CTDB_CALL_DONE) { event_loop_once(state->node->ctdb->ev); } @@ -692,16 +690,6 @@ static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call return -1; } - rec = state->fetch_private; - - /* ugly hack to manage forced migration */ - if (rec != NULL) { - rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr); - rec->data->dsize = state->call.reply_data.dsize; - talloc_free(state); - return 0; - } - if (state->call.reply_data.dsize) { call->reply_data.dptr = talloc_memdup(state->node->ctdb, state->call.reply_data.dptr, @@ -717,92 +705,3 @@ static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call } -/* - make a remote ctdb call - async recv. - - This is called when the program wants to wait for a ctdb_call to complete and get the - results. This call will block unless the call has already completed. -*/ -int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) -{ - if (state->ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_call_recv(state, call); - } - return ctdb_daemon_call_recv(state, call); -} - -/* - full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() -*/ -int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) -{ - struct ctdb_call_state *state; - - state = ctdb_call_send(ctdb_db, call); - return ctdb_call_recv(state, call); -} - - - -struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, - TDB_DATA key, TDB_DATA *data) -{ - struct ctdb_call call; - struct ctdb_record_handle *rec; - struct ctdb_call_state *state; - int ret; - - if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_fetch_lock(ctdb_db, mem_ctx, key, data); - } - - ZERO_STRUCT(call); - call.call_id = CTDB_FETCH_FUNC; - call.key = key; - call.flags = CTDB_IMMEDIATE_MIGRATION; - - rec = talloc(mem_ctx, struct ctdb_record_handle); - CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec); - - rec->ctdb_db = ctdb_db; - rec->key = key; - rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); - rec->data = data; - - state = ctdb_call_send(ctdb_db, &call); - state->fetch_private = rec; - - ret = ctdb_call_recv(state, &call); - if (ret != 0) { - talloc_free(rec); - return NULL; - } - - return rec; -} - - -int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) -{ - int ret; - struct ctdb_ltdb_header header; - struct ctdb_db_context *ctdb_db = talloc_get_type(rec->ctdb_db, struct ctdb_db_context); - - if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_store_unlock(rec, data); - } - - /* should be avoided if possible hang header off rec ? */ - ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL, NULL); - if (ret) { - ctdb_set_error(rec->ctdb_db->ctdb, "Fetch of locally held record failed"); - talloc_free(rec); - return ret; - } - - ret = ctdb_ltdb_store(rec->ctdb_db, rec->key, &header, data); - - talloc_free(rec); - - return ret; -} |