From 52049f3d0a61e93879a8c783f2ac8225d9322905 Mon Sep 17 00:00:00 2001 From: Benjamin Franzke Date: Wed, 25 May 2011 13:59:21 +0200 Subject: Talk! --- src/socket.c | 427 +++++++++++++++++++++++++++++++++++++---------------------- src/varint.c | 103 ++++++++++++++ src/varint.h | 12 ++ 3 files changed, 381 insertions(+), 161 deletions(-) create mode 100644 src/varint.c create mode 100644 src/varint.h (limited to 'src') diff --git a/src/socket.c b/src/socket.c index 332c60d..5c1fa5e 100644 --- a/src/socket.c +++ b/src/socket.c @@ -3,8 +3,7 @@ #include #include - -#include "mumble.pb-c.h" +#include #include "polarssl/net.h" #include "polarssl/ssl.h" @@ -16,11 +15,16 @@ #include #include +#include #include #include #include +#include + +#include "mumble.pb-c.h" +#include "varint.h" #include "messages.h" #define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0])) @@ -42,109 +46,29 @@ struct context { CELTHeader celt_header; CELTMode *celt_mode; - GstElement *pipeline; + GstElement *playback_pipeline; + GstElement *record_pipeline; GstAppSrc *src; + GstAppSink *sink; + + int64_t sequence; }; enum udp_message_type { udp_voice_celt_alpha, udp_ping, udp_voice_speex, - udp_voice_celt_beta, + udp_voice_celt_beta }; -int64_t -decode_varint(uint8_t *data, uint32_t *read, uint32_t left) -{ - int64_t varint = 0; - - /* 1 byte with 7 · 8 + 1 leading zeroes */ - if ((data[0] & 0x80) == 0x00) { - varint = data[0] & 0x7F; - *read = 1; - /* 2 bytes with 6 · 8 + 2 leading zeroes */ - } else if ((data[0] & 0xC0) == 0x80) { - varint = ((data[0] & 0x3F) << 8) | data[1]; - *read = 2; - /* 3 bytes with 5 · 8 + 3 leading zeroes */ - } else if ((data[0] & 0xE0) == 0xC0) { - varint = (((data[0] & 0x1F) << 16) | - (data[1] << 8) | (data[2])); - *read = 3; - /* 4 bytes with 4 · 8 + 4 leading zeroes */ - } else if ((data[0] & 0xF0) == 0xE0) { - varint = (((data[0] & 0x0F) << 24) | (data[1] << 16) | - (data[2] << 8) | (data[3])); - *read = 4; - } else /* if ((data[pos] & 0xF0) == 0xF0) */ { - switch (data[0] & 0xFC) { - /* 32-bit positive number */ - case 0xF0: - varint = ((data[1] << 24) | (data[2] << 16) | - (data[3] << 8) | data[4]); - *read = 1 + 4; - break; - /* 64-bit number */ - case 0xF4: - varint = - ((int64_t)data[1] << 56) | ((int64_t)data[2] << 48) | - ((int64_t)data[3] << 40) | ((int64_t)data[4] << 32) | - (data[5] << 24) | (data[6] << 16) | - (data[7] << 8) | (data[8] << 0); - *read = 1 + 8; - break; - /* Negative varint */ - case 0xF8: - /* FIXME: handle endless recursion */ - varint = -decode_varint(&data[1], read, left - 1); - *read += 1; - break; - /* Negative two bit number */ - case 0xFC: - varint = -(int)(data[0] & 0x03); - *read = 1; - break; - } - } - - return varint; -} - static void -handle_udp(struct context *ctx, uint8_t *data, uint32_t len) +appsrc_push(GstAppSrc *src, const void *mem, size_t size) { - int64_t session; - int64_t sequence; - int pos = 1; - int read = 0; - - int frame_len, term; - static int iseq = 0; - - session = decode_varint(&data[pos], &read, len-pos); - pos += read; - sequence = decode_varint(&data[pos], &read, len-pos); - pos += read; - printf("session: %ld, sequence: %ld\n", session, sequence); - - do { - frame_len = (data[pos] & 0x7F); - term = (data[pos] & 0x80) == 0x80; - printf("_len: %d, term: %d\n", frame_len, term); - pos += 1; - - if (frame_len == 0) - break; - - void *src = malloc(frame_len); - memcpy(src, &data[pos], frame_len); - - GstBuffer *gstbuf = gst_app_buffer_new(src, frame_len, free, NULL); - gst_app_src_push_buffer(ctx->src, gstbuf); - - pos += frame_len; - sequence++; - } while (term); + GstBuffer *gstbuf; + + gstbuf = gst_app_buffer_new(g_memdup(mem, size), size, + g_free, NULL); + gst_app_src_push_buffer(src, gstbuf); } static void @@ -171,6 +95,99 @@ get_preamble(uint8_t *buffer, int *type, int *len) *len = (int)msgLen; } +pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER; + +static GstFlowReturn +pull_buffer(GstAppSink *sink, gpointer user_data) +{ + struct context *ctx = user_data; + GstBuffer *buf; + uint8_t data[1024]; + uint32_t write = 0; + uint32_t pos = 0; + int ret; + + static uint64_t seq = 0; + + /* header will be written at the end */ + pos = PREAMBLE_SIZE; + + buf = gst_app_sink_pull_buffer(ctx->sink); + + ++seq; + if (seq <= 2) { + gst_buffer_unref(buf); + pthread_mutex_unlock(&mutex1); + return GST_FLOW_OK; + } + if (GST_BUFFER_SIZE(buf) > 127) { + g_printerr("GOT TOO BIG BUFFER\n"); + pthread_mutex_unlock(&mutex1); + return GST_FLOW_ERROR; + } + + data[pos++] = (udp_voice_celt_alpha) | (0 << 4); + + encode_varint(&data[pos], &write, ++ctx->sequence, 1024-pos); + pos += write; + + data[pos++] = 0x00 /*: 0x80 */ | (GST_BUFFER_SIZE(buf) & 0x7F); + memcpy(&data[pos], GST_BUFFER_DATA(buf), GST_BUFFER_SIZE(buf)); + pos += GST_BUFFER_SIZE(buf); + + gst_buffer_unref(buf); + + add_preamble(&data[0], 1, pos-PREAMBLE_SIZE); + pthread_mutex_lock(&mutex1); + while ((ret = ssl_write(&ctx->ssl, data, PREAMBLE_SIZE)) < PREAMBLE_SIZE) { + if (ret != POLARSSL_ERR_NET_TRY_AGAIN) { + printf("write failed: %d\n", ret); + abort(); + } + } + while ((ret = ssl_write(&ctx->ssl, &data[PREAMBLE_SIZE], pos-PREAMBLE_SIZE)) < (pos-PREAMBLE_SIZE)) { + if (ret != POLARSSL_ERR_NET_TRY_AGAIN) { + printf("write failed: %d\n", ret); + abort(); + } + } + pthread_mutex_unlock(&mutex1); + + return GST_FLOW_OK; +} + +static void +handle_udp(struct context *ctx, uint8_t *data, uint32_t len) +{ + int64_t session; + int64_t sequence; + uint32_t pos = 1; + uint32_t read = 0; + uint8_t frame_len, terminator; + + printf("data[0]: 0x%x\n", data[0]); + printf("len: %u\n", len); + session = decode_varint(&data[pos], &read, len-pos); + pos += read; + sequence = decode_varint(&data[pos], &read, len-pos); + pos += read; + printf("session: %ld, sequence: %ld\n", session, sequence); + + do { + frame_len = data[pos] & 0x7F; + terminator = data[pos] & 0x80; + pos += 1; + + if (frame_len == 0 || frame_len > len-pos) + break; + + appsrc_push(ctx->src, &data[pos], frame_len); + + pos += frame_len; + sequence++; + } while (terminator); +} + static void recv_version(MumbleProto__Version *version, struct context *ctx) { @@ -185,6 +202,15 @@ recv_channel_state(MumbleProto__ChannelState *state, struct context *ctx) state->channel_id, state->parent, state->name, state->description, state->temporary, state->position); } +static void +recv_server_sync(MumbleProto__ServerSync *sync, struct context *ctx) +{ + ctx->session = sync->session; + + printf("got session: %d\n", ctx->session); + +} + static void send_msg(struct context *ctx, ProtobufCMessage *msg) { @@ -203,6 +229,7 @@ send_msg(struct context *ctx, ProtobufCMessage *msg) protobuf_c_message_pack_to_buffer(msg, &buffer.base); add_preamble(preamble, type, buffer.len); + pthread_mutex_lock(&mutex1); while ((ret = ssl_write(&ctx->ssl, preamble, PREAMBLE_SIZE)) <= 0) { if (ret != POLARSSL_ERR_NET_TRY_AGAIN) { printf("write failed: %d\n", ret); @@ -215,6 +242,7 @@ send_msg(struct context *ctx, ProtobufCMessage *msg) abort(); } } + pthread_mutex_unlock(&mutex1); PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&buffer); } @@ -228,15 +256,13 @@ recv_msg(struct context *ctx, const callback_t *callbacks, uint32_t callback_siz ProtobufCMessage *msg; void *data; int type, len; - int ret, i; - - printf("recv msg\n"); + int ret; do { ret = ssl_read(&ctx->ssl, preamble, 6); if (ret == POLARSSL_ERR_NET_CONN_RESET) { printf("conn reset\n"); - exit(1); + g_main_loop_quit (ctx->loop); } } while (ret == POLARSSL_ERR_NET_TRY_AGAIN); @@ -314,6 +340,7 @@ do_ping(struct context *ctx) static const callback_t callbacks[] = { /* VERSION */ (callback_t) recv_version, + [5] = (callback_t) recv_server_sync, [7] = (callback_t) recv_channel_state, [127] = NULL, }; @@ -327,20 +354,21 @@ _recv(GIOChannel *source, GIOCondition condition, gpointer data) recv_msg(ctx, callbacks, ARRAY_SIZE(callbacks)); } while (ssl_get_bytes_avail(&ctx->ssl) > 0); - do_ping(ctx); + //do_ping(ctx); return TRUE; } static gboolean bus_call(GstBus *bus, GstMessage *msg, gpointer data) { - GMainLoop *loop = (GMainLoop *) data; + struct context *ctx = data; + GMainLoop *loop = ctx->loop; switch (GST_MESSAGE_TYPE (msg)) { case GST_MESSAGE_EOS: g_print ("End of stream\n"); - //g_main_loop_quit (loop); + g_main_loop_quit (loop); break; case GST_MESSAGE_ERROR: @@ -358,21 +386,27 @@ bus_call(GstBus *bus, GstMessage *msg, gpointer data) break; } default: - g_print("unhandled message: %d\n", GST_MESSAGE_TYPE(msg)); + g_print("unhandled message: %d %s\n", GST_MESSAGE_TYPE(msg), gst_message_type_get_name(GST_MESSAGE_TYPE(msg))); break; } + + return TRUE; } static void app_need_data(GstAppSrc *src, guint length, gpointer user_data) { +#if 0 struct context *ctx = user_data; +#endif } static void app_enough_data(GstAppSrc *src, gpointer user_data) { +#if 0 struct context *ctx = user_data; +#endif } static GstAppSrcCallbacks app_callbacks = { @@ -381,6 +415,130 @@ static GstAppSrcCallbacks app_callbacks = { NULL }; +static int +setup_playback_gst_pipeline(struct context *ctx) +{ + GstElement *pipeline, *src, *decoder, *conv, *sink; + GstBus *bus; + + pipeline = gst_pipeline_new("cmumble-output"); + src = gst_element_factory_make("appsrc", "input"); + decoder = gst_element_factory_make("celtdec", "celt-decoder"); + conv = gst_element_factory_make("audioconvert", "converter"); + sink = gst_element_factory_make("autoaudiosink", "audio-output"); + + if (!pipeline || !src || !decoder || !conv || !sink) { + g_printerr("failed to initialize pipeline\n"); + return -1; + } + + bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + gst_bus_add_watch(bus, bus_call, ctx); + gst_object_unref(bus); + + gst_bin_add_many(GST_BIN(pipeline), + src, decoder, conv, sink, NULL); + gst_element_link_many(src, decoder, conv, sink, NULL); + + ctx->src = GST_APP_SRC(src); + ctx->playback_pipeline = pipeline; + + /* Important! */ + gst_base_src_set_live(GST_BASE_SRC(ctx->src), TRUE); + gst_base_src_set_do_timestamp(GST_BASE_SRC(ctx->src), TRUE); + gst_base_src_set_format(GST_BASE_SRC(ctx->src), GST_FORMAT_TIME); + + gst_app_src_set_stream_type(ctx->src, GST_APP_STREAM_TYPE_STREAM); + gst_app_src_set_callbacks(ctx->src, &app_callbacks, ctx, NULL); + + gst_element_set_state(pipeline, GST_STATE_PLAYING); + + { /* Setup Celt Decoder */ +#define SAMPLERATE 48000 +#define CHANNELS 1 + uint8_t celt_header_packet[sizeof(CELTHeader)]; + + ctx->celt_mode = celt_mode_create(SAMPLERATE, + SAMPLERATE / 100, NULL); + celt_header_init(&ctx->celt_header, ctx->celt_mode, CHANNELS); + celt_header_to_packet(&ctx->celt_header, + celt_header_packet, sizeof(CELTHeader)); + + appsrc_push(ctx->src, celt_header_packet, sizeof(CELTHeader)); + /* fake vorbiscomment buffer */ + appsrc_push(ctx->src, NULL, 0); + } + + return 0; +} + +static int +setup_recording_gst_pipeline(struct context *ctx) +{ + GstElement *pipeline, *src, *cutter, *resample, + *conv, *capsfilter, *encoder, *sink; + GstBus *bus; + GstCaps *caps; + + pipeline = gst_pipeline_new("cmumble-input"); + src = gst_element_factory_make("autoaudiosrc", "audio-input"); + cutter = gst_element_factory_make("cutter", "cutter"); + resample = gst_element_factory_make("audioresample", "resample"); + conv = gst_element_factory_make("audioconvert", "converter"); + capsfilter = gst_element_factory_make("capsfilter", "capsfilter"); + encoder = gst_element_factory_make("celtenc", "celt-encoder"); + sink = gst_element_factory_make("appsink", "output"); + + if (!pipeline || !src || !cutter || !resample || !conv || + !capsfilter || !encoder || !sink) { + g_printerr("failed to initialize pipeline\n"); + return -1; + } + + bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + gst_bus_add_watch(bus, bus_call, ctx); + gst_object_unref(bus); + + gst_bin_add_many(GST_BIN(pipeline), + src, cutter, resample, conv, capsfilter, encoder, sink, NULL); + gst_element_link_many(src, cutter, resample, conv, capsfilter, encoder, sink, NULL); + + ctx->sink = GST_APP_SINK(sink); + ctx->record_pipeline = pipeline; + + caps = gst_caps_new_simple("audio/x-raw-int", + "channels", G_TYPE_INT, 1, + "depth", G_TYPE_INT, 16, + "rate", G_TYPE_INT, 48000, + "width", G_TYPE_INT, 16, + "signed", G_TYPE_BOOLEAN, TRUE, + NULL); + g_object_set(G_OBJECT(capsfilter), "caps", caps, NULL); + gst_caps_unref(caps); + + g_object_set(G_OBJECT(cutter), + "threshold_dB", -45.0, "leaky", TRUE, + "runlength", 0.5, "prelength", 1.0, NULL); + + gst_app_sink_set_emit_signals(ctx->sink, TRUE); + gst_app_sink_set_drop(ctx->sink, FALSE);; + g_signal_connect(sink, "new-buffer", G_CALLBACK(pull_buffer), ctx); + + caps = gst_caps_new_simple("audio/x-celt", + "rate", G_TYPE_INT, SAMPLERATE, + "channels", G_TYPE_INT, 1, + "frame-size", G_TYPE_INT, SAMPLERATE/100, + NULL); + gst_app_sink_set_caps(ctx->sink, caps); + gst_caps_unref(caps); + + gst_element_set_state(pipeline, GST_STATE_PLAYING); + + ctx->sequence = 0; + + return 0; +} + int main(int argc, char **argv) { #if 1 @@ -406,7 +564,7 @@ int main(int argc, char **argv) ssl_set_dbg(&ctx.ssl, my_debug, NULL); ssl_set_bio(&ctx.ssl, net_recv, &ctx.sock, net_send, &ctx.sock); - //ssl_set_session(&ctx.ssl, 1, 600, &ssn); + /* ssl_set_session(&ctx.ssl, 1, 600, &ssn); */ ssl_set_session(&ctx.ssl, 0, 0, &ctx.ssn); ssl_set_ciphers(&ctx.ssl, ssl_default_ciphers); @@ -422,7 +580,7 @@ int main(int argc, char **argv) { MumbleProto__Authenticate authenticate; mumble_proto__authenticate__init(&authenticate); - authenticate.username = "ben2"; + authenticate.username = argv[1]; authenticate.password = ""; authenticate.n_celt_versions = 1; authenticate.celt_versions = (int32_t[]) { 0x8000000b }; @@ -431,70 +589,16 @@ int main(int argc, char **argv) do_ping(&ctx); - GstElement *pipeline, *src, *decoder, *conv, *sink; - g_type_init(); gst_init(&argc, &argv); ctx.loop = g_main_loop_new(NULL, FALSE); - pipeline = gst_pipeline_new("cmumble-output"); - src = gst_element_factory_make("appsrc", "input"); - decoder = gst_element_factory_make("celtdec", "celt-decoder"); - conv = gst_element_factory_make("audioconvert", "converter"); - sink = gst_element_factory_make("autoaudiosink", "audio-output"); - //sink = gst_element_factory_make("filesink", "output"); - g_object_set(G_OBJECT(sink), "location", "foo.raw", NULL); - //sink = gst_element_factory_make("alsasink", "audio-output"); - - - if (!pipeline || !src || !decoder || !conv || !sink) { - g_printerr("failed to initialize pipeline\n"); + if (setup_playback_gst_pipeline(&ctx) < 0) return 1; - } - { - GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); - gst_bus_add_watch(bus, bus_call, ctx.loop); - gst_object_unref(bus); - } - - gst_bin_add_many(GST_BIN(pipeline), - src, decoder, conv, sink, NULL); - gst_element_link_many(src, decoder, conv, sink, NULL); - - ctx.src = GST_APP_SRC(src); - ctx.pipeline = pipeline; - - /* Important! */ - gst_base_src_set_live(GST_BASE_SRC(ctx.src), TRUE); - gst_base_src_set_do_timestamp(GST_BASE_SRC(ctx.src), TRUE); - gst_base_src_set_format(GST_BASE_SRC(ctx.src), GST_FORMAT_TIME); - - gst_app_src_set_stream_type(ctx.src, GST_APP_STREAM_TYPE_STREAM); - gst_app_src_set_callbacks(ctx.src, &app_callbacks, &ctx, NULL); - - gst_element_set_state(pipeline, GST_STATE_PLAYING); - - { /* Setup Celt Decoder */ -#define SAMPLERATE 48000 -#define CHANNELS 1 - GstBuffer *gst_buf; - uint8_t celt_header_packet[sizeof(CELTHeader)]; - - ctx.celt_mode = celt_mode_create(SAMPLERATE, - SAMPLERATE / 100, NULL); - celt_header_init(&ctx.celt_header, ctx.celt_mode, CHANNELS); - celt_header_to_packet(&ctx.celt_header, - celt_header_packet, sizeof(CELTHeader)); - - gst_buf = gst_app_buffer_new(celt_header_packet, - sizeof(CELTHeader), NULL, &ctx); - gst_app_src_push_buffer(ctx.src, gst_buf); - /* fake vorbiscomment buffer */ - gst_buf = gst_app_buffer_new(NULL, 0, NULL, &ctx); - gst_app_src_push_buffer(ctx.src, gst_buf); - } + if (setup_recording_gst_pipeline(&ctx) < 0) + return 1; ctx.sock_channel = g_io_channel_unix_new(ctx.sock); g_io_add_watch(ctx.sock_channel, G_IO_IN | G_IO_ERR, _recv, &ctx); @@ -507,4 +611,5 @@ int main(int argc, char **argv) ssl_free(&ctx.ssl); memset(&ctx.ssl, 0, sizeof(ctx.ssl)); + return 0; } diff --git a/src/varint.c b/src/varint.c new file mode 100644 index 0000000..da7e480 --- /dev/null +++ b/src/varint.c @@ -0,0 +1,103 @@ +#include + +void +encode_varint(uint8_t *data, uint32_t *write, int64_t value, uint32_t left) +{ + uint32_t pos = 0; + + if (value < 0) { + *write = 0; + return; + } + + if (value < 0x80) { + data[pos++] = value & 0xFF; + } else if (value < 0x4000) { + data[pos++] = 0x80 | ((value & 0xFF00 ) >> 8); + data[pos++] = value & 0xFF; + } else if (value < 0x200000) { + data[pos++] = 0xC0 | ((value & 0xFF0000) >> 16); + data[pos++] = ((value & 0xFF00) >> 8) & 0xFF; + data[pos++] = value & 0xFF; + } else if (value < 0x10000000) { + data[pos++] = 0xE0 | ((value ) >> 24); + data[pos++] = (value >> 16) & 0xFF; + data[pos++] = (value >> 8) & 0xFF; + data[pos++] = value & 0xFF; + } else if (value < 0x100000000LL) { + data[pos++] = 0xF0; + data[pos++] = (value >> 24) & 0xFF; + data[pos++] = (value >> 16) & 0xFF; + data[pos++] = (value >> 8) & 0xFF; + data[pos++] = value & 0xFF; + } else { + data[pos++] = 0xF4; + data[pos++] = (value >> 56) & 0xFF; + data[pos++] = (value >> 48) & 0xFF; + data[pos++] = (value >> 40) & 0xFF; + data[pos++] = (value >> 32) & 0xFF; + data[pos++] = (value >> 24) & 0xFF; + data[pos++] = (value >> 16) & 0xFF; + data[pos++] = (value >> 8) & 0xFF; + data[pos++] = value & 0xFF; + } + + *write = pos; +} + +int64_t +decode_varint(uint8_t *data, uint32_t *read, uint32_t left) +{ + int64_t varint = 0; + + /* 1 byte with 7 · 8 + 1 leading zeroes */ + if ((data[0] & 0x80) == 0x00) { + varint = data[0] & 0x7F; + *read = 1; + /* 2 bytes with 6 · 8 + 2 leading zeroes */ + } else if ((data[0] & 0xC0) == 0x80) { + varint = ((data[0] & 0x3F) << 8) | data[1]; + *read = 2; + /* 3 bytes with 5 · 8 + 3 leading zeroes */ + } else if ((data[0] & 0xE0) == 0xC0) { + varint = (((data[0] & 0x1F) << 16) | + (data[1] << 8) | (data[2])); + *read = 3; + /* 4 bytes with 4 · 8 + 4 leading zeroes */ + } else if ((data[0] & 0xF0) == 0xE0) { + varint = (((data[0] & 0x0F) << 24) | (data[1] << 16) | + (data[2] << 8) | (data[3])); + *read = 4; + } else /* if ((data[pos] & 0xF0) == 0xF0) */ { + switch (data[0] & 0xFC) { + /* 32-bit positive number */ + case 0xF0: + varint = ((data[1] << 24) | (data[2] << 16) | + (data[3] << 8) | data[4]); + *read = 1 + 4; + break; + /* 64-bit number */ + case 0xF4: + varint = + ((int64_t)data[1] << 56) | ((int64_t)data[2] << 48) | + ((int64_t)data[3] << 40) | ((int64_t)data[4] << 32) | + (data[5] << 24) | (data[6] << 16) | + (data[7] << 8) | (data[8] << 0); + *read = 1 + 8; + break; + /* Negative varint */ + case 0xF8: + /* FIXME: handle endless recursion */ + varint = -decode_varint(&data[1], read, left - 1); + *read += 1; + break; + /* Negative two bit number */ + case 0xFC: + varint = -(int)(data[0] & 0x03); + *read = 1; + break; + } + } + + return varint; +} diff --git a/src/varint.h b/src/varint.h new file mode 100644 index 0000000..49b7975 --- /dev/null +++ b/src/varint.h @@ -0,0 +1,12 @@ +#ifndef _VARINT_H_ +#define _VARINT_H_ + +#include + +void +encode_varint(uint8_t *data, uint32_t *write, int64_t value, uint32_t left); + +int64_t +decode_varint(uint8_t *data, uint32_t *read, uint32_t left); + +#endif -- cgit