diff options
author | Volker Lendecke <vlendec@samba.org> | 2007-06-10 17:02:09 +0000 |
---|---|---|
committer | Gerald (Jerry) Carter <jerry@samba.org> | 2007-10-10 12:23:14 -0500 |
commit | de565785f5e1f097bd75f57331425c4185185f80 (patch) | |
tree | af24bb44388f9ffa2077435cc3fea134557c186b /source3/lib | |
parent | b05c7b97830f6029c264fc44831c2f5eae4f0c83 (diff) | |
download | samba-de565785f5e1f097bd75f57331425c4185185f80.tar.gz samba-de565785f5e1f097bd75f57331425c4185185f80.tar.bz2 samba-de565785f5e1f097bd75f57331425c4185185f80.zip |
r23410: Merge the core of the cluster code.
I'm 100% certain I've forgotten to merge something, but the main code
should be in. It's mainly in dbwrap_ctdb.c, ctdbd_conn.c and
messages_ctdbd.c.
There should be no changes to the non-cluster case, it does survive make
test on my laptop.
It survives some very basic tests with ctdbd enables, I did not do the
full test suite for clusters yet.
Phew...
Volker
(This used to be commit 15553d6327a3aecdd2b0b94a3656d04bf4106323)
Diffstat (limited to 'source3/lib')
-rw-r--r-- | source3/lib/ctdbd_conn.c | 1167 | ||||
-rw-r--r-- | source3/lib/dbwrap.c | 26 | ||||
-rw-r--r-- | source3/lib/dbwrap_ctdb.c | 451 | ||||
-rw-r--r-- | source3/lib/messages.c | 41 | ||||
-rw-r--r-- | source3/lib/messages_ctdbd.c | 119 | ||||
-rw-r--r-- | source3/lib/packet.c | 256 | ||||
-rw-r--r-- | source3/lib/substitute.c | 6 | ||||
-rw-r--r-- | source3/lib/util.c | 81 |
8 files changed, 2135 insertions, 12 deletions
diff --git a/source3/lib/ctdbd_conn.c b/source3/lib/ctdbd_conn.c new file mode 100644 index 0000000000..8c1aab8f37 --- /dev/null +++ b/source3/lib/ctdbd_conn.c @@ -0,0 +1,1167 @@ +/* + Unix SMB/CIFS implementation. + Samba internal messaging functions + Copyright (C) 2007 by Volker Lendecke + Copyright (C) 2007 by Andrew Tridgell + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + +#include "includes.h" + +#ifdef CLUSTER_SUPPORT + +#include "librpc/gen_ndr/messaging.h" +#include "librpc/gen_ndr/ndr_messaging.h" + +/* paths to these include files come from --with-ctdb= in configure */ +#include "ctdb.h" +#include "ctdb_private.h" + +struct ctdbd_connection { + struct messaging_context *msg_ctx; + uint32 reqid; + uint32 our_vnn; + uint64 rand_srvid; + struct packet_context *pkt; + struct fd_event *fde; + + void (*release_ip_handler)(const char *ip_addr, void *private_data); + void *release_ip_priv; +}; + +static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, + uint32_t vnn, uint32 opcode, + uint64_t srvid, TDB_DATA data, + TALLOC_CTX *mem_ctx, TDB_DATA *outdata, + int *cstatus); + +/* + * exit on fatal communications errors with the ctdbd daemon + */ +static void cluster_fatal(const char *why) +{ + DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why)); + /* we don't use smb_panic() as we don't want to delay to write + a core file. We need to release this process id immediately + so that someone else can take over without getting sharing + violations */ + _exit(0); +} + +/* + * Register a srvid with ctdbd + */ +static NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, + uint64_t srvid) +{ + + int cstatus; + return ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_REGISTER_SRVID, srvid, + tdb_null, NULL, NULL, &cstatus); +} + +/* + * get our vnn from the cluster + */ +static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32 *vnn) +{ + int32_t cstatus=-1; + NTSTATUS status; + status = ctdbd_control(conn, + CTDB_CURRENT_NODE, CTDB_CONTROL_GET_VNN, 0, + tdb_null, NULL, NULL, &cstatus); + if (!NT_STATUS_IS_OK(status)) { + cluster_fatal("ctdbd_control failed\n"); + } + *vnn = (uint32_t)cstatus; + return status; +} + +uint32 ctdbd_vnn(const struct ctdbd_connection *conn) +{ + return conn->our_vnn; +} + +/* + * Get us a ctdb connection + */ + +static NTSTATUS ctdbd_connect(TALLOC_CTX *mem_ctx, + struct packet_context **presult) +{ + struct packet_context *result; + const char *sockname = lp_ctdbd_socket(); + struct sockaddr_un addr; + int fd; + + if (!sockname || !*sockname) { + sockname = CTDB_PATH; + } + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd == -1) { + DEBUG(3, ("Could not create socket: %s\n", strerror(errno))); + return map_nt_error_from_unix(errno); + } + + ZERO_STRUCT(addr); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, sockname, sizeof(addr.sun_path)); + + if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + DEBUG(0, ("connect(%s) failed: %s\n", sockname, + strerror(errno))); + close(fd); + return map_nt_error_from_unix(errno); + } + + if (!(result = packet_init(mem_ctx, fd))) { + close(fd); + return NT_STATUS_NO_MEMORY; + } + + *presult = result; + return NT_STATUS_OK; +} + +/* + * Do we have a complete ctdb packet in the queue? + */ + +static BOOL ctdb_req_complete(const struct data_blob *data, + size_t *length, + void *private_data) +{ + uint32 msglen; + + if (data->length < sizeof(msglen)) { + return False; + } + + msglen = *((uint32 *)data->data); + + DEBUG(10, ("msglen = %d\n", msglen)); + + if (msglen < sizeof(struct ctdb_req_header)) { + DEBUG(0, ("Got invalid msglen: %d, expected at least %d for " + "the req_header\n", msglen, + sizeof(struct ctdb_req_header))); + cluster_fatal("ctdbd protocol error\n"); + } + + if (data->length >= msglen) { + *length = msglen; + return True; + } + + return False; +} + +/* + * State necessary to defer an incoming message while we are waiting for a + * ctdb reply. + */ + +struct deferred_msg_state { + struct messaging_context *msg_ctx; + struct messaging_rec *rec; +}; + +/* + * Timed event handler for the deferred message + */ + +static void deferred_message_dispatch(struct event_context *event_ctx, + struct timed_event *te, + const struct timeval *now, + void *private_data) +{ + struct deferred_msg_state *state = talloc_get_type_abort( + private_data, struct deferred_msg_state); + + messaging_dispatch_rec(state->msg_ctx, state->rec); + TALLOC_FREE(state); + TALLOC_FREE(te); +} + +struct req_pull_state { + TALLOC_CTX *mem_ctx; + DATA_BLOB req; +}; + +/* + * Pull a ctdb request out of the incoming packet queue + */ + +static NTSTATUS ctdb_req_pull(const struct data_blob *data, + void *private_data) +{ + struct req_pull_state *state = (struct req_pull_state *)private_data; + + state->req = data_blob_talloc(state->mem_ctx, data->data, + data->length); + if (state->req.data == NULL) { + return NT_STATUS_NO_MEMORY; + } + return NT_STATUS_OK; +} + +/* + * Fetch a messaging_rec from an incoming ctdb style message + */ + +static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx, + size_t overall_length, + struct ctdb_req_message *msg) +{ + struct messaging_rec *result; + DATA_BLOB blob; + NTSTATUS status; + + if ((overall_length < offsetof(struct ctdb_req_message, data)) + || (overall_length + < offsetof(struct ctdb_req_message, data) + msg->datalen)) { + + cluster_fatal("got invalid msg length"); + } + + if (!(result = TALLOC_P(mem_ctx, struct messaging_rec))) { + DEBUG(0, ("talloc failed\n")); + return NULL; + } + + blob = data_blob_const(msg->data, msg->datalen); + + status = ndr_pull_struct_blob( + &blob, result, result, + (ndr_pull_flags_fn_t)ndr_pull_messaging_rec); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("ndr_pull_struct_blob failed: %s\n", + nt_errstr(status))); + TALLOC_FREE(result); + return NULL; + } + + return result; +} + +/* + * Read a full ctdbd request. If we have a messaging context, defer incoming + * messages that might come in between. + */ + +static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32 reqid, + TALLOC_CTX *mem_ctx, void *result) +{ + struct ctdb_req_header *hdr; + struct req_pull_state state; + NTSTATUS status; + + again: + + status = packet_fd_read_sync(conn->pkt); + + if (NT_STATUS_EQUAL(status, NT_STATUS_NETWORK_BUSY)) { + /* EAGAIN */ + goto again; + } + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("packet_fd_read failed: %s\n", nt_errstr(status))); + cluster_fatal("ctdbd died\n"); + } + + ZERO_STRUCT(state); + state.mem_ctx = mem_ctx; + + if (!packet_handler(conn->pkt, ctdb_req_complete, ctdb_req_pull, + &state, &status)) { + /* + * Not enough data + */ + goto again; + } + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("Could not read packet: %s\n", nt_errstr(status))); + cluster_fatal("ctdbd died\n"); + } + + hdr = (struct ctdb_req_header *)state.req.data; + + if (hdr->operation == CTDB_REQ_MESSAGE) { + struct timed_event *evt; + struct deferred_msg_state *msg_state; + struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr; + + if (conn->msg_ctx == NULL) { + DEBUG(1, ("Got a message without having a msg ctx, " + "dropping msg %llu\n", msg->srvid)); + goto again; + } + + if ((conn->release_ip_handler != NULL) + && (msg->srvid == CTDB_SRVID_RELEASE_IP)) { + /* must be dispatched immediately */ + DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n")); + conn->release_ip_handler((const char *)msg->data, + conn->release_ip_priv); + TALLOC_FREE(hdr); + goto again; + } + + if (!(msg_state = TALLOC_P(NULL, struct deferred_msg_state))) { + DEBUG(0, ("talloc failed\n")); + TALLOC_FREE(hdr); + goto again; + } + + if (!(msg_state->rec = ctdb_pull_messaging_rec( + msg_state, state.req.length, msg))) { + DEBUG(0, ("ctdbd_pull_messaging_rec failed\n")); + TALLOC_FREE(msg_state); + TALLOC_FREE(hdr); + goto again; + } + + TALLOC_FREE(hdr); + + msg_state->msg_ctx = conn->msg_ctx; + + /* + * We're waiting for a call reply, but an async message has + * crossed. Defer dispatching to the toplevel event loop. + */ + evt = event_add_timed(conn->msg_ctx->event_ctx, + conn->msg_ctx->event_ctx, + timeval_zero(), + "deferred_message_dispatch", + deferred_message_dispatch, + msg_state); + if (evt == NULL) { + DEBUG(0, ("event_add_timed failed\n")); + TALLOC_FREE(msg_state); + TALLOC_FREE(hdr); + goto again; + } + + goto again; + } + + if (hdr->reqid != reqid) { + /* we got the wrong reply */ + DEBUG(0,("Discarding mismatched ctdb reqid %u should have " + "been %u\n", hdr->reqid, reqid)); + TALLOC_FREE(hdr); + goto again; + } + + *((void **)result) = talloc_move(mem_ctx, &hdr); + + return NT_STATUS_OK; +} + +/* + * Get us a ctdbd connection + */ + +NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx, + struct ctdbd_connection **pconn) +{ + struct ctdbd_connection *conn; + NTSTATUS status; + + if (!(conn = TALLOC_ZERO_P(mem_ctx, struct ctdbd_connection))) { + DEBUG(0, ("talloc failed\n")); + return NT_STATUS_NO_MEMORY; + } + + status = ctdbd_connect(conn, &conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("ctdbd_connect failed: %s\n", nt_errstr(status))); + goto fail; + } + + status = get_cluster_vnn(conn, &conn->our_vnn); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("get_cluster_vnn failed: %s\n", nt_errstr(status))); + goto fail; + } + + generate_random_buffer((unsigned char *)&conn->rand_srvid, + sizeof(conn->rand_srvid)); + + status = register_with_ctdbd(conn, conn->rand_srvid); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(5, ("Could not register random srvid: %s\n", + nt_errstr(status))); + goto fail; + } + + *pconn = conn; + return NT_STATUS_OK; + + fail: + TALLOC_FREE(conn); + return status; +} + +/* + * Get us a ctdbd connection and register us as a process + */ + +NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx, + struct ctdbd_connection **pconn) +{ + struct ctdbd_connection *conn; + NTSTATUS status; + + status = ctdbd_init_connection(mem_ctx, &conn); + + if (!NT_STATUS_IS_OK(status)) { + return status; + } + + status = register_with_ctdbd(conn, (uint64_t)sys_getpid()); + if (!NT_STATUS_IS_OK(status)) { + goto fail; + } + + status = register_with_ctdbd(conn, MSG_SRVID_SAMBA); + if (!NT_STATUS_IS_OK(status)) { + goto fail; + } + + *pconn = conn; + return NT_STATUS_OK; + + fail: + TALLOC_FREE(conn); + return status; +} + +/* + * Packet handler to receive and handle a ctdb message + */ +static NTSTATUS ctdb_handle_message(const struct data_blob *data, + void *private_data) +{ + struct ctdbd_connection *conn = talloc_get_type_abort( + private_data, struct ctdbd_connection); + struct ctdb_req_message *msg; + struct messaging_rec *msg_rec; + + msg = (struct ctdb_req_message *)data->data; + + if (msg->hdr.operation != CTDB_REQ_MESSAGE) { + DEBUG(0, ("Received async msg of type %u, discarding\n", + msg->hdr.operation)); + return NT_STATUS_INVALID_PARAMETER; + } + + if ((conn->release_ip_handler != NULL) + && (msg->srvid == CTDB_SRVID_RELEASE_IP)) { + /* must be dispatched immediately */ + DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n")); + conn->release_ip_handler((const char *)msg->data, + conn->release_ip_priv); + return NT_STATUS_OK; + } + + SMB_ASSERT(conn->msg_ctx != NULL); + + if (msg->srvid == CTDB_SRVID_RECONFIGURE) { + DEBUG(0,("Got cluster reconfigure message\n")); + /* + * when the cluster is reconfigured, we need to clean the brl + * database + */ + messaging_send(conn->msg_ctx, procid_self(), + MSG_SMB_BRL_VALIDATE, &data_blob_null); + + /* + * it's possible that we have just rejoined the cluster after + * an outage. In that case our pending locks could have been + * removed from the lockdb, so retry them once more + */ + message_send_all(conn->msg_ctx, MSG_SMB_UNLOCK, NULL, 0, NULL); + + return NT_STATUS_OK; + + } + + /* only messages to our pid or the broadcast are valid here */ + if (msg->srvid != sys_getpid() && msg->srvid != MSG_SRVID_SAMBA) { + DEBUG(0,("Got unexpected message with srvid=%llu\n", + (unsigned long long)msg->srvid)); + return NT_STATUS_OK; + } + + if (!(msg_rec = ctdb_pull_messaging_rec(NULL, data->length, msg))) { + DEBUG(10, ("ctdb_pull_messaging_rec failed\n")); + return NT_STATUS_NO_MEMORY; + } + + messaging_dispatch_rec(conn->msg_ctx, msg_rec); + + TALLOC_FREE(msg_rec); + return NT_STATUS_OK; +} + +/* + * The ctdbd socket is readable asynchronuously + */ + +static void ctdbd_socket_handler(struct event_context *event_ctx, + struct fd_event *event, + uint16 flags, + void *private_data) +{ + struct ctdbd_connection *conn = talloc_get_type_abort( + private_data, struct ctdbd_connection); + + NTSTATUS status; + + status = packet_fd_read(conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status))); + cluster_fatal("ctdbd died\n"); + } + + while (packet_handler(conn->pkt, ctdb_req_complete, + ctdb_handle_message, conn, &status)) { + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("could not handle incoming message: %s\n", + nt_errstr(status))); + } + } +} + +/* + * Prepare a ctdbd connection to receive messages + */ + +NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn, + struct messaging_context *msg_ctx) +{ + SMB_ASSERT(conn->msg_ctx == NULL); + SMB_ASSERT(conn->fde == NULL); + + if (!(conn->fde = event_add_fd(msg_ctx->event_ctx, conn, + packet_get_fd(conn->pkt), + EVENT_FD_READ, + ctdbd_socket_handler, + conn))) { + DEBUG(0, ("event_add_fd failed\n")); + return NT_STATUS_NO_MEMORY; + } + + conn->msg_ctx = msg_ctx; + + return NT_STATUS_OK; +} + +/* + * Send a messaging message across a ctdbd + */ + +NTSTATUS ctdbd_messaging_send(struct ctdbd_connection *conn, + uint32 dst_vnn, uint64 dst_srvid, + struct messaging_rec *msg) +{ + struct ctdb_req_message r; + TALLOC_CTX *mem_ctx; + DATA_BLOB blob; + NTSTATUS status; + + if (!(mem_ctx = talloc_init("ctdbd_messaging_send"))) { + DEBUG(0, ("talloc failed\n")); + return NT_STATUS_NO_MEMORY; + } + + status = ndr_push_struct_blob( + &blob, mem_ctx, msg, + (ndr_push_flags_fn_t)ndr_push_messaging_rec); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("ndr_push_struct_blob failed: %s\n", + nt_errstr(status))); + goto fail; + } + + r.hdr.length = offsetof(struct ctdb_req_message, data) + blob.length; + r.hdr.ctdb_magic = CTDB_MAGIC; + r.hdr.ctdb_version = CTDB_VERSION; + r.hdr.generation = 1; + r.hdr.operation = CTDB_REQ_MESSAGE; + r.hdr.destnode = dst_vnn; + r.hdr.srcnode = conn->our_vnn; + r.hdr.reqid = 0; + r.srvid = dst_srvid; + r.datalen = blob.length; + + status = packet_send( + conn->pkt, 2, + data_blob_const(&r, offsetof(struct ctdb_req_message, data)), + blob); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("packet_send failed: %s\n", nt_errstr(status))); + goto fail; + } + + status = packet_flush(conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); + cluster_fatal("cluster dispatch daemon msg write error\n"); + } + + status = NT_STATUS_OK; + fail: + TALLOC_FREE(mem_ctx); + return status; +} + +/* + * send/recv a generic ctdb control message + */ +static NTSTATUS ctdbd_control(struct ctdbd_connection *conn, + uint32_t vnn, uint32 opcode, + uint64_t srvid, TDB_DATA data, + TALLOC_CTX *mem_ctx, TDB_DATA *outdata, + int *cstatus) +{ + struct ctdb_req_control req; + struct ctdb_reply_control *reply = NULL; + struct ctdbd_connection *new_conn = NULL; + NTSTATUS status; + + if (conn == NULL) { + status = ctdbd_init_connection(NULL, &new_conn); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("Could not init temp connection: %s\n", + nt_errstr(status))); + goto fail; + } + + conn = new_conn; + } + + ZERO_STRUCT(req); + req.hdr.length = offsetof(struct ctdb_req_control, data) + data.dsize; + req.hdr.ctdb_magic = CTDB_MAGIC; + req.hdr.ctdb_version = CTDB_VERSION; + req.hdr.operation = CTDB_REQ_CONTROL; + req.hdr.reqid = ++conn->reqid; + req.hdr.destnode = vnn; + req.opcode = opcode; + req.srvid = srvid; + req.datalen = data.dsize; + + status = packet_send( + conn->pkt, 2, + data_blob_const(&req, offsetof(struct ctdb_req_control, data)), + data); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); + goto fail; + } + + status = packet_flush(conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); + cluster_fatal("cluster dispatch daemon control write error\n"); + } + + status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("ctdb_read_req failed: %s\n", nt_errstr(status))); + goto fail; + } + + if (reply->hdr.operation != CTDB_REPLY_CONTROL) { + DEBUG(0, ("received invalid reply\n")); + goto fail; + } + + if (outdata) { + if (!(outdata->dptr = (uint8 *)talloc_memdup( + mem_ctx, reply->data, reply->datalen))) { + TALLOC_FREE(reply); + return NT_STATUS_NO_MEMORY; + } + outdata->dsize = reply->datalen; + } + if (cstatus) { + (*cstatus) = reply->status; + } + + status = NT_STATUS_OK; + + fail: + TALLOC_FREE(new_conn); + TALLOC_FREE(reply); + return status; +} + +/* + * see if a remote process exists + */ +BOOL ctdbd_process_exists(struct ctdbd_connection *conn, uint32 vnn, pid_t pid) +{ + NTSTATUS status; + TDB_DATA data; + int32_t cstatus; + + data.dptr = (uint8_t*)&pid; + data.dsize = sizeof(pid); + + status = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS, 0, + data, NULL, NULL, &cstatus); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, (__location__ " ctdb_control for process_exists " + "failed\n")); + return False; + } + + return cstatus == 0; +} + +/* + * Get a db path + */ +char *ctdbd_dbpath(struct ctdbd_connection *conn, + TALLOC_CTX *mem_ctx, uint32_t db_id) +{ + NTSTATUS status; + TDB_DATA data; + int32_t cstatus; + + data.dptr = (uint8_t*)&db_id; + data.dsize = sizeof(db_id); + + status = ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_GETDBPATH, 0, data, + mem_ctx, &data, &cstatus); + if (!NT_STATUS_IS_OK(status) || cstatus != 0) { + DEBUG(0,(__location__ " ctdb_control for getdbpath failed\n")); + return NULL; + } + + return (char *)data.dptr; +} + +/* + * attach to a ctdb database + */ +NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn, + const char *name, uint32_t *db_id, int tdb_flags) +{ + NTSTATUS status; + TDB_DATA data; + int32_t cstatus; + + data.dptr = (uint8_t*)name; + data.dsize = strlen(name)+1; + + status = ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_DB_ATTACH, 0, data, + NULL, &data, &cstatus); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, (__location__ " ctdb_control for db_attach " + "failed: %s\n", nt_errstr(status))); + return status; + } + + if (cstatus != 0 || data.dsize != sizeof(uint32_t)) { + DEBUG(0,(__location__ " ctdb_control for db_attach failed\n")); + return NT_STATUS_INTERNAL_ERROR; + } + + *db_id = *(uint32_t *)data.dptr; + talloc_free(data.dptr); + + if (!(tdb_flags & TDB_SEQNUM)) { + return NT_STATUS_OK; + } + + data.dptr = (uint8_t *)db_id; + data.dsize = sizeof(*db_id); + + status = ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_ENABLE_SEQNUM, 0, data, + NULL, NULL, &cstatus); + if (!NT_STATUS_IS_OK(status) || cstatus != 0) { + DEBUG(0,(__location__ " ctdb_control for enable seqnum " + "failed\n")); + return NT_STATUS_IS_OK(status) ? NT_STATUS_INTERNAL_ERROR : + status; + } + + return NT_STATUS_OK; +} + +/* + * force the migration of a record to this node + */ +NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32 db_id, + TDB_DATA key) +{ + struct ctdb_req_call req; + struct ctdb_reply_call *reply; + NTSTATUS status; + + ZERO_STRUCT(req); + + req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize; + req.hdr.ctdb_magic = CTDB_MAGIC; + req.hdr.ctdb_version = CTDB_VERSION; + req.hdr.operation = CTDB_REQ_CALL; + req.hdr.reqid = ++conn->reqid; + req.flags = CTDB_IMMEDIATE_MIGRATION; + req.callid = CTDB_NULL_FUNC; + req.db_id = db_id; + req.keylen = key.dsize; + + status = packet_send( + conn->pkt, 2, + data_blob_const(&req, offsetof(struct ctdb_req_call, data)), + key); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); + return status; + } + + status = packet_flush(conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); + cluster_fatal("cluster dispatch daemon control write error\n"); + } + + status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status))); + goto fail; + } + + if (reply->hdr.operation != CTDB_REPLY_CALL) { + DEBUG(0, ("received invalid reply\n")); + status = NT_STATUS_INTERNAL_ERROR; + goto fail; + } + + status = NT_STATUS_OK; + fail: + + TALLOC_FREE(reply); + return status; +} + +/* + * remotely fetch a record without locking it or forcing a migration + */ +NTSTATUS ctdbd_fetch(struct ctdbd_connection *conn, uint32 db_id, + TDB_DATA key, TALLOC_CTX *mem_ctx, TDB_DATA *data) +{ + struct ctdb_req_call req; + struct ctdb_reply_call *reply; + NTSTATUS status; + + ZERO_STRUCT(req); + + req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize; + req.hdr.ctdb_magic = CTDB_MAGIC; + req.hdr.ctdb_version = CTDB_VERSION; + req.hdr.operation = CTDB_REQ_CALL; + req.hdr.reqid = ++conn->reqid; + req.flags = 0; + req.callid = CTDB_FETCH_FUNC; + req.db_id = db_id; + req.keylen = key.dsize; + + status = packet_send( + conn->pkt, 2, + data_blob_const(&req, offsetof(struct ctdb_req_call, data)), + key); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status))); + return status; + } + + status = packet_flush(conn->pkt); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status))); + cluster_fatal("cluster dispatch daemon control write error\n"); + } + + status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status))); + goto fail; + } + + if (reply->hdr.operation != CTDB_REPLY_CALL) { + DEBUG(0, ("received invalid reply\n")); + status = NT_STATUS_INTERNAL_ERROR; + goto fail; + } + + data->dsize = reply->datalen; + if (data->dsize == 0) { + data->dptr = NULL; + goto done; + } + + data->dptr = (uint8 *)talloc_memdup(mem_ctx, &reply->data[0], + reply->datalen); + if (data->dptr == NULL) { + DEBUG(0, ("talloc failed\n")); + status = NT_STATUS_NO_MEMORY; + goto fail; + } + + done: + status = NT_STATUS_OK; + fail: + TALLOC_FREE(reply); + return status; +} + +struct ctdbd_traverse_state { + void (*fn)(TDB_DATA key, TDB_DATA data, void *private_data); + void *private_data; +}; + +/* + * Handle a traverse record coming in on the ctdbd connection + */ + +static NTSTATUS ctdb_traverse_handler(const struct data_blob *blob, + void *private_data) +{ + struct ctdbd_traverse_state *state = + (struct ctdbd_traverse_state *)private_data; + + struct ctdb_req_message *m; + struct ctdb_rec_data *d; + TDB_DATA key, data; + + m = (struct ctdb_req_message *)blob->data; + + if (blob->length < sizeof(*m) || m->hdr.length != blob->length) { + DEBUG(0, ("Got invalid message of length %d\n", + (int)blob->length)); + return NT_STATUS_UNEXPECTED_IO_ERROR; + } + + d = (struct ctdb_rec_data *)&m->data[0]; + if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) { + DEBUG(0, ("Got invalid traverse data of length %d\n", + (int)m->datalen)); + return NT_STATUS_UNEXPECTED_IO_ERROR; + } + + key.dsize = d->keylen; + key.dptr = &d->data[0]; + data.dsize = d->datalen; + data.dptr = &d->data[d->keylen]; + + if (key.dsize == 0 && data.dsize == 0) { + /* end of traverse */ + return NT_STATUS_END_OF_FILE; + } + + if (data.dsize < sizeof(struct ctdb_ltdb_header)) { + DEBUG(0, ("Got invalid ltdb header length %d\n", + (int)data.dsize)); + return NT_STATUS_UNEXPECTED_IO_ERROR; + } + data.dsize -= sizeof(struct ctdb_ltdb_header); + data.dptr += sizeof(struct ctdb_ltdb_header); + + if (state->fn) { + state->fn(key, data, state->private_data); + } + + return NT_STATUS_OK; +} + +/* + Traverse a ctdb database. This uses a kind-of hackish way to open a second + connection to ctdbd to avoid the hairy recursive and async problems with + everything in-line. +*/ + +NTSTATUS ctdbd_traverse(uint32 db_id, + void (*fn)(TDB_DATA key, TDB_DATA data, + void *private_data), + void *private_data) +{ + struct ctdbd_connection *conn; + NTSTATUS status; + + TDB_DATA data; + struct ctdb_traverse_start t; + int cstatus; + struct ctdbd_traverse_state state; + + status = ctdbd_init_connection(NULL, &conn); + + t.db_id = db_id; + t.srvid = conn->rand_srvid; + t.reqid = ++conn->reqid; + + data.dptr = (uint8_t *)&t; + data.dsize = sizeof(t); + + status = ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid, + data, NULL, NULL, &cstatus); + + if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) { + + DEBUG(0,("ctdbd_control failed: %s, %d\n", nt_errstr(status), + cstatus)); + + if (NT_STATUS_IS_OK(status)) { + /* + * We need a mapping here + */ + status = NT_STATUS_UNSUCCESSFUL; + } + goto done; + } + + state.fn = fn; + state.private_data = private_data; + + while (True) { + + status = NT_STATUS_OK; + + if (packet_handler(conn->pkt, ctdb_req_complete, + ctdb_traverse_handler, &state, &status)) { + + if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) { + status = NT_STATUS_OK; + break; + } + + /* + * There might be more in the queue + */ + continue; + } + + if (!NT_STATUS_IS_OK(status)) { + break; + } + + status = packet_fd_read_sync(conn->pkt); + + if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) { + status = NT_STATUS_OK; + } + + if (!NT_STATUS_IS_OK(status)) { + cluster_fatal("ctdbd died\n"); + } + } + + done: + TALLOC_FREE(conn); + return status; +} + +/* + * Register us as a server for a particular tcp connection + */ + +NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn, + const struct sockaddr_in *server, + const struct sockaddr_in *client, + void (*release_ip_handler)(const char *ip_addr, + void *private_data), + void *private_data) +{ + struct ctdb_control_tcp p; + TDB_DATA data; + NTSTATUS status; + + /* + * Only one connection so far + */ + SMB_ASSERT(conn->release_ip_handler == NULL); + + /* + * We want to be told about IP releases + */ + + status = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP); + if (!NT_STATUS_IS_OK(status)) { + return status; + } + + p.dest = *server; + p.src = *client; + + /* + * inform ctdb of our tcp connection, so if IP takeover happens ctdb + * can send an extra ack to trigger a reset for our client, so it + * immediately reconnects + */ + data.dptr = (uint8_t *)&p; + data.dsize = sizeof(p); + + return ctdbd_control(conn, CTDB_CURRENT_NODE, + CTDB_CONTROL_TCP_CLIENT, + CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL, NULL); +} + +/* + * We want to handle reconfigure events + */ +NTSTATUS ctdbd_register_reconfigure(struct ctdbd_connection *conn) +{ + return register_with_ctdbd(conn, CTDB_SRVID_RECONFIGURE); +} + +#else + +NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx, + struct ctdbd_connection **pconn) +{ + return NT_STATUS_NOT_IMPLEMENTED; +} + +#endif diff --git a/source3/lib/dbwrap.c b/source3/lib/dbwrap.c index 9f74a9eb48..c06cd4bb16 100644 --- a/source3/lib/dbwrap.c +++ b/source3/lib/dbwrap.c @@ -48,6 +48,32 @@ struct db_context *db_open(TALLOC_CTX *mem_ctx, { struct db_context *result = NULL; +#ifdef CLUSTER_SUPPORT + + if (lp_clustering()) { + const char *partname; + /* ctdb only wants the file part of the name */ + partname = strrchr(name, '/'); + if (partname) { + partname++; + } else { + partname = name; + } + /* allow ctdb for individual databases to be disabled */ + if (lp_parm_bool(-1, "ctdb", partname, True)) { + result = db_open_ctdb(mem_ctx, partname, hash_size, + tdb_flags, open_flags, mode); + if (result == NULL) { + DEBUG(0,("failed to attach to ctdb %s\n", + partname)); + smb_panic("failed to attach to a ctdb " + "database"); + } + } + } + +#endif + if (result == NULL) { result = db_open_tdb(mem_ctx, name, hash_size, tdb_flags, open_flags, mode); diff --git a/source3/lib/dbwrap_ctdb.c b/source3/lib/dbwrap_ctdb.c new file mode 100644 index 0000000000..124485e539 --- /dev/null +++ b/source3/lib/dbwrap_ctdb.c @@ -0,0 +1,451 @@ +/* + Unix SMB/CIFS implementation. + Database interface wrapper around ctdbd + Copyright (C) Volker Lendecke 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + +#include "includes.h" + +#ifdef CLUSTER_SUPPORT + +#include "ctdb.h" +#include "ctdb_private.h" + +struct db_ctdb_ctx { + struct tdb_wrap *wtdb; + uint32 db_id; + struct ctdbd_connection *conn; +}; + +struct db_ctdb_rec { + struct db_ctdb_ctx *ctdb_ctx; + struct ctdb_ltdb_header header; +}; + +static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx); + +static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag) +{ + struct db_ctdb_rec *crec = talloc_get_type_abort( + rec->private_data, struct db_ctdb_rec); + TDB_DATA cdata; + int ret; + + cdata.dsize = sizeof(crec->header) + data.dsize; + + if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) { + return NT_STATUS_NO_MEMORY; + } + + memcpy(cdata.dptr, &crec->header, sizeof(crec->header)); + memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize); + + ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE); + + SAFE_FREE(cdata.dptr); + + return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION; +} + +static NTSTATUS db_ctdb_delete(struct db_record *rec) +{ + struct db_ctdb_rec *crec = talloc_get_type_abort( + rec->private_data, struct db_ctdb_rec); + TDB_DATA data; + int ret; + + /* + * We have to store the header with empty data. TODO: Fix the + * tdb-level cleanup + */ + + data.dptr = (uint8 *)&crec->header; + data.dsize = sizeof(crec->header); + + ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, data, TDB_REPLACE); + + return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION; +} + +static int db_ctdb_record_destr(struct db_record* data) +{ + struct db_ctdb_rec *crec = talloc_get_type_abort( + data->private_data, struct db_ctdb_rec); + + DEBUG(10, ("Unlocking key %s\n", + hex_encode(data, (unsigned char *)data->key.dptr, + data->key.dsize))); + + if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) { + DEBUG(0, ("tdb_chainunlock failed\n")); + return -1; + } + + return 0; +} + +static struct db_record *db_ctdb_fetch_locked(struct db_context *db, + TALLOC_CTX *mem_ctx, + TDB_DATA key) +{ + struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data, + struct db_ctdb_ctx); + struct db_record *result; + struct db_ctdb_rec *crec; + NTSTATUS status; + TDB_DATA ctdb_data; + + if (!(result = talloc(mem_ctx, struct db_record))) { + DEBUG(0, ("talloc failed\n")); + return NULL; + } + + if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) { + DEBUG(0, ("talloc failed\n")); + TALLOC_FREE(result); + return NULL; + } + + result->private_data = (void *)crec; + crec->ctdb_ctx = ctx; + + result->key.dsize = key.dsize; + result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize); + if (result->key.dptr == NULL) { + DEBUG(0, ("talloc failed\n")); + TALLOC_FREE(result); + return NULL; + } + + /* + * Do a blocking lock on the record + */ +again: + + DEBUG(10, ("Locking key %s\n", + hex_encode(result, (unsigned char *)key.dptr, + key.dsize))); + + if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) { + DEBUG(3, ("tdb_chainlock failed\n")); + TALLOC_FREE(result); + return NULL; + } + + result->store = db_ctdb_store; + result->delete_rec = db_ctdb_delete; + talloc_set_destructor(result, db_ctdb_record_destr); + + ctdb_data = tdb_fetch(ctx->wtdb->tdb, key); + + /* + * See if we have a valid record and we are the dmaster. If so, we can + * take the shortcut and just return it. + */ + + if ((ctdb_data.dptr == NULL) || + (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) || + ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn() +#if 0 + || (random() % 2 != 0) +#endif +) { + SAFE_FREE(ctdb_data.dptr); + tdb_chainunlock(ctx->wtdb->tdb, key); + talloc_set_destructor(result, NULL); + + DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n", + ctdb_data.dptr, ctdb_data.dptr ? + ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1, + get_my_vnn())); + + status = ctdbd_migrate(db_ctdbd_conn(ctx), ctx->db_id, key); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(5, ("ctdb_migrate failed: %s\n", + nt_errstr(status))); + TALLOC_FREE(result); + return NULL; + } + /* now its migrated, try again */ + goto again; + } + + memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header)); + + result->value.dsize = ctdb_data.dsize - sizeof(crec->header); + result->value.dptr = NULL; + + if ((result->value.dsize != 0) + && !(result->value.dptr = (uint8 *)talloc_memdup( + result, ctdb_data.dptr + sizeof(crec->header), + result->value.dsize))) { + DEBUG(0, ("talloc failed\n")); + TALLOC_FREE(result); + } + + SAFE_FREE(ctdb_data.dptr); + + return result; +} + +/* + fetch (unlocked, no migration) operation on ctdb + */ +static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) +{ + struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data, + struct db_ctdb_ctx); + NTSTATUS status; + TDB_DATA ctdb_data; + + /* try a direct fetch */ + ctdb_data = tdb_fetch(ctx->wtdb->tdb, key); + + /* + * See if we have a valid record and we are the dmaster. If so, we can + * take the shortcut and just return it. + */ + if ((ctdb_data.dptr != NULL) && + (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) && + ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn()) { + /* we are the dmaster - avoid the ctdb protocol op */ + + data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header); + if (data->dsize == 0) { + SAFE_FREE(ctdb_data.dptr); + data->dptr = NULL; + return 0; + } + + data->dptr = (uint8 *)talloc_memdup( + mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header), + data->dsize); + + SAFE_FREE(ctdb_data.dptr); + + if (data->dptr == NULL) { + return -1; + } + return 0; + } + + SAFE_FREE(ctdb_data.dptr); + + /* we weren't able to get it locally - ask ctdb to fetch it for us */ + status = ctdbd_fetch(db_ctdbd_conn(ctx), ctx->db_id, key, mem_ctx, + data); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status))); + return -1; + } + + return 0; +} + +struct traverse_state { + struct db_context *db; + int (*fn)(struct db_record *rec, void *private_data); + void *private_data; +}; + +static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data) +{ + struct traverse_state *state = (struct traverse_state *)private_data; + struct db_record *rec; + TALLOC_CTX *tmp_ctx = talloc_new(state->db); + /* we have to give them a locked record to prevent races */ + rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key); + if (rec && rec->value.dsize > 0) { + state->fn(rec, state->private_data); + } + talloc_free(tmp_ctx); +} + +static int db_ctdb_traverse(struct db_context *db, + int (*fn)(struct db_record *rec, + void *private_data), + void *private_data) +{ + struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data, + struct db_ctdb_ctx); + struct traverse_state state; + + state.db = db; + state.fn = fn; + state.private_data = private_data; + + ctdbd_traverse(ctx->db_id, traverse_callback, &state); + return 0; +} + +static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag) +{ + return NT_STATUS_MEDIA_WRITE_PROTECTED; +} + +static NTSTATUS db_ctdb_delete_deny(struct db_record *rec) +{ + return NT_STATUS_MEDIA_WRITE_PROTECTED; +} + +static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data) +{ + struct traverse_state *state = (struct traverse_state *)private_data; + struct db_record rec; + rec.key = key; + rec.value = data; + rec.store = db_ctdb_store_deny; + rec.delete_rec = db_ctdb_delete_deny; + rec.private_data = state->db; + state->fn(&rec, state->private_data); +} + +static int db_ctdb_traverse_read(struct db_context *db, + int (*fn)(struct db_record *rec, + void *private_data), + void *private_data) +{ + struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data, + struct db_ctdb_ctx); + struct traverse_state state; + + state.db = db; + state.fn = fn; + state.private_data = private_data; + + ctdbd_traverse(ctx->db_id, traverse_read_callback, &state); + return 0; +} + +static int db_ctdb_get_seqnum(struct db_context *db) +{ + struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data, + struct db_ctdb_ctx); + return tdb_get_seqnum(ctx->wtdb->tdb); +} + +/* + * Get the ctdbd connection for a database. If possible, re-use the messaging + * ctdbd connection + */ +static struct ctdbd_connection *db_ctdbd_conn(struct db_ctdb_ctx *ctx) +{ + struct ctdbd_connection *result; + + result = messaging_ctdbd_connection(); + + if (result != NULL) { + + if (ctx->conn == NULL) { + /* + * Someone has initialized messaging since we + * initialized our own connection, we don't need it + * anymore. + */ + TALLOC_FREE(ctx->conn); + } + + return result; + } + + if (ctx->conn == NULL) { + ctdbd_init_connection(ctx, &ctx->conn); + set_my_vnn(ctdbd_vnn(ctx->conn)); + } + + return ctx->conn; +} + +struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx, + const char *name, + int hash_size, int tdb_flags, + int open_flags, mode_t mode) +{ + struct db_context *result; + struct db_ctdb_ctx *db_ctdb; + char *db_path; + NTSTATUS status; + + if (!lp_clustering()) { + DEBUG(10, ("Clustering disabled -- no ctdb\n")); + return NULL; + } + + if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) { + DEBUG(0, ("talloc failed\n")); + TALLOC_FREE(result); + return NULL; + } + + if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) { + DEBUG(0, ("talloc failed\n")); + TALLOC_FREE(result); + return NULL; + } + + db_ctdb->conn = NULL; + + status = ctdbd_db_attach(db_ctdbd_conn(db_ctdb), name, + &db_ctdb->db_id, tdb_flags); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("ctdbd_db_attach failed for %s: %s\n", name, + nt_errstr(status))); + TALLOC_FREE(result); + return NULL; + } + + db_path = ctdbd_dbpath(db_ctdbd_conn(db_ctdb), db_ctdb, + db_ctdb->db_id); + + /* only pass through specific flags */ + tdb_flags &= TDB_SEQNUM; + + db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0); + if (db_ctdb->wtdb == NULL) { + DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno))); + TALLOC_FREE(result); + return NULL; + } + talloc_free(db_path); + + result->private_data = (void *)db_ctdb; + result->fetch_locked = db_ctdb_fetch_locked; + result->fetch = db_ctdb_fetch; + result->traverse = db_ctdb_traverse; + result->traverse_read = db_ctdb_traverse_read; + result->get_seqnum = db_ctdb_get_seqnum; + + DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n", + name, db_ctdb->db_id)); + + return result; +} + +#else + +struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx, + const char *name, + int hash_size, int tdb_flags, + int open_flags, mode_t mode) +{ + DEBUG(0, ("no clustering compiled in\n")); + return NULL; +} + +#endif diff --git a/source3/lib/messages.c b/source3/lib/messages.c index b796e1472c..54657d8d56 100644 --- a/source3/lib/messages.c +++ b/source3/lib/messages.c @@ -205,6 +205,19 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, return NULL; } +#ifdef CLUSTER_SUPPORT + if (lp_clustering()) { + status = messaging_ctdbd_init(ctx, ctx, &ctx->remote); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("messaging_ctdb_init failed: %s\n", + nt_errstr(status))); + TALLOC_FREE(ctx); + return NULL; + } + } +#endif + messaging_register(ctx, NULL, MSG_PING, ping_message); /* Register some debugging related messages */ @@ -217,6 +230,34 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, } /* + * re-init after a fork + */ +NTSTATUS messaging_reinit(struct messaging_context *msg_ctx) +{ +#ifdef CLUSTER_SUPPORT + + TALLOC_FREE(msg_ctx->remote); + + if (lp_clustering()) { + NTSTATUS status; + + status = messaging_ctdbd_init(msg_ctx, msg_ctx, + &msg_ctx->remote); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("messaging_ctdb_init failed: %s\n", + nt_errstr(status))); + return status; + } + } + +#endif + + return NT_STATUS_OK; +} + + +/* * Register a dispatch function for a particular message type. Allow multiple * registrants */ diff --git a/source3/lib/messages_ctdbd.c b/source3/lib/messages_ctdbd.c new file mode 100644 index 0000000000..4ac9ea9904 --- /dev/null +++ b/source3/lib/messages_ctdbd.c @@ -0,0 +1,119 @@ +/* + Unix SMB/CIFS implementation. + Samba internal messaging functions + Copyright (C) 2007 by Volker Lendecke + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + +#include "includes.h" + +#ifdef CLUSTER_SUPPORT + +#include "librpc/gen_ndr/messaging.h" + +struct messaging_ctdbd_context { + struct ctdbd_connection *conn; +}; + +/* + * This is a Samba3 hack/optimization. Routines like process_exists need to + * talk to ctdbd, and they don't get handed a messaging context. + */ +struct ctdbd_connection *global_ctdbd_connection; + +struct ctdbd_connection *messaging_ctdbd_connection(void) +{ + return global_ctdbd_connection; +} + +static NTSTATUS messaging_ctdb_send(struct messaging_context *msg_ctx, + struct server_id pid, int msg_type, + const DATA_BLOB *data, + struct messaging_backend *backend) +{ + struct messaging_ctdbd_context *ctx = talloc_get_type_abort( + backend->private_data, struct messaging_ctdbd_context); + + struct messaging_rec msg; + + msg.msg_version = MESSAGE_VERSION; + msg.msg_type = msg_type; + msg.dest = pid; + msg.src = procid_self(); + msg.buf = *data; + + return ctdbd_messaging_send(ctx->conn, pid.vnn, pid.pid, &msg); +} + +static int messaging_ctdbd_destructor(struct messaging_ctdbd_context *ctx) +{ + /* + * The global connection just went away + */ + global_ctdbd_connection = NULL; + return 0; +} + +NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx, + TALLOC_CTX *mem_ctx, + struct messaging_backend **presult) +{ + struct messaging_backend *result; + struct messaging_ctdbd_context *ctx; + NTSTATUS status; + + if (!(result = TALLOC_P(mem_ctx, struct messaging_backend))) { + DEBUG(0, ("talloc failed\n")); + return NT_STATUS_NO_MEMORY; + } + + if (!(ctx = TALLOC_P(result, struct messaging_ctdbd_context))) { + DEBUG(0, ("talloc failed\n")); + TALLOC_FREE(result); + return NT_STATUS_NO_MEMORY; + } + + status = ctdbd_messaging_connection(ctx, &ctx->conn); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("ctdbd_init_connection failed: %s\n", + nt_errstr(status))); + TALLOC_FREE(result); + return status; + } + + global_ctdbd_connection = ctx->conn; + talloc_set_destructor(ctx, messaging_ctdbd_destructor); + + set_my_vnn(ctdbd_vnn(ctx->conn)); + + result->send_fn = messaging_ctdb_send; + result->private_data = (void *)ctx; + + *presult = result; + return NT_STATUS_OK; +} + +#else + +NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx, + TALLOC_CTX *mem_ctx, + struct messaging_backend **presult) +{ + return NT_STATUS_NOT_IMPLEMENTED; +} + +#endif diff --git a/source3/lib/packet.c b/source3/lib/packet.c new file mode 100644 index 0000000000..2473a771e0 --- /dev/null +++ b/source3/lib/packet.c @@ -0,0 +1,256 @@ +/* + Unix SMB/CIFS implementation. + Packet handling + Copyright (C) Volker Lendecke 2007 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + +#include "includes.h" + +struct packet_context { + int fd; + struct data_blob in, out; +}; + +/* + * Close the underlying fd + */ +static int packet_context_destructor(struct packet_context *ctx) +{ + return close(ctx->fd); +} + +/* + * Initialize a packet context. The fd is given to the packet context, meaning + * that it is automatically closed when the packet context is freed. + */ +struct packet_context *packet_init(TALLOC_CTX *mem_ctx, int fd) +{ + struct packet_context *result; + + if (!(result = TALLOC_ZERO_P(mem_ctx, struct packet_context))) { + return NULL; + } + + result->fd = fd; + talloc_set_destructor(result, packet_context_destructor); + return result; +} + +/* + * Pull data from the fd + */ +NTSTATUS packet_fd_read(struct packet_context *ctx) +{ + int res, available; + size_t new_size; + uint8 *in; + + res = ioctl(ctx->fd, FIONREAD, &available); + + if (res == -1) { + DEBUG(10, ("ioctl(FIONREAD) failed: %s\n", strerror(errno))); + return map_nt_error_from_unix(errno); + } + + SMB_ASSERT(available >= 0); + + if (available == 0) { + return NT_STATUS_END_OF_FILE; + } + + new_size = ctx->in.length + available; + + if (new_size < ctx->in.length) { + DEBUG(0, ("integer wrap\n")); + return NT_STATUS_NO_MEMORY; + } + + if (!(in = TALLOC_REALLOC_ARRAY(ctx, ctx->in.data, uint8, new_size))) { + DEBUG(10, ("talloc failed\n")); + return NT_STATUS_NO_MEMORY; + } + + res = recv(ctx->fd, in + ctx->in.length, available, 0); + + if (res < 0) { + DEBUG(10, ("recv failed: %s\n", strerror(errno))); + return map_nt_error_from_unix(errno); + } + + if (res == 0) { + return NT_STATUS_END_OF_FILE; + } + + ctx->in.data = in; + ctx->in.length += available; + + return NT_STATUS_OK; +} + +NTSTATUS packet_fd_read_sync(struct packet_context *ctx) +{ + int res; + fd_set r_fds; + + FD_ZERO(&r_fds); + FD_SET(ctx->fd, &r_fds); + + res = sys_select(ctx->fd+1, &r_fds, NULL, NULL, NULL); + + if (res == -1) { + DEBUG(10, ("select returned %s\n", strerror(errno))); + return map_nt_error_from_unix(errno); + } + + return packet_fd_read(ctx); +} + +BOOL packet_handler(struct packet_context *ctx, + BOOL (*full_req)(const struct data_blob *data, + size_t *length, + void *private_data), + NTSTATUS (*callback)(const struct data_blob *data, + void *private_data), + void *private_data, + NTSTATUS *status) +{ + size_t length; + struct data_blob data; + + if (!full_req(&ctx->in, &length, private_data)) { + return False; + } + + SMB_ASSERT(length <= ctx->in.length); + + data.data = ctx->in.data; + data.length = length; + + *status = callback(&data, private_data); + + memmove(ctx->in.data, ctx->in.data + length, + ctx->in.length - length); + ctx->in.length -= length; + + return True; +} + +/* + * How many bytes of outgoing data do we have pending? + */ +size_t packet_outgoing_bytes(struct packet_context *ctx) +{ + return ctx->out.length; +} + +/* + * Push data to the fd + */ +NTSTATUS packet_fd_write(struct packet_context *ctx) +{ + ssize_t sent; + + sent = send(ctx->fd, ctx->out.data, ctx->out.length, 0); + + if (sent == -1) { + DEBUG(0, ("send failed: %s\n", strerror(errno))); + return map_nt_error_from_unix(errno); + } + + memmove(ctx->out.data, ctx->out.data + sent, + ctx->out.length - sent); + ctx->out.length -= sent; + + return NT_STATUS_OK; +} + +/* + * Sync flush all outgoing bytes + */ +NTSTATUS packet_flush(struct packet_context *ctx) +{ + while (ctx->out.length != 0) { + NTSTATUS status = packet_fd_write(ctx); + if (!NT_STATUS_IS_OK(status)) { + return status; + } + } + return NT_STATUS_OK; +} + +/* + * Send a list of DATA_BLOBs + * + * Example: packet_send(ctx, 2, data_blob_const(&size, sizeof(size)), + * data_blob_const(buf, size)); + */ +NTSTATUS packet_send(struct packet_context *ctx, int num_blobs, ...) +{ + va_list ap; + int i; + size_t len; + uint8 *out; + + len = ctx->out.length; + + va_start(ap, num_blobs); + for (i=0; i<num_blobs; i++) { + size_t tmp; + struct data_blob blob = va_arg(ap, struct data_blob); + + tmp = len + blob.length; + if (tmp < len) { + DEBUG(0, ("integer overflow\n")); + va_end(ap); + return NT_STATUS_NO_MEMORY; + } + len = tmp; + } + va_end(ap); + + if (len == 0) { + return NT_STATUS_OK; + } + + if (!(out = TALLOC_REALLOC_ARRAY(ctx, ctx->out.data, uint8, len))) { + DEBUG(0, ("talloc failed\n")); + return NT_STATUS_NO_MEMORY; + } + + ctx->out.data = out; + + va_start(ap, num_blobs); + for (i=0; i<num_blobs; i++) { + struct data_blob blob = va_arg(ap, struct data_blob); + + memcpy(ctx->out.data+ctx->out.length, blob.data, blob.length); + ctx->out.length += blob.length; + } + va_end(ap); + + SMB_ASSERT(ctx->out.length == len); + return NT_STATUS_OK; +} + +/* + * Get the packet context's file descriptor + */ +int packet_get_fd(struct packet_context *ctx) +{ + return ctx->fd; +} + diff --git a/source3/lib/substitute.c b/source3/lib/substitute.c index 25a6a2c4c8..708c184475 100644 --- a/source3/lib/substitute.c +++ b/source3/lib/substitute.c @@ -453,7 +453,7 @@ char *alloc_sub_basic(const char *smb_name, const char *domain_name, const char *str) { char *b, *p, *s, *r, *a_string; - fstring pidstr; + fstring pidstr, vnnstr; struct passwd *pass; const char *local_machine_name = get_local_machine_name(); @@ -552,6 +552,10 @@ char *alloc_sub_basic(const char *smb_name, const char *domain_name, case '(': a_string = realloc_expand_longvar( a_string, p ); break; + case 'V' : + slprintf(vnnstr,sizeof(vnnstr)-1, "%u", get_my_vnn()); + a_string = realloc_string_sub(a_string, "%V", vnnstr); + break; default: break; } diff --git a/source3/lib/util.c b/source3/lib/util.c index 36396d9f83..3d72eb5d29 100644 --- a/source3/lib/util.c +++ b/source3/lib/util.c @@ -1532,20 +1532,24 @@ BOOL process_exists(const struct server_id pid) return True; } - if (!procid_is_local(&pid)) { - /* This *SEVERELY* needs fixing. */ - return True; + if (procid_is_local(&pid)) { + return (kill(pid.pid,0) == 0 || errno != ESRCH); } - /* Doing kill with a non-positive pid causes messages to be - * sent to places we don't want. */ - SMB_ASSERT(pid.pid > 0); - return(kill(pid.pid,0) == 0 || errno != ESRCH); +#ifdef CLUSTER_SUPPORT + return ctdbd_process_exists(messaging_ctdbd_connection(), pid.vnn, + pid.pid); +#else + return False; +#endif } BOOL process_exists_by_pid(pid_t pid) { - return process_exists(pid_to_procid(pid)); + /* Doing kill with a non-positive pid causes messages to be + * sent to places we don't want. */ + SMB_ASSERT(pid > 0); + return(kill(pid,0) == 0 || errno != ESRCH); } /******************************************************************* @@ -3065,10 +3069,26 @@ pid_t procid_to_pid(const struct server_id *proc) return proc->pid; } +static uint32 my_vnn = NONCLUSTER_VNN; + +void set_my_vnn(uint32 vnn) +{ + DEBUG(10, ("vnn pid %d = %u\n", (int)sys_getpid(), (unsigned int)vnn)); + my_vnn = vnn; +} + +uint32 get_my_vnn(void) +{ + return my_vnn; +} + struct server_id pid_to_procid(pid_t pid) { struct server_id result; result.pid = pid; +#ifdef CLUSTER_SUPPORT + result.vnn = my_vnn; +#endif return result; } @@ -3084,7 +3104,13 @@ struct server_id server_id_self(void) BOOL procid_equal(const struct server_id *p1, const struct server_id *p2) { - return (p1->pid == p2->pid); + if (p1->pid != p2->pid) + return False; +#ifdef CLUSTER_SUPPORT + if (p1->vnn != p2->vnn) + return False; +#endif + return True; } BOOL cluster_id_equal(const struct server_id *id1, @@ -3095,18 +3121,47 @@ BOOL cluster_id_equal(const struct server_id *id1, BOOL procid_is_me(const struct server_id *pid) { - return (pid->pid == sys_getpid()); + if (pid->pid != sys_getpid()) + return False; +#ifdef CLUSTER_SUPPORT + if (pid->vnn != my_vnn) + return False; +#endif + return True; } struct server_id interpret_pid(const char *pid_string) { +#ifdef CLUSTER_SUPPORT + unsigned int vnn, pid; + struct server_id result; + if (sscanf(pid_string, "%u:%u", &vnn, &pid) == 2) { + result.vnn = vnn; + result.pid = pid; + } + else { + result.vnn = NONCLUSTER_VNN; + result.pid = -1; + } + return result; +#else return pid_to_procid(atoi(pid_string)); +#endif } char *procid_str_static(const struct server_id *pid) { static fstring str; - fstr_sprintf(str, "%d", pid->pid); +#ifdef CLUSTER_SUPPORT + if (pid->vnn == NONCLUSTER_VNN) { + fstr_sprintf(str, "%d", (int)pid->pid); + } + else { + fstr_sprintf(str, "%u:%d", (unsigned)pid->vnn, (int)pid->pid); + } +#else + fstr_sprintf(str, "%d", (int)pid->pid); +#endif return str; } @@ -3122,7 +3177,11 @@ BOOL procid_valid(const struct server_id *pid) BOOL procid_is_local(const struct server_id *pid) { +#ifdef CLUSTER_SUPPORT + return pid->vnn == my_vnn; +#else return True; +#endif } int this_is_smp(void) |