From 5ddf678e0113f81aa2b5f99134cda4fe8c01afb7 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Fri, 23 Jul 2004 06:40:49 +0000 Subject: r1578: the first stage of the async client rewrite. Up to now the client code has had an async API, and operated asynchronously at the packet level, but was not truly async in that it assumed that it could always write to the socket and when a partial packet came in that it could block waiting for the rest of the packet. This change makes the SMB client library full async, by adding a separate outgoing packet queue, using non-blocking socket IO and having a input buffer that can fill asynchonously until the full packet has arrived. The main complexity was in dealing with the events structure when using the CIFS proxy backend. In that case the same events structure needs to be used in both the client library and the main smbd server, so that when the client library is waiting for a reply that the main server keeps processing packets. This required some changes in the events library code. Next step is to make the generated rpc client code use these new capabilities. (This used to be commit 96bf4da3edc4d64b0f58ef520269f3b385b8da02) --- source4/libcli/raw/clisocket.c | 8 +- source4/libcli/raw/clitransport.c | 337 +++++++++++++++++++++++++++++++++----- source4/libcli/raw/rawrequest.c | 186 +-------------------- 3 files changed, 306 insertions(+), 225 deletions(-) (limited to 'source4/libcli') diff --git a/source4/libcli/raw/clisocket.c b/source4/libcli/raw/clisocket.c index 5cd6f33689..eb5d3c0342 100644 --- a/source4/libcli/raw/clisocket.c +++ b/source4/libcli/raw/clisocket.c @@ -72,7 +72,13 @@ BOOL cli_sock_connect(struct cli_socket *sock, struct in_addr *ip, int port) &sock->dest_ip, sock->port, LONG_CONNECT_TIMEOUT); - return (sock->fd != -1); + if (sock->fd == -1) { + return False; + } + + set_blocking(sock->fd, False); + + return True; } diff --git a/source4/libcli/raw/clitransport.c b/source4/libcli/raw/clitransport.c index 18784fe33a..96d5a18a71 100644 --- a/source4/libcli/raw/clitransport.c +++ b/source4/libcli/raw/clitransport.c @@ -21,6 +21,17 @@ #include "includes.h" +/* + an event has happened on the socket +*/ +static void cli_transport_event_handler(struct event_context *ev, struct fd_event *fde, + time_t t, uint16_t flags) +{ + struct cli_transport *transport = fde->private; + + cli_transport_process(transport); +} + /* create a transport structure based on an established socket */ @@ -28,6 +39,7 @@ struct cli_transport *cli_transport_init(struct cli_socket *sock) { TALLOC_CTX *mem_ctx; struct cli_transport *transport; + struct fd_event fde; mem_ctx = talloc_init("cli_transport"); if (!mem_ctx) return NULL; @@ -35,6 +47,12 @@ struct cli_transport *cli_transport_init(struct cli_socket *sock) transport = talloc_zero(mem_ctx, sizeof(*transport)); if (!transport) return NULL; + transport->event.ctx = event_context_init(); + if (transport->event.ctx == NULL) { + talloc_destroy(mem_ctx); + return NULL; + } + transport->mem_ctx = mem_ctx; transport->socket = sock; transport->negotiate.protocol = PROTOCOL_NT1; @@ -47,6 +65,14 @@ struct cli_transport *cli_transport_init(struct cli_socket *sock) ZERO_STRUCT(transport->called); + fde.fd = sock->fd; + fde.flags = EVENT_FD_READ; + fde.handler = cli_transport_event_handler; + fde.private = transport; + fde.ref_count = 1; + + transport->event.fde = event_add_fd(transport->event.ctx, &fde); + return transport; } @@ -59,6 +85,9 @@ void cli_transport_close(struct cli_transport *transport) transport->reference_count--; if (transport->reference_count <= 0) { cli_sock_close(transport->socket); + event_remove_fd(transport->event.ctx, transport->event.fde); + event_remove_timed(transport->event.ctx, transport->event.te); + event_context_destroy(transport->event.ctx); talloc_destroy(transport->mem_ctx); } } @@ -72,6 +101,21 @@ void cli_transport_dead(struct cli_transport *transport) } +/* + enable select for write on a transport +*/ +static void cli_transport_write_enable(struct cli_transport *transport) +{ + transport->event.fde->flags |= EVENT_FD_WRITE; +} + +/* + disable select for write on a transport +*/ +static void cli_transport_write_disable(struct cli_transport *transport) +{ + transport->event.fde->flags &= ~EVENT_FD_WRITE; +} /**************************************************************************** send a session request (if appropriate) @@ -145,7 +189,7 @@ again: /* the zero mid is reserved for requests that don't have a mid */ if (mid == 0) mid = 1; - for (req=transport->pending_requests; req; req=req->next) { + for (req=transport->pending_recv; req; req=req->next) { if (req->mid == mid) { mid++; goto again; @@ -156,81 +200,284 @@ again: return mid; } +static void idle_handler(struct event_context *ev, + struct timed_event *te, time_t t) +{ + struct cli_transport *transport = te->private; + te->next_event = t + transport->idle.period; + transport->idle.func(transport, transport->idle.private); +} + /* setup the idle handler for a transport + the period is in seconds */ void cli_transport_idle_handler(struct cli_transport *transport, void (*idle_func)(struct cli_transport *, void *), uint_t period, void *private) { + struct timed_event te; transport->idle.func = idle_func; transport->idle.private = private; transport->idle.period = period; -} + if (transport->event.te != NULL) { + event_remove_timed(transport->event.ctx, transport->event.te); + } + + te.next_event = time(NULL) + period; + te.handler = idle_handler; + te.private = transport; + transport->event.te = event_add_timed(transport->event.ctx, &te); +} /* - determine if a packet is pending for receive on a transport + process some pending sends */ -BOOL cli_transport_pending(struct cli_transport *transport) +static void cli_transport_process_send(struct cli_transport *transport) { - return socket_pending(transport->socket->fd); -} - + while (transport->pending_send) { + struct cli_request *req = transport->pending_send; + ssize_t ret; + ret = cli_sock_write(transport->socket, req->out.buffer, req->out.size); + if (ret == -1) { + if (errno == EAGAIN || errno == EINTR) { + return; + } + cli_transport_dead(transport); + } + req->out.buffer += ret; + req->out.size -= ret; + if (req->out.size == 0) { + req->state = CLI_REQUEST_RECV; + DLIST_REMOVE(transport->pending_send, req); + DLIST_ADD(transport->pending_recv, req); + } + } + /* we're out of requests to send, so don't wait for write + events any more */ + cli_transport_write_disable(transport); +} /* - wait for data on a transport, periodically calling a wait function - if one has been defined - return True if a packet is received -*/ -BOOL cli_transport_select(struct cli_transport *transport) + we have a full request in our receive buffer - match it to a pending request + and process + */ +static void cli_transport_finish_recv(struct cli_transport *transport) { - fd_set fds; - int selrtn; - int fd; - struct timeval timeout; + uint8_t *buffer, *hdr, *vwv; + int len; + uint16_t wct, mid = 0; + struct cli_request *req; - fd = transport->socket->fd; + buffer = transport->recv_buffer.buffer; + len = transport->recv_buffer.req_size; - if (fd == -1) { - return False; + hdr = buffer+NBT_HDR_SIZE; + vwv = hdr + HDR_VWV; + + /* see if it could be an oplock break request */ + if (handle_oplock_break(transport, len, hdr, vwv)) { + talloc_free(transport->mem_ctx, buffer); + ZERO_STRUCT(transport->recv_buffer); + return; } - do { - uint_t period = 1000; + /* at this point we need to check for a readbraw reply, as + these can be any length */ + if (transport->readbraw_pending) { + transport->readbraw_pending = 0; + + /* it must match the first entry in the pending queue + as the client is not allowed to have outstanding + readbraw requests */ + req = transport->pending_recv; + if (!req) goto error; + + req->in.buffer = buffer; + talloc_steal(transport->mem_ctx, req->mem_ctx, buffer); + req->in.size = len + NBT_HDR_SIZE; + req->in.allocated = req->in.size; + goto async; + } - FD_ZERO(&fds); - FD_SET(fd,&fds); - - if (transport->idle.func) { - period = transport->idle.period; + if (len >= MIN_SMB_SIZE) { + /* extract the mid for matching to pending requests */ + mid = SVAL(hdr, HDR_MID); + wct = CVAL(hdr, HDR_WCT); + } + + /* match the incoming request against the list of pending requests */ + for (req=transport->pending_recv; req; req=req->next) { + if (req->mid == mid) break; + } + + if (!req) { + DEBUG(1,("Discarding unmatched reply with mid %d\n", mid)); + goto error; + } + + /* fill in the 'in' portion of the matching request */ + req->in.buffer = buffer; + talloc_steal(transport->mem_ctx, req->mem_ctx, buffer); + req->in.size = len + NBT_HDR_SIZE; + req->in.allocated = req->in.size; + + /* handle non-SMB replies */ + if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE) { + req->state = CLI_REQUEST_ERROR; + goto error; + } + + if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) { + DEBUG(2,("bad reply size for mid %d\n", mid)); + req->status = NT_STATUS_UNSUCCESSFUL; + req->state = CLI_REQUEST_ERROR; + goto error; + } + + req->in.hdr = hdr; + req->in.vwv = vwv; + req->in.wct = wct; + if (req->in.size >= NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) { + req->in.data = req->in.vwv + VWV(wct) + 2; + req->in.data_size = SVAL(req->in.vwv, VWV(wct)); + if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct) + req->in.data_size) { + DEBUG(3,("bad data size for mid %d\n", mid)); + /* blergh - w2k3 gives a bogus data size values in some + openX replies */ + req->in.data_size = req->in.size - (NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)); } + } + req->in.ptr = req->in.data; + req->flags2 = SVAL(req->in.hdr, HDR_FLG2); + + if (!(req->flags2 & FLAGS2_32_BIT_ERROR_CODES)) { + transport->error.etype = ETYPE_DOS; + transport->error.e.dos.eclass = CVAL(req->in.hdr,HDR_RCLS); + transport->error.e.dos.ecode = SVAL(req->in.hdr,HDR_ERR); + req->status = dos_to_ntstatus(transport->error.e.dos.eclass, + transport->error.e.dos.ecode); + } else { + transport->error.etype = ETYPE_NT; + transport->error.e.nt_status = NT_STATUS(IVAL(req->in.hdr, HDR_RCLS)); + req->status = transport->error.e.nt_status; + } + + if (!cli_request_check_sign_mac(req)) { + transport->error.etype = ETYPE_SOCKET; + transport->error.e.socket_error = SOCKET_READ_BAD_SIG; + req->state = CLI_REQUEST_ERROR; + goto error; + }; + +async: + /* if this request has an async handler then call that to + notify that the reply has been received. This might destroy + the request so it must happen last */ + ZERO_STRUCT(transport->recv_buffer); + DLIST_REMOVE(transport->pending_recv, req); + req->state = CLI_REQUEST_DONE; + if (req->async.fn) { + req->async.fn(req); + } + return; + +error: + if (req) { + DLIST_REMOVE(transport->pending_recv, req); + req->state = CLI_REQUEST_ERROR; + } + ZERO_STRUCT(transport->recv_buffer); +} - timeout.tv_sec = period / 1000; - timeout.tv_usec = 1000*(period%1000); - - selrtn = sys_select_intr(fd+1,&fds,NULL,NULL,&timeout); - - if (selrtn == 1) { - /* the fd is readable */ - return True; +/* + process some pending receives +*/ +static void cli_transport_process_recv(struct cli_transport *transport) +{ + /* a incoming packet goes through 2 stages - first we read the + 4 byte header, which tells us how much more is coming. Then + we read the rest */ + if (transport->recv_buffer.received < NBT_HDR_SIZE) { + ssize_t ret; + ret = cli_sock_read(transport->socket, + transport->recv_buffer.header + + transport->recv_buffer.received, + NBT_HDR_SIZE - transport->recv_buffer.received); + if (ret == -1) { + if (errno == EINTR || errno == EAGAIN) { + return; + } + cli_transport_dead(transport); + return; } - - if (selrtn == -1) { - /* sys_select_intr() already handles EINTR, so this - is an error. The socket is probably dead */ - return False; + + transport->recv_buffer.received += ret; + + if (transport->recv_buffer.received == NBT_HDR_SIZE) { + /* we've got a full header */ + transport->recv_buffer.req_size = smb_len(transport->recv_buffer.header) + NBT_HDR_SIZE; + transport->recv_buffer.buffer = talloc(transport->mem_ctx, + NBT_HDR_SIZE+transport->recv_buffer.req_size); + if (transport->recv_buffer.buffer == NULL) { + cli_transport_dead(transport); + return; + } + memcpy(transport->recv_buffer.buffer, transport->recv_buffer.header, NBT_HDR_SIZE); } - - /* only other possibility is that we timed out - call the idle function - if there is one */ - if (transport->idle.func) { - transport->idle.func(transport, transport->idle.private); + } + + if (transport->recv_buffer.received < transport->recv_buffer.req_size) { + ssize_t ret; + ret = cli_sock_read(transport->socket, + transport->recv_buffer.buffer + + transport->recv_buffer.received, + transport->recv_buffer.req_size - + transport->recv_buffer.received); + if (ret == -1) { + if (errno == EINTR || errno == EAGAIN) { + return; + } + cli_transport_dead(transport); + return; } - } while (selrtn == 0); + transport->recv_buffer.received += ret; + } + + if (transport->recv_buffer.received != 0 && + transport->recv_buffer.received == transport->recv_buffer.req_size) { + cli_transport_finish_recv(transport); + } +} +/* + process some read/write requests that are pending + return False if the socket is dead +*/ +BOOL cli_transport_process(struct cli_transport *transport) +{ + cli_transport_process_send(transport); + cli_transport_process_recv(transport); + if (transport->socket->fd == -1) { + return False; + } return True; } + + +/* + put a request into the send queue +*/ +void cli_transport_send(struct cli_request *req) +{ + /* put it on the outgoing socket queue */ + req->state = CLI_REQUEST_SEND; + DLIST_ADD(req->transport->pending_send, req); + + /* make sure we look for write events */ + cli_transport_write_enable(req->transport); +} diff --git a/source4/libcli/raw/rawrequest.c b/source4/libcli/raw/rawrequest.c index c31f07505f..ce6cd0a1a4 100644 --- a/source4/libcli/raw/rawrequest.c +++ b/source4/libcli/raw/rawrequest.c @@ -43,7 +43,7 @@ NTSTATUS cli_request_destroy(struct cli_request *req) if (req->transport) { /* remove it from the list of pending requests (a null op if its not in the list) */ - DLIST_REMOVE(req->transport->pending_requests, req); + DLIST_REMOVE(req->transport->pending_recv, req); } /* ahh, its so nice to destroy a complex structure in such a @@ -79,6 +79,7 @@ struct cli_request *cli_request_setup_nonsmb(struct cli_transport *transport, ui ZERO_STRUCTP(req); /* setup the request context */ + req->state = CLI_REQUEST_INIT; req->mem_ctx = mem_ctx; req->transport = transport; req->session = NULL; @@ -266,32 +267,20 @@ static void cli_req_grow_data(struct cli_request *req, uint_t new_size) SSVAL(req->out.vwv, VWV(req->out.wct), new_size); } + /* send a message */ BOOL cli_request_send(struct cli_request *req) { - uint_t ret; - if (IVAL(req->out.buffer, 0) == 0) { _smb_setlen(req->out.buffer, req->out.size - NBT_HDR_SIZE); } cli_request_calculate_sign_mac(req); - ret = cli_sock_write(req->transport->socket, req->out.buffer, req->out.size); - - if (req->out.size != ret) { - req->transport->error.etype = ETYPE_SOCKET; - req->transport->error.e.socket_error = SOCKET_WRITE_ERROR; - DEBUG(0,("Error writing %d bytes to server - %s\n", - (int)req->out.size, strerror(errno))); - return False; - } + cli_transport_send(req); - /* add it to the list of pending requests */ - DLIST_ADD(req->transport->pending_requests, req); - return True; } @@ -306,17 +295,8 @@ BOOL cli_request_receive(struct cli_request *req) if (!req) return False; /* keep receiving packets until this one is replied to */ - while (!req->in.buffer) { - if (!cli_transport_select(req->transport)) { - req->status = NT_STATUS_UNSUCCESSFUL; - return False; - } - - if (!cli_request_receive_next(req->transport)) { - cli_transport_dead(req->transport); - req->status = NT_STATUS_UNEXPECTED_NETWORK_ERROR; - return False; - } + while (req->state <= CLI_REQUEST_RECV) { + event_loop_once(req->transport->event.ctx); } return True; @@ -327,7 +307,7 @@ BOOL cli_request_receive(struct cli_request *req) handle oplock break requests from the server - return True if the request was an oplock break */ -static BOOL handle_oplock_break(struct cli_transport *transport, uint_t len, const char *hdr, const char *vwv) +BOOL handle_oplock_break(struct cli_transport *transport, uint_t len, const char *hdr, const char *vwv) { /* we must be very fussy about what we consider an oplock break to avoid matching readbraw replies */ @@ -350,158 +330,6 @@ static BOOL handle_oplock_break(struct cli_transport *transport, uint_t len, con return True; } - -/* - receive an async message from the server - this function assumes that the caller already knows that the socket is readable - and that there is a packet waiting - - The packet is not actually returned by this function, instead any - registered async message handlers are called - - return True if a packet was successfully received and processed - return False if the socket appears to be dead -*/ -BOOL cli_request_receive_next(struct cli_transport *transport) -{ - BOOL ret; - int len; - char header[NBT_HDR_SIZE]; - char *buffer, *hdr, *vwv; - TALLOC_CTX *mem_ctx; - struct cli_request *req; - uint16_t wct, mid = 0; - - len = cli_sock_read(transport->socket, header, 4); - if (len != 4) { - return False; - } - - len = smb_len(header); - - mem_ctx = talloc_init("cli_request_receive_next"); - - /* allocate the incoming buffer at the right size */ - buffer = talloc(mem_ctx, len+NBT_HDR_SIZE); - if (!buffer) { - talloc_destroy(mem_ctx); - return False; - } - - /* fill in the already received header */ - memcpy(buffer, header, NBT_HDR_SIZE); - - ret = cli_sock_read(transport->socket, buffer + NBT_HDR_SIZE, len); - /* If the server is not responding, note that now */ - if (ret != len) { - return False; - } - - hdr = buffer+NBT_HDR_SIZE; - vwv = hdr + HDR_VWV; - - /* see if it could be an oplock break request */ - if (handle_oplock_break(transport, len, hdr, vwv)) { - goto done; - } - - /* at this point we need to check for a readbraw reply, as these can be any length */ - if (transport->readbraw_pending) { - transport->readbraw_pending = 0; - - /* it must match the first entry in the pending queue as the client is not allowed - to have outstanding readbraw requests */ - req = transport->pending_requests; - if (!req) goto done; - - req->in.buffer = buffer; - talloc_steal(mem_ctx, req->mem_ctx, buffer); - req->in.size = len + NBT_HDR_SIZE; - req->in.allocated = req->in.size; - goto async; - } - - if (len >= MIN_SMB_SIZE) { - /* extract the mid for matching to pending requests */ - mid = SVAL(hdr, HDR_MID); - wct = CVAL(hdr, HDR_WCT); - } - - /* match the incoming request against the list of pending requests */ - for (req=transport->pending_requests; req; req=req->next) { - if (req->mid == mid) break; - } - - if (!req) { - DEBUG(3,("Discarding unmatched reply with mid %d\n", mid)); - goto done; - } - - /* fill in the 'in' portion of the matching request */ - req->in.buffer = buffer; - talloc_steal(mem_ctx, req->mem_ctx, buffer); - req->in.size = len + NBT_HDR_SIZE; - req->in.allocated = req->in.size; - - /* handle non-SMB replies */ - if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE) { - goto done; - } - - if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) { - DEBUG(2,("bad reply size for mid %d\n", mid)); - req->status = NT_STATUS_UNSUCCESSFUL; - goto done; - } - - req->in.hdr = hdr; - req->in.vwv = vwv; - req->in.wct = wct; - if (req->in.size >= NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) { - req->in.data = req->in.vwv + VWV(wct) + 2; - req->in.data_size = SVAL(req->in.vwv, VWV(wct)); - if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct) + req->in.data_size) { - DEBUG(3,("bad data size for mid %d\n", mid)); - /* blergh - w2k3 gives a bogus data size values in some - openX replies */ - req->in.data_size = req->in.size - (NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)); - } - } - req->in.ptr = req->in.data; - req->flags2 = SVAL(req->in.hdr, HDR_FLG2); - - if (!(req->flags2 & FLAGS2_32_BIT_ERROR_CODES)) { - transport->error.etype = ETYPE_DOS; - transport->error.e.dos.eclass = CVAL(req->in.hdr,HDR_RCLS); - transport->error.e.dos.ecode = SVAL(req->in.hdr,HDR_ERR); - req->status = dos_to_ntstatus(transport->error.e.dos.eclass, - transport->error.e.dos.ecode); - } else { - transport->error.etype = ETYPE_NT; - transport->error.e.nt_status = NT_STATUS(IVAL(req->in.hdr, HDR_RCLS)); - req->status = transport->error.e.nt_status; - } - - if (!cli_request_check_sign_mac(req)) { - transport->error.etype = ETYPE_SOCKET; - transport->error.e.socket_error = SOCKET_READ_BAD_SIG; - return False; - }; - -async: - /* if this request has an async handler then call that to - notify that the reply has been received. This might destroy - the request so it must happen last */ - if (req->async.fn) { - req->async.fn(req); - } - -done: - talloc_destroy(mem_ctx); - return True; -} - - /* wait for a reply to be received for a packet that just returns an error code and nothing more -- cgit