diff options
Diffstat (limited to 'src/socket.c')
-rw-r--r-- | src/socket.c | 168 |
1 files changed, 123 insertions, 45 deletions
diff --git a/src/socket.c b/src/socket.c index 690b94f..332c60d 100644 --- a/src/socket.c +++ b/src/socket.c @@ -14,6 +14,10 @@ #include <celt/celt_header.h> #include <speex/speex_jitter.h> +#include <gst/gst.h> +#include <gst/app/gstappsrc.h> +#include <gst/app/gstappbuffer.h> + #include <glib.h> #include <glib-object.h> @@ -37,6 +41,9 @@ struct context { CELTHeader celt_header; CELTMode *celt_mode; + + GstElement *pipeline; + GstAppSrc *src; }; enum udp_message_type { @@ -111,14 +118,7 @@ handle_udp(struct context *ctx, uint8_t *data, uint32_t len) int pos = 1; int read = 0; -#define PCM_SIZE (48000/100 * 1) - int16_t pcm[PCM_SIZE]; - uint8_t buf[BUFSIZ]; - FILE *f; int frame_len, term; - CELTDecoder *dec_state; - JitterBuffer *jitter; - CELTMode *mode; static int iseq = 0; session = decode_varint(&data[pos], &read, len-pos); @@ -127,15 +127,6 @@ handle_udp(struct context *ctx, uint8_t *data, uint32_t len) pos += read; printf("session: %ld, sequence: %ld\n", session, sequence); - f = fopen("foo", "a+"); - - dec_state = celt_decoder_create(ctx->celt_mode, - ctx->celt_header.nb_channels, NULL); - - jitter = jitter_buffer_init(ctx->celt_header.frame_size); - jitter_buffer_ctl(jitter, JITTER_BUFFER_SET_MARGIN, - &ctx->celt_header.frame_size); - do { frame_len = (data[pos] & 0x7F); term = (data[pos] & 0x80) == 0x80; @@ -145,32 +136,15 @@ handle_udp(struct context *ctx, uint8_t *data, uint32_t len) if (frame_len == 0) break; - JitterBufferPacket packet; - packet.data = &data[pos]; - packet.len = frame_len; - packet.timestamp = ctx->celt_header.frame_size * iseq++; - packet.span = ctx->celt_header.frame_size; - packet.sequence = 0; - - jitter_buffer_put(jitter, &packet); - - packet.data = buf; - packet.len = BUFSIZ; - jitter_buffer_tick(jitter); - jitter_buffer_get(jitter, &packet, ctx->celt_header.frame_size, NULL); + void *src = malloc(frame_len); + memcpy(src, &data[pos], frame_len); - if (packet.len == 0) - packet.data = NULL; - - celt_decode(dec_state, packet.data, packet.len, pcm); - fwrite(pcm, sizeof(int16_t), PCM_SIZE, f); + GstBuffer *gstbuf = gst_app_buffer_new(src, frame_len, free, NULL); + gst_app_src_push_buffer(ctx->src, gstbuf); pos += frame_len; sequence++; } while (term); - - fclose(f); - celt_decoder_destroy(dec_state); } static void @@ -305,7 +279,7 @@ recv_msg(struct context *ctx, const callback_t *callbacks, uint32_t callback_siz len, data); if (msg == NULL) { printf("message unpack failure\n"); - return ; + return; } printf("debug: received message: %s type:%d, len:%d\n", messages[type].name, type, len); @@ -348,6 +322,7 @@ static gboolean _recv(GIOChannel *source, GIOCondition condition, gpointer data) { struct context *ctx = data; + do { recv_msg(ctx, callbacks, ARRAY_SIZE(callbacks)); } while (ssl_get_bytes_avail(&ctx->ssl) > 0); @@ -356,6 +331,55 @@ _recv(GIOChannel *source, GIOCondition condition, gpointer data) return TRUE; } +static gboolean +bus_call(GstBus *bus, GstMessage *msg, gpointer data) +{ + GMainLoop *loop = (GMainLoop *) data; + + switch (GST_MESSAGE_TYPE (msg)) { + + case GST_MESSAGE_EOS: + g_print ("End of stream\n"); + //g_main_loop_quit (loop); + break; + + case GST_MESSAGE_ERROR: + { + char *debug; + GError *error; + + gst_message_parse_error (msg, &error, &debug); + g_free (debug); + + g_printerr ("Error: %s\n", error->message); + g_error_free (error); + + g_main_loop_quit (loop); + break; + } + default: + g_print("unhandled message: %d\n", GST_MESSAGE_TYPE(msg)); + break; + } +} + +static void +app_need_data(GstAppSrc *src, guint length, gpointer user_data) +{ + struct context *ctx = user_data; +} + +static void +app_enough_data(GstAppSrc *src, gpointer user_data) +{ + struct context *ctx = user_data; +} + +static GstAppSrcCallbacks app_callbacks = { + app_need_data, + app_enough_data, + NULL +}; int main(int argc, char **argv) { @@ -407,17 +431,71 @@ int main(int argc, char **argv) do_ping(&ctx); -#define SAMPLERATE 48000 -#define CHANNELS 1 - ctx.celt_mode = celt_mode_create(SAMPLERATE, SAMPLERATE / 100, NULL); - celt_header_init(&ctx.celt_header, ctx.celt_mode, CHANNELS); - uint8_t celt_header_packet[sizeof(CELTHeader)]; - printf("extra headers: %d\n", ctx.celt_header.extra_headers); - celt_header_to_packet(&ctx.celt_header, celt_header_packet, sizeof(CELTHeader)); + GstElement *pipeline, *src, *decoder, *conv, *sink; g_type_init(); + gst_init(&argc, &argv); + ctx.loop = g_main_loop_new(NULL, FALSE); + pipeline = gst_pipeline_new("cmumble-output"); + src = gst_element_factory_make("appsrc", "input"); + decoder = gst_element_factory_make("celtdec", "celt-decoder"); + conv = gst_element_factory_make("audioconvert", "converter"); + sink = gst_element_factory_make("autoaudiosink", "audio-output"); + //sink = gst_element_factory_make("filesink", "output"); + g_object_set(G_OBJECT(sink), "location", "foo.raw", NULL); + //sink = gst_element_factory_make("alsasink", "audio-output"); + + + if (!pipeline || !src || !decoder || !conv || !sink) { + g_printerr("failed to initialize pipeline\n"); + return 1; + } + + { + GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + gst_bus_add_watch(bus, bus_call, ctx.loop); + gst_object_unref(bus); + } + + gst_bin_add_many(GST_BIN(pipeline), + src, decoder, conv, sink, NULL); + gst_element_link_many(src, decoder, conv, sink, NULL); + + ctx.src = GST_APP_SRC(src); + ctx.pipeline = pipeline; + + /* Important! */ + gst_base_src_set_live(GST_BASE_SRC(ctx.src), TRUE); + gst_base_src_set_do_timestamp(GST_BASE_SRC(ctx.src), TRUE); + gst_base_src_set_format(GST_BASE_SRC(ctx.src), GST_FORMAT_TIME); + + gst_app_src_set_stream_type(ctx.src, GST_APP_STREAM_TYPE_STREAM); + gst_app_src_set_callbacks(ctx.src, &app_callbacks, &ctx, NULL); + + gst_element_set_state(pipeline, GST_STATE_PLAYING); + + { /* Setup Celt Decoder */ +#define SAMPLERATE 48000 +#define CHANNELS 1 + GstBuffer *gst_buf; + uint8_t celt_header_packet[sizeof(CELTHeader)]; + + ctx.celt_mode = celt_mode_create(SAMPLERATE, + SAMPLERATE / 100, NULL); + celt_header_init(&ctx.celt_header, ctx.celt_mode, CHANNELS); + celt_header_to_packet(&ctx.celt_header, + celt_header_packet, sizeof(CELTHeader)); + + gst_buf = gst_app_buffer_new(celt_header_packet, + sizeof(CELTHeader), NULL, &ctx); + gst_app_src_push_buffer(ctx.src, gst_buf); + /* fake vorbiscomment buffer */ + gst_buf = gst_app_buffer_new(NULL, 0, NULL, &ctx); + gst_app_src_push_buffer(ctx.src, gst_buf); + } + ctx.sock_channel = g_io_channel_unix_new(ctx.sock); g_io_add_watch(ctx.sock_channel, G_IO_IN | G_IO_ERR, _recv, &ctx); |