From 8df311c1052dd137a223a0839b3a64a3ec3d7869 Mon Sep 17 00:00:00 2001 From: Benjamin Franzke Date: Wed, 4 Dec 2013 11:46:58 +0100 Subject: record: Queue buffers and send out multiple frames at once Also restart sequence on discontiuity. FIXME: Research whether we can always just ask for the GST_BUFFER_DISCONT flag. Sending audio works quite reasonable with a server that has 72kbit/s now. :) --- src/audio.c | 168 ++++++++++++++++++++++++++++++++++++++++++++++++++---------- src/audio.h | 10 ++++ 2 files changed, 152 insertions(+), 26 deletions(-) diff --git a/src/audio.c b/src/audio.c index e40b1c2..b04ea39 100644 --- a/src/audio.c +++ b/src/audio.c @@ -8,6 +8,7 @@ #define SAMPLERATE 48000 #define FRAMESIZE 480 /* SAMPLERATE/100 */ #define CHANNELS 1 +#define BUFFER_TIME (gst_util_uint64_scale_int(1, GST_SECOND, 100)) #define CELT_CAPS "audio/x-celt,channels=" G_STRINGIFY(CHANNELS) "," \ "rate=" G_STRINGIFY(SAMPLERATE) ",frame-size=" G_STRINGIFY(FRAMESIZE) #define AUDIO_CAPS "audio/x-raw,format=S16LE,channels=" \ @@ -47,7 +48,7 @@ cmumble_audio_push(struct cmumble *cm, struct cmumble_user *user, * been received out of order at the server? */ if (user->last_sequence < 0 || sequence == 0 || - sequence < (user->last_sequence + 1)) { + sequence < (user->last_sequence + 1)) { GST_BUFFER_FLAG_SET(gstbuf, GST_BUFFER_FLAG_DISCONT); GST_BUFFER_FLAG_SET(gstbuf, GST_BUFFER_FLAG_RESYNC); time = now - base; @@ -77,7 +78,7 @@ cmumble_audio_push(struct cmumble *cm, struct cmumble_user *user, GST_BUFFER_DTS(gstbuf) = now - base; GST_BUFFER_PTS(gstbuf) = time; - GST_BUFFER_DURATION(gstbuf) = gst_util_uint64_scale_int (1, GST_SECOND, 100); + GST_BUFFER_DURATION(gstbuf) = BUFFER_TIME; user->last_time_end = time + GST_BUFFER_DURATION(gstbuf); user->last_sequence = sequence; @@ -85,21 +86,54 @@ cmumble_audio_push(struct cmumble *cm, struct cmumble_user *user, gst_app_src_push_buffer(user->src, gstbuf); } +static GstFlowReturn +send_queued_celt_buffers(struct cmumble *cm) +{ + uint8_t data[1024]; + uint32_t written = 0, pos = 0; + mumble_udptunnel_t tunnel; + GstSample *sample; + GstBuffer *buf; + int i; + + if (g_queue_is_empty(cm->audio.buffer_queue)) + return GST_FLOW_ERROR; + + data[pos++] = (udp_voice_celt_alpha << 5) | (udp_normal_talking); + encode_varint(&data[pos], &written, cm->sequence, sizeof(data)-pos); + pos += written; + + for (i = 0; !g_queue_is_empty(cm->audio.buffer_queue); ++i) { + sample = g_queue_pop_head(cm->audio.buffer_queue); + buf = gst_sample_get_buffer(sample); + + data[pos] = gst_buffer_get_size(buf) & 0x7F; + if (!g_queue_is_empty(cm->audio.buffer_queue)) + data[pos] |= 0x80; + pos += 1; + gst_buffer_extract(buf, 0, &data[pos], gst_buffer_get_size(buf)); + pos += gst_buffer_get_size(buf); + gst_sample_unref(sample); + } + + cm->sequence += i; + + cmumble_init_udptunnel(&tunnel); + tunnel.packet.data = data; + tunnel.packet.len = pos; + cmumble_send_udptunnel(cm, &tunnel); + + return GST_FLOW_OK; +} + static GstFlowReturn pull_buffer(GstAppSink *sink, gpointer user_data) { struct cmumble *cm = user_data; GstSample *sample; GstBuffer *buf; - uint8_t data[1024]; - uint32_t write = 0, pos = 0; - mumble_udptunnel_t tunnel; + GstClockTime *silence; - /* FIXME: Make this more generic/disable pulling - * the pipeline completely if not connected? - */ - if (cm->con.conn == NULL) - return GST_FLOW_OK; sample = gst_app_sink_pull_sample(cm->audio.sink); if (sample == NULL) @@ -112,27 +146,42 @@ pull_buffer(GstAppSink *sink, gpointer user_data) return GST_FLOW_OK; } + /* FIXME: Make this more generic/disable pulling + * the pipeline completely if not connected? + */ + if (cm->con.conn == NULL) { + gst_sample_unref(sample); + return GST_FLOW_OK; + } + if (gst_buffer_get_size(buf) > 127) { g_printerr("error: unexpected buffer size\n"); gst_sample_unref(sample); return GST_FLOW_ERROR; } - data[pos++] = (udp_voice_celt_alpha << 5) | (udp_normal_talking); + if (cm->audio.last_time < GST_BUFFER_PTS(buf) - BUFFER_TIME) { + if (!g_queue_is_empty(cm->audio.buffer_queue)) + send_queued_celt_buffers(cm); + cm->sequence = 0; + } + cm->audio.last_time = GST_BUFFER_PTS(buf); - encode_varint(&data[pos], &write, ++cm->sequence, sizeof(data)-pos); - pos += write; + silence = g_queue_peek_head(cm->audio.silence_timestamps); + while (silence && GST_BUFFER_PTS(buf) > *silence) { + g_queue_remove(cm->audio.silence_timestamps, silence); + g_free(silence); + silence = g_queue_peek_head(cm->audio.silence_timestamps); + } - data[pos++] = 0x00 /*: 0x80 */ | (gst_buffer_get_size(buf) & 0x7F); - gst_buffer_extract(buf, 0, &data[pos], gst_buffer_get_size(buf)); - pos += gst_buffer_get_size(buf); + g_queue_push_tail(cm->audio.buffer_queue, sample); - gst_sample_unref(sample); + if (silence && *silence == (GST_BUFFER_PTS(buf) + BUFFER_TIME)) + return send_queued_celt_buffers(cm); - cmumble_init_udptunnel(&tunnel); - tunnel.packet.data = data; - tunnel.packet.len = pos; - cmumble_send_udptunnel(cm, &tunnel); + /* FIXME: This should not be hardcoded, but derived from bitrate */ + if (g_queue_get_length(cm->audio.buffer_queue) == 4) + return send_queued_celt_buffers(cm); return GST_FLOW_OK; } @@ -164,14 +213,60 @@ GstAppSinkCallbacks sink_cbs = { .new_sample = new_sample }; +static void +handle_cutter_message(struct cmumble *cm, GstMessage *message) +{ + const GstStructure *s; + gboolean above; + GstClockTime *time; + + s = gst_message_get_structure(message); + if (!gst_structure_get_boolean(s, "above", &above)) + return; + + /* We are only intrested in below state */ + if (above) + return; + + time = g_new(GstClockTime, 1); + if (!time) + return; + + if (!gst_structure_get_clock_time(s, "timestamp", time)) + return; + + if (*time == cm->audio.last_time + BUFFER_TIME) + send_queued_celt_buffers(cm); + else + g_queue_push_tail(cm->audio.silence_timestamps, time); +} + +static gboolean +record_pipe_bus_message(GstBus *bus, GstMessage *message, gpointer data) +{ + struct cmumble *cm = data; + + switch (GST_MESSAGE_TYPE(message)) { + case GST_MESSAGE_ELEMENT: + if (GST_MESSAGE_SRC(message) == GST_OBJECT(cm->audio.cutter)) + handle_cutter_message(cm, message); + default: + break; + } + + return TRUE; +} + static int setup_recording_gst_pipeline(struct cmumble *cm) { - GstElement *pipeline, *cutter, *sink; + GstElement *pipeline, *sink; GError *error = NULL; + GstBus *bus; char *desc = "autoaudiosrc name=src ! cutter name=cutter ! " "audioresample ! audioconvert ! "AUDIO_CAPS" ! " - "celtenc ! appsink name=sink caps="CELT_CAPS; + "celtenc name=enc perfect-timestamp=true hard-resync=true" " ! " + "appsink name=sink caps="CELT_CAPS; pipeline = gst_parse_launch(desc, &error); if (error) { @@ -182,16 +277,33 @@ setup_recording_gst_pipeline(struct cmumble *cm) cm->audio.sink = GST_APP_SINK(sink); cm->audio.record_pipeline = pipeline; - cutter = gst_bin_get_by_name(GST_BIN(pipeline), "cutter"); + cm->audio.src = gst_bin_get_by_name(GST_BIN(pipeline), "src"); + + cm->audio.cutter = gst_bin_get_by_name(GST_BIN(pipeline), "cutter"); /* FIXME: The threshold should be configurable. */ - g_object_set(G_OBJECT(cutter), + g_object_set(G_OBJECT(cm->audio.cutter), "threshold_dB", -45.0, "leaky", TRUE, NULL); gst_app_sink_set_callbacks(cm->audio.sink, &sink_cbs, cm, NULL); - gst_app_sink_set_drop(cm->audio.sink, FALSE);; + gst_app_sink_set_drop(cm->audio.sink, TRUE); + + bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + cm->audio.bus_watch_id = + gst_bus_add_watch(bus, record_pipe_bus_message, cm); + g_object_unref(bus); + + cm->audio.buffer_queue = g_queue_new(); + if (!cm->audio.buffer_queue) + return -1; + cm->audio.silence_timestamps = g_queue_new(); + if (!cm->audio.silence_timestamps) + return -1; gst_element_set_state(pipeline, GST_STATE_PLAYING); + + cm->audio.enc = gst_bin_get_by_name(GST_BIN(pipeline), "enc"); + cm->sequence = 0; return 0; @@ -337,6 +449,10 @@ int cmumble_audio_fini(struct cmumble *cm) { + g_source_remove(cm->audio.bus_watch_id); + g_queue_free_full(cm->audio.silence_timestamps, g_free); + g_queue_free_full(cm->audio.buffer_queue, (GDestroyNotify) gst_sample_unref); + return 0; } diff --git a/src/audio.h b/src/audio.h index 8e47ca9..a54f269 100644 --- a/src/audio.h +++ b/src/audio.h @@ -13,11 +13,21 @@ struct cmumble_audio { GstElement *record_pipeline; GstAppSink *sink; + GstElement *src; + GstElement *cutter; + GstElement *enc; + + guint bus_watch_id; + + GstClockTime last_time; guint8 celt_header_packet[sizeof(CELTHeader)]; CELTHeader celt_header; CELTMode *celt_mode; + GQueue *buffer_queue; + GQueue *silence_timestamps; + gint32 celt_bitstream_version; }; -- cgit