diff options
author | Volker Lendecke <vl@samba.org> | 2012-02-15 13:56:23 +0100 |
---|---|---|
committer | Volker Lendecke <vl@samba.org> | 2012-04-17 10:21:01 +0200 |
commit | 74baae782f09fd8f22af133b820680e37049f5b0 (patch) | |
tree | 6488f9e8260fe17c69cbdee600442d45c300869d | |
parent | 41712599e0f33096ccc81e7cd520e66dcf91a424 (diff) | |
download | samba-74baae782f09fd8f22af133b820680e37049f5b0.tar.gz samba-74baae782f09fd8f22af133b820680e37049f5b0.tar.bz2 samba-74baae782f09fd8f22af133b820680e37049f5b0.zip |
s3: Add msg_channel
This is a tevent_based variant of messaging_register
-rw-r--r-- | source3/Makefile.in | 2 | ||||
-rw-r--r-- | source3/lib/msg_channel.c | 369 | ||||
-rw-r--r-- | source3/lib/msg_channel.h | 45 | ||||
-rw-r--r-- | source3/torture/proto.h | 1 | ||||
-rw-r--r-- | source3/torture/test_msg.c | 131 | ||||
-rw-r--r-- | source3/torture/torture.c | 1 | ||||
-rwxr-xr-x | source3/wscript_build | 2 |
7 files changed, 551 insertions, 0 deletions
diff --git a/source3/Makefile.in b/source3/Makefile.in index c8fb256de7..b3137427d4 100644 --- a/source3/Makefile.in +++ b/source3/Makefile.in @@ -434,6 +434,7 @@ LIB_OBJ = $(LIBSAMBAUTIL_OBJ) $(UTIL_OBJ) $(CRYPTO_OBJ) $(LIBTSOCKET_OBJ) \ lib/messages.o librpc/gen_ndr/ndr_messaging.o lib/messages_local.o \ lib/messages_ctdbd.o lib/ctdb_packet.o lib/ctdbd_conn.o \ lib/ctdb_conn.o \ + lib/msg_channel.o \ lib/id_cache.o \ ../lib/socket/interfaces.o lib/memcache.o \ lib/talloc_dict.o \ @@ -1275,6 +1276,7 @@ SMBTORTURE_OBJ1 = torture/torture.o torture/nbio.o torture/scanner.o torture/uta torture/test_authinfo_structs.o \ torture/test_cleanup.o \ torture/test_ctdbconn.o \ + torture/test_msg.o \ torture/t_strappend.o SMBTORTURE_OBJ = $(SMBTORTURE_OBJ1) $(PARAM_OBJ) $(TLDAP_OBJ) \ diff --git a/source3/lib/msg_channel.c b/source3/lib/msg_channel.c new file mode 100644 index 0000000000..9a174c0ed6 --- /dev/null +++ b/source3/lib/msg_channel.c @@ -0,0 +1,369 @@ +/* + Unix SMB/CIFS implementation. + Samba3 message channels + Copyright (C) Volker Lendecke 2012 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "msg_channel.h" +#include "ctdb_conn.h" +#include "lib/util/tevent_unix.h" + +struct msg_channel { + struct ctdb_msg_channel *ctdb_channel; + struct messaging_context *msg; + uint32_t msg_type; + + struct tevent_req *pending_req; + struct tevent_context *ev; + + struct messaging_rec **msgs; +}; + +struct msg_channel_init_state { + struct msg_channel *channel; +}; + +static void msg_channel_init_got_ctdb(struct tevent_req *subreq); +static void msg_channel_init_got_msg(struct messaging_context *msg, + void *priv, uint32_t msg_type, + struct server_id server_id, DATA_BLOB *data); +static void msg_channel_trigger(struct tevent_context *ev, + struct tevent_immediate *im, + void *priv); +static int msg_channel_init_destructor(struct msg_channel *s); + +struct tevent_req *msg_channel_init_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct messaging_context *msg, + uint32_t msg_type) +{ + struct tevent_req *req, *subreq; + struct msg_channel_init_state *state; + struct server_id pid; + + req = tevent_req_create(mem_ctx, &state, + struct msg_channel_init_state); + if (req == NULL) { + return NULL; + } + + state->channel = talloc_zero(state, struct msg_channel); + if (tevent_req_nomem(state->channel, req)) { + return tevent_req_post(req, ev); + } + state->channel->msg = msg; + state->channel->msg_type = msg_type; + + pid = messaging_server_id(msg); + subreq = ctdb_msg_channel_init_send(state, ev, lp_ctdbd_socket(), + pid.pid); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, msg_channel_init_got_ctdb, req); + return req; +} + +static void msg_channel_init_got_ctdb(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct msg_channel_init_state *state = tevent_req_data( + req, struct msg_channel_init_state); + struct msg_channel *s = state->channel; + NTSTATUS status; + int ret; + + ret = ctdb_msg_channel_init_recv(subreq, s, &s->ctdb_channel); + TALLOC_FREE(subreq); + + if (ret == ENOSYS) { + s->ctdb_channel = NULL; + ret = 0; + } + + if (tevent_req_error(req, ret)) { + return; + } + status = messaging_register(s->msg, s, s->msg_type, + msg_channel_init_got_msg); + if (!NT_STATUS_IS_OK(status)) { + tevent_req_error(req, map_errno_from_nt_status(status)); + return; + } + talloc_set_destructor(s, msg_channel_init_destructor); + tevent_req_done(req); +} + +static int msg_channel_init_destructor(struct msg_channel *s) +{ + messaging_deregister(s->msg, s->msg_type, s); + return 0; +} + +int msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, + struct msg_channel **pchannel) +{ + struct msg_channel_init_state *state = tevent_req_data( + req, struct msg_channel_init_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + return err; + } + *pchannel = talloc_move(mem_ctx, &state->channel); + return 0; +} + +int msg_channel_init(TALLOC_CTX *mem_ctx, struct messaging_context *msg, + uint32_t msgtype, struct msg_channel **pchannel) +{ + TALLOC_CTX *frame = talloc_stackframe(); + struct tevent_context *ev; + struct tevent_req *req; + int err = ENOMEM; + bool ok; + + ev = tevent_context_init(frame); + if (ev == NULL) { + goto fail; + } + req = msg_channel_init_send(frame, ev, msg, msgtype); + if (req == NULL) { + goto fail; + } + ok = tevent_req_poll(req, ev); + if (!ok) { + err = errno; + goto fail; + } + err = msg_channel_init_recv(req, mem_ctx, pchannel); +fail: + TALLOC_FREE(frame); + return err; +} + +static void msg_channel_init_got_msg(struct messaging_context *msg, + void *priv, uint32_t msg_type, + struct server_id server_id, + DATA_BLOB *data) +{ + struct msg_channel *s = talloc_get_type_abort( + priv, struct msg_channel); + struct messaging_rec *rec; + struct messaging_rec **msgs; + size_t num_msgs; + struct tevent_immediate *im; + + rec = talloc(s, struct messaging_rec); + if (rec == NULL) { + goto fail; + } + rec->msg_version = 1; + rec->msg_type = msg_type; + rec->dest = server_id; + rec->src = messaging_server_id(msg); + rec->buf.data = (uint8_t *)talloc_memdup(rec, data->data, + data->length); + if (rec->buf.data == NULL) { + goto fail; + } + rec->buf.length = data->length; + + num_msgs = talloc_array_length(s->msgs); + msgs = talloc_realloc(s, s->msgs, struct messaging_rec *, num_msgs+1); + if (msgs == NULL) { + goto fail; + } + s->msgs = msgs; + s->msgs[num_msgs] = talloc_move(s->msgs, &rec); + + if (s->pending_req == NULL) { + return; + } + + im = tevent_create_immediate(s->ev); + if (im == NULL) { + goto fail; + } + tevent_schedule_immediate(im, s->ev, msg_channel_trigger, s); + return; +fail: + TALLOC_FREE(rec); +} + +struct msg_read_state { + struct tevent_context *ev; + struct tevent_req *req; + struct msg_channel *channel; + struct messaging_rec *rec; +}; + +static int msg_read_state_destructor(struct msg_read_state *s); +static void msg_read_got_ctdb(struct tevent_req *subreq); + +struct tevent_req *msg_read_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct msg_channel *channel) +{ + struct tevent_req *req; + struct tevent_immediate *im; + struct msg_read_state *state; + void *msg_tdb_event; + size_t num_msgs; + + req = tevent_req_create(mem_ctx, &state, struct msg_read_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->req = req; + state->channel = channel; + + if (channel->pending_req != NULL) { + tevent_req_error(req, EBUSY); + return tevent_req_post(req, ev); + } + channel->pending_req = req; + channel->ev = ev; + talloc_set_destructor(state, msg_read_state_destructor); + + num_msgs = talloc_array_length(channel->msgs); + if (num_msgs != 0) { + im = tevent_create_immediate(channel->ev); + if (tevent_req_nomem(im, req)) { + return tevent_req_post(req, ev); + } + tevent_schedule_immediate(im, channel->ev, msg_channel_trigger, + channel); + return req; + } + + msg_tdb_event = messaging_tdb_event(state, channel->msg, ev); + if (tevent_req_nomem(msg_tdb_event, req)) { + return tevent_req_post(req, ev); + + } + if (channel->ctdb_channel != NULL) { + struct tevent_req *subreq; + + subreq = ctdb_msg_read_send(state, ev, + channel->ctdb_channel); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, msg_read_got_ctdb, req); + } + return req; +} + +static int msg_read_state_destructor(struct msg_read_state *s) +{ + assert(s->channel->pending_req == s->req); + s->channel->pending_req = NULL; + return 0; +} + +static void msg_channel_trigger(struct tevent_context *ev, + struct tevent_immediate *im, + void *priv) +{ + struct msg_channel *channel; + struct tevent_req *req; + struct msg_read_state *state; + size_t num_msgs; + + channel = talloc_get_type_abort(priv, struct msg_channel); + req = channel->pending_req; + state = tevent_req_data(req, struct msg_read_state); + + talloc_set_destructor(state, NULL); + msg_read_state_destructor(state); + + num_msgs = talloc_array_length(channel->msgs); + assert(num_msgs > 0); + + state->rec = talloc_move(state, &channel->msgs[0]); + + memmove(channel->msgs, channel->msgs+1, + sizeof(struct messaging_rec) * (num_msgs-1)); + channel->msgs = talloc_realloc( + channel, channel->msgs, struct messaging_rec *, num_msgs - 1); + + tevent_req_done(req); +} + +static void msg_read_got_ctdb(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct msg_read_state *state = tevent_req_data( + req, struct msg_read_state); + DATA_BLOB blob; + enum ndr_err_code ndr_err; + int ret; + + ret = ctdb_msg_read_recv(subreq, talloc_tos(), + &blob.data, &blob.length); + TALLOC_FREE(subreq); + if (tevent_req_error(req, ret)) { + return; + } + + state->rec = talloc(state, struct messaging_rec); + if (tevent_req_nomem(state->rec, req)) { + return; + } + + ndr_err = ndr_push_struct_blob( + &blob, state->rec, state->rec, + (ndr_push_flags_fn_t)ndr_push_messaging_rec); + + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + DEBUG(0, ("ndr_push_struct_blob failed: %s\n", + ndr_errstr(ndr_err))); + tevent_req_error(req, ndr_map_error2errno(ndr_err)); + return; + } + if (state->rec->msg_type == state->channel->msg_type) { + tevent_req_done(req); + return; + } + /* + * Got some unexpected msg type, wait for the next one + */ + subreq = ctdb_msg_read_send(state, state->ev, + state->channel->ctdb_channel); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, msg_read_got_ctdb, req); +} + +int msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, + struct messaging_rec **prec) +{ + struct msg_read_state *state = tevent_req_data( + req, struct msg_read_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + return err; + } + *prec = talloc_move(mem_ctx, &state->rec); + return 0; +} diff --git a/source3/lib/msg_channel.h b/source3/lib/msg_channel.h new file mode 100644 index 0000000000..4c7ae420cb --- /dev/null +++ b/source3/lib/msg_channel.h @@ -0,0 +1,45 @@ +/* + Unix SMB/CIFS implementation. + Samba3 message streams + Copyright (C) Volker Lendecke 2012 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _MSG_STREAM_H_ +#define _MSG_STREAM_H_ + +#include <talloc.h> +#include <tevent.h> +#include "messages.h" +#include "librpc/gen_ndr/messaging.h" + +struct msg_channel; + +struct tevent_req *msg_channel_init_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct messaging_context *msg, + uint32_t msgtype); +int msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, + struct msg_channel **pchannel); +int msg_channel_init(TALLOC_CTX *mem_ctx, struct messaging_context *msg, + uint32_t msgtype, struct msg_channel **pchannel); + +struct tevent_req *msg_read_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct msg_channel *channel); +int msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, + struct messaging_rec **prec); + +#endif diff --git a/source3/torture/proto.h b/source3/torture/proto.h index 6e2a00450f..b6da0f477c 100644 --- a/source3/torture/proto.h +++ b/source3/torture/proto.h @@ -105,5 +105,6 @@ bool run_local_sprintf_append(int dummy); bool run_cleanup1(int dummy); bool run_cleanup2(int dummy); bool run_ctdb_conn(int dummy); +bool run_msg_test(int dummy); #endif /* __TORTURE_H__ */ diff --git a/source3/torture/test_msg.c b/source3/torture/test_msg.c new file mode 100644 index 0000000000..88b07e742c --- /dev/null +++ b/source3/torture/test_msg.c @@ -0,0 +1,131 @@ +/* + Unix SMB/CIFS implementation. + Test msg_stream API + Copyright (C) Volker Lendecke 2012 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "torture/proto.h" +#include "lib/util/tevent_unix.h" +#include "msg_channel.h" + +struct msg_test_state { + struct tevent_context *ev; + struct messaging_context *msg; + struct msg_channel *channel; +}; + +static void msg_test_got_channel(struct tevent_req *subreq); +static void msg_test_got_msg(struct tevent_req *subreq); + +static struct tevent_req *msg_test_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev) +{ + struct tevent_req *req, *subreq; + struct msg_test_state *state; + + req = tevent_req_create(mem_ctx, &state, struct msg_test_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + + state->msg = messaging_init(state, state->ev); + if (tevent_req_nomem(state->msg, req)) { + return tevent_req_post(req, ev); + } + subreq = msg_channel_init_send(state, state->ev, state->msg, MSG_PING); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, msg_test_got_channel, req); + return req; +} + +static void msg_test_got_channel(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct msg_test_state *state = tevent_req_data( + req, struct msg_test_state); + int ret; + + ret = msg_channel_init_recv(subreq, state, &state->channel); + TALLOC_FREE(subreq); + if (tevent_req_error(req, ret)) { + return; + } + subreq = msg_read_send(state, state->ev, state->channel); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, msg_test_got_msg, req); +} + +static void msg_test_got_msg(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct msg_test_state *state = tevent_req_data( + req, struct msg_test_state); + struct messaging_rec *msg; + int ret; + + ret = msg_read_recv(subreq, state, &msg); + TALLOC_FREE(subreq); + if (tevent_req_error(req, ret)) { + return; + } + tevent_req_done(req); +} + +static int msg_test_recv(struct tevent_req *req) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + return err; + } + return 0; +} + +bool run_msg_test(int dummy) +{ + struct tevent_context *ev; + struct tevent_req *req; + int ret; + + ev = tevent_context_init(talloc_tos()); + if (ev == NULL) { + fprintf(stderr, "tevent_context_init failed\n"); + return false; + } + req = msg_test_send(ev, ev); + if (req == NULL) { + fprintf(stderr, "msg_test_send failed\n"); + return false; + } + if (!tevent_req_poll(req, ev)) { + fprintf(stderr, "tevent_req_poll failed\n"); + return false; + } + ret = msg_test_recv(req); + TALLOC_FREE(req); + printf("msg_test_recv returned %s\n", + ret ? strerror(ret) : "success"); + TALLOC_FREE(ev); + return (ret == 0); +} diff --git a/source3/torture/torture.c b/source3/torture/torture.c index 0c421b5342..ae0d6d4c08 100644 --- a/source3/torture/torture.c +++ b/source3/torture/torture.c @@ -8918,6 +8918,7 @@ static struct { { "LOCAL-GENCACHE", run_local_gencache, 0}, { "LOCAL-TALLOC-DICT", run_local_talloc_dict, 0}, { "LOCAL-CTDB-CONN", run_ctdb_conn, 0}, + { "LOCAL-MSG", run_msg_test, 0}, { "LOCAL-BASE64", run_local_base64, 0}, { "LOCAL-RBTREE", run_local_rbtree, 0}, { "LOCAL-MEMCACHE", run_local_memcache, 0}, diff --git a/source3/wscript_build b/source3/wscript_build index af8cb84f48..2771358972 100755 --- a/source3/wscript_build +++ b/source3/wscript_build @@ -42,6 +42,7 @@ LIB_SRC = ''' lib/messages.c lib/messages_local.c lib/messages_ctdbd.c lib/ctdb_packet.c lib/ctdbd_conn.c lib/ctdb_conn.c + lib/msg_channel.c lib/id_cache.c lib/talloc_dict.c lib/serverid.c @@ -574,6 +575,7 @@ SMBTORTURE_SRC1 = '''torture/torture.c torture/nbio.c torture/scanner.c torture/ torture/test_smbsock_any_connect.c torture/test_cleanup.c torture/test_ctdbconn.c + torture/test_msg.c torture/t_strappend.c''' SMBTORTURE_SRC = '''${SMBTORTURE_SRC1} |