From 656d03fcca9789ffe919d5fe64a4b5462b94f6c8 Mon Sep 17 00:00:00 2001 From: Benjamin Franzke Date: Sat, 28 May 2011 12:44:37 +0200 Subject: Move send_msg and recv_msg to messages.c --- src/Makefile.am | 2 +- src/cmumble.c | 199 +------------------------------------------------------- src/cmumble.h | 67 ++++++++++++++++--- src/messages.c | 139 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 202 insertions(+), 205 deletions(-) create mode 100644 src/messages.c diff --git a/src/Makefile.am b/src/Makefile.am index fdb5c84..389dba1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,7 +1,7 @@ bin_PROGRAMS = cmumble noinst_HEADERS = cmumble.h messages.h mumble.pb-c.h varint.h -cmumble_SOURCES = cmumble.c mumble.pb-c.c varint.c +cmumble_SOURCES = cmumble.c messages.c mumble.pb-c.c varint.c cmumble_LDADD = $(PROTOBUF_LIBS) $(GLIB_LIBS) $(GIO_LIBS) $(GSTREAMER_LIBS) $(CELT_LIBS) AM_CPPFLAGS = $(PROTOBUF_CFLAGS) $(GLIB_CFLAGS) $(GIO_CFLAGS) $(GSTREAMER_CFLAGS) $(CELT_CFLAGS) diff --git a/src/cmumble.c b/src/cmumble.c index f6fa9a5..f2036a8 100644 --- a/src/cmumble.c +++ b/src/cmumble.c @@ -1,71 +1,10 @@ #include "../config.h" -#include #include -#include -#include -#include - -#include -#include - -#include -#include -#include -#include - -#include -#include -#include - -#include "mumble.pb-c.h" #include "varint.h" #include "cmumble.h" -#define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0])) - -#define PREAMBLE_SIZE 6 - -struct context { - GMainLoop *loop; - - uint32_t session; - bool authenticated; - - GSocketClient *sock_client; - GSocketConnection *conn; - GSocket *sock; - GIOStream *iostream; - - uint8_t celt_header_packet[sizeof(CELTHeader)]; - CELTHeader celt_header; - CELTMode *celt_mode; - - GstElement *record_pipeline; - GstAppSink *sink; - - int64_t sequence; - - GList *users; -}; - -struct user { - uint32_t session; - char *name; - uint32_t user_id; - - GstElement *pipeline; - GstAppSrc *src; -}; - -enum udp_message_type { - udp_voice_celt_alpha, - udp_ping, - udp_voice_speex, - udp_voice_celt_beta -}; - static struct user * find_user(struct context *ctx, uint32_t session) { @@ -93,34 +32,6 @@ appsrc_push(GstAppSrc *src, const void *mem, size_t size) gst_app_src_push_buffer(src, gstbuf); } -static void -add_preamble(uint8_t *buffer, uint16_t type, uint32_t len) -{ - buffer[1] = (type) & 0xff; - buffer[0] = (type >> 8) & 0xff; - - buffer[5] = (len) & 0xff; - buffer[4] = (len >> 8) & 0xff; - buffer[3] = (len >> 16) & 0xff; - buffer[2] = (len >> 24) & 0xff; -} - -static void -get_preamble(uint8_t *buffer, int *type, int *len) -{ - uint16_t msgType; - uint32_t msgLen; - - msgType = buffer[1] | (buffer[0] << 8); - msgLen = buffer[5] | (buffer[4] << 8) | (buffer[3] << 16) | (buffer[2] << 24); - *type = (int)msgType; - *len = (int)msgLen; -} - -GStaticMutex write_mutex = G_STATIC_MUTEX_INIT; - -static void -send_msg(struct context *ctx, ProtobufCMessage *msg); static GstFlowReturn pull_buffer(GstAppSink *sink, gpointer user_data) @@ -303,7 +214,6 @@ recv_user_state(MumbleProto__UserState *state, struct context *ctx) ctx->users = g_list_prepend(ctx->users, user); } -typedef void (*callback_t)(void *, void *); static const callback_t callbacks[] = { [Version] = (callback_t) recv_version, @@ -334,116 +244,13 @@ static const callback_t callbacks[] = { [SuggestConfig] = (callback_t) NULL, }; -static void -send_msg(struct context *ctx, ProtobufCMessage *msg) -{ - uint8_t pad[128]; - uint8_t preamble[PREAMBLE_SIZE]; - int type = -1; - int i; - ProtobufCBufferSimple buffer = PROTOBUF_C_BUFFER_SIMPLE_INIT(pad); - GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream); - - for (i = 0; i < ARRAY_SIZE(messages); ++i) - if (messages[i].descriptor == msg->descriptor) - type = i; - assert(type >= 0); - - if (type == UDPTunnel) { - MumbleProto__UDPTunnel *tunnel = (MumbleProto__UDPTunnel *) msg; - buffer.data = tunnel->packet.data; - buffer.len = tunnel->packet.len; - buffer.must_free_data = 0; - } else { - protobuf_c_message_pack_to_buffer(msg, &buffer.base); - } - - add_preamble(preamble, type, buffer.len); - - g_static_mutex_lock(&write_mutex); - g_output_stream_write(output, preamble, PREAMBLE_SIZE, NULL, NULL); - g_output_stream_write(output, buffer.data, buffer.len, NULL, NULL); - g_static_mutex_unlock(&write_mutex); - - PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&buffer); -} - -static void -recv_msg(struct context *ctx, const callback_t *callbacks, uint32_t callback_size) -{ - uint8_t preamble[PREAMBLE_SIZE]; - ProtobufCMessage *msg; - void *data; - int type, len; - gssize ret; - GInputStream *input = g_io_stream_get_input_stream(ctx->iostream); - - ret = g_input_stream_read(input, preamble, PREAMBLE_SIZE, NULL, NULL); - - if (ret <= 0) { - printf("read failed: %ld\n", ret); - return; - } - - get_preamble(preamble, &type, &len); - - if (!(type >= 0 && type < ARRAY_SIZE(messages))) { - printf("unknown message type: %d\n", type); - return; - } - - if (len <= 0) { - printf("length 0\n"); - return; - } - - data = malloc(len); - if (data == NULL) { - printf("out of mem\n"); - abort(); - } - ret = g_input_stream_read(input, data, len, NULL, NULL); - - /* tunneled udp data - not a regular protobuf message - * create dummy ProtobufCMessage */ - if (type == UDPTunnel) { - MumbleProto__UDPTunnel udptunnel; - mumble_proto__udptunnel__init(&udptunnel); - - udptunnel.packet.len = len; - udptunnel.packet.data = data; - - if (callbacks[UDPTunnel]) - callbacks[UDPTunnel](&udptunnel.base, ctx); - - //handle_udp(ctx, data, len); - - free(data); - return; - } - - msg = protobuf_c_message_unpack(messages[type].descriptor, NULL, - len, data); - if (msg == NULL) { - printf("message unpack failure\n"); - return; - } - - printf("debug: received message: %s type:%d, len:%d\n", messages[type].name, type, len); - if (callbacks[type]) - callbacks[type](msg, ctx); - - protobuf_c_message_free_unpacked(msg, NULL); - free(data); -} - static gboolean do_ping(struct context *ctx) { MumbleProto__Ping ping; - struct timeval tv; + GTimeVal tv; - gettimeofday(&tv, NULL); + g_get_current_time(&tv); mumble_proto__ping__init(&ping); ping.timestamp = tv.tv_sec; @@ -461,7 +268,7 @@ read_cb(GSocket *socket, GIOCondition condition, gpointer data) GInputStream *input = g_io_stream_get_input_stream(ctx->iostream); do { - recv_msg(ctx, callbacks, ARRAY_SIZE(callbacks)); + recv_msg(ctx, callbacks, G_N_ELEMENTS(callbacks)); } while (g_input_stream_has_pending(input)); diff --git a/src/cmumble.h b/src/cmumble.h index 94ac8dd..baab8f3 100644 --- a/src/cmumble.h +++ b/src/cmumble.h @@ -1,21 +1,72 @@ #ifndef _CMUMBLE_H_ #define _CMUMBLE_H_ +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include "mumble.pb-c.h" #include "messages.h" +struct context { + GMainLoop *loop; + + uint32_t session; + gboolean authenticated; + + GSocketClient *sock_client; + GSocketConnection *conn; + GSocket *sock; + GIOStream *iostream; + + uint8_t celt_header_packet[sizeof(CELTHeader)]; + CELTHeader celt_header; + CELTMode *celt_mode; + + GstElement *record_pipeline; + GstAppSink *sink; + + int64_t sequence; + + GList *users; +}; + +struct user { + uint32_t session; + char *name; + uint32_t user_id; + + GstElement *pipeline; + GstAppSrc *src; +}; + +enum udp_message_type { + udp_voice_celt_alpha, + udp_ping, + udp_voice_speex, + udp_voice_celt_beta +}; + enum mumble_message { #define MUMBLE_MSG(a,b,c) a, MUMBLE_MSGS #undef MUMBLE_MSG }; -static const struct { - const ProtobufCMessageDescriptor *descriptor; - const char *name; -} messages[] = { -#define MUMBLE_MSG(a,b,c) { &mumble_proto_##b##__descriptor, c }, - MUMBLE_MSGS -#undef MUMBLE_MSG -}; +typedef void (*callback_t)(ProtobufCMessage *msg, struct context *); + +void +send_msg(struct context *ctx, ProtobufCMessage *msg); + +void +recv_msg(struct context *ctx, const callback_t *callbacks, uint32_t callback_size); #endif diff --git a/src/messages.c b/src/messages.c new file mode 100644 index 0000000..a5f2f6f --- /dev/null +++ b/src/messages.c @@ -0,0 +1,139 @@ +#include "cmumble.h" + +#define PREAMBLE_SIZE 6 + +GStaticMutex write_mutex = G_STATIC_MUTEX_INIT; + +static const struct { + const ProtobufCMessageDescriptor *descriptor; + const char *name; +} messages[] = { +#define MUMBLE_MSG(a,b,c) { &mumble_proto_##b##__descriptor, c }, + MUMBLE_MSGS +#undef MUMBLE_MSG +}; + +static void +add_preamble(uint8_t *buffer, uint16_t type, uint32_t len) +{ + buffer[1] = (type) & 0xff; + buffer[0] = (type >> 8) & 0xff; + + buffer[5] = (len) & 0xff; + buffer[4] = (len >> 8) & 0xff; + buffer[3] = (len >> 16) & 0xff; + buffer[2] = (len >> 24) & 0xff; +} + +static void +get_preamble(uint8_t *buffer, int *type, int *len) +{ + uint16_t msgType; + uint32_t msgLen; + + msgType = buffer[1] | (buffer[0] << 8); + msgLen = buffer[5] | (buffer[4] << 8) | (buffer[3] << 16) | (buffer[2] << 24); + *type = (int)msgType; + *len = (int)msgLen; +} + +void +send_msg(struct context *ctx, ProtobufCMessage *msg) +{ + uint8_t pad[128]; + uint8_t preamble[PREAMBLE_SIZE]; + int type = -1; + int i; + ProtobufCBufferSimple buffer = PROTOBUF_C_BUFFER_SIMPLE_INIT(pad); + GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream); + + for (i = 0; i < G_N_ELEMENTS(messages); ++i) + if (messages[i].descriptor == msg->descriptor) + type = i; + assert(type >= 0); + + if (type == UDPTunnel) { + MumbleProto__UDPTunnel *tunnel = (MumbleProto__UDPTunnel *) msg; + buffer.data = tunnel->packet.data; + buffer.len = tunnel->packet.len; + buffer.must_free_data = 0; + } else { + protobuf_c_message_pack_to_buffer(msg, &buffer.base); + } + + add_preamble(preamble, type, buffer.len); + + g_static_mutex_lock(&write_mutex); + g_output_stream_write(output, preamble, PREAMBLE_SIZE, NULL, NULL); + g_output_stream_write(output, buffer.data, buffer.len, NULL, NULL); + g_static_mutex_unlock(&write_mutex); + + PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&buffer); +} + +void +recv_msg(struct context *ctx, const callback_t *callbacks, uint32_t callback_size) +{ + uint8_t preamble[PREAMBLE_SIZE]; + ProtobufCMessage *msg; + gchar *data; + int type, len; + gssize ret; + GInputStream *input = g_io_stream_get_input_stream(ctx->iostream); + + ret = g_input_stream_read(input, preamble, PREAMBLE_SIZE, NULL, NULL); + + if (ret <= 0) { + g_printerr("read failed: %ld\n", ret); + return; + } + + get_preamble(preamble, &type, &len); + + if (!(type >= 0 && type < G_N_ELEMENTS(messages))) { + printf("unknown message type: %d\n", type); + return; + } + + if (len <= 0) { + g_printerr("length 0\n"); + return; + } + + data = g_malloc(len); + if (data == NULL) { + g_printerr("out of mem\n"); + g_main_loop_quit (ctx->loop); + } + ret = g_input_stream_read(input, data, len, NULL, NULL); + + /* tunneled udp data - not a regular protobuf message + * create dummy ProtobufCMessage */ + if (type == UDPTunnel) { + MumbleProto__UDPTunnel udptunnel; + mumble_proto__udptunnel__init(&udptunnel); + + udptunnel.packet.len = len; + udptunnel.packet.data = data; + + if (callbacks[UDPTunnel]) + callbacks[UDPTunnel](&udptunnel.base, ctx); + + g_free(data); + return; + } + + msg = protobuf_c_message_unpack(messages[type].descriptor, NULL, + len, data); + if (msg == NULL) { + g_printerr("message unpack failure\n"); + return; + } + + g_print("debug: received message: %s type:%d, len:%d\n", messages[type].name, type, len); + if (callbacks[type]) + callbacks[type](msg, ctx); + + protobuf_c_message_free_unpacked(msg, NULL); + g_free(data); +} -- cgit