From de565785f5e1f097bd75f57331425c4185185f80 Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Sun, 10 Jun 2007 17:02:09 +0000 Subject: r23410: Merge the core of the cluster code. I'm 100% certain I've forgotten to merge something, but the main code should be in. It's mainly in dbwrap_ctdb.c, ctdbd_conn.c and messages_ctdbd.c. There should be no changes to the non-cluster case, it does survive make test on my laptop. It survives some very basic tests with ctdbd enables, I did not do the full test suite for clusters yet. Phew... Volker (This used to be commit 15553d6327a3aecdd2b0b94a3656d04bf4106323) --- source3/lib/ctdbd_conn.c | 1167 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1167 insertions(+) create mode 100644 source3/lib/ctdbd_conn.c (limited to 'source3/lib/ctdbd_conn.c') diff --git a/source3/lib/ctdbd_conn.c b/source3/lib/ctdbd_conn.c new file mode 100644 index 0000000000..8c1aab8f37 --- /dev/null +++ b/source3/lib/ctdbd_conn.c @@ -0,0 +1,1167 @@ +/* + Unix SMB/CIFS implementation. + Samba internal messaging functions + Copyright (C) 2007 by Volker Lendecke + Copyright (C) 2007 by Andrew Tridgell + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + +#include "includes.h" + +#ifdef CLUSTER_SUPPORT + +#include "librpc/gen_ndr/messaging.h" +#include "librpc/gen_ndr/ndr_messaging.h" + +/* paths to these include files come from --with-ctdb= in configure */ +#include "ctdb.h" +#include "ctdb_private.h" + +struct ctdbd_connection { + struct messaging_context *msg_ctx; + uint32 reqid; + uint32 our_vnn; + uint64 rand_srvid; + struct packet_context *pkt; + struct fd_event *fde; + + void (*release_ip_handler)(const char *ip_addr, void *private_data); + void *release_ip_priv; +}; + +static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, + uint32_t vnn, uint32 opcode, + uint64_t srvid, TDB_DATA data, + TALLOC_CTX *mem_ctx, TDB_DATA *outdata, + int *cstatus); + +/* + * exit on fatal communications errors with the ctdbd daemon + */ +static void cluster_fatal(const char *why) +{ + DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why)); + /* we don't use smb_panic() as we don't want to delay to write + a core file. We need to release this process id immediately + so that someone else can take over without getting sharing + violations */ + _exit(0); +} + +/* + * Register a srvid with ctdbd + */ +static NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, + uint64_t srvid) +{ + + int cstatus; + return ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_REGISTER_SRVID, srvid, + tdb_null, NULL, NULL, &cstatus); +} + +/* + * get our vnn from the cluster + */ +static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32 *vnn) +{ + int32_t cstatus=-1; + NTSTATUS status; + status = ctdbd_control(conn, + CTDB_CURRENT_NODE, CTDB_CONTROL_GET_VNN, 0, + tdb_null, NULL, NULL, &cstatus); + if (!NT_STATUS_IS_OK(status)) { + cluster_fatal("ctdbd_control failed\n"); + } + *vnn = (uint32_t)cstatus; + return status; +} + +uint32 ctdbd_vnn(const struct ctdbd_connection *conn) +{ + return conn->our_vnn; +} + +/* + * Get us a ctdb connection + */ + +static NTSTATUS ctdbd_connect(TALLOC_CTX *mem_ctx, + struct packet_context **presult) +{ + struct packet_context *result; + const char *sockname = lp_ctdbd_socket(); + struct sockaddr_un addr; + int fd; + + if (!sockname || !*sockname) { + sockname = CTDB_PATH; + } + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd == -1) { + DEBUG(3, ("Could not create socket: %s\n", strerror(errno))); + return map_nt_error_from_unix(errno); + } + + ZERO_STRUCT(addr); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, sockname, sizeof(addr.sun_path)); + + if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + DEBUG(0, ("connect(%s) failed: %s\n", sockname, + strerror(errno))); + close(fd); + return map_nt_error_from_unix(errno); + } + + if (!(result = packet_init(mem_ctx, fd))) { + close(fd); + return NT_STATUS_NO_MEMORY; + } + + *presult = result; + return NT_STATUS_OK; +} + +/* + * Do we have a complete ctdb packet in the queue? + */ + +static BOOL ctdb_req_complete(const struct data_blob *data, + size_t *length, + void *private_data) +{ + uint32 msglen; + + if (data->length < sizeof(msglen)) { + return False; + } + + msglen = *((uint32 *)data->data); + + DEBUG(10, ("msglen = %d\n", msglen)); + + if (msglen < sizeof(struct ctdb_req_header)) { + DEBUG(0, ("Got invalid msglen: %d, expected at least %d for " + "the req_header\n", msglen, + sizeof(struct ctdb_req_header))); + cluster_fatal("ctdbd protocol error\n"); + } + + if (data->length >= msglen) { + *length = msglen; + return True; + } + + return False; +} + +/* + * State necessary to defer an incoming message while we are waiting for a + * ctdb reply. + */ + +struct deferred_msg_state { + struct messaging_context *msg_ctx; + struct messaging_rec *rec; +}; + +/* + * Timed event handler for the deferred message + */ + +static void deferred_message_dispatch(struct event_context *event_ctx, + struct timed_event *te, + const struct timeval *now, + void *private_data) +{ + struct deferred_msg_state *state = talloc_get_type_abort( + private_data, struct deferred_msg_state); + + messaging_dispatch_rec(state->msg_ctx, state->rec); + TALLOC_FREE(state); + TALLOC_FREE(te); +} + +struct req_pull_state { + TALLOC_CTX *mem_ctx; + DATA_BLOB req; +}; + +/* + * Pull a ctdb request out of the incoming packet queue + */ + +static NTSTATUS ctdb_req_pull(const struct data_blob *data, + void *private_data) +{ + struct req_pull_state *state = (struct req_pull_state *)private_data; + + state->req = data_blob_talloc(state->mem_ctx, data->data, + data->length); + if (state->req.data == NULL) { + return NT_STATUS_NO_MEMORY; + } + return NT_STATUS_OK; +} + +/* + * Fetch a messaging_rec from an incoming ctdb style message + */ + +static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx, + size_t overall_length, + struct ctdb_req_message *msg) +{ + struct messaging_rec *result; + DATA_BLOB blob; + NTSTATUS status; + + if ((overall_length < offsetof(struct ctdb_req_message, data)) + || (overall_length + < offsetof(struct ctdb_req_message, data) + msg->datalen)) { + + cluster_fatal("got invalid msg length"); + } + + if (!(result = TALLOC_P(mem_ctx, struct messaging_rec))) { + DEBUG(0, ("talloc failed\n")); + return NULL; + } + + blob = data_blob_const(msg->data, msg->datalen); + + status = ndr_pull_struct_blob( + &blob, result, result, + (ndr_pull_flags_fn_t)ndr_pull_messaging_rec); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("ndr_pull_struct_blob failed: %s\n", + nt_errstr(status))); + TALLOC_FREE(result); + return NULL; + } + + return result; +} + +/* + * Read a full ctdbd request. If we have a messaging context, defer incoming + * messages that might come in between. + */ + +static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32 reqid, + TALLOC_CTX *mem_ctx, void *result) +{ + struct ctdb_req_header *hdr; + struct req_pull_state state; + NTSTATUS status; + + again: + + status = packet_fd_read_sync(conn->pkt); + + if (NT_STATUS_EQUAL(status, NT_STATUS_NETWORK_BUSY)) { + /* EAGAIN */ + goto again; + } + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("packet_fd_read failed: %s\n", nt_errstr(status))); + cluster_fatal("ctdbd died\n"); + } + + ZERO_STRUCT(state); + state.mem_ctx = mem_ctx; + + if (!packet_handler(conn->pkt, ctdb_req_complete, ctdb_req_pull, + &state, &status)) { + /* + * Not enough data + */ + goto again; + } + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("Could not read packet: %s\n", nt_errstr(status))); + cluster_fatal("ctdbd died\n"); + } + + hdr = (struct ctdb_req_header *)state.req.data; + + if (hdr->operation == CTDB_REQ_MESSAGE) { + struct timed_event *evt; + struct deferred_msg_state *msg_state; + struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr; + + if (conn->msg_ctx == NULL) { + DEBUG(1, ("Got a message without having a msg ctx, " + "dropping msg %llu\n", msg->srvid)); + goto again; + } + + if ((conn->release_ip_handler != NULL) + && (msg->srvid == CTDB_SRVID_RELEASE_IP)) { + /* must be dispatched immediately */ + DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n")); + conn->release_ip_handler((const char *)msg->data, + conn->release_ip_priv); + TALLOC_FREE(hdr); + goto again; + } + + if (!(msg_state = TALLOC_P(NULL, struct deferred_msg_state))) { + DEBUG(0, ("talloc failed\n")); + TALLOC_FREE(hdr); + goto again; + } + + if (!(msg_state->rec = ctdb_pull_messaging_rec( + msg_state, state.req.length, msg))) { + DEBUG(0, ("ctdbd_pull_messaging_rec failed\n")); + TALLOC_FREE(msg_state); + TALLOC_FREE(hdr); + goto again; + } + + TALLOC_FREE(hdr); + + msg_state->msg_ctx = conn->msg_ctx; + + /* + * We're waiting for a call reply, but an async message has + * crossed. Defer dispatching to the toplevel event loop. + */ + evt = event_add_timed(conn->msg_ctx->event_ctx, + conn->msg_ctx->event_ctx, + timeval_zero(), + "deferred_message_dispatch", + deferred_message_dispatch, + msg_state); + if (evt == NULL) { + DEBUG(0, ("event_add_timed failed\n")); + TALLOC_FREE(msg_state); + TALLOC_FREE(hdr); + goto again; + } + + goto again; + } + + if (hdr->reqid != reqid) { + /* we got the wrong reply */ + DEBUG(0,("Discarding mismatched ctdb reqid %u should have " + "been %u\n", hdr->reqid, reqid)); + TALLOC_FREE(hdr); + goto again; + } + + *((void **)result) = talloc_move(mem_ctx, &hdr); + + return NT_STATUS_OK; +} + +/* + * Get us a ctdbd connection + */ + +NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx, + struct ctdbd_connection **pconn) +{ + struct ctdbd_connection *conn; + NTSTATUS status; + + if (!(conn = TALLOC_ZERO_P(mem_ctx, struct ctdbd_connection))) { + DEBUG(0, ("talloc failed\n")); + return NT_STATUS_NO_MEMORY; + } + + status = ctdbd_connect(conn, &conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("ctdbd_connect failed: %s\n", nt_errstr(status))); + goto fail; + } + + status = get_cluster_vnn(conn, &conn->our_vnn); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("get_cluster_vnn failed: %s\n", nt_errstr(status))); + goto fail; + } + + generate_random_buffer((unsigned char *)&conn->rand_srvid, + sizeof(conn->rand_srvid)); + + status = register_with_ctdbd(conn, conn->rand_srvid); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(5, ("Could not register random srvid: %s\n", + nt_errstr(status))); + goto fail; + } + + *pconn = conn; + return NT_STATUS_OK; + + fail: + TALLOC_FREE(conn); + return status; +} + +/* + * Get us a ctdbd connection and register us as a process + */ + +NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx, + struct ctdbd_connection **pconn) +{ + struct ctdbd_connection *conn; + NTSTATUS status; + + status = ctdbd_init_connection(mem_ctx, &conn); + + if (!NT_STATUS_IS_OK(status)) { + return status; + } + + status = register_with_ctdbd(conn, (uint64_t)sys_getpid()); + if (!NT_STATUS_IS_OK(status)) { + goto fail; + } + + status = register_with_ctdbd(conn, MSG_SRVID_SAMBA); + if (!NT_STATUS_IS_OK(status)) { + goto fail; + } + + *pconn = conn; + return NT_STATUS_OK; + + fail: + TALLOC_FREE(conn); + return status; +} + +/* + * Packet handler to receive and handle a ctdb message + */ +static NTSTATUS ctdb_handle_message(const struct data_blob *data, + void *private_data) +{ + struct ctdbd_connection *conn = talloc_get_type_abort( + private_data, struct ctdbd_connection); + struct ctdb_req_message *msg; + struct messaging_rec *msg_rec; + + msg = (struct ctdb_req_message *)data->data; + + if (msg->hdr.operation != CTDB_REQ_MESSAGE) { + DEBUG(0, ("Received async msg of type %u, discarding\n", + msg->hdr.operation)); + return NT_STATUS_INVALID_PARAMETER; + } + + if ((conn->release_ip_handler != NULL) + && (msg->srvid == CTDB_SRVID_RELEASE_IP)) { + /* must be dispatched immediately */ + DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n")); + conn->release_ip_handler((const char *)msg->data, + conn->release_ip_priv); + return NT_STATUS_OK; + } + + SMB_ASSERT(conn->msg_ctx != NULL); + + if (msg->srvid == CTDB_SRVID_RECONFIGURE) { + DEBUG(0,("Got cluster reconfigure message\n")); + /* + * when the cluster is reconfigured, we need to clean the brl + * database + */ + messaging_send(conn->msg_ctx, procid_self(), + MSG_SMB_BRL_VALIDATE, &data_blob_null); + + /* + * it's possible that we have just rejoined the cluster after + * an outage. In that case our pending locks could have been + * removed from the lockdb, so retry them once more + */ + message_send_all(conn->msg_ctx, MSG_SMB_UNLOCK, NULL, 0, NULL); + + return NT_STATUS_OK; + + } + + /* only messages to our pid or the broadcast are valid here */ + if (msg->srvid != sys_getpid() && msg->srvid != MSG_SRVID_SAMBA) { + DEBUG(0,("Got unexpected message with srvid=%llu\n", + (unsigned long long)msg->srvid)); + return NT_STATUS_OK; + } + + if (!(msg_rec = ctdb_pull_messaging_rec(NULL, data->length, msg))) { + DEBUG(10, ("ctdb_pull_messaging_rec failed\n")); + return NT_STATUS_NO_MEMORY; + } + + messaging_dispatch_rec(conn->msg_ctx, msg_rec); + + TALLOC_FREE(msg_rec); + return NT_STATUS_OK; +} + +/* + * The ctdbd socket is readable asynchronuously + */ + +static void ctdbd_socket_handler(struct event_context *event_ctx, + struct fd_event *event, + uint16 flags, + void *private_data) +{ + struct ctdbd_connection *conn = talloc_get_type_abort( + private_data, struct ctdbd_connection); + + NTSTATUS status; + + status = packet_fd_read(conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status))); + cluster_fatal("ctdbd died\n"); + } + + while (packet_handler(conn->pkt, ctdb_req_complete, + ctdb_handle_message, conn, &status)) { + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("could not handle incoming message: %s\n", + nt_errstr(status))); + } + } +} + +/* + * Prepare a ctdbd connection to receive messages + */ + +NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn, + struct messaging_context *msg_ctx) +{ + SMB_ASSERT(conn->msg_ctx == NULL); + SMB_ASSERT(conn->fde == NULL); + + if (!(conn->fde = event_add_fd(msg_ctx->event_ctx, conn, + packet_get_fd(conn->pkt), + EVENT_FD_READ, + ctdbd_socket_handler, + conn))) { + DEBUG(0, ("event_add_fd failed\n")); + return NT_STATUS_NO_MEMORY; + } + + conn->msg_ctx = msg_ctx; + + return NT_STATUS_OK; +} + +/* + * Send a messaging message across a ctdbd + */ + +NTSTATUS ctdbd_messaging_send(struct ctdbd_connection *conn, + uint32 dst_vnn, uint64 dst_srvid, + struct messaging_rec *msg) +{ + struct ctdb_req_message r; + TALLOC_CTX *mem_ctx; + DATA_BLOB blob; + NTSTATUS status; + + if (!(mem_ctx = talloc_init("ctdbd_messaging_send"))) { + DEBUG(0, ("talloc failed\n")); + return NT_STATUS_NO_MEMORY; + } + + status = ndr_push_struct_blob( + &blob, mem_ctx, msg, + (ndr_push_flags_fn_t)ndr_push_messaging_rec); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("ndr_push_struct_blob failed: %s\n", + nt_errstr(status))); + goto fail; + } + + r.hdr.length = offsetof(struct ctdb_req_message, data) + blob.length; + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.generation = 1; + r.hdr.operation = CTDB_REQ_MESSAGE; + r.hdr.destnode = dst_vnn; + r.hdr.srcnode = conn->our_vnn; + r.hdr.reqid = 0; + r.srvid = dst_srvid; + r.datalen = blob.length; + + status = packet_send( + conn->pkt, 2, + data_blob_const(&r, offsetof(struct ctdb_req_message, data)), + blob); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("packet_send failed: %s\n", nt_errstr(status))); + goto fail; + } + + status = packet_flush(conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); + cluster_fatal("cluster dispatch daemon msg write error\n"); + } + + status = NT_STATUS_OK; + fail: + TALLOC_FREE(mem_ctx); + return status; +} + +/* + * send/recv a generic ctdb control message + */ +static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, + uint32_t vnn, uint32 opcode, + uint64_t srvid, TDB_DATA data, + TALLOC_CTX *mem_ctx, TDB_DATA *outdata, + int *cstatus) +{ + struct ctdb_req_control req; + struct ctdb_reply_control *reply = NULL; + struct ctdbd_connection *new_conn = NULL; + NTSTATUS status; + + if (conn == NULL) { + status = ctdbd_init_connection(NULL, &new_conn); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("Could not init temp connection: %s\n", + nt_errstr(status))); + goto fail; + } + + conn = new_conn; + } + + ZERO_STRUCT(req); + req.hdr.length = offsetof(struct ctdb_req_control, data) + data.dsize; + req.hdr.ctdb_magic = CTDB_MAGIC; + req.hdr.ctdb_version = CTDB_VERSION; + req.hdr.operation = CTDB_REQ_CONTROL; + req.hdr.reqid = ++conn->reqid; + req.hdr.destnode = vnn; + req.opcode = opcode; + req.srvid = srvid; + req.datalen = data.dsize; + + status = packet_send( + conn->pkt, 2, + data_blob_const(&req, offsetof(struct ctdb_req_control, data)), + data); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); + goto fail; + } + + status = packet_flush(conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); + cluster_fatal("cluster dispatch daemon control write error\n"); + } + + status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("ctdb_read_req failed: %s\n", nt_errstr(status))); + goto fail; + } + + if (reply->hdr.operation != CTDB_REPLY_CONTROL) { + DEBUG(0, ("received invalid reply\n")); + goto fail; + } + + if (outdata) { + if (!(outdata->dptr = (uint8 *)talloc_memdup( + mem_ctx, reply->data, reply->datalen))) { + TALLOC_FREE(reply); + return NT_STATUS_NO_MEMORY; + } + outdata->dsize = reply->datalen; + } + if (cstatus) { + (*cstatus) = reply->status; + } + + status = NT_STATUS_OK; + + fail: + TALLOC_FREE(new_conn); + TALLOC_FREE(reply); + return status; +} + +/* + * see if a remote process exists + */ +BOOL ctdbd_process_exists(struct ctdbd_connection *conn, uint32 vnn, pid_t pid) +{ + NTSTATUS status; + TDB_DATA data; + int32_t cstatus; + + data.dptr = (uint8_t*)&pid; + data.dsize = sizeof(pid); + + status = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS, 0, + data, NULL, NULL, &cstatus); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, (__location__ " ctdb_control for process_exists " + "failed\n")); + return False; + } + + return cstatus == 0; +} + +/* + * Get a db path + */ +char *ctdbd_dbpath(struct ctdbd_connection *conn, + TALLOC_CTX *mem_ctx, uint32_t db_id) +{ + NTSTATUS status; + TDB_DATA data; + int32_t cstatus; + + data.dptr = (uint8_t*)&db_id; + data.dsize = sizeof(db_id); + + status = ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_GETDBPATH, 0, data, + mem_ctx, &data, &cstatus); + if (!NT_STATUS_IS_OK(status) || cstatus != 0) { + DEBUG(0,(__location__ " ctdb_control for getdbpath failed\n")); + return NULL; + } + + return (char *)data.dptr; +} + +/* + * attach to a ctdb database + */ +NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn, + const char *name, uint32_t *db_id, int tdb_flags) +{ + NTSTATUS status; + TDB_DATA data; + int32_t cstatus; + + data.dptr = (uint8_t*)name; + data.dsize = strlen(name)+1; + + status = ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_DB_ATTACH, 0, data, + NULL, &data, &cstatus); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, (__location__ " ctdb_control for db_attach " + "failed: %s\n", nt_errstr(status))); + return status; + } + + if (cstatus != 0 || data.dsize != sizeof(uint32_t)) { + DEBUG(0,(__location__ " ctdb_control for db_attach failed\n")); + return NT_STATUS_INTERNAL_ERROR; + } + + *db_id = *(uint32_t *)data.dptr; + talloc_free(data.dptr); + + if (!(tdb_flags & TDB_SEQNUM)) { + return NT_STATUS_OK; + } + + data.dptr = (uint8_t *)db_id; + data.dsize = sizeof(*db_id); + + status = ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_ENABLE_SEQNUM, 0, data, + NULL, NULL, &cstatus); + if (!NT_STATUS_IS_OK(status) || cstatus != 0) { + DEBUG(0,(__location__ " ctdb_control for enable seqnum " + "failed\n")); + return NT_STATUS_IS_OK(status) ? NT_STATUS_INTERNAL_ERROR : + status; + } + + return NT_STATUS_OK; +} + +/* + * force the migration of a record to this node + */ +NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32 db_id, + TDB_DATA key) +{ + struct ctdb_req_call req; + struct ctdb_reply_call *reply; + NTSTATUS status; + + ZERO_STRUCT(req); + + req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize; + req.hdr.ctdb_magic = CTDB_MAGIC; + req.hdr.ctdb_version = CTDB_VERSION; + req.hdr.operation = CTDB_REQ_CALL; + req.hdr.reqid = ++conn->reqid; + req.flags = CTDB_IMMEDIATE_MIGRATION; + req.callid = CTDB_NULL_FUNC; + req.db_id = db_id; + req.keylen = key.dsize; + + status = packet_send( + conn->pkt, 2, + data_blob_const(&req, offsetof(struct ctdb_req_call, data)), + key); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); + return status; + } + + status = packet_flush(conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); + cluster_fatal("cluster dispatch daemon control write error\n"); + } + + status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status))); + goto fail; + } + + if (reply->hdr.operation != CTDB_REPLY_CALL) { + DEBUG(0, ("received invalid reply\n")); + status = NT_STATUS_INTERNAL_ERROR; + goto fail; + } + + status = NT_STATUS_OK; + fail: + + TALLOC_FREE(reply); + return status; +} + +/* + * remotely fetch a record without locking it or forcing a migration + */ +NTSTATUS ctdbd_fetch(struct ctdbd_connection *conn, uint32 db_id, + TDB_DATA key, TALLOC_CTX *mem_ctx, TDB_DATA *data) +{ + struct ctdb_req_call req; + struct ctdb_reply_call *reply; + NTSTATUS status; + + ZERO_STRUCT(req); + + req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize; + req.hdr.ctdb_magic = CTDB_MAGIC; + req.hdr.ctdb_version = CTDB_VERSION; + req.hdr.operation = CTDB_REQ_CALL; + req.hdr.reqid = ++conn->reqid; + req.flags = 0; + req.callid = CTDB_FETCH_FUNC; + req.db_id = db_id; + req.keylen = key.dsize; + + status = packet_send( + conn->pkt, 2, + data_blob_const(&req, offsetof(struct ctdb_req_call, data)), + key); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); + return status; + } + + status = packet_flush(conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); + cluster_fatal("cluster dispatch daemon control write error\n"); + } + + status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status))); + goto fail; + } + + if (reply->hdr.operation != CTDB_REPLY_CALL) { + DEBUG(0, ("received invalid reply\n")); + status = NT_STATUS_INTERNAL_ERROR; + goto fail; + } + + data->dsize = reply->datalen; + if (data->dsize == 0) { + data->dptr = NULL; + goto done; + } + + data->dptr = (uint8 *)talloc_memdup(mem_ctx, &reply->data[0], + reply->datalen); + if (data->dptr == NULL) { + DEBUG(0, ("talloc failed\n")); + status = NT_STATUS_NO_MEMORY; + goto fail; + } + + done: + status = NT_STATUS_OK; + fail: + TALLOC_FREE(reply); + return status; +} + +struct ctdbd_traverse_state { + void (*fn)(TDB_DATA key, TDB_DATA data, void *private_data); + void *private_data; +}; + +/* + * Handle a traverse record coming in on the ctdbd connection + */ + +static NTSTATUS ctdb_traverse_handler(const struct data_blob *blob, + void *private_data) +{ + struct ctdbd_traverse_state *state = + (struct ctdbd_traverse_state *)private_data; + + struct ctdb_req_message *m; + struct ctdb_rec_data *d; + TDB_DATA key, data; + + m = (struct ctdb_req_message *)blob->data; + + if (blob->length < sizeof(*m) || m->hdr.length != blob->length) { + DEBUG(0, ("Got invalid message of length %d\n", + (int)blob->length)); + return NT_STATUS_UNEXPECTED_IO_ERROR; + } + + d = (struct ctdb_rec_data *)&m->data[0]; + if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) { + DEBUG(0, ("Got invalid traverse data of length %d\n", + (int)m->datalen)); + return NT_STATUS_UNEXPECTED_IO_ERROR; + } + + key.dsize = d->keylen; + key.dptr = &d->data[0]; + data.dsize = d->datalen; + data.dptr = &d->data[d->keylen]; + + if (key.dsize == 0 && data.dsize == 0) { + /* end of traverse */ + return NT_STATUS_END_OF_FILE; + } + + if (data.dsize < sizeof(struct ctdb_ltdb_header)) { + DEBUG(0, ("Got invalid ltdb header length %d\n", + (int)data.dsize)); + return NT_STATUS_UNEXPECTED_IO_ERROR; + } + data.dsize -= sizeof(struct ctdb_ltdb_header); + data.dptr += sizeof(struct ctdb_ltdb_header); + + if (state->fn) { + state->fn(key, data, state->private_data); + } + + return NT_STATUS_OK; +} + +/* + Traverse a ctdb database. This uses a kind-of hackish way to open a second + connection to ctdbd to avoid the hairy recursive and async problems with + everything in-line. +*/ + +NTSTATUS ctdbd_traverse(uint32 db_id, + void (*fn)(TDB_DATA key, TDB_DATA data, + void *private_data), + void *private_data) +{ + struct ctdbd_connection *conn; + NTSTATUS status; + + TDB_DATA data; + struct ctdb_traverse_start t; + int cstatus; + struct ctdbd_traverse_state state; + + status = ctdbd_init_connection(NULL, &conn); + + t.db_id = db_id; + t.srvid = conn->rand_srvid; + t.reqid = ++conn->reqid; + + data.dptr = (uint8_t *)&t; + data.dsize = sizeof(t); + + status = ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid, + data, NULL, NULL, &cstatus); + + if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) { + + DEBUG(0,("ctdbd_control failed: %s, %d\n", nt_errstr(status), + cstatus)); + + if (NT_STATUS_IS_OK(status)) { + /* + * We need a mapping here + */ + status = NT_STATUS_UNSUCCESSFUL; + } + goto done; + } + + state.fn = fn; + state.private_data = private_data; + + while (True) { + + status = NT_STATUS_OK; + + if (packet_handler(conn->pkt, ctdb_req_complete, + ctdb_traverse_handler, &state, &status)) { + + if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) { + status = NT_STATUS_OK; + break; + } + + /* + * There might be more in the queue + */ + continue; + } + + if (!NT_STATUS_IS_OK(status)) { + break; + } + + status = packet_fd_read_sync(conn->pkt); + + if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) { + status = NT_STATUS_OK; + } + + if (!NT_STATUS_IS_OK(status)) { + cluster_fatal("ctdbd died\n"); + } + } + + done: + TALLOC_FREE(conn); + return status; +} + +/* + * Register us as a server for a particular tcp connection + */ + +NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn, + const struct sockaddr_in *server, + const struct sockaddr_in *client, + void (*release_ip_handler)(const char *ip_addr, + void *private_data), + void *private_data) +{ + struct ctdb_control_tcp p; + TDB_DATA data; + NTSTATUS status; + + /* + * Only one connection so far + */ + SMB_ASSERT(conn->release_ip_handler == NULL); + + /* + * We want to be told about IP releases + */ + + status = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP); + if (!NT_STATUS_IS_OK(status)) { + return status; + } + + p.dest = *server; + p.src = *client; + + /* + * inform ctdb of our tcp connection, so if IP takeover happens ctdb + * can send an extra ack to trigger a reset for our client, so it + * immediately reconnects + */ + data.dptr = (uint8_t *)&p; + data.dsize = sizeof(p); + + return ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_TCP_CLIENT, + CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL, NULL); +} + +/* + * We want to handle reconfigure events + */ +NTSTATUS ctdbd_register_reconfigure(struct ctdbd_connection *conn) +{ + return register_with_ctdbd(conn, CTDB_SRVID_RECONFIGURE); +} + +#else + +NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx, + struct ctdbd_connection **pconn) +{ + return NT_STATUS_NOT_IMPLEMENTED; +} + +#endif -- cgit