/* Unix SMB/CIFS implementation. Samba internal messaging functions Copyright (C) 2007 by Volker Lendecke Copyright (C) 2007 by Andrew Tridgell This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "includes.h" #ifdef CLUSTER_SUPPORT #include "librpc/gen_ndr/messaging.h" #include "librpc/gen_ndr/ndr_messaging.h" /* paths to these include files come from --with-ctdb= in configure */ #include "ctdb.h" #include "ctdb_private.h" struct ctdbd_connection { struct messaging_context *msg_ctx; uint32 reqid; uint32 our_vnn; uint64 rand_srvid; struct packet_context *pkt; struct fd_event *fde; void (*release_ip_handler)(const char *ip_addr, void *private_data); void *release_ip_priv; }; static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, uint32_t vnn, uint32 opcode, uint64_t srvid, TDB_DATA data, TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int *cstatus); /* * exit on fatal communications errors with the ctdbd daemon */ static void cluster_fatal(const char *why) { DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why)); /* we don't use smb_panic() as we don't want to delay to write a core file. We need to release this process id immediately so that someone else can take over without getting sharing violations */ _exit(0); } /* * Register a srvid with ctdbd */ static NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid) { int cstatus; return ctdbd_control(conn, CTDB_CURRENT_NODE, CTDB_CONTROL_REGISTER_SRVID, srvid, tdb_null, NULL, NULL, &cstatus); } /* * get our vnn from the cluster */ static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32 *vnn) { int32_t cstatus=-1; NTSTATUS status; status = ctdbd_control(conn, CTDB_CURRENT_NODE, CTDB_CONTROL_GET_VNN, 0, tdb_null, NULL, NULL, &cstatus); if (!NT_STATUS_IS_OK(status)) { cluster_fatal("ctdbd_control failed\n"); } *vnn = (uint32_t)cstatus; return status; } uint32 ctdbd_vnn(const struct ctdbd_connection *conn) { return conn->our_vnn; } /* * Get us a ctdb connection */ static NTSTATUS ctdbd_connect(TALLOC_CTX *mem_ctx, struct packet_context **presult) { struct packet_context *result; const char *sockname = lp_ctdbd_socket(); struct sockaddr_un addr; int fd; if (!sockname || !*sockname) { sockname = CTDB_PATH; } fd = socket(AF_UNIX, SOCK_STREAM, 0); if (fd == -1) { DEBUG(3, ("Could not create socket: %s\n", strerror(errno))); return map_nt_error_from_unix(errno); } ZERO_STRUCT(addr); addr.sun_family = AF_UNIX; strncpy(addr.sun_path, sockname, sizeof(addr.sun_path)); if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { DEBUG(0, ("connect(%s) failed: %s\n", sockname, strerror(errno))); close(fd); return map_nt_error_from_unix(errno); } if (!(result = packet_init(mem_ctx, fd))) { close(fd); return NT_STATUS_NO_MEMORY; } *presult = result; return NT_STATUS_OK; } /* * Do we have a complete ctdb packet in the queue? */ static BOOL ctdb_req_complete(const struct data_blob *data, size_t *length, void *private_data) { uint32 msglen; if (data->length < sizeof(msglen)) { return False; } msglen = *((uint32 *)data->data); DEBUG(10, ("msglen = %d\n", msglen)); if (msglen < sizeof(struct ctdb_req_header)) { DEBUG(0, ("Got invalid msglen: %d, expected at least %d for " "the req_header\n", (int)msglen, (int)sizeof(struct ctdb_req_header))); cluster_fatal("ctdbd protocol error\n"); } if (data->length >= msglen) { *length = msglen; return True; } return False; } /* * State necessary to defer an incoming message while we are waiting for a * ctdb reply. */ struct deferred_msg_state { struct messaging_context *msg_ctx; struct messaging_rec *rec; }; /* * Timed event handler for the deferred message */ static void deferred_message_dispatch(struct event_context *event_ctx, struct timed_event *te, const struct timeval *now, void *private_data) { struct deferred_msg_state *state = talloc_get_type_abort( private_data, struct deferred_msg_state); messaging_dispatch_rec(state->msg_ctx, state->rec); TALLOC_FREE(state); TALLOC_FREE(te); } struct req_pull_state { TALLOC_CTX *mem_ctx; DATA_BLOB req; }; /* * Pull a ctdb request out of the incoming packet queue */ static NTSTATUS ctdb_req_pull(const struct data_blob *data, void *private_data) { struct req_pull_state *state = (struct req_pull_state *)private_data; state->req = data_blob_talloc(state->mem_ctx, data->data, data->length); if (state->req.data == NULL) { return NT_STATUS_NO_MEMORY; } return NT_STATUS_OK; } /* * Fetch a messaging_rec from an incoming ctdb style message */ static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx, size_t overall_length, struct ctdb_req_message *msg) { struct messaging_rec *result; DATA_BLOB blob; NTSTATUS status; if ((overall_length < offsetof(struct ctdb_req_message, data)) || (overall_length < offsetof(struct ctdb_req_message, data) + msg->datalen)) { cluster_fatal("got invalid msg length"); } if (!(result = TALLOC_P(mem_ctx, struct messaging_rec))) { DEBUG(0, ("talloc failed\n")); return NULL; } blob = data_blob_const(msg->data, msg->datalen); status = ndr_pull_struct_blob( &blob, result, result, (ndr_pull_flags_fn_t)ndr_pull_messaging_rec); if (!NT_STATUS_IS_OK(status)) { DEBUG(0, ("ndr_pull_struct_blob failed: %s\n", nt_errstr(status))); TALLOC_FREE(result); return NULL; } return result; } /* * Read a full ctdbd request. If we have a messaging context, defer incoming * messages that might come in between. */ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32 reqid, TALLOC_CTX *mem_ctx, void *result) { struct ctdb_req_header *hdr; struct req_pull_state state; NTSTATUS status; again: status = packet_fd_read_sync(conn->pkt); if (NT_STATUS_EQUAL(status, NT_STATUS_NETWORK_BUSY)) { /* EAGAIN */ goto again; } else if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) { /* EAGAIN */ goto again; } if (!NT_STATUS_IS_OK(status)) { DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status))); cluster_fatal("ctdbd died\n"); } ZERO_STRUCT(state); state.mem_ctx = mem_ctx; if (!packet_handler(conn->pkt, ctdb_req_complete, ctdb_req_pull, &state, &status)) { /* * Not enough data */ goto again; } if (!NT_STATUS_IS_OK(status)) { DEBUG(0, ("Could not read packet: %s\n", nt_errstr(status))); cluster_fatal("ctdbd died\n"); } hdr = (struct ctdb_req_header *)state.req.data; if (hdr->operation == CTDB_REQ_MESSAGE) { struct timed_event *evt; struct deferred_msg_state *msg_state; struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr; if (conn->msg_ctx == NULL) { DEBUG(1, ("Got a message without having a msg ctx, " "dropping msg %llu\n", (long long unsigned)msg->srvid)); goto again; } if ((conn->release_ip_handler != NULL) && (msg->srvid == CTDB_SRVID_RELEASE_IP)) { /* must be dispatched immediately */ DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n")); conn->release_ip_handler((const char *)msg->data, conn->release_ip_priv); TALLOC_FREE(hdr); goto again; } if (!(msg_state = TALLOC_P(NULL, struct deferred_msg_state))) { DEBUG(0, ("talloc failed\n")); TALLOC_FREE(hdr); goto again; } if (!(msg_state->rec = ctdb_pull_messaging_rec( msg_state, state.req.length, msg))) { DEBUG(0, ("ctdbd_pull_messaging_rec failed\n")); TALLOC_FREE(msg_state); TALLOC_FREE(hdr); goto again; } TALLOC_FREE(hdr); msg_state->msg_ctx = conn->msg_ctx; /* * We're waiting for a call reply, but an async message has * crossed. Defer dispatching to the toplevel event loop. */ evt = event_add_timed(conn->msg_ctx->event_ctx, conn->msg_ctx->event_ctx, timeval_zero(), "deferred_message_dispatch", deferred_message_dispatch, msg_state); if (evt == NULL) { DEBUG(0, ("event_add_timed failed\n")); TALLOC_FREE(msg_state); TALLOC_FREE(hdr); goto again; } goto again; } if (hdr->reqid != reqid) { /* we got the wrong reply */ DEBUG(0,("Discarding mismatched ctdb reqid %u should have " "been %u\n", hdr->reqid, reqid)); TALLOC_FREE(hdr); goto again; } *((void **)result) = talloc_move(mem_ctx, &hdr); return NT_STATUS_OK; } /* * Get us a ctdbd connection */ NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx, struct ctdbd_connection **pconn) { struct ctdbd_connection *conn; NTSTATUS status; if (!(conn = TALLOC_ZERO_P(mem_ctx, struct ctdbd_connection))) { DEBUG(0, ("talloc failed\n")); return NT_STATUS_NO_MEMORY; } status = ctdbd_connect(conn, &conn->pkt); if (!NT_STATUS_IS_OK(status)) { DEBUG(10, ("ctdbd_connect failed: %s\n", nt_errstr(status))); goto fail; } status = get_cluster_vnn(conn, &conn->our_vnn); if (!NT_STATUS_IS_OK(status)) { DEBUG(10, ("get_cluster_vnn failed: %s\n", nt_errstr(status))); goto fail; } generate_random_buffer((unsigned char *)&conn->rand_srvid, sizeof(conn->rand_srvid)); status = register_with_ctdbd(conn, conn->rand_srvid); if (!NT_STATUS_IS_OK(status)) { DEBUG(5, ("Could not register random srvid: %s\n", nt_errstr(status))); goto fail; } *pconn = conn; return NT_STATUS_OK; fail: TALLOC_FREE(conn); return status; } /* * Get us a ctdbd connection and register us as a process */ NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx, struct ctdbd_connection **pconn) { struct ctdbd_connection *conn; NTSTATUS status; status = ctdbd_init_connection(mem_ctx, &conn); if (!NT_STATUS_IS_OK(status)) { return status; } status = register_with_ctdbd(conn, (uint64_t)sys_getpid()); if (!NT_STATUS_IS_OK(status)) { goto fail; } status = register_with_ctdbd(conn, MSG_SRVID_SAMBA); if (!NT_STATUS_IS_OK(status)) { goto fail; } *pconn = conn; return NT_STATUS_OK; fail: TALLOC_FREE(conn); return status; } /* * Packet handler to receive and handle a ctdb message */ static NTSTATUS ctdb_handle_message(const struct data_blob *data, void *private_data) { struct ctdbd_connection *conn = talloc_get_type_abort( private_data, struct ctdbd_connection); struct ctdb_req_message *msg; struct messaging_rec *msg_rec; msg = (struct ctdb_req_message *)data->data; if (msg->hdr.operation != CTDB_REQ_MESSAGE) { DEBUG(0, ("Received async msg of type %u, discarding\n", msg->hdr.operation)); return NT_STATUS_INVALID_PARAMETER; } if ((conn->release_ip_handler != NULL) && (msg->srvid == CTDB_SRVID_RELEASE_IP)) { /* must be dispatched immediately */ DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n")); conn->release_ip_handler((const char *)msg->data, conn->release_ip_priv); return NT_STATUS_OK; } SMB_ASSERT(conn->msg_ctx != NULL); if (msg->srvid == CTDB_SRVID_RECONFIGURE) { DEBUG(0,("Got cluster reconfigure message\n")); /* * when the cluster is reconfigured, we need to clean the brl * database */ messaging_send(conn->msg_ctx, procid_self(), MSG_SMB_BRL_VALIDATE, &data_blob_null); /* * it's possible that we have just rejoined the cluster after * an outage. In that case our pending locks could have been * removed from the lockdb, so retry them once more */ message_send_all(conn->msg_ctx, MSG_SMB_UNLOCK, NULL, 0, NULL); return NT_STATUS_OK; } /* only messages to our pid or the broadcast are valid here */ if (msg->srvid != sys_getpid() && msg->srvid != MSG_SRVID_SAMBA) { DEBUG(0,("Got unexpected message with srvid=%llu\n", (unsigned long long)msg->srvid)); return NT_STATUS_OK; } if (!(msg_rec = ctdb_pull_messaging_rec(NULL, data->length, msg))) { DEBUG(10, ("ctdb_pull_messaging_rec failed\n")); return NT_STATUS_NO_MEMORY; } messaging_dispatch_rec(conn->msg_ctx, msg_rec); TALLOC_FREE(msg_rec); return NT_STATUS_OK; } /* * The ctdbd socket is readable asynchronuously */ static void ctdbd_socket_handler(struct event_context *event_ctx, struct fd_event *event, uint16 flags, void *private_data) { struct ctdbd_connection *conn = talloc_get_type_abort( private_data, struct ctdbd_connection); NTSTATUS status; status = packet_fd_read(conn->pkt); if (!NT_STATUS_IS_OK(status)) { DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status))); cluster_fatal("ctdbd died\n"); } while (packet_handler(conn->pkt, ctdb_req_complete, ctdb_handle_message, conn, &status)) { if (!NT_STATUS_IS_OK(status)) { DEBUG(10, ("could not handle incoming message: %s\n", nt_errstr(status))); } } } /* * Prepare a ctdbd connection to receive messages */ NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn, struct messaging_context *msg_ctx) { SMB_ASSERT(conn->msg_ctx == NULL); SMB_ASSERT(conn->fde == NULL); if (!(conn->fde = event_add_fd(msg_ctx->event_ctx, conn, packet_get_fd(conn->pkt), EVENT_FD_READ, ctdbd_socket_handler, conn))) { DEBUG(0, ("event_add_fd failed\n")); return NT_STATUS_NO_MEMORY; } conn->msg_ctx = msg_ctx; return NT_STATUS_OK; } /* * Send a messaging message across a ctdbd */ NTSTATUS ctdbd_messaging_send(struct ctdbd_connection *conn, uint32 dst_vnn, uint64 dst_srvid, struct messaging_rec *msg) { struct ctdb_req_message r; TALLOC_CTX *mem_ctx; DATA_BLOB blob; NTSTATUS status; if (!(mem_ctx = talloc_init("ctdbd_messaging_send"))) { DEBUG(0, ("talloc failed\n")); return NT_STATUS_NO_MEMORY; } status = ndr_push_struct_blob( &blob, mem_ctx, msg, (ndr_push_flags_fn_t)ndr_push_messaging_rec); if (!NT_STATUS_IS_OK(status)) { DEBUG(0, ("ndr_push_struct_blob failed: %s\n", nt_errstr(status))); goto fail; } r.hdr.length = offsetof(struct ctdb_req_message, data) + blob.length; r.hdr.ctdb_magic = CTDB_MAGIC; r.hdr.ctdb_version = CTDB_VERSION; r.hdr.generation = 1; r.hdr.operation = CTDB_REQ_MESSAGE; r.hdr.destnode = dst_vnn; r.hdr.srcnode = conn->our_vnn; r.hdr.reqid = 0; r.srvid = dst_srvid; r.datalen = blob.length; status = packet_send( conn->pkt, 2, data_blob_const(&r, offsetof(struct ctdb_req_message, data)), blob); if (!NT_STATUS_IS_OK(status)) { DEBUG(0, ("packet_send failed: %s\n", nt_errstr(status))); goto fail; } status = packet_flush(conn->pkt); if (!NT_STATUS_IS_OK(status)) { DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); cluster_fatal("cluster dispatch daemon msg write error\n"); } status = NT_STATUS_OK; fail: TALLOC_FREE(mem_ctx); return status; } /* * send/recv a generic ctdb control message */ static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, uint32_t vnn, uint32 opcode, uint64_t srvid, TDB_DATA data, TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int *cstatus) { struct ctdb_req_control req; struct ctdb_reply_control *reply = NULL; struct ctdbd_connection *new_conn = NULL; NTSTATUS status; if (conn == NULL) { status = ctdbd_init_connection(NULL, &new_conn); if (!NT_STATUS_IS_OK(status)) { DEBUG(10, ("Could not init temp connection: %s\n", nt_errstr(status))); goto fail; } conn = new_conn; } ZERO_STRUCT(req); req.hdr.length = offsetof(struct ctdb_req_control, data) + data.dsize; req.hdr.ctdb_magic = CTDB_MAGIC; req.hdr.ctdb_version = CTDB_VERSION; req.hdr.operation = CTDB_REQ_CONTROL; req.hdr.reqid = ++conn->reqid; req.hdr.destnode = vnn; req.opcode = opcode; req.srvid = srvid; req.datalen = data.dsize; status = packet_send( conn->pkt, 2, data_blob_const(&req, offsetof(struct ctdb_req_control, data)), data_blob_const(data.dptr, data.dsize)); if (!NT_STATUS_IS_OK(status)) { DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); goto fail; } status = packet_flush(conn->pkt); if (!NT_STATUS_IS_OK(status)) { DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); cluster_fatal("cluster dispatch daemon control write error\n"); } status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply); if (!NT_STATUS_IS_OK(status)) { DEBUG(10, ("ctdb_read_req failed: %s\n", nt_errstr(status))); goto fail; } if (reply->hdr.operation != CTDB_REPLY_CONTROL) { DEBUG(0, ("received invalid reply\n")); goto fail; } if (outdata) { if (!(outdata->dptr = (uint8 *)talloc_memdup( mem_ctx, reply->data, reply->datalen))) { TALLOC_FREE(reply); return NT_STATUS_NO_MEMORY; } outdata->dsize = reply->datalen; } if (cstatus) { (*cstatus) = reply->status; } status = NT_STATUS_OK; fail: TALLOC_FREE(new_conn); TALLOC_FREE(reply); return status; } /* * see if a remote process exists */ BOOL ctdbd_process_exists(struct ctdbd_connection *conn, uint32 vnn, pid_t pid) { NTSTATUS status; TDB_DATA data; int32_t cstatus; data.dptr = (uint8_t*)&pid; data.dsize = sizeof(pid); status = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS, 0, data, NULL, NULL, &cstatus); if (!NT_STATUS_IS_OK(status)) { DEBUG(0, (__location__ " ctdb_control for process_exists " "failed\n")); return False; } return cstatus == 0; } /* * Get a db path */ char *ctdbd_dbpath(struct ctdbd_connection *conn, TALLOC_CTX *mem_ctx, uint32_t db_id) { NTSTATUS status; TDB_DATA data; int32_t cstatus; data.dptr = (uint8_t*)&db_id; data.dsize = sizeof(db_id); status = ctdbd_control(conn, CTDB_CURRENT_NODE, CTDB_CONTROL_GETDBPATH, 0, data, mem_ctx, &data, &cstatus); if (!NT_STATUS_IS_OK(status) || cstatus != 0) { DEBUG(0,(__location__ " ctdb_control for getdbpath failed\n")); return NULL; } return (char *)data.dptr; } /* * attach to a ctdb database */ NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn, const char *name, uint32_t *db_id, int tdb_flags) { NTSTATUS status; TDB_DATA data; int32_t cstatus; data.dptr = (uint8_t*)name; data.dsize = strlen(name)+1; status = ctdbd_control(conn, CTDB_CURRENT_NODE, CTDB_CONTROL_DB_ATTACH, 0, data, NULL, &data, &cstatus); if (!NT_STATUS_IS_OK(status)) { DEBUG(0, (__location__ " ctdb_control for db_attach " "failed: %s\n", nt_errstr(status))); return status; } if (cstatus != 0 || data.dsize != sizeof(uint32_t)) { DEBUG(0,(__location__ " ctdb_control for db_attach failed\n")); return NT_STATUS_INTERNAL_ERROR; } *db_id = *(uint32_t *)data.dptr; talloc_free(data.dptr); if (!(tdb_flags & TDB_SEQNUM)) { return NT_STATUS_OK; } data.dptr = (uint8_t *)db_id; data.dsize = sizeof(*db_id); status = ctdbd_control(conn, CTDB_CURRENT_NODE, CTDB_CONTROL_ENABLE_SEQNUM, 0, data, NULL, NULL, &cstatus); if (!NT_STATUS_IS_OK(status) || cstatus != 0) { DEBUG(0,(__location__ " ctdb_control for enable seqnum " "failed\n")); return NT_STATUS_IS_OK(status) ? NT_STATUS_INTERNAL_ERROR : status; } return NT_STATUS_OK; } /* * force the migration of a record to this node */ NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32 db_id, TDB_DATA key) { struct ctdb_req_call req; struct ctdb_reply_call *reply; NTSTATUS status; ZERO_STRUCT(req); req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize; req.hdr.ctdb_magic = CTDB_MAGIC; req.hdr.ctdb_version = CTDB_VERSION; req.hdr.operation = CTDB_REQ_CALL; req.hdr.reqid = ++conn->reqid; req.flags = CTDB_IMMEDIATE_MIGRATION; req.callid = CTDB_NULL_FUNC; req.db_id = db_id; req.keylen = key.dsize; status = packet_send( conn->pkt, 2, data_blob_const(&req, offsetof(struct ctdb_req_call, data)), data_blob_const(key.dptr, key.dsize)); if (!NT_STATUS_IS_OK(status)) { DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); return status; } status = packet_flush(conn->pkt); if (!NT_STATUS_IS_OK(status)) { DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); cluster_fatal("cluster dispatch daemon control write error\n"); } status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply); if (!NT_STATUS_IS_OK(status)) { DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status))); goto fail; } if (reply->hdr.operation != CTDB_REPLY_CALL) { DEBUG(0, ("received invalid reply\n")); status = NT_STATUS_INTERNAL_ERROR; goto fail; } status = NT_STATUS_OK; fail: TALLOC_FREE(reply); return status; } /* * remotely fetch a record without locking it or forcing a migration */ NTSTATUS ctdbd_fetch(struct ctdbd_connection *conn, uint32 db_id, TDB_DATA key, TALLOC_CTX *mem_ctx, TDB_DATA *data) { struct ctdb_req_call req; struct ctdb_reply_call *reply; NTSTATUS status; ZERO_STRUCT(req); req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize; req.hdr.ctdb_magic = CTDB_MAGIC; req.hdr.ctdb_version = CTDB_VERSION; req.hdr.operation = CTDB_REQ_CALL; req.hdr.reqid = ++conn->reqid; req.flags = 0; req.callid = CTDB_FETCH_FUNC; req.db_id = db_id; req.keylen = key.dsize; status = packet_send( conn->pkt, 2, data_blob_const(&req, offsetof(struct ctdb_req_call, data)), data_blob_const(key.dptr, key.dsize)); if (!NT_STATUS_IS_OK(status)) { DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); return status; } status = packet_flush(conn->pkt); if (!NT_STATUS_IS_OK(status)) { DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); cluster_fatal("cluster dispatch daemon control write error\n"); } status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply); if (!NT_STATUS_IS_OK(status)) { DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status))); goto fail; } if (reply->hdr.operation != CTDB_REPLY_CALL) { DEBUG(0, ("received invalid reply\n")); status = NT_STATUS_INTERNAL_ERROR; goto fail; } data->dsize = reply->datalen; if (data->dsize == 0) { data->dptr = NULL; goto done; } data->dptr = (uint8 *)talloc_memdup(mem_ctx, &reply->data[0], reply->datalen); if (data->dptr == NULL) { DEBUG(0, ("talloc failed\n")); status = NT_STATUS_NO_MEMORY; goto fail; } done: status = NT_STATUS_OK; fail: TALLOC_FREE(reply); return status; } struct ctdbd_traverse_state { void (*fn)(TDB_DATA key, TDB_DATA data, void *private_data); void *private_data; }; /* * Handle a traverse record coming in on the ctdbd connection */ static NTSTATUS ctdb_traverse_handler(const struct data_blob *blob, void *private_data) { struct ctdbd_traverse_state *state = (struct ctdbd_traverse_state *)private_data; struct ctdb_req_message *m; struct ctdb_rec_data *d; TDB_DATA key, data; m = (struct ctdb_req_message *)blob->data; if (blob->length < sizeof(*m) || m->hdr.length != blob->length) { DEBUG(0, ("Got invalid message of length %d\n", (int)blob->length)); return NT_STATUS_UNEXPECTED_IO_ERROR; } d = (struct ctdb_rec_data *)&m->data[0]; if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) { DEBUG(0, ("Got invalid traverse data of length %d\n", (int)m->datalen)); return NT_STATUS_UNEXPECTED_IO_ERROR; } key.dsize = d->keylen; key.dptr = &d->data[0]; data.dsize = d->datalen; data.dptr = &d->data[d->keylen]; if (key.dsize == 0 && data.dsize == 0) { /* end of traverse */ return NT_STATUS_END_OF_FILE; } if (data.dsize < sizeof(struct ctdb_ltdb_header)) { DEBUG(0, ("Got invalid ltdb header length %d\n", (int)data.dsize)); return NT_STATUS_UNEXPECTED_IO_ERROR; } data.dsize -= sizeof(struct ctdb_ltdb_header); data.dptr += sizeof(struct ctdb_ltdb_header); if (state->fn) { state->fn(key, data, state->private_data); } return NT_STATUS_OK; } /* Traverse a ctdb database. This uses a kind-of hackish way to open a second connection to ctdbd to avoid the hairy recursive and async problems with everything in-line. */ NTSTATUS ctdbd_traverse(uint32 db_id, void (*fn)(TDB_DATA key, TDB_DATA data, void *private_data), void *private_data) { struct ctdbd_connection *conn; NTSTATUS status; TDB_DATA data; struct ctdb_traverse_start t; int cstatus; struct ctdbd_traverse_state state; status = ctdbd_init_connection(NULL, &conn); t.db_id = db_id; t.srvid = conn->rand_srvid; t.reqid = ++conn->reqid; data.dptr = (uint8_t *)&t; data.dsize = sizeof(t); status = ctdbd_control(conn, CTDB_CURRENT_NODE, CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid, data, NULL, NULL, &cstatus); if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) { DEBUG(0,("ctdbd_control failed: %s, %d\n", nt_errstr(status), cstatus)); if (NT_STATUS_IS_OK(status)) { /* * We need a mapping here */ status = NT_STATUS_UNSUCCESSFUL; } goto done; } state.fn = fn; state.private_data = private_data; while (True) { status = NT_STATUS_OK; if (packet_handler(conn->pkt, ctdb_req_complete, ctdb_traverse_handler, &state, &status)) { if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) { status = NT_STATUS_OK; break; } /* * There might be more in the queue */ continue; } if (!NT_STATUS_IS_OK(status)) { break; } status = packet_fd_read_sync(conn->pkt); if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) { /* * There might be more in the queue */ continue; } if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) { status = NT_STATUS_OK; } if (!NT_STATUS_IS_OK(status)) { DEBUG(0, ("packet_fd_read_sync failed: %s\n", nt_errstr(status))); cluster_fatal("ctdbd died\n"); } } done: TALLOC_FREE(conn); return status; } /* * Register us as a server for a particular tcp connection */ NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn, const struct sockaddr_in *server, const struct sockaddr_in *client, void (*release_ip_handler)(const char *ip_addr, void *private_data), void *private_data) { struct ctdb_control_tcp p; TDB_DATA data; NTSTATUS status; /* * Only one connection so far */ SMB_ASSERT(conn->release_ip_handler == NULL); /* * We want to be told about IP releases */ status = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP); if (!NT_STATUS_IS_OK(status)) { return status; } p.dest = *server; p.src = *client; /* * inform ctdb of our tcp connection, so if IP takeover happens ctdb * can send an extra ack to trigger a reset for our client, so it * immediately reconnects */ data.dptr = (uint8_t *)&p; data.dsize = sizeof(p); return ctdbd_control(conn, CTDB_CURRENT_NODE, CTDB_CONTROL_TCP_CLIENT, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL, NULL); } /* * We want to handle reconfigure events */ NTSTATUS ctdbd_register_reconfigure(struct ctdbd_connection *conn) { return register_with_ctdbd(conn, CTDB_SRVID_RECONFIGURE); } #else NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx, struct ctdbd_connection **pconn) { return NT_STATUS_NOT_IMPLEMENTED; } #endif