summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/cmumble.c32
-rw-r--r--src/cmumble.h6
-rw-r--r--src/messages.c21
3 files changed, 34 insertions, 25 deletions
diff --git a/src/cmumble.c b/src/cmumble.c
index dea9a55..d849e05 100644
--- a/src/cmumble.c
+++ b/src/cmumble.c
@@ -36,7 +36,6 @@ pull_buffer(GstAppSink *sink, gpointer user_data)
GstBuffer *buf;
uint8_t data[1024];
uint32_t write = 0, pos = 0;
- GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream);
MumbleProto__UDPTunnel tunnel;
static int seq = 0;
@@ -247,19 +246,15 @@ do_ping(struct context *ctx)
}
static gboolean
-read_cb(GSocket *socket, GIOCondition condition, gpointer data)
+read_cb(GObject *pollable_stream, gpointer data)
{
+ GPollableInputStream *input = G_POLLABLE_INPUT_STREAM(pollable_stream);
struct context *ctx = data;
- GInputStream *input = g_io_stream_get_input_stream(ctx->iostream);
+ gint count;
do {
- recv_msg(ctx, &callbacks);
- } while (g_input_stream_has_pending(input));
-
- /* FIXME */
- static int i = 0;
- if (i++ < 2)
- do_ping(ctx);
+ count = recv_msg(ctx, &callbacks);
+ } while (count && g_pollable_input_stream_is_readable(input));
return TRUE;
}
@@ -426,7 +421,15 @@ int main(int argc, char **argv)
return 1;
}
- g_object_get(G_OBJECT(ctx.conn), "base-io-stream", &ctx.iostream, NULL);
+ g_object_get(G_OBJECT(ctx.conn),
+ "input-stream", &ctx.input,
+ "output-stream", &ctx.output, NULL);
+
+ if (!G_IS_POLLABLE_INPUT_STREAM(ctx.input) ||
+ !g_pollable_input_stream_can_poll(ctx.input)) {
+ g_printerr("Error: GSocketConnection is not pollable\n");
+ return 1;
+ }
{
MumbleProto__Version version;
@@ -457,14 +460,13 @@ int main(int argc, char **argv)
if (setup_recording_gst_pipeline(&ctx) < 0)
return 1;
- ctx.sock = g_socket_connection_get_socket(ctx.conn);
- source = g_socket_create_source(ctx.sock, G_IO_IN | G_IO_ERR, NULL);
- g_source_set_callback(source, (GSourceFunc)read_cb, &ctx, NULL);
+ source = g_pollable_input_stream_create_source(ctx.input, NULL);
+ g_source_set_callback(source, (GSourceFunc) read_cb, &ctx, NULL);
g_source_attach(source, NULL);
g_source_unref(source);
source = g_timeout_source_new_seconds(5);
- g_source_set_callback(source, (GSourceFunc)do_ping, &ctx, NULL);
+ g_source_set_callback(source, (GSourceFunc) do_ping, &ctx, NULL);
g_source_attach(source, NULL);
g_source_unref(source);
diff --git a/src/cmumble.h b/src/cmumble.h
index a6cc7f2..af8396a 100644
--- a/src/cmumble.h
+++ b/src/cmumble.h
@@ -25,7 +25,9 @@ struct context {
GSocketClient *sock_client;
GSocketConnection *conn;
GSocket *sock;
- GIOStream *iostream;
+
+ GPollableInputStream *input;
+ GOutputStream *output;
uint8_t celt_header_packet[sizeof(CELTHeader)];
CELTHeader celt_header;
@@ -72,7 +74,7 @@ typedef void (*callback_t)(ProtobufCMessage *msg, struct context *);
void
send_msg(struct context *ctx, ProtobufCMessage *msg);
-void
+int
recv_msg(struct context *ctx, const struct mumble_callbacks *callbacks);
#endif
diff --git a/src/messages.c b/src/messages.c
index 87f204e..cdc0c98 100644
--- a/src/messages.c
+++ b/src/messages.c
@@ -45,7 +45,6 @@ send_msg(struct context *ctx, ProtobufCMessage *msg)
int type = -1;
int i;
ProtobufCBufferSimple buffer = PROTOBUF_C_BUFFER_SIMPLE_INIT(pad);
- GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream);
for (i = 0; i < G_N_ELEMENTS(messages); ++i)
if (messages[i].descriptor == msg->descriptor)
@@ -64,14 +63,14 @@ send_msg(struct context *ctx, ProtobufCMessage *msg)
add_preamble(preamble, type, buffer.len);
g_static_mutex_lock(&write_mutex);
- g_output_stream_write(output, preamble, PREAMBLE_SIZE, NULL, NULL);
- g_output_stream_write(output, buffer.data, buffer.len, NULL, NULL);
+ g_output_stream_write(ctx->output, preamble, PREAMBLE_SIZE, NULL, NULL);
+ g_output_stream_write(ctx->output, buffer.data, buffer.len, NULL, NULL);
g_static_mutex_unlock(&write_mutex);
PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&buffer);
}
-void
+int
recv_msg(struct context *ctx, const struct mumble_callbacks *cbs)
{
uint8_t preamble[PREAMBLE_SIZE];
@@ -79,14 +78,18 @@ recv_msg(struct context *ctx, const struct mumble_callbacks *cbs)
gchar *data;
int type, len;
gssize ret;
- GInputStream *input = g_io_stream_get_input_stream(ctx->iostream);
const callback_t *callbacks = (const callback_t *) cbs;
+ GError *error = NULL;
- ret = g_input_stream_read(input, preamble, PREAMBLE_SIZE, NULL, NULL);
+ ret = g_pollable_input_stream_read_nonblocking(ctx->input,
+ preamble, PREAMBLE_SIZE,
+ NULL, &error);
if (ret <= 0) {
+ if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+ return 0;
g_printerr("read failed: %ld\n", ret);
- return;
+ return 0;
}
get_preamble(preamble, &type, &len);
@@ -106,7 +109,7 @@ recv_msg(struct context *ctx, const struct mumble_callbacks *cbs)
g_printerr("out of mem\n");
g_main_loop_quit (ctx->loop);
}
- ret = g_input_stream_read(input, data, len, NULL, NULL);
+ ret = g_input_stream_read(G_INPUT_STREAM(ctx->input), data, len, NULL, NULL);
/* tunneled udp data - not a regular protobuf message
* create dummy ProtobufCMessage */
@@ -137,4 +140,6 @@ recv_msg(struct context *ctx, const struct mumble_callbacks *cbs)
protobuf_c_message_free_unpacked(msg, NULL);
g_free(data);
+
+ return 1;
}