summaryrefslogtreecommitdiff
path: root/source4
diff options
context:
space:
mode:
authorAndrew Tridgell <tridge@samba.org>2004-07-23 06:40:49 +0000
committerGerald (Jerry) Carter <jerry@samba.org>2007-10-10 12:57:42 -0500
commit5ddf678e0113f81aa2b5f99134cda4fe8c01afb7 (patch)
tree68e10d04766a1ad8e05f30c4d58e4398343ac7fb /source4
parent1ce4a2d5fecae297e5b6f6e8f0d68534d9dc7c92 (diff)
downloadsamba-5ddf678e0113f81aa2b5f99134cda4fe8c01afb7.tar.gz
samba-5ddf678e0113f81aa2b5f99134cda4fe8c01afb7.tar.bz2
samba-5ddf678e0113f81aa2b5f99134cda4fe8c01afb7.zip
r1578: the first stage of the async client rewrite.
Up to now the client code has had an async API, and operated asynchronously at the packet level, but was not truly async in that it assumed that it could always write to the socket and when a partial packet came in that it could block waiting for the rest of the packet. This change makes the SMB client library full async, by adding a separate outgoing packet queue, using non-blocking socket IO and having a input buffer that can fill asynchonously until the full packet has arrived. The main complexity was in dealing with the events structure when using the CIFS proxy backend. In that case the same events structure needs to be used in both the client library and the main smbd server, so that when the client library is waiting for a reply that the main server keeps processing packets. This required some changes in the events library code. Next step is to make the generated rpc client code use these new capabilities. (This used to be commit 96bf4da3edc4d64b0f58ef520269f3b385b8da02)
Diffstat (limited to 'source4')
-rw-r--r--source4/client/client.c25
-rw-r--r--source4/include/cli_context.h42
-rw-r--r--source4/include/dlinklist.h14
-rw-r--r--source4/include/events.h2
-rw-r--r--source4/include/rewrite.h7
-rw-r--r--source4/lib/events.c344
-rw-r--r--source4/libcli/raw/clisocket.c8
-rw-r--r--source4/libcli/raw/clitransport.c337
-rw-r--r--source4/libcli/raw/rawrequest.c186
-rw-r--r--source4/ntvfs/cifs/vfs_cifs.c29
-rw-r--r--source4/smb_server/smb_server.c1
-rw-r--r--source4/torture/basic/scanner.c6
-rw-r--r--source4/torture/gentest.c19
13 files changed, 592 insertions, 428 deletions
diff --git a/source4/client/client.c b/source4/client/client.c
index 0ad3eed889..abc4033f29 100644
--- a/source4/client/client.c
+++ b/source4/client/client.c
@@ -2677,8 +2677,6 @@ make sure we swallow keepalives during idle time
****************************************************************************/
static void readline_callback(void)
{
- fd_set fds;
- struct timeval timeout;
static time_t last_t;
time_t t;
@@ -2688,28 +2686,7 @@ static void readline_callback(void)
last_t = t;
- again:
- if (cli->transport->socket->fd == -1)
- return;
-
- FD_ZERO(&fds);
- FD_SET(cli->transport->socket->fd, &fds);
-
- timeout.tv_sec = 0;
- timeout.tv_usec = 0;
- sys_select_intr(cli->transport->socket->fd+1,&fds,NULL,NULL,&timeout);
-
- /* We deliberately use cli_swallow_keepalives instead of
- client_receive_smb as we want to receive
- session keepalives and then drop them here.
- */
- if (FD_ISSET(cli->transport->socket->fd, &fds)) {
- if (!cli_request_receive_next(cli->transport)) {
- d_printf("Lost connection to server\n");
- exit(1);
- }
- goto again;
- }
+ cli_transport_process(cli->transport);
if (cli->tree) {
cli_chkpath(cli->tree, "\\");
diff --git a/source4/include/cli_context.h b/source4/include/cli_context.h
index 22a9898188..e0bf1689ad 100644
--- a/source4/include/cli_context.h
+++ b/source4/include/cli_context.h
@@ -134,7 +134,7 @@ struct cli_transport {
uint_t readbraw_pending:1;
/* an idle function - if this is defined then it will be
- called once every period milliseconds while we are waiting
+ called once every period seconds while we are waiting
for a packet */
struct {
void (*func)(struct cli_transport *, void *);
@@ -151,7 +151,11 @@ struct cli_transport {
uint16_t ecode;
} dos;
NTSTATUS nt_status;
- enum socket_error socket_error;
+ enum {SOCKET_READ_TIMEOUT,
+ SOCKET_READ_EOF,
+ SOCKET_READ_ERROR,
+ SOCKET_WRITE_ERROR,
+ SOCKET_READ_BAD_SIG} socket_error;
uint_t nbt_error;
} e;
} error;
@@ -164,12 +168,30 @@ struct cli_transport {
void *private;
} oplock;
- /* a list of async requests that are pending on this connection */
- struct cli_request *pending_requests;
+ /* a list of async requests that are pending for send on this connection */
+ struct cli_request *pending_send;
+
+ /* a list of async requests that are pending for receive on this connection */
+ struct cli_request *pending_recv;
/* remember the called name - some sub-protocols require us to
know the server name */
struct nmb_name called;
+
+ /* a buffer for partially received SMB packets. */
+ struct {
+ uint8_t header[NBT_HDR_SIZE];
+ size_t req_size;
+ size_t received;
+ uint8_t *buffer;
+ } recv_buffer;
+
+ /* the event handle for waiting for socket IO */
+ struct {
+ struct event_context *ctx;
+ struct fd_event *fde;
+ struct timed_event *te;
+ } event;
};
/* this is the context for the user */
@@ -216,6 +238,15 @@ struct cli_tree {
};
+/*
+ a client request moves between the following 4 states.
+*/
+enum cli_request_state {CLI_REQUEST_INIT, /* we are creating the request */
+ CLI_REQUEST_SEND, /* the request is in the outgoing socket Q */
+ CLI_REQUEST_RECV, /* we are waiting for a matching reply */
+ CLI_REQUEST_DONE, /* the request is finished */
+ CLI_REQUEST_ERROR}; /* a packet or transport level error has occurred */
+
/* the context for a single SMB request. This is passed to any request-context
* functions (similar to context.h, the server version).
* This will allow requests to be multi-threaded. */
@@ -226,6 +257,9 @@ struct cli_request {
/* a talloc context for the lifetime of this request */
TALLOC_CTX *mem_ctx;
+ /* each request is in one of 4 possible states */
+ enum cli_request_state state;
+
/* a request always has a transport context, nearly always has
a session context and usually has a tree context */
struct cli_transport *transport;
diff --git a/source4/include/dlinklist.h b/source4/include/dlinklist.h
index 6191299384..40f7f0a0c7 100644
--- a/source4/include/dlinklist.h
+++ b/source4/include/dlinklist.h
@@ -77,3 +77,17 @@ do { \
DLIST_REMOVE(list, p); \
DLIST_ADD_END(list, p, tmp); \
} while (0)
+
+/* concatenate two lists - putting all elements of the 2nd list at the
+ end of the first list */
+#define DLIST_CONCATENATE(list1, list2, type) \
+do { \
+ if (!(list1)) { \
+ (list1) = (list2); \
+ } else { \
+ type tmp; \
+ for (tmp = (list1); tmp->next; tmp = tmp->next) ; \
+ tmp->next = (list2); \
+ (list2)->prev = tmp; \
+ } \
+} while (0)
diff --git a/source4/include/events.h b/source4/include/events.h
index 7dde3b2ba0..edded2632b 100644
--- a/source4/include/events.h
+++ b/source4/include/events.h
@@ -67,6 +67,8 @@ struct event_context {
BOOL exit_now;
int code;
} exit;
+
+ int ref_count;
};
diff --git a/source4/include/rewrite.h b/source4/include/rewrite.h
index c8587f5e4e..21cc7342d1 100644
--- a/source4/include/rewrite.h
+++ b/source4/include/rewrite.h
@@ -50,13 +50,6 @@ typedef int BOOL;
/* Debugging stuff */
#include "debug.h"
-/* types of socket errors */
-enum socket_error {SOCKET_READ_TIMEOUT,
- SOCKET_READ_EOF,
- SOCKET_READ_ERROR,
- SOCKET_WRITE_ERROR,
- SOCKET_READ_BAD_SIG};
-
#include "doserr.h"
/*
diff --git a/source4/lib/events.c b/source4/lib/events.c
index 13a9a444e8..a6099db5c5 100644
--- a/source4/lib/events.c
+++ b/source4/lib/events.c
@@ -81,25 +81,50 @@ struct event_context *event_context_init(void)
/* start off with no events */
ZERO_STRUCTP(ev);
+ ev->ref_count = 1;
+
return ev;
}
-
-
/*
- add a fd based event
- return NULL on failure (memory allocation error)
+ destroy an events context, also destroying any remaining events
*/
-struct fd_event *event_add_fd(struct event_context *ev, struct fd_event *e)
+void event_context_destroy(struct event_context *ev)
{
- e = memdup(e, sizeof(*e));
- if (!e) return NULL;
- DLIST_ADD(ev->fd_events, e);
- e->ref_count = 1;
- if (e->fd > ev->maxfd) {
- ev->maxfd = e->fd;
+ struct fd_event *fde;
+ struct timed_event *te;
+ struct loop_event *le;
+
+ ev->ref_count--;
+ if (ev->ref_count != 0) {
+ return;
}
- return e;
+
+ for (fde=ev->fd_events; fde;) {
+ struct fd_event *next = fde->next;
+ event_remove_fd(ev, fde);
+ if (fde->ref_count == 0) {
+ free(fde);
+ }
+ fde=next;
+ }
+ for (te=ev->timed_events; te;) {
+ struct timed_event *next = te->next;
+ event_remove_timed(ev, te);
+ if (te->ref_count == 0) {
+ free(te);
+ }
+ te=next;
+ }
+ for (le=ev->loop_events; le;) {
+ struct loop_event *next = le->next;
+ event_remove_loop(ev, le);
+ if (le->ref_count == 0) {
+ free(le);
+ }
+ le=next;
+ }
+ free(ev);
}
@@ -118,6 +143,47 @@ static void calc_maxfd(struct event_context *ev)
}
}
+/*
+ move the event structures from ev2 into ev, upping the reference
+ count on ev. The event context ev2 is then destroyed.
+
+ this is used by modules that need to call on the events of a lower module
+*/
+void event_context_merge(struct event_context *ev, struct event_context *ev2)
+{
+ DLIST_CONCATENATE(ev->fd_events, ev2->fd_events, struct fd_event *);
+ DLIST_CONCATENATE(ev->timed_events, ev2->timed_events, struct timed_event *);
+ DLIST_CONCATENATE(ev->loop_events, ev2->loop_events, struct loop_event *);
+
+ ev->ref_count++;
+
+ ev2->fd_events = NULL;
+ ev2->timed_events = NULL;
+ ev2->loop_events = NULL;
+
+ event_context_destroy(ev2);
+
+ calc_maxfd(ev);
+}
+
+
+/*
+ add a fd based event
+ return NULL on failure (memory allocation error)
+*/
+struct fd_event *event_add_fd(struct event_context *ev, struct fd_event *e)
+{
+ e = memdup(e, sizeof(*e));
+ if (!e) return NULL;
+ DLIST_ADD(ev->fd_events, e);
+ e->ref_count = 1;
+ if (e->fd > ev->maxfd) {
+ ev->maxfd = e->fd;
+ }
+ return e;
+}
+
+
/* to mark the ev->maxfd invalid
* this means we need to recalculate it
*/
@@ -242,150 +308,158 @@ void event_loop_exit(struct event_context *ev, int code)
}
/*
- go into an event loop using the events defined in ev this function
- will return with the specified code if one of the handlers calls
- event_loop_exit()
-
- also return (with code 0) if all fd events are removed
+ do a single event loop using the events defined in ev this function
*/
-int event_loop_wait(struct event_context *ev)
+void event_loop_once(struct event_context *ev)
{
time_t t;
-
- ZERO_STRUCT(ev->exit);
- ev->maxfd = EVENT_INVALID_MAXFD;
+ fd_set r_fds, w_fds;
+ struct fd_event *fe;
+ struct loop_event *le;
+ struct timed_event *te;
+ int selrtn;
+ struct timeval tval;
t = time(NULL);
- while (ev->fd_events && !ev->exit.exit_now) {
- fd_set r_fds, w_fds;
- struct fd_event *fe;
- struct loop_event *le;
- struct timed_event *te;
- int selrtn;
- struct timeval tval;
-
- /* the loop events are called on each loop. Be careful to allow the
- event to remove itself */
- for (le=ev->loop_events;le;) {
- struct loop_event *next = le->next;
- if (le->ref_count == 0) {
- DLIST_REMOVE(ev->loop_events, le);
- free(le);
- } else {
- le->ref_count++;
- le->handler(ev, le, t);
- le->ref_count--;
- }
- le = next;
+ /* the loop events are called on each loop. Be careful to allow the
+ event to remove itself */
+ for (le=ev->loop_events;le;) {
+ struct loop_event *next = le->next;
+ if (le->ref_count == 0) {
+ DLIST_REMOVE(ev->loop_events, le);
+ free(le);
+ } else {
+ le->ref_count++;
+ le->handler(ev, le, t);
+ le->ref_count--;
}
+ le = next;
+ }
- ZERO_STRUCT(tval);
- FD_ZERO(&r_fds);
- FD_ZERO(&w_fds);
-
- /* setup any fd events */
- for (fe=ev->fd_events; fe; ) {
- struct fd_event *next = fe->next;
- if (fe->ref_count == 0) {
- DLIST_REMOVE(ev->fd_events, fe);
- if (ev->maxfd == fe->fd) {
- ev->maxfd = EVENT_INVALID_MAXFD;
- }
- free(fe);
- } else {
- if (fe->flags & EVENT_FD_READ) {
- FD_SET(fe->fd, &r_fds);
- }
- if (fe->flags & EVENT_FD_WRITE) {
- FD_SET(fe->fd, &w_fds);
- }
+ ZERO_STRUCT(tval);
+ FD_ZERO(&r_fds);
+ FD_ZERO(&w_fds);
+
+ /* setup any fd events */
+ for (fe=ev->fd_events; fe; ) {
+ struct fd_event *next = fe->next;
+ if (fe->ref_count == 0) {
+ DLIST_REMOVE(ev->fd_events, fe);
+ if (ev->maxfd == fe->fd) {
+ ev->maxfd = EVENT_INVALID_MAXFD;
+ }
+ free(fe);
+ } else {
+ if (fe->flags & EVENT_FD_READ) {
+ FD_SET(fe->fd, &r_fds);
+ }
+ if (fe->flags & EVENT_FD_WRITE) {
+ FD_SET(fe->fd, &w_fds);
}
- fe = next;
}
+ fe = next;
+ }
- /* start with a reasonable max timeout */
- tval.tv_sec = 600;
+ /* start with a reasonable max timeout */
+ tval.tv_sec = 600;
- /* work out the right timeout for all timed events */
- for (te=ev->timed_events;te;te=te->next) {
- int timeout = te->next_event - t;
- if (timeout < 0) {
- timeout = 0;
- }
- if (te->ref_count &&
- timeout < tval.tv_sec) {
- tval.tv_sec = timeout;
- }
+ /* work out the right timeout for all timed events */
+ for (te=ev->timed_events;te;te=te->next) {
+ int timeout = te->next_event - t;
+ if (timeout < 0) {
+ timeout = 0;
}
+ if (te->ref_count &&
+ timeout < tval.tv_sec) {
+ tval.tv_sec = timeout;
+ }
+ }
- /* only do a select() if there're fd_events
- * otherwise we would block for a the time in tval,
- * and if there're no fd_events present anymore we want to
- * leave the event loop directly
+ /* only do a select() if there're fd_events
+ * otherwise we would block for a the time in tval,
+ * and if there're no fd_events present anymore we want to
+ * leave the event loop directly
+ */
+ if (ev->fd_events) {
+ /* we maybe need to recalculate the maxfd */
+ if (ev->maxfd == EVENT_INVALID_MAXFD) {
+ calc_maxfd(ev);
+ }
+
+ /* TODO:
+ * we don't use sys_select() as it isn't thread
+ * safe. We need to replace the magic pipe handling in
+ * sys_select() with something in the events
+ * structure - for now just use select()
*/
- if (ev->fd_events) {
- /* we maybe need to recalculate the maxfd */
- if (ev->maxfd == EVENT_INVALID_MAXFD) {
- calc_maxfd(ev);
- }
-
- /* TODO:
- * we don't use sys_select() as it isn't thread
- * safe. We need to replace the magic pipe handling in
- * sys_select() with something in the events
- * structure - for now just use select()
- */
- selrtn = select(ev->maxfd+1, &r_fds, &w_fds, NULL, &tval);
-
- t = time(NULL);
-
- if (selrtn == -1 && errno == EBADF) {
- /* the socket is dead! this should never
- happen as the socket should have first been
- made readable and that should have removed
- the event, so this must be a bug. This is a
- fatal error. */
- DEBUG(0,("EBADF on event_loop_wait - exiting\n"));
- return -1;
- }
-
- if (selrtn > 0) {
- /* at least one file descriptor is ready - check
- which ones and call the handler, being careful to allow
- the handler to remove itself when called */
- for (fe=ev->fd_events; fe; fe=fe->next) {
- uint16_t flags = 0;
- if (FD_ISSET(fe->fd, &r_fds)) flags |= EVENT_FD_READ;
- if (FD_ISSET(fe->fd, &w_fds)) flags |= EVENT_FD_WRITE;
- if (fe->ref_count && flags) {
- fe->ref_count++;
- fe->handler(ev, fe, t, flags);
- fe->ref_count--;
- }
+ selrtn = select(ev->maxfd+1, &r_fds, &w_fds, NULL, &tval);
+
+ t = time(NULL);
+
+ if (selrtn == -1 && errno == EBADF) {
+ /* the socket is dead! this should never
+ happen as the socket should have first been
+ made readable and that should have removed
+ the event, so this must be a bug. This is a
+ fatal error. */
+ DEBUG(0,("EBADF on event_loop_wait - exiting\n"));
+ return;
+ }
+
+ if (selrtn > 0) {
+ /* at least one file descriptor is ready - check
+ which ones and call the handler, being careful to allow
+ the handler to remove itself when called */
+ for (fe=ev->fd_events; fe; fe=fe->next) {
+ uint16_t flags = 0;
+ if (FD_ISSET(fe->fd, &r_fds)) flags |= EVENT_FD_READ;
+ if (FD_ISSET(fe->fd, &w_fds)) flags |= EVENT_FD_WRITE;
+ if (fe->ref_count && flags) {
+ fe->ref_count++;
+ fe->handler(ev, fe, t, flags);
+ fe->ref_count--;
}
}
}
+ }
- /* call any timed events that are now due */
- for (te=ev->timed_events;te;) {
- struct timed_event *next = te->next;
- if (te->ref_count == 0) {
- DLIST_REMOVE(ev->timed_events, te);
- free(te);
- } else if (te->next_event <= t) {
- te->ref_count++;
- te->handler(ev, te, t);
- te->ref_count--;
- if (te->next_event <= t) {
- /* the handler didn't set a time for the
- next event - remove the event */
- event_remove_timed(ev, te);
- }
+ /* call any timed events that are now due */
+ for (te=ev->timed_events;te;) {
+ struct timed_event *next = te->next;
+ if (te->ref_count == 0) {
+ DLIST_REMOVE(ev->timed_events, te);
+ free(te);
+ } else if (te->next_event <= t) {
+ te->ref_count++;
+ te->handler(ev, te, t);
+ te->ref_count--;
+ if (te->next_event <= t) {
+ /* the handler didn't set a time for the
+ next event - remove the event */
+ event_remove_timed(ev, te);
}
- te = next;
- }
+ }
+ te = next;
+ }
+}
+
+/*
+ go into an event loop using the events defined in ev this function
+ will return with the specified code if one of the handlers calls
+ event_loop_exit()
+
+ also return (with code 0) if all fd events are removed
+*/
+int event_loop_wait(struct event_context *ev)
+{
+ ZERO_STRUCT(ev->exit);
+ ev->maxfd = EVENT_INVALID_MAXFD;
+
+ ev->exit.exit_now = False;
+ while (ev->fd_events && !ev->exit.exit_now) {
+ event_loop_once(ev);
}
return ev->exit.code;
diff --git a/source4/libcli/raw/clisocket.c b/source4/libcli/raw/clisocket.c
index 5cd6f33689..eb5d3c0342 100644
--- a/source4/libcli/raw/clisocket.c
+++ b/source4/libcli/raw/clisocket.c
@@ -72,7 +72,13 @@ BOOL cli_sock_connect(struct cli_socket *sock, struct in_addr *ip, int port)
&sock->dest_ip,
sock->port,
LONG_CONNECT_TIMEOUT);
- return (sock->fd != -1);
+ if (sock->fd == -1) {
+ return False;
+ }
+
+ set_blocking(sock->fd, False);
+
+ return True;
}
diff --git a/source4/libcli/raw/clitransport.c b/source4/libcli/raw/clitransport.c
index 18784fe33a..96d5a18a71 100644
--- a/source4/libcli/raw/clitransport.c
+++ b/source4/libcli/raw/clitransport.c
@@ -22,12 +22,24 @@
#include "includes.h"
/*
+ an event has happened on the socket
+*/
+static void cli_transport_event_handler(struct event_context *ev, struct fd_event *fde,
+ time_t t, uint16_t flags)
+{
+ struct cli_transport *transport = fde->private;
+
+ cli_transport_process(transport);
+}
+
+/*
create a transport structure based on an established socket
*/
struct cli_transport *cli_transport_init(struct cli_socket *sock)
{
TALLOC_CTX *mem_ctx;
struct cli_transport *transport;
+ struct fd_event fde;
mem_ctx = talloc_init("cli_transport");
if (!mem_ctx) return NULL;
@@ -35,6 +47,12 @@ struct cli_transport *cli_transport_init(struct cli_socket *sock)
transport = talloc_zero(mem_ctx, sizeof(*transport));
if (!transport) return NULL;
+ transport->event.ctx = event_context_init();
+ if (transport->event.ctx == NULL) {
+ talloc_destroy(mem_ctx);
+ return NULL;
+ }
+
transport->mem_ctx = mem_ctx;
transport->socket = sock;
transport->negotiate.protocol = PROTOCOL_NT1;
@@ -47,6 +65,14 @@ struct cli_transport *cli_transport_init(struct cli_socket *sock)
ZERO_STRUCT(transport->called);
+ fde.fd = sock->fd;
+ fde.flags = EVENT_FD_READ;
+ fde.handler = cli_transport_event_handler;
+ fde.private = transport;
+ fde.ref_count = 1;
+
+ transport->event.fde = event_add_fd(transport->event.ctx, &fde);
+
return transport;
}
@@ -59,6 +85,9 @@ void cli_transport_close(struct cli_transport *transport)
transport->reference_count--;
if (transport->reference_count <= 0) {
cli_sock_close(transport->socket);
+ event_remove_fd(transport->event.ctx, transport->event.fde);
+ event_remove_timed(transport->event.ctx, transport->event.te);
+ event_context_destroy(transport->event.ctx);
talloc_destroy(transport->mem_ctx);
}
}
@@ -72,6 +101,21 @@ void cli_transport_dead(struct cli_transport *transport)
}
+/*
+ enable select for write on a transport
+*/
+static void cli_transport_write_enable(struct cli_transport *transport)
+{
+ transport->event.fde->flags |= EVENT_FD_WRITE;
+}
+
+/*
+ disable select for write on a transport
+*/
+static void cli_transport_write_disable(struct cli_transport *transport)
+{
+ transport->event.fde->flags &= ~EVENT_FD_WRITE;
+}
/****************************************************************************
send a session request (if appropriate)
@@ -145,7 +189,7 @@ again:
/* the zero mid is reserved for requests that don't have a mid */
if (mid == 0) mid = 1;
- for (req=transport->pending_requests; req; req=req->next) {
+ for (req=transport->pending_recv; req; req=req->next) {
if (req->mid == mid) {
mid++;
goto again;
@@ -156,81 +200,284 @@ again:
return mid;
}
+static void idle_handler(struct event_context *ev,
+ struct timed_event *te, time_t t)
+{
+ struct cli_transport *transport = te->private;
+ te->next_event = t + transport->idle.period;
+ transport->idle.func(transport, transport->idle.private);
+}
+
/*
setup the idle handler for a transport
+ the period is in seconds
*/
void cli_transport_idle_handler(struct cli_transport *transport,
void (*idle_func)(struct cli_transport *, void *),
uint_t period,
void *private)
{
+ struct timed_event te;
transport->idle.func = idle_func;
transport->idle.private = private;
transport->idle.period = period;
-}
+ if (transport->event.te != NULL) {
+ event_remove_timed(transport->event.ctx, transport->event.te);
+ }
+
+ te.next_event = time(NULL) + period;
+ te.handler = idle_handler;
+ te.private = transport;
+ transport->event.te = event_add_timed(transport->event.ctx, &te);
+}
/*
- determine if a packet is pending for receive on a transport
+ process some pending sends
*/
-BOOL cli_transport_pending(struct cli_transport *transport)
+static void cli_transport_process_send(struct cli_transport *transport)
{
- return socket_pending(transport->socket->fd);
-}
-
+ while (transport->pending_send) {
+ struct cli_request *req = transport->pending_send;
+ ssize_t ret;
+ ret = cli_sock_write(transport->socket, req->out.buffer, req->out.size);
+ if (ret == -1) {
+ if (errno == EAGAIN || errno == EINTR) {
+ return;
+ }
+ cli_transport_dead(transport);
+ }
+ req->out.buffer += ret;
+ req->out.size -= ret;
+ if (req->out.size == 0) {
+ req->state = CLI_REQUEST_RECV;
+ DLIST_REMOVE(transport->pending_send, req);
+ DLIST_ADD(transport->pending_recv, req);
+ }
+ }
+ /* we're out of requests to send, so don't wait for write
+ events any more */
+ cli_transport_write_disable(transport);
+}
/*
- wait for data on a transport, periodically calling a wait function
- if one has been defined
- return True if a packet is received
-*/
-BOOL cli_transport_select(struct cli_transport *transport)
+ we have a full request in our receive buffer - match it to a pending request
+ and process
+ */
+static void cli_transport_finish_recv(struct cli_transport *transport)
{
- fd_set fds;
- int selrtn;
- int fd;
- struct timeval timeout;
+ uint8_t *buffer, *hdr, *vwv;
+ int len;
+ uint16_t wct, mid = 0;
+ struct cli_request *req;
- fd = transport->socket->fd;
+ buffer = transport->recv_buffer.buffer;
+ len = transport->recv_buffer.req_size;
- if (fd == -1) {
- return False;
+ hdr = buffer+NBT_HDR_SIZE;
+ vwv = hdr + HDR_VWV;
+
+ /* see if it could be an oplock break request */
+ if (handle_oplock_break(transport, len, hdr, vwv)) {
+ talloc_free(transport->mem_ctx, buffer);
+ ZERO_STRUCT(transport->recv_buffer);
+ return;
}
- do {
- uint_t period = 1000;
+ /* at this point we need to check for a readbraw reply, as
+ these can be any length */
+ if (transport->readbraw_pending) {
+ transport->readbraw_pending = 0;
+
+ /* it must match the first entry in the pending queue
+ as the client is not allowed to have outstanding
+ readbraw requests */
+ req = transport->pending_recv;
+ if (!req) goto error;
+
+ req->in.buffer = buffer;
+ talloc_steal(transport->mem_ctx, req->mem_ctx, buffer);
+ req->in.size = len + NBT_HDR_SIZE;
+ req->in.allocated = req->in.size;
+ goto async;
+ }
- FD_ZERO(&fds);
- FD_SET(fd,&fds);
-
- if (transport->idle.func) {
- period = transport->idle.period;
+ if (len >= MIN_SMB_SIZE) {
+ /* extract the mid for matching to pending requests */
+ mid = SVAL(hdr, HDR_MID);
+ wct = CVAL(hdr, HDR_WCT);
+ }
+
+ /* match the incoming request against the list of pending requests */
+ for (req=transport->pending_recv; req; req=req->next) {
+ if (req->mid == mid) break;
+ }
+
+ if (!req) {
+ DEBUG(1,("Discarding unmatched reply with mid %d\n", mid));
+ goto error;
+ }
+
+ /* fill in the 'in' portion of the matching request */
+ req->in.buffer = buffer;
+ talloc_steal(transport->mem_ctx, req->mem_ctx, buffer);
+ req->in.size = len + NBT_HDR_SIZE;
+ req->in.allocated = req->in.size;
+
+ /* handle non-SMB replies */
+ if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE) {
+ req->state = CLI_REQUEST_ERROR;
+ goto error;
+ }
+
+ if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) {
+ DEBUG(2,("bad reply size for mid %d\n", mid));
+ req->status = NT_STATUS_UNSUCCESSFUL;
+ req->state = CLI_REQUEST_ERROR;
+ goto error;
+ }
+
+ req->in.hdr = hdr;
+ req->in.vwv = vwv;
+ req->in.wct = wct;
+ if (req->in.size >= NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) {
+ req->in.data = req->in.vwv + VWV(wct) + 2;
+ req->in.data_size = SVAL(req->in.vwv, VWV(wct));
+ if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct) + req->in.data_size) {
+ DEBUG(3,("bad data size for mid %d\n", mid));
+ /* blergh - w2k3 gives a bogus data size values in some
+ openX replies */
+ req->in.data_size = req->in.size - (NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct));
}
+ }
+ req->in.ptr = req->in.data;
+ req->flags2 = SVAL(req->in.hdr, HDR_FLG2);
+
+ if (!(req->flags2 & FLAGS2_32_BIT_ERROR_CODES)) {
+ transport->error.etype = ETYPE_DOS;
+ transport->error.e.dos.eclass = CVAL(req->in.hdr,HDR_RCLS);
+ transport->error.e.dos.ecode = SVAL(req->in.hdr,HDR_ERR);
+ req->status = dos_to_ntstatus(transport->error.e.dos.eclass,
+ transport->error.e.dos.ecode);
+ } else {
+ transport->error.etype = ETYPE_NT;
+ transport->error.e.nt_status = NT_STATUS(IVAL(req->in.hdr, HDR_RCLS));
+ req->status = transport->error.e.nt_status;
+ }
+
+ if (!cli_request_check_sign_mac(req)) {
+ transport->error.etype = ETYPE_SOCKET;
+ transport->error.e.socket_error = SOCKET_READ_BAD_SIG;
+ req->state = CLI_REQUEST_ERROR;
+ goto error;
+ };
+
+async:
+ /* if this request has an async handler then call that to
+ notify that the reply has been received. This might destroy
+ the request so it must happen last */
+ ZERO_STRUCT(transport->recv_buffer);
+ DLIST_REMOVE(transport->pending_recv, req);
+ req->state = CLI_REQUEST_DONE;
+ if (req->async.fn) {
+ req->async.fn(req);
+ }
+ return;
+
+error:
+ if (req) {
+ DLIST_REMOVE(transport->pending_recv, req);
+ req->state = CLI_REQUEST_ERROR;
+ }
+ ZERO_STRUCT(transport->recv_buffer);
+}
- timeout.tv_sec = period / 1000;
- timeout.tv_usec = 1000*(period%1000);
-
- selrtn = sys_select_intr(fd+1,&fds,NULL,NULL,&timeout);
-
- if (selrtn == 1) {
- /* the fd is readable */
- return True;
+/*
+ process some pending receives
+*/
+static void cli_transport_process_recv(struct cli_transport *transport)
+{
+ /* a incoming packet goes through 2 stages - first we read the
+ 4 byte header, which tells us how much more is coming. Then
+ we read the rest */
+ if (transport->recv_buffer.received < NBT_HDR_SIZE) {
+ ssize_t ret;
+ ret = cli_sock_read(transport->socket,
+ transport->recv_buffer.header +
+ transport->recv_buffer.received,
+ NBT_HDR_SIZE - transport->recv_buffer.received);
+ if (ret == -1) {
+ if (errno == EINTR || errno == EAGAIN) {
+ return;
+ }
+ cli_transport_dead(transport);
+ return;
}
-
- if (selrtn == -1) {
- /* sys_select_intr() already handles EINTR, so this
- is an error. The socket is probably dead */
- return False;
+
+ transport->recv_buffer.received += ret;
+
+ if (transport->recv_buffer.received == NBT_HDR_SIZE) {
+ /* we've got a full header */
+ transport->recv_buffer.req_size = smb_len(transport->recv_buffer.header) + NBT_HDR_SIZE;
+ transport->recv_buffer.buffer = talloc(transport->mem_ctx,
+ NBT_HDR_SIZE+transport->recv_buffer.req_size);
+ if (transport->recv_buffer.buffer == NULL) {
+ cli_transport_dead(transport);
+ return;
+ }
+ memcpy(transport->recv_buffer.buffer, transport->recv_buffer.header, NBT_HDR_SIZE);
}
-
- /* only other possibility is that we timed out - call the idle function
- if there is one */
- if (transport->idle.func) {
- transport->idle.func(transport, transport->idle.private);
+ }
+
+ if (transport->recv_buffer.received < transport->recv_buffer.req_size) {
+ ssize_t ret;
+ ret = cli_sock_read(transport->socket,
+ transport->recv_buffer.buffer +
+ transport->recv_buffer.received,
+ transport->recv_buffer.req_size -
+ transport->recv_buffer.received);
+ if (ret == -1) {
+ if (errno == EINTR || errno == EAGAIN) {
+ return;
+ }
+ cli_transport_dead(transport);
+ return;
}
- } while (selrtn == 0);
+ transport->recv_buffer.received += ret;
+ }
+
+ if (transport->recv_buffer.received != 0 &&
+ transport->recv_buffer.received == transport->recv_buffer.req_size) {
+ cli_transport_finish_recv(transport);
+ }
+}
+/*
+ process some read/write requests that are pending
+ return False if the socket is dead
+*/
+BOOL cli_transport_process(struct cli_transport *transport)
+{
+ cli_transport_process_send(transport);
+ cli_transport_process_recv(transport);
+ if (transport->socket->fd == -1) {
+ return False;
+ }
return True;
}
+
+
+/*
+ put a request into the send queue
+*/
+void cli_transport_send(struct cli_request *req)
+{
+ /* put it on the outgoing socket queue */
+ req->state = CLI_REQUEST_SEND;
+ DLIST_ADD(req->transport->pending_send, req);
+
+ /* make sure we look for write events */
+ cli_transport_write_enable(req->transport);
+}
diff --git a/source4/libcli/raw/rawrequest.c b/source4/libcli/raw/rawrequest.c
index c31f07505f..ce6cd0a1a4 100644
--- a/source4/libcli/raw/rawrequest.c
+++ b/source4/libcli/raw/rawrequest.c
@@ -43,7 +43,7 @@ NTSTATUS cli_request_destroy(struct cli_request *req)
if (req->transport) {
/* remove it from the list of pending requests (a null op if
its not in the list) */
- DLIST_REMOVE(req->transport->pending_requests, req);
+ DLIST_REMOVE(req->transport->pending_recv, req);
}
/* ahh, its so nice to destroy a complex structure in such a
@@ -79,6 +79,7 @@ struct cli_request *cli_request_setup_nonsmb(struct cli_transport *transport, ui
ZERO_STRUCTP(req);
/* setup the request context */
+ req->state = CLI_REQUEST_INIT;
req->mem_ctx = mem_ctx;
req->transport = transport;
req->session = NULL;
@@ -266,32 +267,20 @@ static void cli_req_grow_data(struct cli_request *req, uint_t new_size)
SSVAL(req->out.vwv, VWV(req->out.wct), new_size);
}
+
/*
send a message
*/
BOOL cli_request_send(struct cli_request *req)
{
- uint_t ret;
-
if (IVAL(req->out.buffer, 0) == 0) {
_smb_setlen(req->out.buffer, req->out.size - NBT_HDR_SIZE);
}
cli_request_calculate_sign_mac(req);
- ret = cli_sock_write(req->transport->socket, req->out.buffer, req->out.size);
-
- if (req->out.size != ret) {
- req->transport->error.etype = ETYPE_SOCKET;
- req->transport->error.e.socket_error = SOCKET_WRITE_ERROR;
- DEBUG(0,("Error writing %d bytes to server - %s\n",
- (int)req->out.size, strerror(errno)));
- return False;
- }
+ cli_transport_send(req);
- /* add it to the list of pending requests */
- DLIST_ADD(req->transport->pending_requests, req);
-
return True;
}
@@ -306,17 +295,8 @@ BOOL cli_request_receive(struct cli_request *req)
if (!req) return False;
/* keep receiving packets until this one is replied to */
- while (!req->in.buffer) {
- if (!cli_transport_select(req->transport)) {
- req->status = NT_STATUS_UNSUCCESSFUL;
- return False;
- }
-
- if (!cli_request_receive_next(req->transport)) {
- cli_transport_dead(req->transport);
- req->status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
- return False;
- }
+ while (req->state <= CLI_REQUEST_RECV) {
+ event_loop_once(req->transport->event.ctx);
}
return True;
@@ -327,7 +307,7 @@ BOOL cli_request_receive(struct cli_request *req)
handle oplock break requests from the server - return True if the request was
an oplock break
*/
-static BOOL handle_oplock_break(struct cli_transport *transport, uint_t len, const char *hdr, const char *vwv)
+BOOL handle_oplock_break(struct cli_transport *transport, uint_t len, const char *hdr, const char *vwv)
{
/* we must be very fussy about what we consider an oplock break to avoid
matching readbraw replies */
@@ -350,158 +330,6 @@ static BOOL handle_oplock_break(struct cli_transport *transport, uint_t len, con
return True;
}
-
-/*
- receive an async message from the server
- this function assumes that the caller already knows that the socket is readable
- and that there is a packet waiting
-
- The packet is not actually returned by this function, instead any
- registered async message handlers are called
-
- return True if a packet was successfully received and processed
- return False if the socket appears to be dead
-*/
-BOOL cli_request_receive_next(struct cli_transport *transport)
-{
- BOOL ret;
- int len;
- char header[NBT_HDR_SIZE];
- char *buffer, *hdr, *vwv;
- TALLOC_CTX *mem_ctx;
- struct cli_request *req;
- uint16_t wct, mid = 0;
-
- len = cli_sock_read(transport->socket, header, 4);
- if (len != 4) {
- return False;
- }
-
- len = smb_len(header);
-
- mem_ctx = talloc_init("cli_request_receive_next");
-
- /* allocate the incoming buffer at the right size */
- buffer = talloc(mem_ctx, len+NBT_HDR_SIZE);
- if (!buffer) {
- talloc_destroy(mem_ctx);
- return False;
- }
-
- /* fill in the already received header */
- memcpy(buffer, header, NBT_HDR_SIZE);
-
- ret = cli_sock_read(transport->socket, buffer + NBT_HDR_SIZE, len);
- /* If the server is not responding, note that now */
- if (ret != len) {
- return False;
- }
-
- hdr = buffer+NBT_HDR_SIZE;
- vwv = hdr + HDR_VWV;
-
- /* see if it could be an oplock break request */
- if (handle_oplock_break(transport, len, hdr, vwv)) {
- goto done;
- }
-
- /* at this point we need to check for a readbraw reply, as these can be any length */
- if (transport->readbraw_pending) {
- transport->readbraw_pending = 0;
-
- /* it must match the first entry in the pending queue as the client is not allowed
- to have outstanding readbraw requests */
- req = transport->pending_requests;
- if (!req) goto done;
-
- req->in.buffer = buffer;
- talloc_steal(mem_ctx, req->mem_ctx, buffer);
- req->in.size = len + NBT_HDR_SIZE;
- req->in.allocated = req->in.size;
- goto async;
- }
-
- if (len >= MIN_SMB_SIZE) {
- /* extract the mid for matching to pending requests */
- mid = SVAL(hdr, HDR_MID);
- wct = CVAL(hdr, HDR_WCT);
- }
-
- /* match the incoming request against the list of pending requests */
- for (req=transport->pending_requests; req; req=req->next) {
- if (req->mid == mid) break;
- }
-
- if (!req) {
- DEBUG(3,("Discarding unmatched reply with mid %d\n", mid));
- goto done;
- }
-
- /* fill in the 'in' portion of the matching request */
- req->in.buffer = buffer;
- talloc_steal(mem_ctx, req->mem_ctx, buffer);
- req->in.size = len + NBT_HDR_SIZE;
- req->in.allocated = req->in.size;
-
- /* handle non-SMB replies */
- if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE) {
- goto done;
- }
-
- if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) {
- DEBUG(2,("bad reply size for mid %d\n", mid));
- req->status = NT_STATUS_UNSUCCESSFUL;
- goto done;
- }
-
- req->in.hdr = hdr;
- req->in.vwv = vwv;
- req->in.wct = wct;
- if (req->in.size >= NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) {
- req->in.data = req->in.vwv + VWV(wct) + 2;
- req->in.data_size = SVAL(req->in.vwv, VWV(wct));
- if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct) + req->in.data_size) {
- DEBUG(3,("bad data size for mid %d\n", mid));
- /* blergh - w2k3 gives a bogus data size values in some
- openX replies */
- req->in.data_size = req->in.size - (NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct));
- }
- }
- req->in.ptr = req->in.data;
- req->flags2 = SVAL(req->in.hdr, HDR_FLG2);
-
- if (!(req->flags2 & FLAGS2_32_BIT_ERROR_CODES)) {
- transport->error.etype = ETYPE_DOS;
- transport->error.e.dos.eclass = CVAL(req->in.hdr,HDR_RCLS);
- transport->error.e.dos.ecode = SVAL(req->in.hdr,HDR_ERR);
- req->status = dos_to_ntstatus(transport->error.e.dos.eclass,
- transport->error.e.dos.ecode);
- } else {
- transport->error.etype = ETYPE_NT;
- transport->error.e.nt_status = NT_STATUS(IVAL(req->in.hdr, HDR_RCLS));
- req->status = transport->error.e.nt_status;
- }
-
- if (!cli_request_check_sign_mac(req)) {
- transport->error.etype = ETYPE_SOCKET;
- transport->error.e.socket_error = SOCKET_READ_BAD_SIG;
- return False;
- };
-
-async:
- /* if this request has an async handler then call that to
- notify that the reply has been received. This might destroy
- the request so it must happen last */
- if (req->async.fn) {
- req->async.fn(req);
- }
-
-done:
- talloc_destroy(mem_ctx);
- return True;
-}
-
-
/*
wait for a reply to be received for a packet that just returns an error
code and nothing more
diff --git a/source4/ntvfs/cifs/vfs_cifs.c b/source4/ntvfs/cifs/vfs_cifs.c
index b6d3486ad8..fd94a923c9 100644
--- a/source4/ntvfs/cifs/vfs_cifs.c
+++ b/source4/ntvfs/cifs/vfs_cifs.c
@@ -68,17 +68,17 @@ static BOOL oplock_handler(struct cli_transport *transport, uint16_t tid, uint16
return req_send_oplock_break(private->tcon, fnum, level);
}
-/*
+ /*
a handler for read events on a connection to a backend server
*/
static void cifs_socket_handler(struct event_context *ev, struct fd_event *fde, time_t t, uint16_t flags)
{
struct cvfs_private *private = fde->private;
struct smbsrv_tcon *tcon = private->tcon;
-
+
DEBUG(5,("cifs_socket_handler event on fd %d\n", fde->fd));
-
- if (!cli_request_receive_next(private->transport)) {
+
+ if (!cli_transport_process(private->transport)) {
/* the connection to our server is dead */
close_cnum(tcon);
}
@@ -93,7 +93,6 @@ static NTSTATUS cvfs_connect(struct smbsrv_request *req, const char *sharename)
NTSTATUS status;
struct cvfs_private *private;
const char *map_calls;
- struct fd_event fde;
const char *host, *user, *pass, *domain, *remote_share;
/* Here we need to determine which server to connect to.
@@ -157,18 +156,17 @@ static NTSTATUS cvfs_connect(struct smbsrv_request *req, const char *sharename)
tcon->ntvfs_ops = ops;
}
- /* we need to tell the event loop that we wish to receive read events
- on our SMB connection to the server */
- fde.fd = private->transport->socket->fd;
- fde.flags = EVENT_FD_READ;
- fde.private = private;
- fde.handler = cifs_socket_handler;
-
- event_add_fd(tcon->smb_conn->connection->event.ctx, &fde);
-
/* we need to receive oplock break requests from the server */
cli_oplock_handler(private->transport, oplock_handler, private);
- cli_transport_idle_handler(private->transport, idle_func, 100, private);
+ cli_transport_idle_handler(private->transport, idle_func, 1, private);
+
+ private->transport->event.fde->handler = cifs_socket_handler;
+ private->transport->event.fde->private = private;
+
+ event_context_merge(tcon->smb_conn->connection->event.ctx,
+ private->transport->event.ctx);
+
+ private->transport->event.ctx = tcon->smb_conn->connection->event.ctx;
return NT_STATUS_OK;
}
@@ -180,7 +178,6 @@ static NTSTATUS cvfs_disconnect(struct smbsrv_tcon *tcon)
{
struct cvfs_private *private = tcon->ntvfs_private;
- event_remove_fd_all(tcon->smb_conn->connection->event.ctx, private->transport->socket->fd);
smb_tree_disconnect(private->tree);
cli_tree_close(private->tree);
diff --git a/source4/smb_server/smb_server.c b/source4/smb_server/smb_server.c
index fd9e35074d..ca36dc3aa9 100644
--- a/source4/smb_server/smb_server.c
+++ b/source4/smb_server/smb_server.c
@@ -745,6 +745,7 @@ static void smbsrv_recv(struct server_connection *conn, time_t t, uint16_t flags
req = receive_smb_request(smb_conn);
if (!req) {
+ conn->event.fde->flags = 0;
smbsrv_terminate_connection(smb_conn, "receive error");
return;
}
diff --git a/source4/torture/basic/scanner.c b/source4/torture/basic/scanner.c
index cf513414e8..20a467100b 100644
--- a/source4/torture/basic/scanner.c
+++ b/source4/torture/basic/scanner.c
@@ -537,7 +537,8 @@ BOOL torture_smb_scan(int dummy)
}
usleep(10000);
- if (cli_transport_pending(cli->transport)) {
+ cli_transport_process(cli->transport);
+ if (req->state > CLI_REQUEST_RECV) {
status = cli_request_simple_recv(req);
printf("op=0x%x status=%s\n", op, nt_errstr(status));
torture_close_connection(cli);
@@ -545,7 +546,8 @@ BOOL torture_smb_scan(int dummy)
}
sleep(1);
- if (cli_transport_pending(cli->transport)) {
+ cli_transport_process(cli->transport);
+ if (req->state > CLI_REQUEST_RECV) {
status = cli_request_simple_recv(req);
printf("op=0x%x status=%s\n", op, nt_errstr(status));
} else {
diff --git a/source4/torture/gentest.c b/source4/torture/gentest.c
index 016c19fd5b..e45d9f0124 100644
--- a/source4/torture/gentest.c
+++ b/source4/torture/gentest.c
@@ -185,7 +185,7 @@ static BOOL connect_servers(void)
}
cli_oplock_handler(servers[i].cli[j]->transport, oplock_handler, NULL);
- cli_transport_idle_handler(servers[i].cli[j]->transport, idle_func, 10, NULL);
+ cli_transport_idle_handler(servers[i].cli[j]->transport, idle_func, 1, NULL);
}
}
@@ -764,13 +764,8 @@ static void idle_func(struct cli_transport *transport, void *private)
for (i=0;i<NSERVERS;i++) {
for (j=0;j<NINSTANCES;j++) {
if (servers[i].cli[j] &&
- transport != servers[i].cli[j]->transport &&
- cli_transport_pending(servers[i].cli[j]->transport)) {
- if (!cli_request_receive_next(servers[i].cli[j]->transport)) {
- printf("Connection to server %d instance %d died!\n",
- i, j);
- exit(1);
- }
+ transport != servers[i].cli[j]->transport) {
+ cli_transport_process(servers[i].cli[j]->transport);
}
}
}
@@ -808,13 +803,7 @@ static void check_pending(void)
for (j=0;j<NINSTANCES;j++) {
for (i=0;i<NSERVERS;i++) {
- if (cli_transport_pending(servers[i].cli[j]->transport)) {
- if (!cli_request_receive_next(servers[i].cli[j]->transport)) {
- printf("Connection to server %d instance %d died!\n",
- i, j);
- exit(1);
- }
- }
+ cli_transport_process(servers[i].cli[j]->transport);
}
}
}