From 54de55f1fc556a7689ee0189ec4ee79c01a690f9 Mon Sep 17 00:00:00 2001 From: Benjamin Franzke Date: Fri, 27 May 2011 10:19:16 +0200 Subject: Rename socket.c to cmumble.c --- src/cmumble.c | 614 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/socket.c | 614 ---------------------------------------------------------- 2 files changed, 614 insertions(+), 614 deletions(-) create mode 100644 src/cmumble.c delete mode 100644 src/socket.c diff --git a/src/cmumble.c b/src/cmumble.c new file mode 100644 index 0000000..0191ac5 --- /dev/null +++ b/src/cmumble.c @@ -0,0 +1,614 @@ +#include +#include +#include + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include "mumble.pb-c.h" +#include "varint.h" +#include "messages.h" + +#define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0])) + +#define PREAMBLE_SIZE 6 + +struct context { + GMainLoop *loop; + + uint32_t session; + bool authenticated; + + GSocketClient *sock_client; + GSocketConnection *conn; + GSocket *sock; + GIOStream *iostream; + + CELTHeader celt_header; + CELTMode *celt_mode; + + GstElement *playback_pipeline; + GstElement *record_pipeline; + GstAppSrc *src; + GstAppSink *sink; + + int64_t sequence; +}; + +enum udp_message_type { + udp_voice_celt_alpha, + udp_ping, + udp_voice_speex, + udp_voice_celt_beta +}; + +static void +appsrc_push(GstAppSrc *src, const void *mem, size_t size) +{ + GstBuffer *gstbuf; + + gstbuf = gst_app_buffer_new(g_memdup(mem, size), size, + g_free, NULL); + gst_app_src_push_buffer(src, gstbuf); +} + +static void +add_preamble(uint8_t *buffer, uint16_t type, uint32_t len) +{ + buffer[1] = (type) & 0xff; + buffer[0] = (type >> 8) & 0xff; + + buffer[5] = (len) & 0xff; + buffer[4] = (len >> 8) & 0xff; + buffer[3] = (len >> 16) & 0xff; + buffer[2] = (len >> 24) & 0xff; +} + +static void +get_preamble(uint8_t *buffer, int *type, int *len) +{ + uint16_t msgType; + uint32_t msgLen; + + msgType = buffer[1] | (buffer[0] << 8); + msgLen = buffer[5] | (buffer[4] << 8) | (buffer[3] << 16) | (buffer[2] << 24); + *type = (int)msgType; + *len = (int)msgLen; +} + +GStaticMutex write_mutex = G_STATIC_MUTEX_INIT; + +static GstFlowReturn +pull_buffer(GstAppSink *sink, gpointer user_data) +{ + struct context *ctx = user_data; + GstBuffer *buf; + uint8_t data[1024]; + uint32_t write = 0; + uint32_t pos = 0; + GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream); + + static uint64_t seq = 0; + + /* header will be written at the end */ + pos = PREAMBLE_SIZE; + + buf = gst_app_sink_pull_buffer(ctx->sink); + + ++seq; + if (seq <= 2) { + gst_buffer_unref(buf); + return GST_FLOW_OK; + } + if (GST_BUFFER_SIZE(buf) > 127) { + g_printerr("GOT TOO BIG BUFFER\n"); + return GST_FLOW_ERROR; + } + + data[pos++] = (udp_voice_celt_alpha) | (0 << 4); + + encode_varint(&data[pos], &write, ++ctx->sequence, 1024-pos); + pos += write; + + data[pos++] = 0x00 /*: 0x80 */ | (GST_BUFFER_SIZE(buf) & 0x7F); + memcpy(&data[pos], GST_BUFFER_DATA(buf), GST_BUFFER_SIZE(buf)); + pos += GST_BUFFER_SIZE(buf); + + gst_buffer_unref(buf); + + add_preamble(&data[0], 1, pos-PREAMBLE_SIZE); + g_static_mutex_lock(&write_mutex); + g_output_stream_write(output, data, PREAMBLE_SIZE, NULL, NULL); + g_output_stream_write(output, &data[PREAMBLE_SIZE], pos-PREAMBLE_SIZE, NULL, NULL); + g_static_mutex_unlock(&write_mutex); + + return GST_FLOW_OK; +} + +static void +handle_udp(struct context *ctx, uint8_t *data, uint32_t len) +{ + int64_t session; + int64_t sequence; + uint32_t pos = 1; + uint32_t read = 0; + uint8_t frame_len, terminator; + + session = decode_varint(&data[pos], &read, len-pos); + pos += read; + sequence = decode_varint(&data[pos], &read, len-pos); + pos += read; + + do { + frame_len = data[pos] & 0x7F; + terminator = data[pos] & 0x80; + pos += 1; + + if (frame_len == 0 || frame_len > len-pos) + break; + + appsrc_push(ctx->src, &data[pos], frame_len); + + pos += frame_len; + sequence++; + } while (terminator); +} + +static void +recv_version(MumbleProto__Version *version, struct context *ctx) +{ + printf("version: 0x%x\n", version->version); + printf("release: %s\n", version->release); +} + +static void +recv_channel_state(MumbleProto__ChannelState *state, struct context *ctx) +{ + printf("channel: id: %u, parent: %u, name: %s, description: %s, temporary: %d, position: %d\n", + state->channel_id, state->parent, state->name, state->description, state->temporary, state->position); +} + +static void +recv_server_sync(MumbleProto__ServerSync *sync, struct context *ctx) +{ + ctx->session = sync->session; + + printf("got session: %d\n", ctx->session); + +} + +static void +recv_crypt_setup(MumbleProto__CryptSetup *crypt, struct context *ctx) +{ + int i; + + if (crypt->has_key) { + printf("key: 0x"); + for (i = 0; i < crypt->key.len; ++i) + printf("%x", crypt->key.data[i]); + printf("\n"); + + } + if (crypt->has_client_nonce) { + printf("client nonce: 0x"); + for (i = 0; i < crypt->client_nonce.len; ++i) + printf("%x", crypt->client_nonce.data[i]); + printf("\n"); + + } + if (crypt->has_server_nonce) { + printf("server nonce: 0x"); + for (i = 0; i < crypt->server_nonce.len; ++i) + printf("%x", crypt->server_nonce.data[i]); + printf("\n"); + + } +} + +static void +recv_codec_version(MumbleProto__CodecVersion *codec, struct context *ctx) +{ + printf("Codec Version: alpha: %d, beta: %d, pefer_alpha: %d\n", + codec->alpha, codec->beta, codec->prefer_alpha); +} + +typedef void (*callback_t)(void *, void *); + +static const callback_t callbacks[] = { + /* VERSION */ (callback_t) recv_version, + [5] = (callback_t) recv_server_sync, + [7] = (callback_t) recv_channel_state, + [15] = (callback_t) recv_crypt_setup, + [21] = (callback_t) recv_codec_version, + [127] = NULL, +}; + +static void +send_msg(struct context *ctx, ProtobufCMessage *msg) +{ + uint8_t pad[128]; + uint8_t preamble[PREAMBLE_SIZE]; + int type = -1; + int i; + ProtobufCBufferSimple buffer = PROTOBUF_C_BUFFER_SIMPLE_INIT(pad); + GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream); + + for (i = 0; i < ARRAY_SIZE(messages); ++i) + if (messages[i].descriptor == msg->descriptor) + type = i; + assert(type >= 0); + + protobuf_c_message_pack_to_buffer(msg, &buffer.base); + add_preamble(preamble, type, buffer.len); + + g_static_mutex_lock(&write_mutex); + g_output_stream_write(output, preamble, PREAMBLE_SIZE, NULL, NULL); + g_output_stream_write(output, buffer.data, buffer.len, NULL, NULL); + g_static_mutex_unlock(&write_mutex); + + PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&buffer); +} + +static void +recv_msg(struct context *ctx, const callback_t *callbacks, uint32_t callback_size) +{ + uint8_t preamble[PREAMBLE_SIZE]; + ProtobufCMessage *msg; + void *data; + int type, len; + gssize ret; + GInputStream *input = g_io_stream_get_input_stream(ctx->iostream); + + ret = g_input_stream_read(input, preamble, PREAMBLE_SIZE, NULL, NULL); + + if (ret <= 0) { + printf("read failed: %ld\n", ret); + return; + } + + get_preamble(preamble, &type, &len); + + if (!(type >= 0 && type < ARRAY_SIZE(messages))) { + printf("unknown message type: %d\n", type); + return; + } + + if (len <= 0) { + printf("length 0\n"); + return; + } + + data = malloc(len); + if (data == NULL) { + printf("out of mem\n"); + abort(); + } + ret = g_input_stream_read(input, data, len, NULL, NULL); + + /* tunneled udp data - not a regular protobuf message */ + if (type == 1) { + handle_udp(ctx, data, len); + free(data); + return; + } + + msg = protobuf_c_message_unpack(messages[type].descriptor, NULL, + len, data); + if (msg == NULL) { + printf("message unpack failure\n"); + return; + } + + printf("debug: received message: %s type:%d, len:%d\n", messages[type].name, type, len); + if (callbacks[type]) + callbacks[type](msg, ctx); + + protobuf_c_message_free_unpacked(msg, NULL); + free(data); +} + +static gboolean +do_ping(struct context *ctx) +{ + MumbleProto__Ping ping; + struct timeval tv; + + gettimeofday(&tv, NULL); + mumble_proto__ping__init(&ping); + + ping.timestamp = tv.tv_sec; + ping.resync = 1; + + send_msg(ctx, &ping.base); + + return TRUE; +} + +static gboolean +read_cb(GSocket *socket, GIOCondition condition, gpointer data) +{ + struct context *ctx = data; + GInputStream *input = g_io_stream_get_input_stream(ctx->iostream); + + do { + recv_msg(ctx, callbacks, ARRAY_SIZE(callbacks)); + } while (g_input_stream_has_pending(input)); + + + /* FIXME */ + static int i = 0; + if (i++ < 2) + do_ping(ctx); + return TRUE; +} + +static gboolean +bus_call(GstBus *bus, GstMessage *msg, gpointer data) +{ + struct context *ctx = data; + GMainLoop *loop = ctx->loop; + + switch (GST_MESSAGE_TYPE (msg)) { + + case GST_MESSAGE_EOS: + g_print ("End of stream\n"); + g_main_loop_quit (loop); + break; + + case GST_MESSAGE_ERROR: + { + char *debug; + GError *error; + + gst_message_parse_error (msg, &error, &debug); + g_free (debug); + + g_printerr ("Error: %s\n", error->message); + g_error_free (error); + + g_main_loop_quit (loop); + break; + } + default: + g_print("unhandled message: %d %s\n", GST_MESSAGE_TYPE(msg), + gst_message_type_get_name(GST_MESSAGE_TYPE(msg))); + break; + } + + return TRUE; +} + +static void +app_need_data(GstAppSrc *src, guint length, gpointer user_data) +{ +#if 0 + struct context *ctx = user_data; +#endif +} + +static void +app_enough_data(GstAppSrc *src, gpointer user_data) +{ +#if 0 + struct context *ctx = user_data; +#endif +} + +static GstAppSrcCallbacks app_callbacks = { + app_need_data, + app_enough_data, + NULL +}; + +static int +setup_playback_gst_pipeline(struct context *ctx) +{ + GstElement *pipeline, *src, *decoder, *conv, *sink; + GstBus *bus; + + pipeline = gst_pipeline_new("cmumble-output"); + src = gst_element_factory_make("appsrc", "input"); + decoder = gst_element_factory_make("celtdec", "celt-decoder"); + conv = gst_element_factory_make("audioconvert", "converter"); + sink = gst_element_factory_make("autoaudiosink", "audio-output"); + + if (!pipeline || !src || !decoder || !conv || !sink) { + g_printerr("failed to initialize pipeline\n"); + return -1; + } + + bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + gst_bus_add_watch(bus, bus_call, ctx); + gst_object_unref(bus); + + gst_bin_add_many(GST_BIN(pipeline), + src, decoder, conv, sink, NULL); + gst_element_link_many(src, decoder, conv, sink, NULL); + + ctx->src = GST_APP_SRC(src); + ctx->playback_pipeline = pipeline; + + /* Important! */ + gst_base_src_set_live(GST_BASE_SRC(ctx->src), TRUE); + gst_base_src_set_do_timestamp(GST_BASE_SRC(ctx->src), TRUE); + gst_base_src_set_format(GST_BASE_SRC(ctx->src), GST_FORMAT_TIME); + + gst_app_src_set_stream_type(ctx->src, GST_APP_STREAM_TYPE_STREAM); + gst_app_src_set_callbacks(ctx->src, &app_callbacks, ctx, NULL); + + gst_element_set_state(pipeline, GST_STATE_PLAYING); + + { /* Setup Celt Decoder */ +#define SAMPLERATE 48000 +#define CHANNELS 1 + uint8_t celt_header_packet[sizeof(CELTHeader)]; + + ctx->celt_mode = celt_mode_create(SAMPLERATE, + SAMPLERATE / 100, NULL); + celt_header_init(&ctx->celt_header, ctx->celt_mode, CHANNELS); + celt_header_to_packet(&ctx->celt_header, + celt_header_packet, sizeof(CELTHeader)); + + appsrc_push(ctx->src, celt_header_packet, sizeof(CELTHeader)); + /* fake vorbiscomment buffer */ + appsrc_push(ctx->src, NULL, 0); + } + + return 0; +} + +static int +setup_recording_gst_pipeline(struct context *ctx) +{ + GstElement *pipeline, *src, *cutter, *resample, + *conv, *capsfilter, *encoder, *sink; + GstBus *bus; + GstCaps *caps; + + pipeline = gst_pipeline_new("cmumble-input"); + src = gst_element_factory_make("autoaudiosrc", "audio-input"); + cutter = gst_element_factory_make("cutter", "cutter"); + resample = gst_element_factory_make("audioresample", "resample"); + conv = gst_element_factory_make("audioconvert", "converter"); + capsfilter = gst_element_factory_make("capsfilter", "capsfilter"); + encoder = gst_element_factory_make("celtenc", "celt-encoder"); + sink = gst_element_factory_make("appsink", "output"); + + if (!pipeline || !src || !cutter || !resample || !conv || + !capsfilter || !encoder || !sink) { + g_printerr("failed to initialize pipeline\n"); + return -1; + } + + bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + gst_bus_add_watch(bus, bus_call, ctx); + gst_object_unref(bus); + + gst_bin_add_many(GST_BIN(pipeline), + src, cutter, resample, conv, + capsfilter, encoder, sink, NULL); + gst_element_link_many(src, cutter, resample, conv, + capsfilter, encoder, sink, NULL); + + ctx->sink = GST_APP_SINK(sink); + ctx->record_pipeline = pipeline; + + caps = gst_caps_new_simple("audio/x-raw-int", + "channels", G_TYPE_INT, 1, + "depth", G_TYPE_INT, 16, + "rate", G_TYPE_INT, 48000, + "width", G_TYPE_INT, 16, + "signed", G_TYPE_BOOLEAN, TRUE, + NULL); + g_object_set(G_OBJECT(capsfilter), "caps", caps, NULL); + gst_caps_unref(caps); + + g_object_set(G_OBJECT(cutter), + "threshold_dB", -45.0, "leaky", TRUE, NULL); + + gst_app_sink_set_emit_signals(ctx->sink, TRUE); + gst_app_sink_set_drop(ctx->sink, FALSE);; + g_signal_connect(sink, "new-buffer", G_CALLBACK(pull_buffer), ctx); + + caps = gst_caps_new_simple("audio/x-celt", + "rate", G_TYPE_INT, SAMPLERATE, + "channels", G_TYPE_INT, 1, + "frame-size", G_TYPE_INT, SAMPLERATE/100, + NULL); + gst_app_sink_set_caps(ctx->sink, caps); + gst_caps_unref(caps); + + gst_element_set_state(pipeline, GST_STATE_PLAYING); + + ctx->sequence = 0; + + return 0; +} + +int main(int argc, char **argv) +{ +#if 0 + char *host = "localhost"; + unsigned int port = 64738; +#else + char *host = "85.214.21.153"; + unsigned int port = 33321; +#endif + struct context ctx; + GError *error = NULL; + GSource *source; + + memset(&ctx, 0, sizeof(ctx)); + + g_type_init(); + ctx.sock_client = g_socket_client_new(); + g_socket_client_set_tls(ctx.sock_client, TRUE); + g_socket_client_set_tls_validation_flags(ctx.sock_client, + G_TLS_CERTIFICATE_INSECURE); + g_socket_client_set_family(ctx.sock_client, G_SOCKET_FAMILY_IPV4); + g_socket_client_set_protocol(ctx.sock_client, G_SOCKET_PROTOCOL_TCP); + g_socket_client_set_socket_type(ctx.sock_client, G_SOCKET_TYPE_STREAM); + + ctx.conn = g_socket_client_connect_to_host(ctx.sock_client, + host, port, NULL, &error); + g_object_get(G_OBJECT(ctx.conn), "base-io-stream", &ctx.iostream, NULL); + + { + MumbleProto__Version version; + mumble_proto__version__init(&version); + version.version = 0x010203; + version.release = "cmumble 0.1"; + version.os = "Gentoo/Linux"; + send_msg(&ctx, &version.base); + } + + { + MumbleProto__Authenticate authenticate; + mumble_proto__authenticate__init(&authenticate); + authenticate.username = argv[1]; + authenticate.password = ""; + authenticate.n_celt_versions = 1; + authenticate.celt_versions = (int32_t[]) { 0x8000000b }; + send_msg(&ctx, &authenticate.base); + } + + gst_init(&argc, &argv); + + ctx.loop = g_main_loop_new(NULL, FALSE); + + if (setup_playback_gst_pipeline(&ctx) < 0) + return 1; + + if (setup_recording_gst_pipeline(&ctx) < 0) + return 1; + + ctx.sock = g_socket_connection_get_socket(ctx.conn); + source = g_socket_create_source(ctx.sock, G_IO_IN | G_IO_ERR, NULL); + g_source_set_callback(source, (GSourceFunc)read_cb, &ctx, NULL); + g_source_attach(source, NULL); + g_source_unref(source); + + source = g_timeout_source_new_seconds(5); + g_source_set_callback(source, (GSourceFunc)do_ping, &ctx, NULL); + g_source_attach(source, NULL); + g_source_unref(source); + + g_main_loop_run(ctx.loop); + + g_main_loop_unref(ctx.loop); + + return 0; +} diff --git a/src/socket.c b/src/socket.c deleted file mode 100644 index 0191ac5..0000000 --- a/src/socket.c +++ /dev/null @@ -1,614 +0,0 @@ -#include -#include -#include - -#include -#include - -#include -#include -#include - -#include -#include -#include -#include - -#include -#include -#include - -#include "mumble.pb-c.h" -#include "varint.h" -#include "messages.h" - -#define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0])) - -#define PREAMBLE_SIZE 6 - -struct context { - GMainLoop *loop; - - uint32_t session; - bool authenticated; - - GSocketClient *sock_client; - GSocketConnection *conn; - GSocket *sock; - GIOStream *iostream; - - CELTHeader celt_header; - CELTMode *celt_mode; - - GstElement *playback_pipeline; - GstElement *record_pipeline; - GstAppSrc *src; - GstAppSink *sink; - - int64_t sequence; -}; - -enum udp_message_type { - udp_voice_celt_alpha, - udp_ping, - udp_voice_speex, - udp_voice_celt_beta -}; - -static void -appsrc_push(GstAppSrc *src, const void *mem, size_t size) -{ - GstBuffer *gstbuf; - - gstbuf = gst_app_buffer_new(g_memdup(mem, size), size, - g_free, NULL); - gst_app_src_push_buffer(src, gstbuf); -} - -static void -add_preamble(uint8_t *buffer, uint16_t type, uint32_t len) -{ - buffer[1] = (type) & 0xff; - buffer[0] = (type >> 8) & 0xff; - - buffer[5] = (len) & 0xff; - buffer[4] = (len >> 8) & 0xff; - buffer[3] = (len >> 16) & 0xff; - buffer[2] = (len >> 24) & 0xff; -} - -static void -get_preamble(uint8_t *buffer, int *type, int *len) -{ - uint16_t msgType; - uint32_t msgLen; - - msgType = buffer[1] | (buffer[0] << 8); - msgLen = buffer[5] | (buffer[4] << 8) | (buffer[3] << 16) | (buffer[2] << 24); - *type = (int)msgType; - *len = (int)msgLen; -} - -GStaticMutex write_mutex = G_STATIC_MUTEX_INIT; - -static GstFlowReturn -pull_buffer(GstAppSink *sink, gpointer user_data) -{ - struct context *ctx = user_data; - GstBuffer *buf; - uint8_t data[1024]; - uint32_t write = 0; - uint32_t pos = 0; - GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream); - - static uint64_t seq = 0; - - /* header will be written at the end */ - pos = PREAMBLE_SIZE; - - buf = gst_app_sink_pull_buffer(ctx->sink); - - ++seq; - if (seq <= 2) { - gst_buffer_unref(buf); - return GST_FLOW_OK; - } - if (GST_BUFFER_SIZE(buf) > 127) { - g_printerr("GOT TOO BIG BUFFER\n"); - return GST_FLOW_ERROR; - } - - data[pos++] = (udp_voice_celt_alpha) | (0 << 4); - - encode_varint(&data[pos], &write, ++ctx->sequence, 1024-pos); - pos += write; - - data[pos++] = 0x00 /*: 0x80 */ | (GST_BUFFER_SIZE(buf) & 0x7F); - memcpy(&data[pos], GST_BUFFER_DATA(buf), GST_BUFFER_SIZE(buf)); - pos += GST_BUFFER_SIZE(buf); - - gst_buffer_unref(buf); - - add_preamble(&data[0], 1, pos-PREAMBLE_SIZE); - g_static_mutex_lock(&write_mutex); - g_output_stream_write(output, data, PREAMBLE_SIZE, NULL, NULL); - g_output_stream_write(output, &data[PREAMBLE_SIZE], pos-PREAMBLE_SIZE, NULL, NULL); - g_static_mutex_unlock(&write_mutex); - - return GST_FLOW_OK; -} - -static void -handle_udp(struct context *ctx, uint8_t *data, uint32_t len) -{ - int64_t session; - int64_t sequence; - uint32_t pos = 1; - uint32_t read = 0; - uint8_t frame_len, terminator; - - session = decode_varint(&data[pos], &read, len-pos); - pos += read; - sequence = decode_varint(&data[pos], &read, len-pos); - pos += read; - - do { - frame_len = data[pos] & 0x7F; - terminator = data[pos] & 0x80; - pos += 1; - - if (frame_len == 0 || frame_len > len-pos) - break; - - appsrc_push(ctx->src, &data[pos], frame_len); - - pos += frame_len; - sequence++; - } while (terminator); -} - -static void -recv_version(MumbleProto__Version *version, struct context *ctx) -{ - printf("version: 0x%x\n", version->version); - printf("release: %s\n", version->release); -} - -static void -recv_channel_state(MumbleProto__ChannelState *state, struct context *ctx) -{ - printf("channel: id: %u, parent: %u, name: %s, description: %s, temporary: %d, position: %d\n", - state->channel_id, state->parent, state->name, state->description, state->temporary, state->position); -} - -static void -recv_server_sync(MumbleProto__ServerSync *sync, struct context *ctx) -{ - ctx->session = sync->session; - - printf("got session: %d\n", ctx->session); - -} - -static void -recv_crypt_setup(MumbleProto__CryptSetup *crypt, struct context *ctx) -{ - int i; - - if (crypt->has_key) { - printf("key: 0x"); - for (i = 0; i < crypt->key.len; ++i) - printf("%x", crypt->key.data[i]); - printf("\n"); - - } - if (crypt->has_client_nonce) { - printf("client nonce: 0x"); - for (i = 0; i < crypt->client_nonce.len; ++i) - printf("%x", crypt->client_nonce.data[i]); - printf("\n"); - - } - if (crypt->has_server_nonce) { - printf("server nonce: 0x"); - for (i = 0; i < crypt->server_nonce.len; ++i) - printf("%x", crypt->server_nonce.data[i]); - printf("\n"); - - } -} - -static void -recv_codec_version(MumbleProto__CodecVersion *codec, struct context *ctx) -{ - printf("Codec Version: alpha: %d, beta: %d, pefer_alpha: %d\n", - codec->alpha, codec->beta, codec->prefer_alpha); -} - -typedef void (*callback_t)(void *, void *); - -static const callback_t callbacks[] = { - /* VERSION */ (callback_t) recv_version, - [5] = (callback_t) recv_server_sync, - [7] = (callback_t) recv_channel_state, - [15] = (callback_t) recv_crypt_setup, - [21] = (callback_t) recv_codec_version, - [127] = NULL, -}; - -static void -send_msg(struct context *ctx, ProtobufCMessage *msg) -{ - uint8_t pad[128]; - uint8_t preamble[PREAMBLE_SIZE]; - int type = -1; - int i; - ProtobufCBufferSimple buffer = PROTOBUF_C_BUFFER_SIMPLE_INIT(pad); - GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream); - - for (i = 0; i < ARRAY_SIZE(messages); ++i) - if (messages[i].descriptor == msg->descriptor) - type = i; - assert(type >= 0); - - protobuf_c_message_pack_to_buffer(msg, &buffer.base); - add_preamble(preamble, type, buffer.len); - - g_static_mutex_lock(&write_mutex); - g_output_stream_write(output, preamble, PREAMBLE_SIZE, NULL, NULL); - g_output_stream_write(output, buffer.data, buffer.len, NULL, NULL); - g_static_mutex_unlock(&write_mutex); - - PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&buffer); -} - -static void -recv_msg(struct context *ctx, const callback_t *callbacks, uint32_t callback_size) -{ - uint8_t preamble[PREAMBLE_SIZE]; - ProtobufCMessage *msg; - void *data; - int type, len; - gssize ret; - GInputStream *input = g_io_stream_get_input_stream(ctx->iostream); - - ret = g_input_stream_read(input, preamble, PREAMBLE_SIZE, NULL, NULL); - - if (ret <= 0) { - printf("read failed: %ld\n", ret); - return; - } - - get_preamble(preamble, &type, &len); - - if (!(type >= 0 && type < ARRAY_SIZE(messages))) { - printf("unknown message type: %d\n", type); - return; - } - - if (len <= 0) { - printf("length 0\n"); - return; - } - - data = malloc(len); - if (data == NULL) { - printf("out of mem\n"); - abort(); - } - ret = g_input_stream_read(input, data, len, NULL, NULL); - - /* tunneled udp data - not a regular protobuf message */ - if (type == 1) { - handle_udp(ctx, data, len); - free(data); - return; - } - - msg = protobuf_c_message_unpack(messages[type].descriptor, NULL, - len, data); - if (msg == NULL) { - printf("message unpack failure\n"); - return; - } - - printf("debug: received message: %s type:%d, len:%d\n", messages[type].name, type, len); - if (callbacks[type]) - callbacks[type](msg, ctx); - - protobuf_c_message_free_unpacked(msg, NULL); - free(data); -} - -static gboolean -do_ping(struct context *ctx) -{ - MumbleProto__Ping ping; - struct timeval tv; - - gettimeofday(&tv, NULL); - mumble_proto__ping__init(&ping); - - ping.timestamp = tv.tv_sec; - ping.resync = 1; - - send_msg(ctx, &ping.base); - - return TRUE; -} - -static gboolean -read_cb(GSocket *socket, GIOCondition condition, gpointer data) -{ - struct context *ctx = data; - GInputStream *input = g_io_stream_get_input_stream(ctx->iostream); - - do { - recv_msg(ctx, callbacks, ARRAY_SIZE(callbacks)); - } while (g_input_stream_has_pending(input)); - - - /* FIXME */ - static int i = 0; - if (i++ < 2) - do_ping(ctx); - return TRUE; -} - -static gboolean -bus_call(GstBus *bus, GstMessage *msg, gpointer data) -{ - struct context *ctx = data; - GMainLoop *loop = ctx->loop; - - switch (GST_MESSAGE_TYPE (msg)) { - - case GST_MESSAGE_EOS: - g_print ("End of stream\n"); - g_main_loop_quit (loop); - break; - - case GST_MESSAGE_ERROR: - { - char *debug; - GError *error; - - gst_message_parse_error (msg, &error, &debug); - g_free (debug); - - g_printerr ("Error: %s\n", error->message); - g_error_free (error); - - g_main_loop_quit (loop); - break; - } - default: - g_print("unhandled message: %d %s\n", GST_MESSAGE_TYPE(msg), - gst_message_type_get_name(GST_MESSAGE_TYPE(msg))); - break; - } - - return TRUE; -} - -static void -app_need_data(GstAppSrc *src, guint length, gpointer user_data) -{ -#if 0 - struct context *ctx = user_data; -#endif -} - -static void -app_enough_data(GstAppSrc *src, gpointer user_data) -{ -#if 0 - struct context *ctx = user_data; -#endif -} - -static GstAppSrcCallbacks app_callbacks = { - app_need_data, - app_enough_data, - NULL -}; - -static int -setup_playback_gst_pipeline(struct context *ctx) -{ - GstElement *pipeline, *src, *decoder, *conv, *sink; - GstBus *bus; - - pipeline = gst_pipeline_new("cmumble-output"); - src = gst_element_factory_make("appsrc", "input"); - decoder = gst_element_factory_make("celtdec", "celt-decoder"); - conv = gst_element_factory_make("audioconvert", "converter"); - sink = gst_element_factory_make("autoaudiosink", "audio-output"); - - if (!pipeline || !src || !decoder || !conv || !sink) { - g_printerr("failed to initialize pipeline\n"); - return -1; - } - - bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); - gst_bus_add_watch(bus, bus_call, ctx); - gst_object_unref(bus); - - gst_bin_add_many(GST_BIN(pipeline), - src, decoder, conv, sink, NULL); - gst_element_link_many(src, decoder, conv, sink, NULL); - - ctx->src = GST_APP_SRC(src); - ctx->playback_pipeline = pipeline; - - /* Important! */ - gst_base_src_set_live(GST_BASE_SRC(ctx->src), TRUE); - gst_base_src_set_do_timestamp(GST_BASE_SRC(ctx->src), TRUE); - gst_base_src_set_format(GST_BASE_SRC(ctx->src), GST_FORMAT_TIME); - - gst_app_src_set_stream_type(ctx->src, GST_APP_STREAM_TYPE_STREAM); - gst_app_src_set_callbacks(ctx->src, &app_callbacks, ctx, NULL); - - gst_element_set_state(pipeline, GST_STATE_PLAYING); - - { /* Setup Celt Decoder */ -#define SAMPLERATE 48000 -#define CHANNELS 1 - uint8_t celt_header_packet[sizeof(CELTHeader)]; - - ctx->celt_mode = celt_mode_create(SAMPLERATE, - SAMPLERATE / 100, NULL); - celt_header_init(&ctx->celt_header, ctx->celt_mode, CHANNELS); - celt_header_to_packet(&ctx->celt_header, - celt_header_packet, sizeof(CELTHeader)); - - appsrc_push(ctx->src, celt_header_packet, sizeof(CELTHeader)); - /* fake vorbiscomment buffer */ - appsrc_push(ctx->src, NULL, 0); - } - - return 0; -} - -static int -setup_recording_gst_pipeline(struct context *ctx) -{ - GstElement *pipeline, *src, *cutter, *resample, - *conv, *capsfilter, *encoder, *sink; - GstBus *bus; - GstCaps *caps; - - pipeline = gst_pipeline_new("cmumble-input"); - src = gst_element_factory_make("autoaudiosrc", "audio-input"); - cutter = gst_element_factory_make("cutter", "cutter"); - resample = gst_element_factory_make("audioresample", "resample"); - conv = gst_element_factory_make("audioconvert", "converter"); - capsfilter = gst_element_factory_make("capsfilter", "capsfilter"); - encoder = gst_element_factory_make("celtenc", "celt-encoder"); - sink = gst_element_factory_make("appsink", "output"); - - if (!pipeline || !src || !cutter || !resample || !conv || - !capsfilter || !encoder || !sink) { - g_printerr("failed to initialize pipeline\n"); - return -1; - } - - bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); - gst_bus_add_watch(bus, bus_call, ctx); - gst_object_unref(bus); - - gst_bin_add_many(GST_BIN(pipeline), - src, cutter, resample, conv, - capsfilter, encoder, sink, NULL); - gst_element_link_many(src, cutter, resample, conv, - capsfilter, encoder, sink, NULL); - - ctx->sink = GST_APP_SINK(sink); - ctx->record_pipeline = pipeline; - - caps = gst_caps_new_simple("audio/x-raw-int", - "channels", G_TYPE_INT, 1, - "depth", G_TYPE_INT, 16, - "rate", G_TYPE_INT, 48000, - "width", G_TYPE_INT, 16, - "signed", G_TYPE_BOOLEAN, TRUE, - NULL); - g_object_set(G_OBJECT(capsfilter), "caps", caps, NULL); - gst_caps_unref(caps); - - g_object_set(G_OBJECT(cutter), - "threshold_dB", -45.0, "leaky", TRUE, NULL); - - gst_app_sink_set_emit_signals(ctx->sink, TRUE); - gst_app_sink_set_drop(ctx->sink, FALSE);; - g_signal_connect(sink, "new-buffer", G_CALLBACK(pull_buffer), ctx); - - caps = gst_caps_new_simple("audio/x-celt", - "rate", G_TYPE_INT, SAMPLERATE, - "channels", G_TYPE_INT, 1, - "frame-size", G_TYPE_INT, SAMPLERATE/100, - NULL); - gst_app_sink_set_caps(ctx->sink, caps); - gst_caps_unref(caps); - - gst_element_set_state(pipeline, GST_STATE_PLAYING); - - ctx->sequence = 0; - - return 0; -} - -int main(int argc, char **argv) -{ -#if 0 - char *host = "localhost"; - unsigned int port = 64738; -#else - char *host = "85.214.21.153"; - unsigned int port = 33321; -#endif - struct context ctx; - GError *error = NULL; - GSource *source; - - memset(&ctx, 0, sizeof(ctx)); - - g_type_init(); - ctx.sock_client = g_socket_client_new(); - g_socket_client_set_tls(ctx.sock_client, TRUE); - g_socket_client_set_tls_validation_flags(ctx.sock_client, - G_TLS_CERTIFICATE_INSECURE); - g_socket_client_set_family(ctx.sock_client, G_SOCKET_FAMILY_IPV4); - g_socket_client_set_protocol(ctx.sock_client, G_SOCKET_PROTOCOL_TCP); - g_socket_client_set_socket_type(ctx.sock_client, G_SOCKET_TYPE_STREAM); - - ctx.conn = g_socket_client_connect_to_host(ctx.sock_client, - host, port, NULL, &error); - g_object_get(G_OBJECT(ctx.conn), "base-io-stream", &ctx.iostream, NULL); - - { - MumbleProto__Version version; - mumble_proto__version__init(&version); - version.version = 0x010203; - version.release = "cmumble 0.1"; - version.os = "Gentoo/Linux"; - send_msg(&ctx, &version.base); - } - - { - MumbleProto__Authenticate authenticate; - mumble_proto__authenticate__init(&authenticate); - authenticate.username = argv[1]; - authenticate.password = ""; - authenticate.n_celt_versions = 1; - authenticate.celt_versions = (int32_t[]) { 0x8000000b }; - send_msg(&ctx, &authenticate.base); - } - - gst_init(&argc, &argv); - - ctx.loop = g_main_loop_new(NULL, FALSE); - - if (setup_playback_gst_pipeline(&ctx) < 0) - return 1; - - if (setup_recording_gst_pipeline(&ctx) < 0) - return 1; - - ctx.sock = g_socket_connection_get_socket(ctx.conn); - source = g_socket_create_source(ctx.sock, G_IO_IN | G_IO_ERR, NULL); - g_source_set_callback(source, (GSourceFunc)read_cb, &ctx, NULL); - g_source_attach(source, NULL); - g_source_unref(source); - - source = g_timeout_source_new_seconds(5); - g_source_set_callback(source, (GSourceFunc)do_ping, &ctx, NULL); - g_source_attach(source, NULL); - g_source_unref(source); - - g_main_loop_run(ctx.loop); - - g_main_loop_unref(ctx.loop); - - return 0; -} -- cgit