summaryrefslogtreecommitdiff
path: root/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket.c')
-rw-r--r--src/socket.c168
1 files changed, 123 insertions, 45 deletions
diff --git a/src/socket.c b/src/socket.c
index 690b94f..332c60d 100644
--- a/src/socket.c
+++ b/src/socket.c
@@ -14,6 +14,10 @@
#include <celt/celt_header.h>
#include <speex/speex_jitter.h>
+#include <gst/gst.h>
+#include <gst/app/gstappsrc.h>
+#include <gst/app/gstappbuffer.h>
+
#include <glib.h>
#include <glib-object.h>
@@ -37,6 +41,9 @@ struct context {
CELTHeader celt_header;
CELTMode *celt_mode;
+
+ GstElement *pipeline;
+ GstAppSrc *src;
};
enum udp_message_type {
@@ -111,14 +118,7 @@ handle_udp(struct context *ctx, uint8_t *data, uint32_t len)
int pos = 1;
int read = 0;
-#define PCM_SIZE (48000/100 * 1)
- int16_t pcm[PCM_SIZE];
- uint8_t buf[BUFSIZ];
- FILE *f;
int frame_len, term;
- CELTDecoder *dec_state;
- JitterBuffer *jitter;
- CELTMode *mode;
static int iseq = 0;
session = decode_varint(&data[pos], &read, len-pos);
@@ -127,15 +127,6 @@ handle_udp(struct context *ctx, uint8_t *data, uint32_t len)
pos += read;
printf("session: %ld, sequence: %ld\n", session, sequence);
- f = fopen("foo", "a+");
-
- dec_state = celt_decoder_create(ctx->celt_mode,
- ctx->celt_header.nb_channels, NULL);
-
- jitter = jitter_buffer_init(ctx->celt_header.frame_size);
- jitter_buffer_ctl(jitter, JITTER_BUFFER_SET_MARGIN,
- &ctx->celt_header.frame_size);
-
do {
frame_len = (data[pos] & 0x7F);
term = (data[pos] & 0x80) == 0x80;
@@ -145,32 +136,15 @@ handle_udp(struct context *ctx, uint8_t *data, uint32_t len)
if (frame_len == 0)
break;
- JitterBufferPacket packet;
- packet.data = &data[pos];
- packet.len = frame_len;
- packet.timestamp = ctx->celt_header.frame_size * iseq++;
- packet.span = ctx->celt_header.frame_size;
- packet.sequence = 0;
-
- jitter_buffer_put(jitter, &packet);
-
- packet.data = buf;
- packet.len = BUFSIZ;
- jitter_buffer_tick(jitter);
- jitter_buffer_get(jitter, &packet, ctx->celt_header.frame_size, NULL);
+ void *src = malloc(frame_len);
+ memcpy(src, &data[pos], frame_len);
- if (packet.len == 0)
- packet.data = NULL;
-
- celt_decode(dec_state, packet.data, packet.len, pcm);
- fwrite(pcm, sizeof(int16_t), PCM_SIZE, f);
+ GstBuffer *gstbuf = gst_app_buffer_new(src, frame_len, free, NULL);
+ gst_app_src_push_buffer(ctx->src, gstbuf);
pos += frame_len;
sequence++;
} while (term);
-
- fclose(f);
- celt_decoder_destroy(dec_state);
}
static void
@@ -305,7 +279,7 @@ recv_msg(struct context *ctx, const callback_t *callbacks, uint32_t callback_siz
len, data);
if (msg == NULL) {
printf("message unpack failure\n");
- return ;
+ return;
}
printf("debug: received message: %s type:%d, len:%d\n", messages[type].name, type, len);
@@ -348,6 +322,7 @@ static gboolean
_recv(GIOChannel *source, GIOCondition condition, gpointer data)
{
struct context *ctx = data;
+
do {
recv_msg(ctx, callbacks, ARRAY_SIZE(callbacks));
} while (ssl_get_bytes_avail(&ctx->ssl) > 0);
@@ -356,6 +331,55 @@ _recv(GIOChannel *source, GIOCondition condition, gpointer data)
return TRUE;
}
+static gboolean
+bus_call(GstBus *bus, GstMessage *msg, gpointer data)
+{
+ GMainLoop *loop = (GMainLoop *) data;
+
+ switch (GST_MESSAGE_TYPE (msg)) {
+
+ case GST_MESSAGE_EOS:
+ g_print ("End of stream\n");
+ //g_main_loop_quit (loop);
+ break;
+
+ case GST_MESSAGE_ERROR:
+ {
+ char *debug;
+ GError *error;
+
+ gst_message_parse_error (msg, &error, &debug);
+ g_free (debug);
+
+ g_printerr ("Error: %s\n", error->message);
+ g_error_free (error);
+
+ g_main_loop_quit (loop);
+ break;
+ }
+ default:
+ g_print("unhandled message: %d\n", GST_MESSAGE_TYPE(msg));
+ break;
+ }
+}
+
+static void
+app_need_data(GstAppSrc *src, guint length, gpointer user_data)
+{
+ struct context *ctx = user_data;
+}
+
+static void
+app_enough_data(GstAppSrc *src, gpointer user_data)
+{
+ struct context *ctx = user_data;
+}
+
+static GstAppSrcCallbacks app_callbacks = {
+ app_need_data,
+ app_enough_data,
+ NULL
+};
int main(int argc, char **argv)
{
@@ -407,17 +431,71 @@ int main(int argc, char **argv)
do_ping(&ctx);
-#define SAMPLERATE 48000
-#define CHANNELS 1
- ctx.celt_mode = celt_mode_create(SAMPLERATE, SAMPLERATE / 100, NULL);
- celt_header_init(&ctx.celt_header, ctx.celt_mode, CHANNELS);
- uint8_t celt_header_packet[sizeof(CELTHeader)];
- printf("extra headers: %d\n", ctx.celt_header.extra_headers);
- celt_header_to_packet(&ctx.celt_header, celt_header_packet, sizeof(CELTHeader));
+ GstElement *pipeline, *src, *decoder, *conv, *sink;
g_type_init();
+ gst_init(&argc, &argv);
+
ctx.loop = g_main_loop_new(NULL, FALSE);
+ pipeline = gst_pipeline_new("cmumble-output");
+ src = gst_element_factory_make("appsrc", "input");
+ decoder = gst_element_factory_make("celtdec", "celt-decoder");
+ conv = gst_element_factory_make("audioconvert", "converter");
+ sink = gst_element_factory_make("autoaudiosink", "audio-output");
+ //sink = gst_element_factory_make("filesink", "output");
+ g_object_set(G_OBJECT(sink), "location", "foo.raw", NULL);
+ //sink = gst_element_factory_make("alsasink", "audio-output");
+
+
+ if (!pipeline || !src || !decoder || !conv || !sink) {
+ g_printerr("failed to initialize pipeline\n");
+ return 1;
+ }
+
+ {
+ GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
+ gst_bus_add_watch(bus, bus_call, ctx.loop);
+ gst_object_unref(bus);
+ }
+
+ gst_bin_add_many(GST_BIN(pipeline),
+ src, decoder, conv, sink, NULL);
+ gst_element_link_many(src, decoder, conv, sink, NULL);
+
+ ctx.src = GST_APP_SRC(src);
+ ctx.pipeline = pipeline;
+
+ /* Important! */
+ gst_base_src_set_live(GST_BASE_SRC(ctx.src), TRUE);
+ gst_base_src_set_do_timestamp(GST_BASE_SRC(ctx.src), TRUE);
+ gst_base_src_set_format(GST_BASE_SRC(ctx.src), GST_FORMAT_TIME);
+
+ gst_app_src_set_stream_type(ctx.src, GST_APP_STREAM_TYPE_STREAM);
+ gst_app_src_set_callbacks(ctx.src, &app_callbacks, &ctx, NULL);
+
+ gst_element_set_state(pipeline, GST_STATE_PLAYING);
+
+ { /* Setup Celt Decoder */
+#define SAMPLERATE 48000
+#define CHANNELS 1
+ GstBuffer *gst_buf;
+ uint8_t celt_header_packet[sizeof(CELTHeader)];
+
+ ctx.celt_mode = celt_mode_create(SAMPLERATE,
+ SAMPLERATE / 100, NULL);
+ celt_header_init(&ctx.celt_header, ctx.celt_mode, CHANNELS);
+ celt_header_to_packet(&ctx.celt_header,
+ celt_header_packet, sizeof(CELTHeader));
+
+ gst_buf = gst_app_buffer_new(celt_header_packet,
+ sizeof(CELTHeader), NULL, &ctx);
+ gst_app_src_push_buffer(ctx.src, gst_buf);
+ /* fake vorbiscomment buffer */
+ gst_buf = gst_app_buffer_new(NULL, 0, NULL, &ctx);
+ gst_app_src_push_buffer(ctx.src, gst_buf);
+ }
+
ctx.sock_channel = g_io_channel_unix_new(ctx.sock);
g_io_add_watch(ctx.sock_channel, G_IO_IN | G_IO_ERR, _recv, &ctx);