summaryrefslogtreecommitdiff
path: root/source4/cluster/ctdb/common/ctdb_call.c
diff options
context:
space:
mode:
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb_call.c')
-rw-r--r--source4/cluster/ctdb/common/ctdb_call.c151
1 files changed, 104 insertions, 47 deletions
diff --git a/source4/cluster/ctdb/common/ctdb_call.c b/source4/cluster/ctdb/common/ctdb_call.c
index 77ec872852..ab5c2cce3b 100644
--- a/source4/cluster/ctdb/common/ctdb_call.c
+++ b/source4/cluster/ctdb/common/ctdb_call.c
@@ -29,6 +29,22 @@
#include "../include/ctdb_private.h"
/*
+ find the ctdb_db from a db index
+ */
+ struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
+{
+ struct ctdb_db_context *ctdb_db;
+
+ for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
+ if (ctdb_db->db_id == id) {
+ break;
+ }
+ }
+ return ctdb_db;
+}
+
+
+/*
local version of ctdb_call
*/
static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
@@ -38,7 +54,7 @@ static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *ca
struct ctdb_call_info *c;
struct ctdb_registered_call *fn;
struct ctdb_context *ctdb = ctdb_db->ctdb;
-
+
c = talloc(ctdb, struct ctdb_call_info);
CTDB_NO_MEMORY(ctdb, c);
@@ -242,13 +258,11 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
data.dptr = c->data + c->keylen;
data.dsize = c->datalen;
- for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
- if (ctdb_db->db_id == c->db_id) {
- break;
- }
- }
+ ctdb_db = find_ctdb_db(ctdb, c->db_id);
if (!ctdb_db) {
- ctdb_send_error(ctdb, hdr, ret, "Unknown database in request. db_id==0x%08x",c->db_id);
+ ctdb_send_error(ctdb, hdr, -1,
+ "Unknown database in request. db_id==0x%08x",
+ c->db_id);
return;
}
@@ -309,13 +323,11 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
struct ctdb_call call;
struct ctdb_db_context *ctdb_db;
- for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
- if (ctdb_db->db_id == c->db_id) {
- break;
- }
- }
+ ctdb_db = find_ctdb_db(ctdb, c->db_id);
if (!ctdb_db) {
- ctdb_send_error(ctdb, hdr, ret, "Unknown database in request. db_id==0x%08x",c->db_id);
+ ctdb_send_error(ctdb, hdr, -1,
+ "Unknown database in request. db_id==0x%08x",
+ c->db_id);
return;
}
@@ -380,24 +392,6 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
talloc_free(r);
}
-enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
-
-/*
- state of a in-progress ctdb call
-*/
-struct ctdb_call_state {
- enum call_state state;
- struct ctdb_req_call *c;
- struct ctdb_db_context *ctdb_db;
- struct ctdb_node *node;
- const char *errmsg;
- struct ctdb_call call;
- int redirect_count;
- struct ctdb_ltdb_header header;
- void *fetch_private;
-};
-
-
/*
called when a CTDB_REPLY_CALL packet comes in
@@ -418,7 +412,14 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
talloc_steal(state, c);
+ /* get an extra reference here - this prevents the free in ctdb_recv_pkt()
+ from freeing the data */
+ (void)talloc_reference(state, c);
+
state->state = CTDB_CALL_DONE;
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
}
/*
@@ -458,6 +459,9 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
state->state = CTDB_CALL_DONE;
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
}
@@ -476,6 +480,9 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
state->state = CTDB_CALL_ERROR;
state->errmsg = (char *)c->msg;
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
}
@@ -521,15 +528,31 @@ static int ctdb_call_destructor(struct ctdb_call_state *state)
called when a ctdb_call times out
*/
void ctdb_call_timeout(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private)
+ struct timeval t, void *private_data)
{
- struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
+ struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
state->state = CTDB_CALL_ERROR;
ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out",
state->c->hdr.reqid);
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
}
/*
+ this allows the caller to setup a async.fn
+*/
+static void call_local_trigger(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
+{
+ struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
+}
+
+
+/*
construct an event driven local ctdb_call
this is used so that locally processed ctdb_call requests are processed
@@ -556,17 +579,20 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
+ event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
+
return state;
}
/*
- make a remote ctdb call - async send
+ make a remote ctdb call - async send. Called in daemon context.
This constructs a ctdb_call request and queues it for processing.
This call never blocks.
*/
-struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctdb_db,
+ struct ctdb_call *call)
{
uint32_t len;
struct ctdb_call_state *state;
@@ -633,21 +659,27 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct c
return state;
}
+/*
+ make a remote ctdb call - async send
-
-struct ctdb_record_handle {
- struct ctdb_db_context *ctdb_db;
- TDB_DATA key;
- TDB_DATA *data;
-};
+ This constructs a ctdb_call request and queues it for processing.
+ This call never blocks.
+*/
+struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+{
+ if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
+ return ctdb_client_call_send(ctdb_db, call);
+ }
+ return ctdb_daemon_call_send(ctdb_db, call);
+}
/*
- make a remote ctdb call - async recv.
+ make a remote ctdb call - async recv - called in daemon context
This is called when the program wants to wait for a ctdb_call to complete and get the
results. This call will block unless the call has already completed.
*/
-int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
+static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
{
struct ctdb_record_handle *rec;
@@ -684,21 +716,34 @@ int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
return 0;
}
+
+/*
+ make a remote ctdb call - async recv.
+
+ This is called when the program wants to wait for a ctdb_call to complete and get the
+ results. This call will block unless the call has already completed.
+*/
+int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
+{
+ if (state->ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
+ return ctdb_client_call_recv(state, call);
+ }
+ return ctdb_daemon_call_recv(state, call);
+}
+
/*
full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
*/
int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
{
struct ctdb_call_state *state;
+
state = ctdb_call_send(ctdb_db, call);
return ctdb_call_recv(state, call);
}
-
-
-
struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
TDB_DATA key, TDB_DATA *data)
{
@@ -707,6 +752,10 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL
struct ctdb_call_state *state;
int ret;
+ if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
+ return ctdb_client_fetch_lock(ctdb_db, mem_ctx, key, data);
+ }
+
ZERO_STRUCT(call);
call.call_id = CTDB_FETCH_FUNC;
call.key = key;
@@ -733,19 +782,27 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL
}
-int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data)
+int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data)
{
int ret;
struct ctdb_ltdb_header header;
+ struct ctdb_db_context *ctdb_db = talloc_get_type(rec->ctdb_db, struct ctdb_db_context);
+
+ if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
+ return ctdb_client_store_unlock(rec, data);
+ }
/* should be avoided if possible hang header off rec ? */
ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL, NULL);
if (ret) {
ctdb_set_error(rec->ctdb_db->ctdb, "Fetch of locally held record failed");
+ talloc_free(rec);
return ret;
}
ret = ctdb_ltdb_store(rec->ctdb_db, rec->key, &header, data);
+ talloc_free(rec);
+
return ret;
}