From 6437a68ae0a92bcee64d6fc4586f345128fcb7c3 Mon Sep 17 00:00:00 2001 From: Benjamin Franzke Date: Thu, 22 Sep 2011 22:16:38 +0200 Subject: Fixup async read by using GPollableInputStream instead of GSocket With the GSource created from GSocket hacks were needed to get all events. GPollableInputStream is the interface implemented directly by GTlsInputStream. --- src/cmumble.c | 32 +++++++++++++++++--------------- src/cmumble.h | 6 ++++-- src/messages.c | 21 +++++++++++++-------- 3 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/cmumble.c b/src/cmumble.c index dea9a55..d849e05 100644 --- a/src/cmumble.c +++ b/src/cmumble.c @@ -36,7 +36,6 @@ pull_buffer(GstAppSink *sink, gpointer user_data) GstBuffer *buf; uint8_t data[1024]; uint32_t write = 0, pos = 0; - GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream); MumbleProto__UDPTunnel tunnel; static int seq = 0; @@ -247,19 +246,15 @@ do_ping(struct context *ctx) } static gboolean -read_cb(GSocket *socket, GIOCondition condition, gpointer data) +read_cb(GObject *pollable_stream, gpointer data) { + GPollableInputStream *input = G_POLLABLE_INPUT_STREAM(pollable_stream); struct context *ctx = data; - GInputStream *input = g_io_stream_get_input_stream(ctx->iostream); + gint count; do { - recv_msg(ctx, &callbacks); - } while (g_input_stream_has_pending(input)); - - /* FIXME */ - static int i = 0; - if (i++ < 2) - do_ping(ctx); + count = recv_msg(ctx, &callbacks); + } while (count && g_pollable_input_stream_is_readable(input)); return TRUE; } @@ -426,7 +421,15 @@ int main(int argc, char **argv) return 1; } - g_object_get(G_OBJECT(ctx.conn), "base-io-stream", &ctx.iostream, NULL); + g_object_get(G_OBJECT(ctx.conn), + "input-stream", &ctx.input, + "output-stream", &ctx.output, NULL); + + if (!G_IS_POLLABLE_INPUT_STREAM(ctx.input) || + !g_pollable_input_stream_can_poll(ctx.input)) { + g_printerr("Error: GSocketConnection is not pollable\n"); + return 1; + } { MumbleProto__Version version; @@ -457,14 +460,13 @@ int main(int argc, char **argv) if (setup_recording_gst_pipeline(&ctx) < 0) return 1; - ctx.sock = g_socket_connection_get_socket(ctx.conn); - source = g_socket_create_source(ctx.sock, G_IO_IN | G_IO_ERR, NULL); - g_source_set_callback(source, (GSourceFunc)read_cb, &ctx, NULL); + source = g_pollable_input_stream_create_source(ctx.input, NULL); + g_source_set_callback(source, (GSourceFunc) read_cb, &ctx, NULL); g_source_attach(source, NULL); g_source_unref(source); source = g_timeout_source_new_seconds(5); - g_source_set_callback(source, (GSourceFunc)do_ping, &ctx, NULL); + g_source_set_callback(source, (GSourceFunc) do_ping, &ctx, NULL); g_source_attach(source, NULL); g_source_unref(source); diff --git a/src/cmumble.h b/src/cmumble.h index a6cc7f2..af8396a 100644 --- a/src/cmumble.h +++ b/src/cmumble.h @@ -25,7 +25,9 @@ struct context { GSocketClient *sock_client; GSocketConnection *conn; GSocket *sock; - GIOStream *iostream; + + GPollableInputStream *input; + GOutputStream *output; uint8_t celt_header_packet[sizeof(CELTHeader)]; CELTHeader celt_header; @@ -72,7 +74,7 @@ typedef void (*callback_t)(ProtobufCMessage *msg, struct context *); void send_msg(struct context *ctx, ProtobufCMessage *msg); -void +int recv_msg(struct context *ctx, const struct mumble_callbacks *callbacks); #endif diff --git a/src/messages.c b/src/messages.c index 87f204e..cdc0c98 100644 --- a/src/messages.c +++ b/src/messages.c @@ -45,7 +45,6 @@ send_msg(struct context *ctx, ProtobufCMessage *msg) int type = -1; int i; ProtobufCBufferSimple buffer = PROTOBUF_C_BUFFER_SIMPLE_INIT(pad); - GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream); for (i = 0; i < G_N_ELEMENTS(messages); ++i) if (messages[i].descriptor == msg->descriptor) @@ -64,14 +63,14 @@ send_msg(struct context *ctx, ProtobufCMessage *msg) add_preamble(preamble, type, buffer.len); g_static_mutex_lock(&write_mutex); - g_output_stream_write(output, preamble, PREAMBLE_SIZE, NULL, NULL); - g_output_stream_write(output, buffer.data, buffer.len, NULL, NULL); + g_output_stream_write(ctx->output, preamble, PREAMBLE_SIZE, NULL, NULL); + g_output_stream_write(ctx->output, buffer.data, buffer.len, NULL, NULL); g_static_mutex_unlock(&write_mutex); PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&buffer); } -void +int recv_msg(struct context *ctx, const struct mumble_callbacks *cbs) { uint8_t preamble[PREAMBLE_SIZE]; @@ -79,14 +78,18 @@ recv_msg(struct context *ctx, const struct mumble_callbacks *cbs) gchar *data; int type, len; gssize ret; - GInputStream *input = g_io_stream_get_input_stream(ctx->iostream); const callback_t *callbacks = (const callback_t *) cbs; + GError *error = NULL; - ret = g_input_stream_read(input, preamble, PREAMBLE_SIZE, NULL, NULL); + ret = g_pollable_input_stream_read_nonblocking(ctx->input, + preamble, PREAMBLE_SIZE, + NULL, &error); if (ret <= 0) { + if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) + return 0; g_printerr("read failed: %ld\n", ret); - return; + return 0; } get_preamble(preamble, &type, &len); @@ -106,7 +109,7 @@ recv_msg(struct context *ctx, const struct mumble_callbacks *cbs) g_printerr("out of mem\n"); g_main_loop_quit (ctx->loop); } - ret = g_input_stream_read(input, data, len, NULL, NULL); + ret = g_input_stream_read(G_INPUT_STREAM(ctx->input), data, len, NULL, NULL); /* tunneled udp data - not a regular protobuf message * create dummy ProtobufCMessage */ @@ -137,4 +140,6 @@ recv_msg(struct context *ctx, const struct mumble_callbacks *cbs) protobuf_c_message_free_unpacked(msg, NULL); g_free(data); + + return 1; } -- cgit