/* 
   ctdb database library
   Utility functions to read/write blobs of data from a file descriptor
   and handle the case where we might need multiple read/writes to get all the
   data.

   Copyright (C) Andrew Tridgell  2006

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 3 of the License, or
   (at your option) any later version.
   
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
   
   You should have received a copy of the GNU General Public License
   along with this program; if not, see <http://www.gnu.org/licenses/>.
*/

#include "includes.h"
#include "lib/tdb/include/tdb.h"
#include "lib/events/events.h"
#include "lib/util/dlinklist.h"
#include "system/network.h"
#include "system/filesys.h"
#include "../include/ctdb_private.h"
#include "../include/ctdb.h"

/* structures for packet queueing - see common/ctdb_io.c */
struct ctdb_partial {
	uint8_t *data;
	uint32_t length;
};

struct ctdb_queue_pkt {
	struct ctdb_queue_pkt *next, *prev;
	uint8_t *data;
	uint32_t length;
	uint32_t full_length;
};

struct ctdb_queue {
	struct ctdb_context *ctdb;
	struct ctdb_partial partial; /* partial input packet */
	struct ctdb_queue_pkt *out_queue;
	struct fd_event *fde;
	int fd;
	size_t alignment;
	void *private_data;
	ctdb_queue_cb_fn_t callback;
};



/*
  called when an incoming connection is readable
*/
static void queue_io_read(struct ctdb_queue *queue)
{
	int num_ready = 0;
	ssize_t nread;
	uint8_t *data, *data_base;

	if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
		return;
	}
	if (num_ready == 0) {
		/* the descriptor has been closed */
		goto failed;
	}


	queue->partial.data = talloc_realloc(queue, queue->partial.data, 
					     uint8_t, 
					     num_ready + queue->partial.length);

	if (queue->partial.data == NULL) {
		DEBUG(0,("read error alloc failed for %u\n", 
			 num_ready + queue->partial.length));
		goto failed;
	}

	nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
	if (nread <= 0) {
		DEBUG(0,("read error nread=%d\n", (int)nread));
		goto failed;
	}


	data = queue->partial.data;
	nread += queue->partial.length;

	queue->partial.data = NULL;
	queue->partial.length = 0;

	if (nread >= 4 && *(uint32_t *)data == nread) {
		/* it is the responsibility of the incoming packet
		 function to free 'data' */
		queue->callback(data, nread, queue->private_data);
		return;
	}

	data_base = data;

	while (nread >= 4 && *(uint32_t *)data <= nread) {
		/* we have at least one packet */
		uint8_t *d2;
		uint32_t len;
		len = *(uint32_t *)data;
		if (len == 0) {
			/* bad packet! treat as EOF */
			DEBUG(0,("Invalid packet of length 0\n"));
			goto failed;
		}
		d2 = (uint8_t *)talloc_memdup(queue, data, len);
		if (d2 == NULL) {
			DEBUG(0,("read error memdup failed for %u\n", len));
			/* sigh */
			goto failed;
		}
		queue->callback(d2, len, queue->private_data);
		data += len;
		nread -= len;		
	}

	if (nread > 0) {
		/* we have only part of a packet */
		if (data_base == data) {
			queue->partial.data = data;
			queue->partial.length = nread;
		} else {
			queue->partial.data = (uint8_t *)talloc_memdup(queue, data, nread);
			if (queue->partial.data == NULL) {
				DEBUG(0,("read error memdup partial failed for %u\n", 
					 (unsigned)nread));
				goto failed;
			}
			queue->partial.length = nread;
			talloc_free(data_base);
		}
		return;
	}

	talloc_free(data_base);
	return;

failed:
	queue->callback(NULL, 0, queue->private_data);
}


/* used when an event triggers a dead queue */
static void queue_dead(struct event_context *ev, struct timed_event *te, 
		       struct timeval t, void *private_data)
{
	struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
	queue->callback(NULL, 0, queue->private_data);
}


/*
  called when an incoming connection is writeable
*/
static void queue_io_write(struct ctdb_queue *queue)
{
	while (queue->out_queue) {
		struct ctdb_queue_pkt *pkt = queue->out_queue;
		ssize_t n;
		if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
			n = write(queue->fd, pkt->data, 1);
		} else {
			n = write(queue->fd, pkt->data, pkt->length);
		}

		if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
			if (pkt->length != pkt->full_length) {
				/* partial packet sent - we have to drop it */
				DLIST_REMOVE(queue->out_queue, pkt);
				talloc_free(pkt);
			}
			talloc_free(queue->fde);
			queue->fde = NULL;
			queue->fd = -1;
			event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
					queue_dead, queue);
			return;
		}
		if (n <= 0) return;
		
		if (n != pkt->length) {
			pkt->length -= n;
			pkt->data += n;
			return;
		}

		DLIST_REMOVE(queue->out_queue, pkt);
		talloc_free(pkt);
	}

	EVENT_FD_NOT_WRITEABLE(queue->fde);
}

/*
  called when an incoming connection is readable or writeable
*/
static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
			     uint16_t flags, void *private_data)
{
	struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);

	if (flags & EVENT_FD_READ) {
		queue_io_read(queue);
	} else {
		queue_io_write(queue);
	}
}


/*
  queue a packet for sending
*/
int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
{
	struct ctdb_queue_pkt *pkt;
	uint32_t length2, full_length;

	if (queue->alignment) {
		/* enforce the length and alignment rules from the tcp packet allocator */
		length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
		*(uint32_t *)data = length2;
	} else {
		length2 = length;
	}

	if (length2 != length) {
		memset(data+length, 0, length2-length);
	}

	full_length = length2;
	
	/* if the queue is empty then try an immediate write, avoiding
	   queue overhead. This relies on non-blocking sockets */
	if (queue->out_queue == NULL && queue->fd != -1 &&
	    !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
		ssize_t n = write(queue->fd, data, length2);
		if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
			talloc_free(queue->fde);
			queue->fde = NULL;
			queue->fd = -1;
			event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
					queue_dead, queue);
			/* yes, we report success, as the dead node is 
			   handled via a separate event */
			return 0;
		}
		if (n > 0) {
			data += n;
			length2 -= n;
		}
		if (length2 == 0) return 0;
	}

	pkt = talloc(queue, struct ctdb_queue_pkt);
	CTDB_NO_MEMORY(queue->ctdb, pkt);

	pkt->data = (uint8_t *)talloc_memdup(pkt, data, length2);
	CTDB_NO_MEMORY(queue->ctdb, pkt->data);

	pkt->length = length2;
	pkt->full_length = full_length;

	if (queue->out_queue == NULL && queue->fd != -1) {
		EVENT_FD_WRITEABLE(queue->fde);
	}

	DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *);

	return 0;
}


/*
  setup the fd used by the queue
 */
int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
{
	queue->fd = fd;
	talloc_free(queue->fde);
	queue->fde = NULL;

	if (fd != -1) {
		queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
					  queue_io_handler, queue);
		if (queue->fde == NULL) {
			return -1;
		}

		if (queue->out_queue) {
			EVENT_FD_WRITEABLE(queue->fde);		
		}
	}

	return 0;
}



/*
  setup a packet queue on a socket
 */
struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
				    TALLOC_CTX *mem_ctx, int fd, int alignment,
				    
				    ctdb_queue_cb_fn_t callback,
				    void *private_data)
{
	struct ctdb_queue *queue;

	queue = talloc_zero(mem_ctx, struct ctdb_queue);
	CTDB_NO_MEMORY_NULL(ctdb, queue);

	queue->ctdb = ctdb;
	queue->fd = fd;
	queue->alignment = alignment;
	queue->private_data = private_data;
	queue->callback = callback;
	if (fd != -1) {
		if (ctdb_queue_set_fd(queue, fd) != 0) {
			talloc_free(queue);
			return NULL;
		}
	}

	return queue;
}