#include "audio.h" #include "varint.h" #include "cmumble.h" #include #include #define SAMPLERATE 48000 #define FRAMESIZE 480 /* SAMPLERATE/100 */ #define CHANNELS 1 #define CELT_CAPS "audio/x-celt,channels=" G_STRINGIFY(CHANNELS) "," \ "rate=" G_STRINGIFY(SAMPLERATE) ",frame-size=" G_STRINGIFY(FRAMESIZE) #define AUDIO_CAPS "audio/x-raw,format=S16LE,channels=" \ G_STRINGIFY(CHANNELS) ",rate=" G_STRINGIFY(SAMPLERATE) void cmumble_audio_push(struct cmumble *cm, struct cmumble_user *user, const guint8 *data, gsize size, gint64 sequence) { GstBuffer *gstbuf; GstClock *clock; GstClockTime time = 0; GstClockTime base, now = 0; if (cm->verbose) g_print("%s: sequence: %ld\n", __func__, sequence); base = gst_element_get_base_time(GST_ELEMENT(user->src)); clock = gst_element_get_clock(GST_ELEMENT(user->src)); if (clock) { now = gst_clock_get_time(clock); g_object_unref(clock); } /* FIXME: What to do when sequence is a bad value? * e.g to little in value, to be uptodate? * - just drop? * - enqueue as now? */ gstbuf = gst_buffer_new_wrapped(g_memdup(data, size), size); GST_BUFFER_FLAG_SET(gstbuf, GST_BUFFER_FLAG_LIVE); /* Asume packets are in order, since we're using tcp tunnel only atm. * FIXME: This assumption is probably wrong, since the packets may have * been received out of order at the server? */ if (user->last_sequence < 0 || sequence == 0 || sequence < (user->last_sequence + 1)) { GST_BUFFER_FLAG_SET(gstbuf, GST_BUFFER_FLAG_DISCONT); GST_BUFFER_FLAG_SET(gstbuf, GST_BUFFER_FLAG_RESYNC); time = now - base; if (cm->verbose) g_print("%s: set time to now\n", __func__); } else if (sequence >= user->last_sequence + 1) { gint64 num = sequence - (user->last_sequence + 1); time = user->last_time_end; if (num > 0) { time += gst_util_uint64_scale_int(num, GST_SECOND, 100); GST_BUFFER_FLAG_SET(gstbuf, GST_BUFFER_FLAG_DISCONT); } if (cm->verbose) g_print("%s: set time by sequence: %lu, now: %lu\n", __func__, time, now - base); } if (time < (now - base)) { GST_BUFFER_FLAG_SET(gstbuf, GST_BUFFER_FLAG_DISCONT); GST_BUFFER_FLAG_SET(gstbuf, GST_BUFFER_FLAG_RESYNC); time = now - base; if (cm->verbose) g_print("%s: time is in the past, setting to now\n", __func__); } GST_BUFFER_DTS(gstbuf) = now - base; GST_BUFFER_PTS(gstbuf) = time; GST_BUFFER_DURATION(gstbuf) = gst_util_uint64_scale_int (1, GST_SECOND, 100); user->last_time_end = time + GST_BUFFER_DURATION(gstbuf); user->last_sequence = sequence; gst_app_src_push_buffer(user->src, gstbuf); } static GstFlowReturn pull_buffer(GstAppSink *sink, gpointer user_data) { struct cmumble *cm = user_data; GstSample *sample; GstBuffer *buf; uint8_t data[1024]; uint32_t write = 0, pos = 0; mumble_udptunnel_t tunnel; /* FIXME: Make this more generic/disable pulling * the pipeline completely if not connected? */ if (cm->con.conn == NULL) return GST_FLOW_OK; sample = gst_app_sink_pull_sample(cm->audio.sink); if (sample == NULL) return GST_FLOW_ERROR; buf = gst_sample_get_buffer(sample); if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_HEADER)) { gst_sample_unref(sample); return GST_FLOW_OK; } if (gst_buffer_get_size(buf) > 127) { g_printerr("error: unexpected buffer size\n"); gst_sample_unref(sample); return GST_FLOW_ERROR; } data[pos++] = (udp_voice_celt_alpha << 5) | (udp_normal_talking); encode_varint(&data[pos], &write, ++cm->sequence, sizeof(data)-pos); pos += write; data[pos++] = 0x00 /*: 0x80 */ | (gst_buffer_get_size(buf) & 0x7F); gst_buffer_extract(buf, 0, &data[pos], gst_buffer_get_size(buf)); pos += gst_buffer_get_size(buf); gst_sample_unref(sample); cmumble_init_udptunnel(&tunnel); tunnel.packet.data = data; tunnel.packet.len = pos; cmumble_send_udptunnel(cm, &tunnel); return GST_FLOW_OK; } static gboolean idle(gpointer user_data) { struct cmumble *cm = user_data; GstAppSink *sink; while ((sink = g_async_queue_try_pop(cm->async_queue)) != NULL) pull_buffer(sink, cm); return FALSE; } static GstFlowReturn new_sample(GstAppSink *sink, gpointer user_data) { struct cmumble *cm = user_data; g_async_queue_push(cm->async_queue, sink); g_idle_add(idle, cm); return GST_FLOW_OK; } GstAppSinkCallbacks sink_cbs = { .new_sample = new_sample }; static int setup_recording_gst_pipeline(struct cmumble *cm) { GstElement *pipeline, *cutter, *sink; GError *error = NULL; char *desc = "autoaudiosrc name=src ! cutter name=cutter ! " "audioresample ! audioconvert ! "AUDIO_CAPS" ! " "celtenc ! appsink name=sink caps="CELT_CAPS; pipeline = gst_parse_launch(desc, &error); if (error) { g_printerr("Failed to create pipeline: %s\n", error->message); return -1; } sink = gst_bin_get_by_name(GST_BIN(pipeline), "sink"); cm->audio.sink = GST_APP_SINK(sink); cm->audio.record_pipeline = pipeline; cutter = gst_bin_get_by_name(GST_BIN(pipeline), "cutter"); /* FIXME: The threshold should be configurable. */ g_object_set(G_OBJECT(cutter), "threshold_dB", -45.0, "leaky", TRUE, NULL); gst_app_sink_set_callbacks(cm->audio.sink, &sink_cbs, cm, NULL); gst_app_sink_set_drop(cm->audio.sink, FALSE);; gst_element_set_state(pipeline, GST_STATE_PLAYING); cm->sequence = 0; return 0; } static void set_pulse_states(const GValue *item, gpointer user_data) { GstElement *elm = g_value_get_object(item); struct cmumble_user *user = user_data; GstStructure *props; gchar *name; if (g_strcmp0(G_OBJECT_TYPE_NAME(elm), "GstPulseSink") != 0 || g_object_class_find_property(G_OBJECT_GET_CLASS(elm), "stream-properties") == NULL) goto out; /* FIXME: Move this into a man-page or so: * Dear User: Add the following to the pulseaudio configuration: * load-module module-device-manager "do_routing=1" * This is to let new join users default to e.g. a headset output. * Also consider setting device.intended_roles = "phone" for your * output to be marked as headset (if you dont have a usb headset dev). */ name = g_strdup_printf("cmumble [%s]", user->name); props = gst_structure_new("props", "application.name", G_TYPE_STRING, name, "media.role", G_TYPE_STRING, "phone", NULL); g_object_set(elm, "stream-properties", props, NULL); gst_structure_free(props); g_free(name); out: g_object_unref(G_OBJECT(elm)); } static void add_celt_streamheader(struct cmumble *cm, GstAppSrc *src) { GValue streamheader = { 0, }, val = { 0, }; GstTagList *tags; GstBuffer *buf[2]; GstStructure *s; GstCaps *caps; gint i; buf[0] = gst_buffer_new_allocate(NULL, sizeof(CELTHeader), NULL); gst_buffer_fill(buf[0], 0, cm->audio.celt_header_packet, sizeof(CELTHeader)); tags = gst_tag_list_new_empty(); buf[1] = gst_tag_list_to_vorbiscomment_buffer(tags, NULL, 0, "mumble"); gst_tag_list_unref(tags); g_value_init(&streamheader, GST_TYPE_ARRAY); for (i = 0; i < G_N_ELEMENTS(buf); ++i) { GST_BUFFER_FLAG_SET(buf[i], GST_BUFFER_FLAG_HEADER); GST_BUFFER_OFFSET(buf[i]) = 0; GST_BUFFER_OFFSET_END(buf[i]) = 0; g_value_init(&val, GST_TYPE_BUFFER); gst_value_take_buffer(&val, buf[i]); gst_value_array_append_value(&streamheader, &val); g_value_unset(&val); } caps = gst_app_src_get_caps(src); caps = gst_caps_make_writable(caps); s = gst_caps_get_structure(caps, 0); gst_structure_set_value(s, "streamheader", &streamheader); g_value_unset(&streamheader); gst_app_src_set_caps(src, caps); gst_caps_unref(caps); } int cmumble_audio_create_playback_pipeline(struct cmumble *cm, struct cmumble_user *user) { GstElement *pipeline, *sink_bin; GError *error = NULL; char *desc = "appsrc name=src format=GST_FORMAT_TIME caps="CELT_CAPS" " "! celtdec name=dec " "! audioresample ! audioconvert ! autoaudiosink name=sink"; GstIterator *it; pipeline = gst_parse_launch(desc, &error); if (error) { g_printerr("Failed to create pipeline: %s\n", error->message); return -1; } user->pipeline = pipeline; user->src = GST_APP_SRC(gst_bin_get_by_name(GST_BIN(pipeline), "src")); add_celt_streamheader(cm, user->src); gst_element_set_state(pipeline, GST_STATE_PLAYING); /* FIXME: Use a recursive name for sink-actual-sink-pluse instead? like: * gst_bin_get_by_name(GST_BIN(pipeline), "sink-actual-sink-pulse"); */ sink_bin = gst_bin_get_by_name(GST_BIN(pipeline), "sink"); it = gst_bin_iterate_sinks(GST_BIN(sink_bin)); gst_iterator_foreach(it, set_pulse_states, user); gst_iterator_free(it); user->last_sequence = -1; return 0; } static int setup_playback_gst_pipeline(struct cmumble *cm) { cm->audio.celt_mode = celt_mode_create(SAMPLERATE, SAMPLERATE / 100, NULL); celt_header_init(&cm->audio.celt_header, cm->audio.celt_mode, CHANNELS); celt_header_to_packet(&cm->audio.celt_header, cm->audio.celt_header_packet, sizeof(CELTHeader)); celt_mode_info(cm->audio.celt_mode, CELT_GET_BITSTREAM_VERSION, &cm->audio.celt_bitstream_version); return 0; } int cmumble_audio_init(struct cmumble *cm) { if (setup_playback_gst_pipeline(cm) < 0) return -1; if (setup_recording_gst_pipeline(cm) < 0) return -1; return 0; } int cmumble_audio_fini(struct cmumble *cm) { return 0; }