diff options
author | Andrew Tridgell <tridge@samba.org> | 2007-04-21 07:23:42 +0000 |
---|---|---|
committer | Gerald (Jerry) Carter <jerry@samba.org> | 2007-10-10 14:51:17 -0500 |
commit | 650d81b252cc669ef848448afad7e9bb79c4f20e (patch) | |
tree | 7108ada18c4d7fff581470791832850c17eab6bf /source4/cluster/ctdb/common/ctdb_client.c | |
parent | 66a9f1e2764900a2c22a4bfad9f9caf3db729385 (diff) | |
download | samba-650d81b252cc669ef848448afad7e9bb79c4f20e.tar.gz samba-650d81b252cc669ef848448afad7e9bb79c4f20e.tar.bz2 samba-650d81b252cc669ef848448afad7e9bb79c4f20e.zip |
r22421: merged in latest ctdb changes from bzr
(This used to be commit 3633f862b966866819c9a0a6ad0238a858e15e62)
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb_client.c')
-rw-r--r-- | source4/cluster/ctdb/common/ctdb_client.c | 612 |
1 files changed, 316 insertions, 296 deletions
diff --git a/source4/cluster/ctdb/common/ctdb_client.c b/source4/cluster/ctdb/common/ctdb_client.c index 3cb27a1165..dbed8d3585 100644 --- a/source4/cluster/ctdb/common/ctdb_client.c +++ b/source4/cluster/ctdb/common/ctdb_client.c @@ -49,94 +49,87 @@ static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, } /* - called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon + state of a in-progress ctdb call in client +*/ +struct ctdb_client_call_state { + enum call_state state; + uint32_t reqid; + struct ctdb_db_context *ctdb_db; + struct ctdb_call call; +}; + +/* + called when a CTDB_REPLY_CALL packet comes in in the client - This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It + This packet comes in response to a CTDB_REQ_CALL request packet. It contains any reply data from the call */ -void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - struct ctdb_reply_fetch_lock *c = (struct ctdb_reply_fetch_lock *)hdr; - struct ctdb_call_state *state; + struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr; + struct ctdb_client_call_state *state; - state = idr_find(ctdb->idr, hdr->reqid); - if (state == NULL) return; + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_client_call_state); + if (state == NULL) { + DEBUG(0, ("reqid %d not found\n", hdr->reqid)); + return; + } state->call.reply_data.dptr = c->data; state->call.reply_data.dsize = c->datalen; - state->call.status = c->state; + state->call.status = c->status; talloc_steal(state, c); - /* get an extra reference here - this prevents the free in ctdb_recv_pkt() - from freeing the data */ - (void)talloc_reference(state, c); - state->state = CTDB_CALL_DONE; - if (state->async.fn) { - state->async.fn(state); - } } -/* - called in the client when we receive a CTDB_REPLY_STORE_UNLOCK from the daemon - - This packet comes in response to a CTDB_REQ_STORE_UNLOCK request packet. It - contains any reply data from the call -*/ -void ctdb_reply_store_unlock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) -{ - struct ctdb_reply_store_unlock *c = (struct ctdb_reply_store_unlock *)hdr; - struct ctdb_call_state *state; - - state = idr_find(ctdb->idr, hdr->reqid); - if (state == NULL) return; - - state->call.status = c->state; - - talloc_steal(state, c); - - /* get an extra reference here - this prevents the free in ctdb_recv_pkt() - from freeing the data */ - (void)talloc_reference(state, c); +static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr); - state->state = CTDB_CALL_DONE; - if (state->async.fn) { - state->async.fn(state); - } -} /* this is called in the client, when data comes in from the daemon */ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) { struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context); - struct ctdb_req_header *hdr; + struct ctdb_req_header *hdr = (struct ctdb_req_header *)data; + TALLOC_CTX *tmp_ctx; + + /* place the packet as a child of a tmp_ctx. We then use + talloc_free() below to free it. If any of the calls want + to keep it, then they will steal it somewhere else, and the + talloc_free() will be a no-op */ + tmp_ctx = talloc_new(ctdb); + talloc_steal(tmp_ctx, hdr); + + if (cnt == 0) { + DEBUG(2,("Daemon has exited - shutting down client\n")); + exit(0); + } if (cnt < sizeof(*hdr)) { - ctdb_set_error(ctdb, "Bad packet length %d\n", cnt); - return; + DEBUG(0,("Bad packet length %d in client\n", cnt)); + goto done; } - hdr = (struct ctdb_req_header *)data; if (cnt != hdr->length) { - ctdb_set_error(ctdb, "Bad header length %d expected %d\n", + ctdb_set_error(ctdb, "Bad header length %d expected %d in client\n", hdr->length, cnt); - return; + goto done; } if (hdr->ctdb_magic != CTDB_MAGIC) { - ctdb_set_error(ctdb, "Non CTDB packet rejected\n"); - return; + ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n"); + goto done; } if (hdr->ctdb_version != CTDB_VERSION) { - ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); - return; + ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version); + goto done; } switch (hdr->operation) { case CTDB_REPLY_CALL: - ctdb_reply_call(ctdb, hdr); + ctdb_client_reply_call(ctdb, hdr); break; case CTDB_REQ_MESSAGE: @@ -147,23 +140,22 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) ctdb_reply_connect_wait(ctdb, hdr); break; - case CTDB_REPLY_FETCH_LOCK: - ctdb_reply_fetch_lock(ctdb, hdr); - break; - - case CTDB_REPLY_STORE_UNLOCK: - ctdb_reply_store_unlock(ctdb, hdr); + case CTDB_REPLY_STATUS: + ctdb_reply_status(ctdb, hdr); break; default: - printf("bogus operation code:%d\n",hdr->operation); + DEBUG(0,("bogus operation code:%d\n",hdr->operation)); } + +done: + talloc_free(tmp_ctx); } /* connect to a unix domain socket */ -static int ux_socket_connect(struct ctdb_context *ctdb) +int ctdb_socket_connect(struct ctdb_context *ctdb) { struct sockaddr_un addr; @@ -189,6 +181,13 @@ static int ux_socket_connect(struct ctdb_context *ctdb) } +struct ctdb_record_handle { + struct ctdb_db_context *ctdb_db; + TDB_DATA key; + TDB_DATA *data; + struct ctdb_ltdb_header header; +}; + /* make a recv call to the local ctdb daemon - called from client context @@ -196,31 +195,19 @@ static int ux_socket_connect(struct ctdb_context *ctdb) This is called when the program wants to wait for a ctdb_call to complete and get the results. This call will block unless the call has already completed. */ -int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call) { - struct ctdb_record_handle *rec; - while (state->state < CTDB_CALL_DONE) { - event_loop_once(state->node->ctdb->ev); + event_loop_once(state->ctdb_db->ctdb->ev); } if (state->state != CTDB_CALL_DONE) { - ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + DEBUG(0,(__location__ " ctdb_call_recv failed\n")); talloc_free(state); return -1; } - rec = state->fetch_private; - - /* ugly hack to manage forced migration */ - if (rec != NULL) { - rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr); - rec->data->dsize = state->call.reply_data.dsize; - talloc_free(state); - return 0; - } - if (state->call.reply_data.dsize) { - call->reply_data.dptr = talloc_memdup(state->node->ctdb, + call->reply_data.dptr = talloc_memdup(state->ctdb_db, state->call.reply_data.dptr, state->call.reply_data.dsize); call->reply_data.dsize = state->call.reply_data.dsize; @@ -240,13 +227,41 @@ int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) /* destroy a ctdb_call in client */ -static int ctdb_client_call_destructor(struct ctdb_call_state *state) +static int ctdb_client_call_destructor(struct ctdb_client_call_state *state) { - idr_remove(state->node->ctdb->idr, state->c->hdr.reqid); + idr_remove(state->ctdb_db->ctdb->idr, state->reqid); return 0; } +/* + construct an event driven local ctdb_call + + this is used so that locally processed ctdb_call requests are processed + in an event driven manner +*/ +static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call, + struct ctdb_ltdb_header *header, + TDB_DATA *data) +{ + struct ctdb_client_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + int ret; + + state = talloc_zero(ctdb_db, struct ctdb_client_call_state); + CTDB_NO_MEMORY_NULL(ctdb, state); + talloc_steal(state, data->dptr); + + state->state = CTDB_CALL_DONE; + state->call = *call; + state->ctdb_db = ctdb_db; + + ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn); + talloc_steal(state, state->call.reply_data.dptr); + + return state; +} /* make a ctdb call to the local daemon - async send. Called from client context. @@ -254,107 +269,109 @@ static int ctdb_client_call_destructor(struct ctdb_call_state *state) This constructs a ctdb_call request and queues it for processing. This call never blocks. */ -struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db, - struct ctdb_call *call) +struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call) { - struct ctdb_call_state *state; + struct ctdb_client_call_state *state; struct ctdb_context *ctdb = ctdb_db->ctdb; struct ctdb_ltdb_header header; TDB_DATA data; int ret; size_t len; + struct ctdb_req_call *c; /* if the domain socket is not yet open, open it */ if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); + ctdb_socket_connect(ctdb); } ret = ctdb_ltdb_lock(ctdb_db, call->key); if (ret != 0) { - printf("failed to lock ltdb record\n"); + DEBUG(0,(__location__ " Failed to get chainlock\n")); return NULL; } ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); if (ret != 0) { ctdb_ltdb_unlock(ctdb_db, call->key); + DEBUG(0,(__location__ " Failed to fetch record\n")); return NULL; } -#if 0 if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { - state = ctdb_call_local_send(ctdb_db, call, &header, &data); + state = ctdb_client_call_local_send(ctdb_db, call, &header, &data); + talloc_free(data.dptr); ctdb_ltdb_unlock(ctdb_db, call->key); return state; } -#endif - state = talloc_zero(ctdb_db, struct ctdb_call_state); + ctdb_ltdb_unlock(ctdb_db, call->key); + talloc_free(data.dptr); + + state = talloc_zero(ctdb_db, struct ctdb_client_call_state); if (state == NULL) { - printf("failed to allocate state\n"); - ctdb_ltdb_unlock(ctdb_db, call->key); + DEBUG(0, (__location__ " failed to allocate state\n")); return NULL; } - talloc_steal(state, data.dptr); - len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; - state->c = ctdbd_allocate_pkt(ctdb, len); - if (state->c == NULL) { - printf("failed to allocate packet\n"); - ctdb_ltdb_unlock(ctdb_db, call->key); + c = ctdbd_allocate_pkt(state, len); + if (c == NULL) { + DEBUG(0, (__location__ " failed to allocate packet\n")); return NULL; } - talloc_set_name_const(state->c, "ctdbd req_call packet"); - talloc_steal(state, state->c); + talloc_set_name_const(c, "ctdb client req_call packet"); + memset(c, 0, offsetof(struct ctdb_req_call, data)); - state->c->hdr.length = len; - state->c->hdr.ctdb_magic = CTDB_MAGIC; - state->c->hdr.ctdb_version = CTDB_VERSION; - state->c->hdr.operation = CTDB_REQ_CALL; - state->c->hdr.destnode = header.dmaster; - state->c->hdr.srcnode = ctdb->vnn; + c->hdr.length = len; + c->hdr.ctdb_magic = CTDB_MAGIC; + c->hdr.ctdb_version = CTDB_VERSION; + c->hdr.operation = CTDB_REQ_CALL; /* this limits us to 16k outstanding messages - not unreasonable */ - state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); - state->c->flags = call->flags; - state->c->db_id = ctdb_db->db_id; - state->c->callid = call->call_id; - state->c->keylen = call->key.dsize; - state->c->calldatalen = call->call_data.dsize; - memcpy(&state->c->data[0], call->key.dptr, call->key.dsize); - memcpy(&state->c->data[call->key.dsize], + c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + c->flags = call->flags; + c->db_id = ctdb_db->db_id; + c->callid = call->call_id; + c->keylen = call->key.dsize; + c->calldatalen = call->call_data.dsize; + memcpy(&c->data[0], call->key.dptr, call->key.dsize); + memcpy(&c->data[call->key.dsize], call->call_data.dptr, call->call_data.dsize); state->call = *call; - state->call.call_data.dptr = &state->c->data[call->key.dsize]; - state->call.key.dptr = &state->c->data[0]; + state->call.call_data.dptr = &c->data[call->key.dsize]; + state->call.key.dptr = &c->data[0]; - state->node = ctdb->nodes[header.dmaster]; state->state = CTDB_CALL_WAIT; - state->header = header; state->ctdb_db = ctdb_db; + state->reqid = c->hdr.reqid; talloc_set_destructor(state, ctdb_client_call_destructor); - ctdb_client_queue_pkt(ctdb, &state->c->hdr); - -/*XXX set up timeout to cleanup if server doesnt respond - event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), - ctdb_call_timeout, state); -*/ + ctdb_client_queue_pkt(ctdb, &c->hdr); - ctdb_ltdb_unlock(ctdb_db, call->key); return state; } +/* + full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() +*/ +int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) +{ + struct ctdb_client_call_state *state; + + state = ctdb_call_send(ctdb_db, call); + return ctdb_call_recv(state, call); +} + /* tell the daemon what messaging srvid we will use, and register the message handler function in the client */ -int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, - ctdb_message_fn_t handler, - void *private_data) +int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) { struct ctdb_req_register c; @@ -362,7 +379,7 @@ int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, /* if the domain socket is not yet open, open it */ if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); + ctdb_socket_connect(ctdb); } ZERO_STRUCT(c); @@ -383,26 +400,10 @@ int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, } - -/* - setup handler for receipt of ctdb messages from ctdb_send_message() -*/ -int ctdb_set_message_handler(struct ctdb_context *ctdb, - uint32_t srvid, - ctdb_message_fn_t handler, - void *private_data) -{ - if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_set_message_handler(ctdb, srvid, handler, private_data); - } - return ctdb_daemon_set_message_handler(ctdb, srvid, handler, private_data); -} - - /* send a message - from client context */ -int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, +int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, uint32_t srvid, TDB_DATA data) { struct ctdb_req_message *r; @@ -436,7 +437,7 @@ int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, /* wait for all nodes to be connected - from client */ -static void ctdb_client_connect_wait(struct ctdb_context *ctdb) +void ctdb_connect_wait(struct ctdb_context *ctdb) { struct ctdb_req_connect_wait r; int res; @@ -447,216 +448,235 @@ static void ctdb_client_connect_wait(struct ctdb_context *ctdb) r.hdr.ctdb_magic = CTDB_MAGIC; r.hdr.ctdb_version = CTDB_VERSION; r.hdr.operation = CTDB_REQ_CONNECT_WAIT; + + DEBUG(3,("ctdb_connect_wait: sending to ctdbd\n")); + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ctdb_socket_connect(ctdb); + } res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length); if (res != 0) { - printf("Failed to queue a connect wait request\n"); + DEBUG(0,(__location__ " Failed to queue a connect wait request\n")); return; } + DEBUG(3,("ctdb_connect_wait: waiting\n")); + /* now we can go into the normal wait routine, as the reply packet will update the ctdb->num_connected variable */ ctdb_daemon_connect_wait(ctdb); } /* - wait for all nodes to be connected -*/ -void ctdb_connect_wait(struct ctdb_context *ctdb) + cancel a ctdb_fetch_lock operation, releasing the lock + */ +static int fetch_lock_destructor(struct ctdb_record_handle *h) { - if (!(ctdb->flags & CTDB_FLAG_DAEMON_MODE)) { - ctdb_daemon_connect_wait(ctdb); - return; - } - - ctdb_client_connect_wait(ctdb); + ctdb_ltdb_unlock(h->ctdb_db, h->key); + return 0; } +/* + force the migration of a record to this node + */ +static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key) +{ + struct ctdb_call call; + ZERO_STRUCT(call); + call.call_id = CTDB_NULL_FUNC; + call.key = key; + call.flags = CTDB_IMMEDIATE_MIGRATION; + return ctdb_call(ctdb_db, &call); +} -struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key) +/* + get a lock on a record, and return the records data. Blocks until it gets the lock + */ +struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) { - struct ctdb_call_state *state; - struct ctdb_context *ctdb = ctdb_db->ctdb; - struct ctdb_req_fetch_lock *req; - int len, res; + int ret; + struct ctdb_record_handle *h; - /* if the domain socket is not yet open, open it */ - if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); - } + /* + procedure is as follows: - state = talloc_zero(ctdb_db, struct ctdb_call_state); - if (state == NULL) { - printf("failed to allocate state\n"); - return NULL; - } - state->state = CTDB_CALL_WAIT; - state->ctdb_db = ctdb_db; - len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize; - state->c = ctdbd_allocate_pkt(ctdb, len); - if (state->c == NULL) { - printf("failed to allocate packet\n"); + 1) get the chain lock. + 2) check if we are dmaster + 3) if we are the dmaster then return handle + 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for + reply from ctdbd + 5) when we get the reply, goto (1) + */ + + h = talloc_zero(mem_ctx, struct ctdb_record_handle); + if (h == NULL) { return NULL; } - ZERO_STRUCT(*state->c); - talloc_set_name_const(state->c, "ctdbd req_fetch_lock packet"); - talloc_steal(state, state->c); - - req = (struct ctdb_req_fetch_lock *)state->c; - req->hdr.length = len; - req->hdr.ctdb_magic = CTDB_MAGIC; - req->hdr.ctdb_version = CTDB_VERSION; - req->hdr.operation = CTDB_REQ_FETCH_LOCK; - req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); - req->db_id = ctdb_db->db_id; - req->keylen = key.dsize; - memcpy(&req->key[0], key.dptr, key.dsize); - - res = ctdb_client_queue_pkt(ctdb, &req->hdr); - if (res != 0) { + + h->ctdb_db = ctdb_db; + h->key = key; + h->key.dptr = talloc_memdup(h, key.dptr, key.dsize); + if (h->key.dptr == NULL) { + talloc_free(h); return NULL; } + h->data = data; - talloc_free(req); - - return state; -} + DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize, + (const char *)key.dptr)); +again: + /* step 1 - get the chain lock */ + ret = ctdb_ltdb_lock(ctdb_db, key); + if (ret != 0) { + DEBUG(0, (__location__ " failed to lock ltdb record\n")); + talloc_free(h); + return NULL; + } -struct ctdb_call_state *ctdb_client_store_unlock_send( - struct ctdb_record_handle *rh, - TALLOC_CTX *mem_ctx, - TDB_DATA data) -{ - struct ctdb_call_state *state; - struct ctdb_db_context *ctdb_db = talloc_get_type(rh->ctdb_db, struct ctdb_db_context); - struct ctdb_context *ctdb = ctdb_db->ctdb; - struct ctdb_req_store_unlock *req; - int len, res; + DEBUG(4,("ctdb_fetch_lock: got chain lock\n")); - /* if the domain socket is not yet open, open it */ - if (ctdb->daemon.sd==-1) { - ux_socket_connect(ctdb); - } + talloc_set_destructor(h, fetch_lock_destructor); - state = talloc_zero(ctdb_db, struct ctdb_call_state); - if (state == NULL) { - printf("failed to allocate state\n"); - return NULL; - } - state->state = CTDB_CALL_WAIT; - state->ctdb_db = ctdb_db; - len = offsetof(struct ctdb_req_store_unlock, data) + rh->key.dsize + data.dsize; - state->c = ctdbd_allocate_pkt(ctdb, len); - if (state->c == NULL) { - printf("failed to allocate packet\n"); + ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + talloc_free(h); return NULL; } - ZERO_STRUCT(*state->c); - talloc_set_name_const(state->c, "ctdbd req_store_unlock packet"); - talloc_steal(state, state->c); - - req = (struct ctdb_req_store_unlock *)state->c; - req->hdr.length = len; - req->hdr.ctdb_magic = CTDB_MAGIC; - req->hdr.ctdb_version = CTDB_VERSION; - req->hdr.operation = CTDB_REQ_STORE_UNLOCK; - req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); - req->db_id = ctdb_db->db_id; - req->keylen = rh->key.dsize; - req->datalen = data.dsize; - memcpy(&req->data[0], rh->key.dptr, rh->key.dsize); - memcpy(&req->data[req->keylen], data.dptr, data.dsize); - - res = ctdb_client_queue_pkt(ctdb, &req->hdr); - if (res != 0) { - return NULL; + + /* when torturing, ensure we test the remote path */ + if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) && + random() % 5 == 0) { + h->header.dmaster = (uint32_t)-1; } - talloc_free(req); - return state; + DEBUG(4,("ctdb_fetch_lock: done local fetch\n")); + + if (h->header.dmaster != ctdb_db->ctdb->vnn) { + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(4,("ctdb_fetch_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + goto again; + } + + DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n")); + return h; } /* - make a recv call to the local ctdb daemon - called from client context + store some data to the record that was locked with ctdb_fetch_lock() +*/ +int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data) +{ + return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data); +} - This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the - results. This call will block unless the call has already completed. +/* + wait until we're the only node left. + this function never returns */ -struct ctdb_record_handle *ctdb_client_fetch_lock_recv(struct ctdb_call_state *state, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data) +void ctdb_shutdown(struct ctdb_context *ctdb) { - struct ctdb_record_handle *rec; + struct ctdb_req_shutdown r; + int len; - while (state->state < CTDB_CALL_DONE) { - event_loop_once(state->ctdb_db->ctdb->ev); - } - if (state->state != CTDB_CALL_DONE) { - ctdb_set_error(state->node->ctdb, "%s", state->errmsg); - talloc_free(state); - return NULL; + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ctdb_socket_connect(ctdb); } - rec = talloc(mem_ctx, struct ctdb_record_handle); - CTDB_NO_MEMORY_NULL(state->ctdb_db->ctdb, rec); + len = sizeof(struct ctdb_req_shutdown); + ZERO_STRUCT(r); + r.hdr.length = len; + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_SHUTDOWN; + r.hdr.reqid = 0; - rec->ctdb_db = state->ctdb_db; - rec->key = key; - rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); - rec->data = talloc(rec, TDB_DATA); - rec->data->dsize = state->call.reply_data.dsize; - rec->data->dptr = talloc_memdup(rec, state->call.reply_data.dptr, rec->data->dsize); + ctdb_client_queue_pkt(ctdb, &(r.hdr)); - if (data) { - *data = *rec->data; + /* this event loop will terminate once we receive the reply */ + while (1) { + event_loop_once(ctdb->ev); } - return rec; } -/* - make a recv call to the local ctdb daemon - called from client context +enum ctdb_status_states {CTDB_STATUS_WAIT, CTDB_STATUS_DONE}; - This is called when the program wants to wait for a ctdb_store_unlock to complete and get the - results. This call will block unless the call has already completed. -*/ -int ctdb_client_store_unlock_recv(struct ctdb_call_state *state, struct ctdb_record_handle *rec) +struct ctdb_status_state { + uint32_t reqid; + struct ctdb_status *status; + enum ctdb_status_states state; +}; + +/* + handle a ctdb_reply_status reply + */ +static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - while (state->state < CTDB_CALL_DONE) { - event_loop_once(state->ctdb_db->ctdb->ev); - } - if (state->state != CTDB_CALL_DONE) { - ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + struct ctdb_reply_status *r = (struct ctdb_reply_status *)hdr; + struct ctdb_status_state *state; + + state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_status_state); + if (state == NULL) { + DEBUG(0, ("reqid %d not found\n", hdr->reqid)); + return; } - talloc_free(state); - return state->state; + *state->status = r->status; + state->state = CTDB_STATUS_DONE; } -struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key, - TDB_DATA *data) +/* + wait until we're the only node left. + this function never returns +*/ +int ctdb_status(struct ctdb_context *ctdb, struct ctdb_status *status) { - struct ctdb_call_state *state; - struct ctdb_record_handle *rec; + struct ctdb_req_status r; + int ret; + struct ctdb_status_state *state; - state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key); - rec = ctdb_client_fetch_lock_recv(state, mem_ctx, key, data); + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ctdb_socket_connect(ctdb); + } - return rec; -} + state = talloc(ctdb, struct ctdb_status_state); + CTDB_NO_MEMORY(ctdb, state); -int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) -{ - struct ctdb_call_state *state; - int res; + state->reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + state->status = status; + state->state = CTDB_STATUS_WAIT; + + ZERO_STRUCT(r); + r.hdr.length = sizeof(r); + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.operation = CTDB_REQ_STATUS; + r.hdr.reqid = state->reqid; - state = ctdb_client_store_unlock_send(rec, rec, data); - res = ctdb_client_store_unlock_recv(state, rec); + ret = ctdb_client_queue_pkt(ctdb, &(r.hdr)); + if (ret != 0) { + talloc_free(state); + return -1; + } + + while (state->state == CTDB_STATUS_WAIT) { + event_loop_once(ctdb->ev); + } - talloc_free(rec); + talloc_free(state); - return res; + return 0; } + |