summaryrefslogtreecommitdiff
path: root/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket.c')
-rw-r--r--src/socket.c427
1 files changed, 266 insertions, 161 deletions
diff --git a/src/socket.c b/src/socket.c
index 332c60d..5c1fa5e 100644
--- a/src/socket.c
+++ b/src/socket.c
@@ -3,8 +3,7 @@
#include <stdbool.h>
#include <sys/types.h>
-
-#include "mumble.pb-c.h"
+#include <sys/time.h>
#include "polarssl/net.h"
#include "polarssl/ssl.h"
@@ -16,11 +15,16 @@
#include <gst/gst.h>
#include <gst/app/gstappsrc.h>
+#include <gst/app/gstappsink.h>
#include <gst/app/gstappbuffer.h>
#include <glib.h>
#include <glib-object.h>
+#include <pthread.h>
+
+#include "mumble.pb-c.h"
+#include "varint.h"
#include "messages.h"
#define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0]))
@@ -42,109 +46,29 @@ struct context {
CELTHeader celt_header;
CELTMode *celt_mode;
- GstElement *pipeline;
+ GstElement *playback_pipeline;
+ GstElement *record_pipeline;
GstAppSrc *src;
+ GstAppSink *sink;
+
+ int64_t sequence;
};
enum udp_message_type {
udp_voice_celt_alpha,
udp_ping,
udp_voice_speex,
- udp_voice_celt_beta,
+ udp_voice_celt_beta
};
-int64_t
-decode_varint(uint8_t *data, uint32_t *read, uint32_t left)
-{
- int64_t varint = 0;
-
- /* 1 byte with 7 · 8 + 1 leading zeroes */
- if ((data[0] & 0x80) == 0x00) {
- varint = data[0] & 0x7F;
- *read = 1;
- /* 2 bytes with 6 · 8 + 2 leading zeroes */
- } else if ((data[0] & 0xC0) == 0x80) {
- varint = ((data[0] & 0x3F) << 8) | data[1];
- *read = 2;
- /* 3 bytes with 5 · 8 + 3 leading zeroes */
- } else if ((data[0] & 0xE0) == 0xC0) {
- varint = (((data[0] & 0x1F) << 16) |
- (data[1] << 8) | (data[2]));
- *read = 3;
- /* 4 bytes with 4 · 8 + 4 leading zeroes */
- } else if ((data[0] & 0xF0) == 0xE0) {
- varint = (((data[0] & 0x0F) << 24) | (data[1] << 16) |
- (data[2] << 8) | (data[3]));
- *read = 4;
- } else /* if ((data[pos] & 0xF0) == 0xF0) */ {
- switch (data[0] & 0xFC) {
- /* 32-bit positive number */
- case 0xF0:
- varint = ((data[1] << 24) | (data[2] << 16) |
- (data[3] << 8) | data[4]);
- *read = 1 + 4;
- break;
- /* 64-bit number */
- case 0xF4:
- varint =
- ((int64_t)data[1] << 56) | ((int64_t)data[2] << 48) |
- ((int64_t)data[3] << 40) | ((int64_t)data[4] << 32) |
- (data[5] << 24) | (data[6] << 16) |
- (data[7] << 8) | (data[8] << 0);
- *read = 1 + 8;
- break;
- /* Negative varint */
- case 0xF8:
- /* FIXME: handle endless recursion */
- varint = -decode_varint(&data[1], read, left - 1);
- *read += 1;
- break;
- /* Negative two bit number */
- case 0xFC:
- varint = -(int)(data[0] & 0x03);
- *read = 1;
- break;
- }
- }
-
- return varint;
-}
-
static void
-handle_udp(struct context *ctx, uint8_t *data, uint32_t len)
+appsrc_push(GstAppSrc *src, const void *mem, size_t size)
{
- int64_t session;
- int64_t sequence;
- int pos = 1;
- int read = 0;
-
- int frame_len, term;
- static int iseq = 0;
-
- session = decode_varint(&data[pos], &read, len-pos);
- pos += read;
- sequence = decode_varint(&data[pos], &read, len-pos);
- pos += read;
- printf("session: %ld, sequence: %ld\n", session, sequence);
-
- do {
- frame_len = (data[pos] & 0x7F);
- term = (data[pos] & 0x80) == 0x80;
- printf("_len: %d, term: %d\n", frame_len, term);
- pos += 1;
-
- if (frame_len == 0)
- break;
-
- void *src = malloc(frame_len);
- memcpy(src, &data[pos], frame_len);
-
- GstBuffer *gstbuf = gst_app_buffer_new(src, frame_len, free, NULL);
- gst_app_src_push_buffer(ctx->src, gstbuf);
-
- pos += frame_len;
- sequence++;
- } while (term);
+ GstBuffer *gstbuf;
+
+ gstbuf = gst_app_buffer_new(g_memdup(mem, size), size,
+ g_free, NULL);
+ gst_app_src_push_buffer(src, gstbuf);
}
static void
@@ -171,6 +95,99 @@ get_preamble(uint8_t *buffer, int *type, int *len)
*len = (int)msgLen;
}
+pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
+
+static GstFlowReturn
+pull_buffer(GstAppSink *sink, gpointer user_data)
+{
+ struct context *ctx = user_data;
+ GstBuffer *buf;
+ uint8_t data[1024];
+ uint32_t write = 0;
+ uint32_t pos = 0;
+ int ret;
+
+ static uint64_t seq = 0;
+
+ /* header will be written at the end */
+ pos = PREAMBLE_SIZE;
+
+ buf = gst_app_sink_pull_buffer(ctx->sink);
+
+ ++seq;
+ if (seq <= 2) {
+ gst_buffer_unref(buf);
+ pthread_mutex_unlock(&mutex1);
+ return GST_FLOW_OK;
+ }
+ if (GST_BUFFER_SIZE(buf) > 127) {
+ g_printerr("GOT TOO BIG BUFFER\n");
+ pthread_mutex_unlock(&mutex1);
+ return GST_FLOW_ERROR;
+ }
+
+ data[pos++] = (udp_voice_celt_alpha) | (0 << 4);
+
+ encode_varint(&data[pos], &write, ++ctx->sequence, 1024-pos);
+ pos += write;
+
+ data[pos++] = 0x00 /*: 0x80 */ | (GST_BUFFER_SIZE(buf) & 0x7F);
+ memcpy(&data[pos], GST_BUFFER_DATA(buf), GST_BUFFER_SIZE(buf));
+ pos += GST_BUFFER_SIZE(buf);
+
+ gst_buffer_unref(buf);
+
+ add_preamble(&data[0], 1, pos-PREAMBLE_SIZE);
+ pthread_mutex_lock(&mutex1);
+ while ((ret = ssl_write(&ctx->ssl, data, PREAMBLE_SIZE)) < PREAMBLE_SIZE) {
+ if (ret != POLARSSL_ERR_NET_TRY_AGAIN) {
+ printf("write failed: %d\n", ret);
+ abort();
+ }
+ }
+ while ((ret = ssl_write(&ctx->ssl, &data[PREAMBLE_SIZE], pos-PREAMBLE_SIZE)) < (pos-PREAMBLE_SIZE)) {
+ if (ret != POLARSSL_ERR_NET_TRY_AGAIN) {
+ printf("write failed: %d\n", ret);
+ abort();
+ }
+ }
+ pthread_mutex_unlock(&mutex1);
+
+ return GST_FLOW_OK;
+}
+
+static void
+handle_udp(struct context *ctx, uint8_t *data, uint32_t len)
+{
+ int64_t session;
+ int64_t sequence;
+ uint32_t pos = 1;
+ uint32_t read = 0;
+ uint8_t frame_len, terminator;
+
+ printf("data[0]: 0x%x\n", data[0]);
+ printf("len: %u\n", len);
+ session = decode_varint(&data[pos], &read, len-pos);
+ pos += read;
+ sequence = decode_varint(&data[pos], &read, len-pos);
+ pos += read;
+ printf("session: %ld, sequence: %ld\n", session, sequence);
+
+ do {
+ frame_len = data[pos] & 0x7F;
+ terminator = data[pos] & 0x80;
+ pos += 1;
+
+ if (frame_len == 0 || frame_len > len-pos)
+ break;
+
+ appsrc_push(ctx->src, &data[pos], frame_len);
+
+ pos += frame_len;
+ sequence++;
+ } while (terminator);
+}
+
static void
recv_version(MumbleProto__Version *version, struct context *ctx)
{
@@ -186,6 +203,15 @@ recv_channel_state(MumbleProto__ChannelState *state, struct context *ctx)
}
static void
+recv_server_sync(MumbleProto__ServerSync *sync, struct context *ctx)
+{
+ ctx->session = sync->session;
+
+ printf("got session: %d\n", ctx->session);
+
+}
+
+static void
send_msg(struct context *ctx, ProtobufCMessage *msg)
{
uint8_t pad[128];
@@ -203,6 +229,7 @@ send_msg(struct context *ctx, ProtobufCMessage *msg)
protobuf_c_message_pack_to_buffer(msg, &buffer.base);
add_preamble(preamble, type, buffer.len);
+ pthread_mutex_lock(&mutex1);
while ((ret = ssl_write(&ctx->ssl, preamble, PREAMBLE_SIZE)) <= 0) {
if (ret != POLARSSL_ERR_NET_TRY_AGAIN) {
printf("write failed: %d\n", ret);
@@ -215,6 +242,7 @@ send_msg(struct context *ctx, ProtobufCMessage *msg)
abort();
}
}
+ pthread_mutex_unlock(&mutex1);
PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&buffer);
}
@@ -228,15 +256,13 @@ recv_msg(struct context *ctx, const callback_t *callbacks, uint32_t callback_siz
ProtobufCMessage *msg;
void *data;
int type, len;
- int ret, i;
-
- printf("recv msg\n");
+ int ret;
do {
ret = ssl_read(&ctx->ssl, preamble, 6);
if (ret == POLARSSL_ERR_NET_CONN_RESET) {
printf("conn reset\n");
- exit(1);
+ g_main_loop_quit (ctx->loop);
}
} while (ret == POLARSSL_ERR_NET_TRY_AGAIN);
@@ -314,6 +340,7 @@ do_ping(struct context *ctx)
static const callback_t callbacks[] = {
/* VERSION */ (callback_t) recv_version,
+ [5] = (callback_t) recv_server_sync,
[7] = (callback_t) recv_channel_state,
[127] = NULL,
};
@@ -327,20 +354,21 @@ _recv(GIOChannel *source, GIOCondition condition, gpointer data)
recv_msg(ctx, callbacks, ARRAY_SIZE(callbacks));
} while (ssl_get_bytes_avail(&ctx->ssl) > 0);
- do_ping(ctx);
+ //do_ping(ctx);
return TRUE;
}
static gboolean
bus_call(GstBus *bus, GstMessage *msg, gpointer data)
{
- GMainLoop *loop = (GMainLoop *) data;
+ struct context *ctx = data;
+ GMainLoop *loop = ctx->loop;
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_EOS:
g_print ("End of stream\n");
- //g_main_loop_quit (loop);
+ g_main_loop_quit (loop);
break;
case GST_MESSAGE_ERROR:
@@ -358,21 +386,27 @@ bus_call(GstBus *bus, GstMessage *msg, gpointer data)
break;
}
default:
- g_print("unhandled message: %d\n", GST_MESSAGE_TYPE(msg));
+ g_print("unhandled message: %d %s\n", GST_MESSAGE_TYPE(msg), gst_message_type_get_name(GST_MESSAGE_TYPE(msg)));
break;
}
+
+ return TRUE;
}
static void
app_need_data(GstAppSrc *src, guint length, gpointer user_data)
{
+#if 0
struct context *ctx = user_data;
+#endif
}
static void
app_enough_data(GstAppSrc *src, gpointer user_data)
{
+#if 0
struct context *ctx = user_data;
+#endif
}
static GstAppSrcCallbacks app_callbacks = {
@@ -381,6 +415,130 @@ static GstAppSrcCallbacks app_callbacks = {
NULL
};
+static int
+setup_playback_gst_pipeline(struct context *ctx)
+{
+ GstElement *pipeline, *src, *decoder, *conv, *sink;
+ GstBus *bus;
+
+ pipeline = gst_pipeline_new("cmumble-output");
+ src = gst_element_factory_make("appsrc", "input");
+ decoder = gst_element_factory_make("celtdec", "celt-decoder");
+ conv = gst_element_factory_make("audioconvert", "converter");
+ sink = gst_element_factory_make("autoaudiosink", "audio-output");
+
+ if (!pipeline || !src || !decoder || !conv || !sink) {
+ g_printerr("failed to initialize pipeline\n");
+ return -1;
+ }
+
+ bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
+ gst_bus_add_watch(bus, bus_call, ctx);
+ gst_object_unref(bus);
+
+ gst_bin_add_many(GST_BIN(pipeline),
+ src, decoder, conv, sink, NULL);
+ gst_element_link_many(src, decoder, conv, sink, NULL);
+
+ ctx->src = GST_APP_SRC(src);
+ ctx->playback_pipeline = pipeline;
+
+ /* Important! */
+ gst_base_src_set_live(GST_BASE_SRC(ctx->src), TRUE);
+ gst_base_src_set_do_timestamp(GST_BASE_SRC(ctx->src), TRUE);
+ gst_base_src_set_format(GST_BASE_SRC(ctx->src), GST_FORMAT_TIME);
+
+ gst_app_src_set_stream_type(ctx->src, GST_APP_STREAM_TYPE_STREAM);
+ gst_app_src_set_callbacks(ctx->src, &app_callbacks, ctx, NULL);
+
+ gst_element_set_state(pipeline, GST_STATE_PLAYING);
+
+ { /* Setup Celt Decoder */
+#define SAMPLERATE 48000
+#define CHANNELS 1
+ uint8_t celt_header_packet[sizeof(CELTHeader)];
+
+ ctx->celt_mode = celt_mode_create(SAMPLERATE,
+ SAMPLERATE / 100, NULL);
+ celt_header_init(&ctx->celt_header, ctx->celt_mode, CHANNELS);
+ celt_header_to_packet(&ctx->celt_header,
+ celt_header_packet, sizeof(CELTHeader));
+
+ appsrc_push(ctx->src, celt_header_packet, sizeof(CELTHeader));
+ /* fake vorbiscomment buffer */
+ appsrc_push(ctx->src, NULL, 0);
+ }
+
+ return 0;
+}
+
+static int
+setup_recording_gst_pipeline(struct context *ctx)
+{
+ GstElement *pipeline, *src, *cutter, *resample,
+ *conv, *capsfilter, *encoder, *sink;
+ GstBus *bus;
+ GstCaps *caps;
+
+ pipeline = gst_pipeline_new("cmumble-input");
+ src = gst_element_factory_make("autoaudiosrc", "audio-input");
+ cutter = gst_element_factory_make("cutter", "cutter");
+ resample = gst_element_factory_make("audioresample", "resample");
+ conv = gst_element_factory_make("audioconvert", "converter");
+ capsfilter = gst_element_factory_make("capsfilter", "capsfilter");
+ encoder = gst_element_factory_make("celtenc", "celt-encoder");
+ sink = gst_element_factory_make("appsink", "output");
+
+ if (!pipeline || !src || !cutter || !resample || !conv ||
+ !capsfilter || !encoder || !sink) {
+ g_printerr("failed to initialize pipeline\n");
+ return -1;
+ }
+
+ bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
+ gst_bus_add_watch(bus, bus_call, ctx);
+ gst_object_unref(bus);
+
+ gst_bin_add_many(GST_BIN(pipeline),
+ src, cutter, resample, conv, capsfilter, encoder, sink, NULL);
+ gst_element_link_many(src, cutter, resample, conv, capsfilter, encoder, sink, NULL);
+
+ ctx->sink = GST_APP_SINK(sink);
+ ctx->record_pipeline = pipeline;
+
+ caps = gst_caps_new_simple("audio/x-raw-int",
+ "channels", G_TYPE_INT, 1,
+ "depth", G_TYPE_INT, 16,
+ "rate", G_TYPE_INT, 48000,
+ "width", G_TYPE_INT, 16,
+ "signed", G_TYPE_BOOLEAN, TRUE,
+ NULL);
+ g_object_set(G_OBJECT(capsfilter), "caps", caps, NULL);
+ gst_caps_unref(caps);
+
+ g_object_set(G_OBJECT(cutter),
+ "threshold_dB", -45.0, "leaky", TRUE,
+ "runlength", 0.5, "prelength", 1.0, NULL);
+
+ gst_app_sink_set_emit_signals(ctx->sink, TRUE);
+ gst_app_sink_set_drop(ctx->sink, FALSE);;
+ g_signal_connect(sink, "new-buffer", G_CALLBACK(pull_buffer), ctx);
+
+ caps = gst_caps_new_simple("audio/x-celt",
+ "rate", G_TYPE_INT, SAMPLERATE,
+ "channels", G_TYPE_INT, 1,
+ "frame-size", G_TYPE_INT, SAMPLERATE/100,
+ NULL);
+ gst_app_sink_set_caps(ctx->sink, caps);
+ gst_caps_unref(caps);
+
+ gst_element_set_state(pipeline, GST_STATE_PLAYING);
+
+ ctx->sequence = 0;
+
+ return 0;
+}
+
int main(int argc, char **argv)
{
#if 1
@@ -406,7 +564,7 @@ int main(int argc, char **argv)
ssl_set_dbg(&ctx.ssl, my_debug, NULL);
ssl_set_bio(&ctx.ssl, net_recv, &ctx.sock, net_send, &ctx.sock);
- //ssl_set_session(&ctx.ssl, 1, 600, &ssn);
+ /* ssl_set_session(&ctx.ssl, 1, 600, &ssn); */
ssl_set_session(&ctx.ssl, 0, 0, &ctx.ssn);
ssl_set_ciphers(&ctx.ssl, ssl_default_ciphers);
@@ -422,7 +580,7 @@ int main(int argc, char **argv)
{
MumbleProto__Authenticate authenticate;
mumble_proto__authenticate__init(&authenticate);
- authenticate.username = "ben2";
+ authenticate.username = argv[1];
authenticate.password = "";
authenticate.n_celt_versions = 1;
authenticate.celt_versions = (int32_t[]) { 0x8000000b };
@@ -431,70 +589,16 @@ int main(int argc, char **argv)
do_ping(&ctx);
- GstElement *pipeline, *src, *decoder, *conv, *sink;
-
g_type_init();
gst_init(&argc, &argv);
ctx.loop = g_main_loop_new(NULL, FALSE);
- pipeline = gst_pipeline_new("cmumble-output");
- src = gst_element_factory_make("appsrc", "input");
- decoder = gst_element_factory_make("celtdec", "celt-decoder");
- conv = gst_element_factory_make("audioconvert", "converter");
- sink = gst_element_factory_make("autoaudiosink", "audio-output");
- //sink = gst_element_factory_make("filesink", "output");
- g_object_set(G_OBJECT(sink), "location", "foo.raw", NULL);
- //sink = gst_element_factory_make("alsasink", "audio-output");
-
-
- if (!pipeline || !src || !decoder || !conv || !sink) {
- g_printerr("failed to initialize pipeline\n");
+ if (setup_playback_gst_pipeline(&ctx) < 0)
return 1;
- }
- {
- GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
- gst_bus_add_watch(bus, bus_call, ctx.loop);
- gst_object_unref(bus);
- }
-
- gst_bin_add_many(GST_BIN(pipeline),
- src, decoder, conv, sink, NULL);
- gst_element_link_many(src, decoder, conv, sink, NULL);
-
- ctx.src = GST_APP_SRC(src);
- ctx.pipeline = pipeline;
-
- /* Important! */
- gst_base_src_set_live(GST_BASE_SRC(ctx.src), TRUE);
- gst_base_src_set_do_timestamp(GST_BASE_SRC(ctx.src), TRUE);
- gst_base_src_set_format(GST_BASE_SRC(ctx.src), GST_FORMAT_TIME);
-
- gst_app_src_set_stream_type(ctx.src, GST_APP_STREAM_TYPE_STREAM);
- gst_app_src_set_callbacks(ctx.src, &app_callbacks, &ctx, NULL);
-
- gst_element_set_state(pipeline, GST_STATE_PLAYING);
-
- { /* Setup Celt Decoder */
-#define SAMPLERATE 48000
-#define CHANNELS 1
- GstBuffer *gst_buf;
- uint8_t celt_header_packet[sizeof(CELTHeader)];
-
- ctx.celt_mode = celt_mode_create(SAMPLERATE,
- SAMPLERATE / 100, NULL);
- celt_header_init(&ctx.celt_header, ctx.celt_mode, CHANNELS);
- celt_header_to_packet(&ctx.celt_header,
- celt_header_packet, sizeof(CELTHeader));
-
- gst_buf = gst_app_buffer_new(celt_header_packet,
- sizeof(CELTHeader), NULL, &ctx);
- gst_app_src_push_buffer(ctx.src, gst_buf);
- /* fake vorbiscomment buffer */
- gst_buf = gst_app_buffer_new(NULL, 0, NULL, &ctx);
- gst_app_src_push_buffer(ctx.src, gst_buf);
- }
+ if (setup_recording_gst_pipeline(&ctx) < 0)
+ return 1;
ctx.sock_channel = g_io_channel_unix_new(ctx.sock);
g_io_add_watch(ctx.sock_channel, G_IO_IN | G_IO_ERR, _recv, &ctx);
@@ -507,4 +611,5 @@ int main(int argc, char **argv)
ssl_free(&ctx.ssl);
memset(&ctx.ssl, 0, sizeof(ctx.ssl));
+ return 0;
}