summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Franzke <benjaminfranzke@googlemail.com>2011-09-22 22:16:38 +0200
committerBenjamin Franzke <benjaminfranzke@googlemail.com>2011-09-22 22:22:59 +0200
commit6437a68ae0a92bcee64d6fc4586f345128fcb7c3 (patch)
tree7cde2fb2a19a9c8c6964260e729759e7445ddc0c
parentb396ee26febb003374f877af841814940fefd034 (diff)
downloadcmumble-6437a68ae0a92bcee64d6fc4586f345128fcb7c3.tar.gz
cmumble-6437a68ae0a92bcee64d6fc4586f345128fcb7c3.tar.bz2
cmumble-6437a68ae0a92bcee64d6fc4586f345128fcb7c3.zip
Fixup async read by using GPollableInputStream instead of GSocket
With the GSource created from GSocket hacks were needed to get all events. GPollableInputStream is the interface implemented directly by GTlsInputStream.
-rw-r--r--src/cmumble.c32
-rw-r--r--src/cmumble.h6
-rw-r--r--src/messages.c21
3 files changed, 34 insertions, 25 deletions
diff --git a/src/cmumble.c b/src/cmumble.c
index dea9a55..d849e05 100644
--- a/src/cmumble.c
+++ b/src/cmumble.c
@@ -36,7 +36,6 @@ pull_buffer(GstAppSink *sink, gpointer user_data)
GstBuffer *buf;
uint8_t data[1024];
uint32_t write = 0, pos = 0;
- GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream);
MumbleProto__UDPTunnel tunnel;
static int seq = 0;
@@ -247,19 +246,15 @@ do_ping(struct context *ctx)
}
static gboolean
-read_cb(GSocket *socket, GIOCondition condition, gpointer data)
+read_cb(GObject *pollable_stream, gpointer data)
{
+ GPollableInputStream *input = G_POLLABLE_INPUT_STREAM(pollable_stream);
struct context *ctx = data;
- GInputStream *input = g_io_stream_get_input_stream(ctx->iostream);
+ gint count;
do {
- recv_msg(ctx, &callbacks);
- } while (g_input_stream_has_pending(input));
-
- /* FIXME */
- static int i = 0;
- if (i++ < 2)
- do_ping(ctx);
+ count = recv_msg(ctx, &callbacks);
+ } while (count && g_pollable_input_stream_is_readable(input));
return TRUE;
}
@@ -426,7 +421,15 @@ int main(int argc, char **argv)
return 1;
}
- g_object_get(G_OBJECT(ctx.conn), "base-io-stream", &ctx.iostream, NULL);
+ g_object_get(G_OBJECT(ctx.conn),
+ "input-stream", &ctx.input,
+ "output-stream", &ctx.output, NULL);
+
+ if (!G_IS_POLLABLE_INPUT_STREAM(ctx.input) ||
+ !g_pollable_input_stream_can_poll(ctx.input)) {
+ g_printerr("Error: GSocketConnection is not pollable\n");
+ return 1;
+ }
{
MumbleProto__Version version;
@@ -457,14 +460,13 @@ int main(int argc, char **argv)
if (setup_recording_gst_pipeline(&ctx) < 0)
return 1;
- ctx.sock = g_socket_connection_get_socket(ctx.conn);
- source = g_socket_create_source(ctx.sock, G_IO_IN | G_IO_ERR, NULL);
- g_source_set_callback(source, (GSourceFunc)read_cb, &ctx, NULL);
+ source = g_pollable_input_stream_create_source(ctx.input, NULL);
+ g_source_set_callback(source, (GSourceFunc) read_cb, &ctx, NULL);
g_source_attach(source, NULL);
g_source_unref(source);
source = g_timeout_source_new_seconds(5);
- g_source_set_callback(source, (GSourceFunc)do_ping, &ctx, NULL);
+ g_source_set_callback(source, (GSourceFunc) do_ping, &ctx, NULL);
g_source_attach(source, NULL);
g_source_unref(source);
diff --git a/src/cmumble.h b/src/cmumble.h
index a6cc7f2..af8396a 100644
--- a/src/cmumble.h
+++ b/src/cmumble.h
@@ -25,7 +25,9 @@ struct context {
GSocketClient *sock_client;
GSocketConnection *conn;
GSocket *sock;
- GIOStream *iostream;
+
+ GPollableInputStream *input;
+ GOutputStream *output;
uint8_t celt_header_packet[sizeof(CELTHeader)];
CELTHeader celt_header;
@@ -72,7 +74,7 @@ typedef void (*callback_t)(ProtobufCMessage *msg, struct context *);
void
send_msg(struct context *ctx, ProtobufCMessage *msg);
-void
+int
recv_msg(struct context *ctx, const struct mumble_callbacks *callbacks);
#endif
diff --git a/src/messages.c b/src/messages.c
index 87f204e..cdc0c98 100644
--- a/src/messages.c
+++ b/src/messages.c
@@ -45,7 +45,6 @@ send_msg(struct context *ctx, ProtobufCMessage *msg)
int type = -1;
int i;
ProtobufCBufferSimple buffer = PROTOBUF_C_BUFFER_SIMPLE_INIT(pad);
- GOutputStream *output = g_io_stream_get_output_stream(ctx->iostream);
for (i = 0; i < G_N_ELEMENTS(messages); ++i)
if (messages[i].descriptor == msg->descriptor)
@@ -64,14 +63,14 @@ send_msg(struct context *ctx, ProtobufCMessage *msg)
add_preamble(preamble, type, buffer.len);
g_static_mutex_lock(&write_mutex);
- g_output_stream_write(output, preamble, PREAMBLE_SIZE, NULL, NULL);
- g_output_stream_write(output, buffer.data, buffer.len, NULL, NULL);
+ g_output_stream_write(ctx->output, preamble, PREAMBLE_SIZE, NULL, NULL);
+ g_output_stream_write(ctx->output, buffer.data, buffer.len, NULL, NULL);
g_static_mutex_unlock(&write_mutex);
PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&buffer);
}
-void
+int
recv_msg(struct context *ctx, const struct mumble_callbacks *cbs)
{
uint8_t preamble[PREAMBLE_SIZE];
@@ -79,14 +78,18 @@ recv_msg(struct context *ctx, const struct mumble_callbacks *cbs)
gchar *data;
int type, len;
gssize ret;
- GInputStream *input = g_io_stream_get_input_stream(ctx->iostream);
const callback_t *callbacks = (const callback_t *) cbs;
+ GError *error = NULL;
- ret = g_input_stream_read(input, preamble, PREAMBLE_SIZE, NULL, NULL);
+ ret = g_pollable_input_stream_read_nonblocking(ctx->input,
+ preamble, PREAMBLE_SIZE,
+ NULL, &error);
if (ret <= 0) {
+ if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+ return 0;
g_printerr("read failed: %ld\n", ret);
- return;
+ return 0;
}
get_preamble(preamble, &type, &len);
@@ -106,7 +109,7 @@ recv_msg(struct context *ctx, const struct mumble_callbacks *cbs)
g_printerr("out of mem\n");
g_main_loop_quit (ctx->loop);
}
- ret = g_input_stream_read(input, data, len, NULL, NULL);
+ ret = g_input_stream_read(G_INPUT_STREAM(ctx->input), data, len, NULL, NULL);
/* tunneled udp data - not a regular protobuf message
* create dummy ProtobufCMessage */
@@ -137,4 +140,6 @@ recv_msg(struct context *ctx, const struct mumble_callbacks *cbs)
protobuf_c_message_free_unpacked(msg, NULL);
g_free(data);
+
+ return 1;
}