summaryrefslogtreecommitdiff
path: root/source4/cluster/ctdb/common/ctdb_call.c
diff options
context:
space:
mode:
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb_call.c')
-rw-r--r--source4/cluster/ctdb/common/ctdb_call.c265
1 files changed, 82 insertions, 183 deletions
diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c
index ab5c2cce3b..76a7e97a87 100644
--- a/source4/cluster/ctdb/common/ctdb_call.c
+++ b/source4/cluster/ctdb/common/ctdb_call.c
@@ -47,9 +47,9 @@
/*
local version of ctdb_call
*/
-static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
- struct ctdb_ltdb_header *header, TDB_DATA *data,
- uint32_t caller)
+int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
+ struct ctdb_ltdb_header *header, TDB_DATA *data,
+ uint32_t caller)
{
struct ctdb_call_info *c;
struct ctdb_registered_call *fn;
@@ -105,6 +105,7 @@ static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *ca
if (c->reply_data) {
call->reply_data = *c->reply_data;
talloc_steal(ctdb, call->reply_data.dptr);
+ talloc_set_name_const(call->reply_data.dptr, __location__);
} else {
call->reply_data.dptr = NULL;
call->reply_data.dsize = 0;
@@ -140,7 +141,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
msglen = strlen(msg)+1;
len = offsetof(struct ctdb_reply_error, msg);
- r = ctdb->methods->allocate_pkt(ctdb, len + msglen);
+ r = ctdb->methods->allocate_pkt(msg, len + msglen);
CTDB_NO_MEMORY_FATAL(ctdb, r);
talloc_set_name_const(r, "send_error packet");
@@ -155,11 +156,9 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
r->msglen = msglen;
memcpy(&r->msg[0], msg, msglen);
- talloc_free(msg);
-
ctdb_queue_packet(ctdb, &r->hdr);
- talloc_free(r);
+ talloc_free(msg);
}
@@ -223,16 +222,12 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
memcpy(&r->data[0], key->dptr, key->dsize);
memcpy(&r->data[key->dsize], data->dptr, data->dsize);
- if (r->hdr.destnode == ctdb->vnn) {
- /* we are the lmaster - don't send to ourselves */
- ctdb_request_dmaster(ctdb, &r->hdr);
- } else {
- ctdb_queue_packet(ctdb, &r->hdr);
-
- /* update the ltdb to record the new dmaster */
- header->dmaster = r->hdr.destnode;
- ctdb_ltdb_store(ctdb_db, *key, header, *data);
- }
+ /* XXX - probably not necessary when lmaster==dmaster
+ update the ltdb to record the new dmaster */
+ header->dmaster = r->hdr.destnode;
+ ctdb_ltdb_store(ctdb_db, *key, header, *data);
+
+ ctdb_queue_packet(ctdb, &r->hdr);
talloc_free(r);
}
@@ -252,6 +247,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
struct ctdb_ltdb_header header;
struct ctdb_db_context *ctdb_db;
int ret, len;
+ TALLOC_CTX *tmp_ctx;
key.dptr = c->data;
key.dsize = c->keylen;
@@ -267,28 +263,41 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
}
/* fetch the current record */
- ret = ctdb_ltdb_fetch(ctdb_db, key, &header, hdr, &data2);
- if (ret != 0) {
+ ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
+ ctdb_recv_raw_pkt, ctdb);
+ if (ret == -1) {
ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
return;
}
-
+ if (ret == -2) {
+ DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n"));
+ return;
+ }
+
/* its a protocol error if the sending node is not the current dmaster */
- if (header.dmaster != hdr->srcnode) {
+ if (header.dmaster != hdr->srcnode &&
+ hdr->srcnode != ctdb_lmaster(ctdb_db->ctdb, &key)) {
ctdb_fatal(ctdb, "dmaster request from non-master");
return;
}
-
+
header.dmaster = c->dmaster;
- if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
+ ret = ctdb_ltdb_store(ctdb_db, key, &header, data);
+ ctdb_ltdb_unlock(ctdb_db, key);
+ if (ret != 0) {
ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
return;
}
+ /* put the packet on a temporary context, allowing us to safely free
+ it below even if ctdb_reply_dmaster() has freed it already */
+ tmp_ctx = talloc_new(ctdb);
+
/* send the CTDB_REPLY_DMASTER */
len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize;
- r = ctdb->methods->allocate_pkt(ctdb, len);
+ r = ctdb->methods->allocate_pkt(tmp_ctx, len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
+
talloc_set_name_const(r, "reply_dmaster packet");
r->hdr.length = len;
r->hdr.ctdb_magic = CTDB_MAGIC;
@@ -300,13 +309,9 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
r->datalen = data.dsize;
memcpy(&r->data[0], data.dptr, data.dsize);
- if (r->hdr.destnode == r->hdr.srcnode) {
- ctdb_reply_dmaster(ctdb, &r->hdr);
- } else {
- ctdb_queue_packet(ctdb, &r->hdr);
- }
+ ctdb_queue_packet(ctdb, &r->hdr);
- talloc_free(r);
+ talloc_free(tmp_ctx);
}
@@ -341,17 +346,23 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
fetches the record data (if any), thus avoiding a 2nd fetch of the data
if the call will be answered locally */
- ret = ctdb_ltdb_fetch(ctdb_db, call.key, &header, hdr, &data);
- if (ret != 0) {
+ ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data,
+ ctdb_recv_raw_pkt, ctdb);
+ if (ret == -1) {
ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
return;
}
+ if (ret == -2) {
+ DEBUG(2,(__location__ " deferred ctdb_request_call\n"));
+ return;
+ }
/* if we are not the dmaster, then send a redirect to the
requesting node */
if (header.dmaster != ctdb->vnn) {
ctdb_call_send_redirect(ctdb, c, &header);
talloc_free(data.dptr);
+ ctdb_ltdb_unlock(ctdb_db, call.key);
return;
}
@@ -364,11 +375,14 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|| c->flags&CTDB_IMMEDIATE_MIGRATION ) {
ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data);
talloc_free(data.dptr);
+ ctdb_ltdb_unlock(ctdb_db, call.key);
return;
}
ctdb_call_local(ctdb_db, &call, &header, &data, c->hdr.srcnode);
+ ctdb_ltdb_unlock(ctdb_db, call.key);
+
len = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
r = ctdb->methods->allocate_pkt(ctdb, len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
@@ -396,15 +410,18 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
called when a CTDB_REPLY_CALL packet comes in
This packet comes in response to a CTDB_REQ_CALL request packet. It
- contains any reply data freom the call
+ contains any reply data from the call
*/
void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
struct ctdb_call_state *state;
- state = idr_find(ctdb->idr, hdr->reqid);
- if (state == NULL) return;
+ state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
+ if (state == NULL) {
+ DEBUG(0, ("reqid %d not found\n", hdr->reqid));
+ return;
+ }
state->call.reply_data.dptr = c->data;
state->call.reply_data.dsize = c->datalen;
@@ -412,10 +429,6 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
talloc_steal(state, c);
- /* get an extra reference here - this prevents the free in ctdb_recv_pkt()
- from freeing the data */
- (void)talloc_reference(state, c);
-
state->state = CTDB_CALL_DONE;
if (state->async.fn) {
state->async.fn(state);
@@ -435,13 +448,25 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
struct ctdb_call_state *state;
struct ctdb_db_context *ctdb_db;
TDB_DATA data;
+ int ret;
- state = idr_find(ctdb->idr, hdr->reqid);
+ state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
if (state == NULL) {
return;
}
+
ctdb_db = state->ctdb_db;
+ ret = ctdb_ltdb_lock_requeue(ctdb_db, state->call.key, hdr,
+ ctdb_recv_raw_pkt, ctdb);
+ if (ret == -2) {
+ return;
+ }
+ if (ret != 0) {
+ DEBUG(0,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
+ return;
+ }
+
data.dptr = c->data;
data.dsize = c->datalen;
@@ -452,12 +477,17 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
state->header.dmaster = ctdb->vnn;
if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) {
+ ctdb_ltdb_unlock(ctdb_db, state->call.key);
ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
return;
}
ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
+ ctdb_ltdb_unlock(ctdb_db, state->call.key);
+
+ talloc_steal(state, state->call.reply_data.dptr);
+
state->state = CTDB_CALL_DONE;
if (state->async.fn) {
state->async.fn(state);
@@ -473,7 +503,7 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
struct ctdb_call_state *state;
- state = idr_find(ctdb->idr, hdr->reqid);
+ state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
if (state == NULL) return;
talloc_steal(state, c);
@@ -498,7 +528,7 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
struct ctdb_call_state *state;
- state = idr_find(ctdb->idr, hdr->reqid);
+ state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
if (state == NULL) return;
talloc_steal(state, c);
@@ -510,6 +540,7 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
/* send it off again */
state->node = ctdb->nodes[c->dmaster];
+ state->c->hdr.destnode = c->dmaster;
ctdb_queue_packet(ctdb, &state->c->hdr);
}
@@ -578,6 +609,7 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
state->ctdb_db = ctdb_db;
ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
+ talloc_steal(state, state->call.reply_data.dptr);
event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
@@ -591,45 +623,27 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
This constructs a ctdb_call request and queues it for processing.
This call never blocks.
*/
-static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctdb_db,
- struct ctdb_call *call)
+struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
+ struct ctdb_call *call,
+ struct ctdb_ltdb_header *header)
{
uint32_t len;
struct ctdb_call_state *state;
- int ret;
- struct ctdb_ltdb_header header;
- TDB_DATA data;
struct ctdb_context *ctdb = ctdb_db->ctdb;
- /*
- if we are the dmaster for this key then we don't need to
- send it off at all, we can bypass the network and handle it
- locally. To find out if we are the dmaster we need to look
- in our ltdb
- */
- ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
- if (ret != 0) return NULL;
-
- if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
- return ctdb_call_local_send(ctdb_db, call, &header, &data);
- }
-
state = talloc_zero(ctdb_db, struct ctdb_call_state);
CTDB_NO_MEMORY_NULL(ctdb, state);
- talloc_steal(state, data.dptr);
-
len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
- state->c = ctdb->methods->allocate_pkt(ctdb, len);
+ state->c = ctdb->methods->allocate_pkt(state, len);
CTDB_NO_MEMORY_NULL(ctdb, state->c);
talloc_set_name_const(state->c, "req_call packet");
- talloc_steal(state, state->c);
state->c->hdr.length = len;
state->c->hdr.ctdb_magic = CTDB_MAGIC;
state->c->hdr.ctdb_version = CTDB_VERSION;
state->c->hdr.operation = CTDB_REQ_CALL;
- state->c->hdr.destnode = header.dmaster;
+ state->c->hdr.destnode = header->dmaster;
state->c->hdr.srcnode = ctdb->vnn;
/* this limits us to 16k outstanding messages - not unreasonable */
state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
@@ -645,9 +659,9 @@ static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctd
state->call.call_data.dptr = &state->c->data[call->key.dsize];
state->call.key.dptr = &state->c->data[0];
- state->node = ctdb->nodes[header.dmaster];
+ state->node = ctdb->nodes[header->dmaster];
state->state = CTDB_CALL_WAIT;
- state->header = header;
+ state->header = *header;
state->ctdb_db = ctdb_db;
talloc_set_destructor(state, ctdb_call_destructor);
@@ -660,29 +674,13 @@ static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctd
}
/*
- make a remote ctdb call - async send
-
- This constructs a ctdb_call request and queues it for processing.
- This call never blocks.
-*/
-struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
-{
- if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
- return ctdb_client_call_send(ctdb_db, call);
- }
- return ctdb_daemon_call_send(ctdb_db, call);
-}
-
-/*
make a remote ctdb call - async recv - called in daemon context
This is called when the program wants to wait for a ctdb_call to complete and get the
results. This call will block unless the call has already completed.
*/
-static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
+int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
{
- struct ctdb_record_handle *rec;
-
while (state->state < CTDB_CALL_DONE) {
event_loop_once(state->node->ctdb->ev);
}
@@ -692,16 +690,6 @@ static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call
return -1;
}
- rec = state->fetch_private;
-
- /* ugly hack to manage forced migration */
- if (rec != NULL) {
- rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr);
- rec->data->dsize = state->call.reply_data.dsize;
- talloc_free(state);
- return 0;
- }
-
if (state->call.reply_data.dsize) {
call->reply_data.dptr = talloc_memdup(state->node->ctdb,
state->call.reply_data.dptr,
@@ -717,92 +705,3 @@ static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call
}
-/*
- make a remote ctdb call - async recv.
-
- This is called when the program wants to wait for a ctdb_call to complete and get the
- results. This call will block unless the call has already completed.
-*/
-int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
-{
- if (state->ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
- return ctdb_client_call_recv(state, call);
- }
- return ctdb_daemon_call_recv(state, call);
-}
-
-/*
- full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
-*/
-int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
-{
- struct ctdb_call_state *state;
-
- state = ctdb_call_send(ctdb_db, call);
- return ctdb_call_recv(state, call);
-}
-
-
-
-struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
- TDB_DATA key, TDB_DATA *data)
-{
- struct ctdb_call call;
- struct ctdb_record_handle *rec;
- struct ctdb_call_state *state;
- int ret;
-
- if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
- return ctdb_client_fetch_lock(ctdb_db, mem_ctx, key, data);
- }
-
- ZERO_STRUCT(call);
- call.call_id = CTDB_FETCH_FUNC;
- call.key = key;
- call.flags = CTDB_IMMEDIATE_MIGRATION;
-
- rec = talloc(mem_ctx, struct ctdb_record_handle);
- CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec);
-
- rec->ctdb_db = ctdb_db;
- rec->key = key;
- rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
- rec->data = data;
-
- state = ctdb_call_send(ctdb_db, &call);
- state->fetch_private = rec;
-
- ret = ctdb_call_recv(state, &call);
- if (ret != 0) {
- talloc_free(rec);
- return NULL;
- }
-
- return rec;
-}
-
-
-int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data)
-{
- int ret;
- struct ctdb_ltdb_header header;
- struct ctdb_db_context *ctdb_db = talloc_get_type(rec->ctdb_db, struct ctdb_db_context);
-
- if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
- return ctdb_client_store_unlock(rec, data);
- }
-
- /* should be avoided if possible hang header off rec ? */
- ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL, NULL);
- if (ret) {
- ctdb_set_error(rec->ctdb_db->ctdb, "Fetch of locally held record failed");
- talloc_free(rec);
- return ret;
- }
-
- ret = ctdb_ltdb_store(rec->ctdb_db, rec->key, &header, data);
-
- talloc_free(rec);
-
- return ret;
-}