summaryrefslogtreecommitdiff
path: root/source4/cluster/ctdb/common/ctdb_ltdb.c
diff options
context:
space:
mode:
Diffstat (limited to 'source4/cluster/ctdb/common/ctdb_ltdb.c')
-rw-r--r--source4/cluster/ctdb/common/ctdb_ltdb.c144
1 files changed, 136 insertions, 8 deletions
diff --git a/source4/cluster/ctdb/common/ctdb_ltdb.c b/source4/cluster/ctdb/common/ctdb_ltdb.c
index 785ccad9b3..cb07a72375 100644
--- a/source4/cluster/ctdb/common/ctdb_ltdb.c
+++ b/source4/cluster/ctdb/common/ctdb_ltdb.c
@@ -45,9 +45,8 @@ struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *na
/*
this is the dummy null procedure that all databases support
*/
-static int ctdb_fetch_func(struct ctdb_call_info *call)
+static int ctdb_null_func(struct ctdb_call_info *call)
{
- call->reply_data = &call->record_data;
return 0;
}
@@ -82,10 +81,21 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name,
}
}
+ if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
+ DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n",
+ ctdb->db_directory));
+ talloc_free(ctdb_db);
+ return NULL;
+ }
+
+ /* add the node id to the database name, so when we run on loopback
+ we don't conflict in the local filesystem */
+ name = talloc_asprintf(ctdb_db, "%s/%s", ctdb->db_directory, name);
+
/* when we have a separate daemon this will need to be a real
file, not a TDB_INTERNAL, so the parent can access it to
for ltdb bypass */
- ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_INTERNAL, open_flags, mode);
+ ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_CLEAR_IF_FIRST, open_flags, mode);
if (ctdb_db->ltdb == NULL) {
ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
talloc_free(ctdb_db);
@@ -94,9 +104,10 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name,
/*
- all databases support the "fetch" function. we need this in order to do forced migration of records
+ all databases support the "null" function. we need this in
+ order to do forced migration of records
*/
- ret = ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
+ ret = ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
if (ret != 0) {
talloc_free(ctdb_db);
return NULL;
@@ -145,13 +156,15 @@ int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
if (rec.dsize < sizeof(*header)) {
+ TDB_DATA d2;
/* return an initial header */
- free(rec.dptr);
+ if (rec.dptr) free(rec.dptr);
ltdb_initial_header(ctdb_db, key, header);
+ ZERO_STRUCT(d2);
if (data) {
- data->dptr = NULL;
- data->dsize = 0;
+ *data = d2;
}
+ ctdb_ltdb_store(ctdb_db, key, header, d2);
return 0;
}
@@ -215,3 +228,118 @@ int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
return tdb_chainunlock(ctdb_db->ltdb->tdb, key);
}
+struct lock_fetch_state {
+ struct ctdb_context *ctdb;
+ void (*recv_pkt)(void *, uint8_t *, uint32_t);
+ void *recv_context;
+ struct ctdb_req_header *hdr;
+};
+
+/*
+ called when we should retry the operation
+ */
+static void lock_fetch_callback(void *p)
+{
+ struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
+ state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
+ talloc_free(state);
+ DEBUG(2,(__location__ " PACKET REQUEUED\n"));
+}
+
+
+/*
+ do a non-blocking ltdb_lock, deferring this ctdb request until we
+ have the chainlock
+
+ It does the following:
+
+ 1) tries to get the chainlock. If it succeeds, then it returns 0
+
+ 2) if it fails to get a chainlock immediately then it sets up a
+ non-blocking chainlock via ctdb_lockwait, and when it gets the
+ chainlock it re-submits this ctdb request to the main packet
+ receive function
+
+ This effectively queues all ctdb requests that cannot be
+ immediately satisfied until it can get the lock. This means that
+ the main ctdb daemon will not block waiting for a chainlock held by
+ a client
+
+ There are 3 possible return values:
+
+ 0: means that it got the lock immediately.
+ -1: means that it failed to get the lock, and won't retry
+ -2: means that it failed to get the lock immediately, but will retry
+ */
+int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_req_header *hdr,
+ void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void *recv_context)
+{
+ int ret;
+ struct tdb_context *tdb = ctdb_db->ltdb->tdb;
+ struct lockwait_handle *h;
+ struct lock_fetch_state *state;
+
+ ret = tdb_chainlock_nonblock(tdb, key);
+
+ if (ret != 0 &&
+ !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
+ /* a hard failure - don't try again */
+ return -1;
+ }
+
+ /* when torturing, ensure we test the contended path */
+ if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
+ random() % 5 == 0) {
+ ret = -1;
+ tdb_chainunlock(tdb, key);
+ }
+
+ /* first the non-contended path */
+ if (ret == 0) {
+ return 0;
+ }
+
+ state = talloc(ctdb_db, struct lock_fetch_state);
+ state->ctdb = ctdb_db->ctdb;
+ state->hdr = hdr;
+ state->recv_pkt = recv_pkt;
+ state->recv_context = recv_context;
+
+ /* now the contended path */
+ h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
+ if (h == NULL) {
+ tdb_chainunlock(tdb, key);
+ return -1;
+ }
+
+ /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
+ so it won't be freed yet */
+ talloc_steal(state, hdr);
+ talloc_steal(state, h);
+
+ /* now tell the caller than we will retry asynchronously */
+ return -2;
+}
+
+/*
+ a varient of ctdb_ltdb_lock_requeue that also fetches the record
+ */
+int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_ltdb_header *header,
+ struct ctdb_req_header *hdr, TDB_DATA *data,
+ void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void *recv_context)
+{
+ int ret;
+
+ ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context);
+ if (ret == 0) {
+ ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
+ if (ret != 0) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+ }
+ }
+ return ret;
+}