summaryrefslogtreecommitdiff
path: root/source4/lib/messaging
diff options
context:
space:
mode:
authorJelmer Vernooij <jelmer@samba.org>2008-05-26 03:07:18 +0200
committerJelmer Vernooij <jelmer@samba.org>2008-05-26 03:07:18 +0200
commitd60d8e57d83acfc94fa36c59fcfb9c6e03ee02b6 (patch)
tree179df95c443cc5e543c94c859487abab057cfc4e /source4/lib/messaging
parentfd712bb878dcfbe8a50e48361c96cc04a3ddacf5 (diff)
downloadsamba-d60d8e57d83acfc94fa36c59fcfb9c6e03ee02b6.tar.gz
samba-d60d8e57d83acfc94fa36c59fcfb9c6e03ee02b6.tar.bz2
samba-d60d8e57d83acfc94fa36c59fcfb9c6e03ee02b6.zip
Implement IRPC calls over the internal messaging bus.
(This used to be commit 777dc3a2c7b5bf855344ba3ae8c8b564c48fc0c6)
Diffstat (limited to 'source4/lib/messaging')
-rw-r--r--source4/lib/messaging/pyirpc.c189
1 files changed, 119 insertions, 70 deletions
diff --git a/source4/lib/messaging/pyirpc.c b/source4/lib/messaging/pyirpc.c
index 5ef940817c..41475daaff 100644
--- a/source4/lib/messaging/pyirpc.c
+++ b/source4/lib/messaging/pyirpc.c
@@ -22,11 +22,13 @@
#include "includes.h"
#include <Python.h>
#include "libcli/util/pyerrors.h"
+#include "librpc/rpc/pyrpc.h"
#include "lib/messaging/irpc.h"
#include "lib/messaging/messaging.h"
#include "lib/events/events.h"
#include "cluster/cluster.h"
#include "param/param.h"
+#include "librpc/gen_ndr/py_irpc.h"
PyAPI_DATA(PyTypeObject) messaging_Type;
PyAPI_DATA(PyTypeObject) irpc_InterfaceType;
@@ -360,69 +362,88 @@ PyObject *py_irpc_connect(PyTypeObject *self, PyObject *args, PyObject *kwargs)
}
}
-static void py_irpc_dealloc(PyObject *self)
+typedef struct {
+ PyObject_HEAD
+ struct irpc_request **reqs;
+ int count;
+ int current;
+ TALLOC_CTX *mem_ctx;
+ py_data_unpack_fn unpack_fn;
+} irpc_ResultObject;
+
+
+static PyObject *irpc_result_next(irpc_ResultObject *iterator)
{
- irpc_InterfaceObject *iface = (irpc_InterfaceObject *)self;
- talloc_free(iface->mem_ctx);
+ NTSTATUS status;
+
+ if (iterator->current >= iterator->count) {
+ PyErr_SetString(PyExc_StopIteration, "No more results");
+ return NULL;
+ }
+
+ status = irpc_call_recv(iterator->reqs[iterator->current]);
+ iterator->current++;
+ if (!NT_STATUS_IS_OK(status)) {
+ PyErr_SetNTSTATUS(status);
+ return NULL;
+ }
+
+ return iterator->unpack_fn(iterator->reqs[iterator->current-1]->r);
+}
+
+static PyObject *irpc_result_len(irpc_ResultObject *self)
+{
+ return PyLong_FromLong(self->count);
+}
+
+static PyMethodDef irpc_result_methods[] = {
+ { "__len__", (PyCFunction)irpc_result_len, METH_NOARGS,
+ "Number of elements returned"},
+ { NULL }
+};
+
+static void irpc_result_dealloc(PyObject *self)
+{
+ talloc_free(((irpc_ResultObject *)self)->mem_ctx);
PyObject_Del(self);
}
-PyTypeObject irpc_InterfaceType = {
+PyTypeObject irpc_ResultIteratorType = {
PyObject_HEAD_INIT(NULL) 0,
- .tp_name = "irpc.ClientConnection",
- .tp_basicsize = sizeof(irpc_InterfaceObject),
+ .tp_name = "irpc.ResultIterator",
+ .tp_basicsize = sizeof(irpc_ResultObject),
.tp_flags = Py_TPFLAGS_DEFAULT|Py_TPFLAGS_BASETYPE,
- .tp_new = py_irpc_connect,
- .tp_dealloc = py_irpc_dealloc,
+ .tp_iter = (iternextfunc)irpc_result_next,
+ .tp_methods = irpc_result_methods,
+ .tp_dealloc = irpc_result_dealloc,
};
-#if 0
-/*
- make an irpc call - called via the same interface as rpc
-*/
-static int ejs_irpc_call(int eid, struct MprVar *io,
- const struct ndr_interface_table *iface, int callnum,
- ejs_pull_function_t ejs_pull, ejs_push_function_t ejs_push)
+static PyObject *py_irpc_call(irpc_InterfaceObject *p, struct PyNdrRpcMethodDef *method_def, PyObject *args, PyObject *kwargs)
{
- NTSTATUS status;
void *ptr;
- struct ejs_rpc *ejs;
- const struct ndr_interface_call *call;
- struct ejs_irpc_connection *p;
struct irpc_request **reqs;
int i, count;
- struct MprVar *results;
-
- p = (struct ejs_irpc_connection *)mprGetThisPtr(eid, "irpc");
-
- ejs = talloc(mprMemCtx(), struct ejs_rpc);
- if (ejs == NULL) {
- status = NT_STATUS_NO_MEMORY;
- goto done;
- }
-
- call = &iface->calls[callnum];
-
- ejs->eid = eid;
- ejs->callname = call->name;
+ NTSTATUS status;
+ TALLOC_CTX *mem_ctx = talloc_new(NULL);
+ irpc_ResultObject *ret;
/* allocate the C structure */
- ptr = talloc_zero_size(ejs, call->struct_size);
+ ptr = talloc_zero_size(mem_ctx, method_def->table->calls[method_def->opnum].struct_size);
if (ptr == NULL) {
status = NT_STATUS_NO_MEMORY;
goto done;
}
/* convert the mpr object into a C structure */
- status = ejs_pull(ejs, io, ptr);
- if (!NT_STATUS_IS_OK(status)) {
- goto done;
+ if (!method_def->pack_in_data(args, kwargs, ptr)) {
+ talloc_free(mem_ctx);
+ return NULL;
}
for (count=0;p->dest_ids[count].id;count++) /* noop */ ;
/* we need to make a call per server */
- reqs = talloc_array(ejs, struct irpc_request *, count);
+ reqs = talloc_array(mem_ctx, struct irpc_request *, count);
if (reqs == NULL) {
status = NT_STATUS_NO_MEMORY;
goto done;
@@ -431,50 +452,72 @@ static int ejs_irpc_call(int eid, struct MprVar *io,
/* make the actual calls */
for (i=0;i<count;i++) {
reqs[i] = irpc_call_send(p->msg_ctx, p->dest_ids[i],
- iface, callnum, ptr, ptr);
+ method_def->table, method_def->opnum, ptr, ptr);
if (reqs[i] == NULL) {
status = NT_STATUS_NO_MEMORY;
goto done;
}
talloc_steal(reqs, reqs[i]);
}
-
- mprSetVar(io, "results", mprObject("results"));
- results = mprGetProperty(io, "results", NULL);
- /* and receive the results, placing them in io.results[i] */
- for (i=0;i<count;i++) {
- struct MprVar *output;
+ ret = PyObject_New(irpc_ResultObject, &irpc_ResultIteratorType);
+ ret->mem_ctx = mem_ctx;
+ ret->reqs = reqs;
+ ret->count = count;
+ ret->current = 0;
+ ret->unpack_fn = method_def->unpack_out_data;
- status = irpc_call_recv(reqs[i]);
- if (!NT_STATUS_IS_OK(status)) {
- goto done;
- }
- status = ejs_push(ejs, io, ptr);
- if (!NT_STATUS_IS_OK(status)) {
- goto done;
- }
+ return (PyObject *)ret;
+done:
+ talloc_free(mem_ctx);
+ PyErr_SetNTSTATUS(status);
+ return NULL;
+}
- /* add to the results array */
- output = mprGetProperty(io, "output", NULL);
- if (output) {
- char idx[16];
- mprItoa(i, idx, sizeof(idx));
- mprSetProperty(results, idx, output);
- mprDeleteProperty(io, "output");
- }
- }
- mprSetVar(results, "length", mprCreateIntegerVar(i));
+static PyObject *py_irpc_call_wrapper(PyObject *self, PyObject *args, void *wrapped, PyObject *kwargs)
+{
+ irpc_InterfaceObject *iface = (irpc_InterfaceObject *)self;
+ struct PyNdrRpcMethodDef *md = wrapped;
-done:
- talloc_free(ejs);
- mpr_Return(eid, mprNTSTATUS(status));
- if (NT_STATUS_EQUAL(status, NT_STATUS_INTERNAL_ERROR)) {
- return -1;
+ return py_irpc_call(iface, md, args, kwargs);
+}
+
+static void py_irpc_dealloc(PyObject *self)
+{
+ irpc_InterfaceObject *iface = (irpc_InterfaceObject *)self;
+ talloc_free(iface->mem_ctx);
+ PyObject_Del(self);
+}
+
+PyTypeObject irpc_InterfaceType = {
+ PyObject_HEAD_INIT(NULL) 0,
+ .tp_name = "irpc.ClientConnection",
+ .tp_basicsize = sizeof(irpc_InterfaceObject),
+ .tp_flags = Py_TPFLAGS_DEFAULT|Py_TPFLAGS_BASETYPE,
+ .tp_new = py_irpc_connect,
+ .tp_dealloc = py_irpc_dealloc,
+};
+
+static bool irpc_AddNdrRpcMethods(PyTypeObject *ifacetype, struct PyNdrRpcMethodDef *mds)
+{
+ int i;
+ for (i = 0; mds[i].name; i++) {
+ PyObject *ret;
+ struct wrapperbase *wb = calloc(sizeof(struct wrapperbase), 1);
+
+ wb->name = discard_const_p(char, mds[i].name);
+ wb->flags = PyWrapperFlag_KEYWORDS;
+ wb->wrapper = (wrapperfunc)py_irpc_call_wrapper;
+ wb->doc = discard_const_p(char, mds[i].doc);
+
+ ret = PyDescr_NewWrapper(ifacetype, wb, &mds[i]);
+
+ PyDict_SetItemString(ifacetype->tp_dict, mds[i].name,
+ (PyObject *)ret);
}
- return 0;
+
+ return true;
}
-#endif
void initirpc(void)
{
@@ -486,6 +529,12 @@ void initirpc(void)
if (PyType_Ready(&messaging_Type) < 0)
return;
+ if (PyType_Ready(&irpc_ResultIteratorType) < 0)
+ return;
+
+ if (!irpc_AddNdrRpcMethods(&irpc_InterfaceType, py_ndr_irpc_methods))
+ return;
+
mod = Py_InitModule3("irpc", NULL, "Internal RPC");
if (mod == NULL)
return;