summaryrefslogtreecommitdiff
path: root/source4/wrepl_server
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2005-10-14 14:02:47 +0000
committerGerald (Jerry) Carter <jerry@samba.org>2007-10-10 13:44:43 -0500
commitcffd522b5c806508dfacfb10234e4c0a115c0a98 (patch)
treeec5271d813d2cb547757732815a434ba27a89582 /source4/wrepl_server
parentd1e6c228692ff8b06d6eecd6be22fe0727e170ac (diff)
downloadsamba-cffd522b5c806508dfacfb10234e4c0a115c0a98.tar.gz
samba-cffd522b5c806508dfacfb10234e4c0a115c0a98.tar.bz2
samba-cffd522b5c806508dfacfb10234e4c0a115c0a98.zip
r11052: bring samba4 uptodate with the samba4-winsrepl branch,
before the bad merge metze (This used to be commit 471c0ca4abb17fb5f73c0efed195c67628c1c06e)
Diffstat (limited to 'source4/wrepl_server')
-rw-r--r--source4/wrepl_server/config.mk4
-rw-r--r--source4/wrepl_server/wrepl_in_call.c258
-rw-r--r--source4/wrepl_server/wrepl_in_connection.c44
-rw-r--r--source4/wrepl_server/wrepl_out_connection.c212
-rw-r--r--source4/wrepl_server/wrepl_server.c77
-rw-r--r--source4/wrepl_server/wrepl_server.h64
6 files changed, 572 insertions, 87 deletions
diff --git a/source4/wrepl_server/config.mk b/source4/wrepl_server/config.mk
index 18fdcad5e9..8c1202ae94 100644
--- a/source4/wrepl_server/config.mk
+++ b/source4/wrepl_server/config.mk
@@ -7,7 +7,9 @@ INIT_OBJ_FILES = \
wrepl_server/wrepl_server.o \
wrepl_server/wrepl_in_connection.o \
wrepl_server/wrepl_in_call.o \
- wrepl_server/wrepl_out_connection.o
+ wrepl_server/wrepl_out_connection.o \
+ wrepl_server/wrepl_out_helpers.o \
+ wrepl_server/wrepl_apply_records.o
REQUIRED_SUBSYSTEMS = \
LIBCLI_WREPL WINSDB
# End SUBSYSTEM WREPL_SRV
diff --git a/source4/wrepl_server/wrepl_in_call.c b/source4/wrepl_server/wrepl_in_call.c
index 4472a0fede..7ccb52cc20 100644
--- a/source4/wrepl_server/wrepl_in_call.c
+++ b/source4/wrepl_server/wrepl_in_call.c
@@ -29,7 +29,10 @@
#include "lib/messaging/irpc.h"
#include "librpc/gen_ndr/ndr_winsrepl.h"
#include "librpc/gen_ndr/ndr_nbt.h"
+#include "libcli/wrepl/winsrepl.h"
#include "wrepl_server/wrepl_server.h"
+#include "wrepl_server/wrepl_out_helpers.h"
+#include "libcli/composite/composite.h"
#include "nbt_server/wins/winsdb.h"
#include "lib/ldb/include/ldb.h"
@@ -111,42 +114,12 @@ static NTSTATUS wreplsrv_in_table_query(struct wreplsrv_in_call *call)
struct wreplsrv_service *service = call->wreplconn->service;
struct wrepl_replication *repl_out = &call->rep_packet.message.replication;
struct wrepl_table *table_out = &call->rep_packet.message.replication.info.table;
- struct wreplsrv_owner *cur;
- uint64_t local_max_version;
- uint32_t i = 0;
+ const char *our_ip = call->wreplconn->our_ip;
repl_out->command = WREPL_REPL_TABLE_REPLY;
- table_out->partner_count = 0;
- table_out->partners = NULL;
- table_out->initiator = WINSDB_OWNER_LOCAL;
-
- local_max_version = wreplsrv_local_max_version(service);
- if (local_max_version > 0) {
- table_out->partner_count++;
- }
-
- for (cur = service->table; cur; cur = cur->next) {
- table_out->partner_count++;
- }
-
- table_out->partners = talloc_array(call, struct wrepl_wins_owner, table_out->partner_count);
- NT_STATUS_HAVE_NO_MEMORY(table_out->partners);
-
- if (local_max_version > 0) {
- table_out->partners[i].address = call->wreplconn->our_ip;
- table_out->partners[i].min_version = 0;
- table_out->partners[i].max_version = local_max_version;
- table_out->partners[i].type = 1;
- i++;
- }
-
- for (cur = service->table; cur; cur = cur->next) {
- table_out->partners[i] = cur->owner;
- i++;
- }
-
- return NT_STATUS_OK;
+ return wreplsrv_fill_wrepl_table(service, call, table_out,
+ our_ip, our_ip, True);
}
static int wreplsrv_in_sort_wins_name(struct wrepl_wins_name *n1,
@@ -157,42 +130,21 @@ static int wreplsrv_in_sort_wins_name(struct wrepl_wins_name *n1,
return 0;
}
-static NTSTATUS wreplsrv_record2wins_name(TALLOC_CTX *mem_ctx, struct wrepl_wins_name *name, struct winsdb_record *rec)
+static NTSTATUS wreplsrv_record2wins_name(TALLOC_CTX *mem_ctx,
+ const char *our_address,
+ struct wrepl_wins_name *name,
+ struct winsdb_record *rec)
{
- uint8_t *namebuf;
- uint32_t namebuf_len;
- uint32_t name_len;
-
- name_len = strlen(rec->name->name);
- if (name_len > 15) {
- return NT_STATUS_INVALID_PARAMETER_MIX;
- }
-
- namebuf = (uint8_t *)talloc_asprintf(mem_ctx, "%-15s%c%s",
- rec->name->name, 'X',
- (rec->name->scope?rec->name->scope:""));
- NT_STATUS_HAVE_NO_MEMORY(namebuf);
- namebuf_len = strlen((char *)namebuf) + 1;
-
- /*
- * we need to set the type here, and use a place-holder in the talloc_asprintf()
- * as the type can be 0x00, and then the namebuf_len = strlen(namebuf); would give wrong results
- */
- namebuf[15] = rec->name->type;
+ uint32_t num_ips, i;
+ struct wrepl_ip *ips;
- /* oh wow, what a nasty bug in windows ... */
- if (rec->name->type == 0x1b) {
- namebuf[15] = namebuf[0];
- namebuf[0] = 0x1b;
- }
+ name->name = rec->name;
+ talloc_steal(mem_ctx, rec->name);
- name->name_len = namebuf_len;
- name->name = namebuf;
name->id = rec->version;
name->unknown = WINSDB_GROUP_ADDRESS;
- name->flags = rec->nb_flags;
- name->group_flag = 0;
+ name->flags = WREPL_NAME_FLAGS(rec->type, rec->state, rec->node, rec->is_static);
switch (name->flags & 2) {
case 0:
@@ -200,8 +152,24 @@ static NTSTATUS wreplsrv_record2wins_name(TALLOC_CTX *mem_ctx, struct wrepl_wins
talloc_steal(mem_ctx, rec->addresses[0]->address);
break;
case 2:
- name->addresses.addresses.num_ips = 0;
- name->addresses.addresses.ips = NULL;
+ num_ips = winsdb_addr_list_length(rec->addresses);
+ ips = talloc_array(mem_ctx, struct wrepl_ip, num_ips);
+ NT_STATUS_HAVE_NO_MEMORY(ips);
+
+ for (i = 0; i < num_ips; i++) {
+ if (strcasecmp(WINSDB_OWNER_LOCAL, rec->addresses[i]->wins_owner) == 0) {
+ ips[i].owner = talloc_strdup(ips, our_address);
+ NT_STATUS_HAVE_NO_MEMORY(ips[i].owner);
+ } else {
+ ips[i].owner = rec->addresses[i]->wins_owner;
+ talloc_steal(ips, rec->addresses[i]->wins_owner);
+ }
+ ips[i].ip = rec->addresses[i]->address;
+ talloc_steal(ips, rec->addresses[i]->address);
+ }
+
+ name->addresses.addresses.num_ips = num_ips;
+ name->addresses.addresses.ips = ips;
break;
}
@@ -251,7 +219,7 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call)
* if the partner ask for nothing, or give invalid ranges,
* return an empty list.
*/
- if (owner_in->min_version >= owner_in->max_version) {
+ if (owner_in->min_version > owner_in->max_version) {
return NT_STATUS_OK;
}
@@ -259,19 +227,29 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call)
* if the partner has already all records for nothing, or give invalid ranges,
* return an empty list.
*/
- if (owner_in->min_version >= owner->owner.max_version) {
+ if (owner_in->min_version > owner->owner.max_version) {
return NT_STATUS_OK;
}
- filter = talloc_asprintf(call, "(&(winsOwner=%s)(objectClass=winsRecord)(state>=%u)(versionID>=%llu)(versionID<=%llu))",
- owner->owner.address, WINS_REC_ACTIVE, owner_in->min_version, owner_in->max_version);
+ filter = talloc_asprintf(call,
+ "(&(winsOwner=%s)(objectClass=winsRecord)"
+ "(|(recordState=%u)(recordState=%u))"
+ "(versionID>=%llu)(versionID<=%llu))",
+ owner->owner.address,
+ WREPL_STATE_ACTIVE, WREPL_STATE_TOMBSTONE,
+ owner_in->min_version, owner_in->max_version);
NT_STATUS_HAVE_NO_MEMORY(filter);
ret = ldb_search(service->wins_db, NULL, LDB_SCOPE_SUBTREE, filter, NULL, &res);
if (res != NULL) {
talloc_steal(call, res);
}
if (ret < 0) return NT_STATUS_INTERNAL_DB_CORRUPTION;
- if (ret == 0) return NT_STATUS_OK;
+ if (ret == 0) {
+ DEBUG(2,("WINSREPL:reply [%u] records owner[%s] min[%llu] max[%llu] to partner[%s]\n",
+ ret, owner_in->address, owner_in->min_version, owner_in->max_version,
+ call->wreplconn->partner->address));
+ return NT_STATUS_OK;
+ }
names = talloc_array(call, struct wrepl_wins_name, ret);
NT_STATUS_HAVE_NO_MEMORY(names);
@@ -280,7 +258,7 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call)
status = winsdb_record(res[i], NULL, call, &rec);
NT_STATUS_NOT_OK_RETURN(status);
- status = wreplsrv_record2wins_name(names, &names[i], rec);
+ status = wreplsrv_record2wins_name(names, call->wreplconn->our_ip, &names[i], rec);
NT_STATUS_NOT_OK_RETURN(status);
talloc_free(rec);
talloc_free(res[i]);
@@ -289,12 +267,113 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call)
/* sort the names before we send them */
qsort(names, ret, sizeof(struct wrepl_wins_name), (comparison_fn_t)wreplsrv_in_sort_wins_name);
+ DEBUG(2,("WINSREPL:reply [%u] records owner[%s] min[%llu] max[%llu] to partner[%s]\n",
+ ret, owner_in->address, owner_in->min_version, owner_in->max_version,
+ call->wreplconn->partner->address));
+
reply_out->num_names = ret;
reply_out->names = names;
return NT_STATUS_OK;
}
+struct wreplsrv_in_update_state {
+ struct wreplsrv_in_connection *wrepl_in;
+ struct wreplsrv_out_connection *wrepl_out;
+ struct composite_context *creq;
+ struct wreplsrv_pull_cycle_io cycle_io;
+};
+
+static void wreplsrv_in_update_handler(struct composite_context *creq)
+{
+ struct wreplsrv_in_update_state *update_state = talloc_get_type(creq->async.private_data,
+ struct wreplsrv_in_update_state);
+ NTSTATUS status;
+
+ status = wreplsrv_pull_cycle_recv(creq);
+
+ talloc_free(update_state->wrepl_out);
+
+ wreplsrv_terminate_in_connection(update_state->wrepl_in, nt_errstr(status));
+}
+
+static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call)
+{
+ struct wreplsrv_in_connection *wrepl_in = call->wreplconn;
+ struct wreplsrv_out_connection *wrepl_out;
+ struct wrepl_table *update_in = &call->req_packet.message.replication.info.table;
+ struct wreplsrv_in_update_state *update_state;
+
+ DEBUG(2,("WREPL_REPL_UPDATE: partner[%s] initiator[%s] num_owners[%u]\n",
+ call->wreplconn->partner->address,
+ update_in->initiator, update_in->partner_count));
+
+ /*
+ * we need to flip the connection into a client connection
+ * and do a WREPL_REPL_SEND_REQUEST's on the that connection
+ * and then stop this connection
+ */
+ talloc_free(wrepl_in->conn->event.fde);
+ wrepl_in->conn->event.fde = NULL;
+
+ update_state = talloc(wrepl_in, struct wreplsrv_in_update_state);
+ NT_STATUS_HAVE_NO_MEMORY(update_state);
+
+ wrepl_out = talloc(update_state, struct wreplsrv_out_connection);
+ NT_STATUS_HAVE_NO_MEMORY(wrepl_out);
+ wrepl_out->service = wrepl_in->service;
+ wrepl_out->partner = wrepl_in->partner;
+ wrepl_out->assoc_ctx.our_ctx = wrepl_in->assoc_ctx.our_ctx;
+ wrepl_out->assoc_ctx.peer_ctx = wrepl_in->assoc_ctx.peer_ctx;
+ wrepl_out->sock = wrepl_socket_merge(wrepl_out,
+ wrepl_in->conn->event.ctx,
+ wrepl_in->conn->socket);
+ NT_STATUS_HAVE_NO_MEMORY(wrepl_out->sock);
+
+ update_state->wrepl_in = wrepl_in;
+ update_state->wrepl_out = wrepl_out;
+ update_state->cycle_io.in.partner = wrepl_out->partner;
+ update_state->cycle_io.in.num_owners = update_in->partner_count;
+ update_state->cycle_io.in.owners = update_in->partners;
+ talloc_steal(update_state, update_in->partners);
+ update_state->cycle_io.in.wreplconn = wrepl_out;
+ update_state->creq = wreplsrv_pull_cycle_send(update_state, &update_state->cycle_io);
+ if (!update_state->creq) {
+ return NT_STATUS_INTERNAL_ERROR;
+ }
+
+ update_state->creq->async.fn = wreplsrv_in_update_handler;
+ update_state->creq->async.private_data = update_state;
+
+ return ERROR_INVALID_PARAMETER;
+}
+
+static NTSTATUS wreplsrv_in_update2(struct wreplsrv_in_call *call)
+{
+ return wreplsrv_in_update(call);
+}
+
+static NTSTATUS wreplsrv_in_inform(struct wreplsrv_in_call *call)
+{
+ struct wrepl_table *inform_in = &call->req_packet.message.replication.info.table;
+ NTSTATUS status;
+
+ DEBUG(2,("WREPL_REPL_INFORM: partner[%s] initiator[%s] num_owners[%u]\n",
+ call->wreplconn->partner->address,
+ inform_in->initiator, inform_in->partner_count));
+
+ status = wreplsrv_sched_inform_action(call->wreplconn->partner, inform_in);
+ NT_STATUS_NOT_OK_RETURN(status);
+
+ /* we don't reply to WREPL_REPL_INFORM messages */
+ return ERROR_INVALID_PARAMETER;
+}
+
+static NTSTATUS wreplsrv_in_inform2(struct wreplsrv_in_call *call)
+{
+ return wreplsrv_in_inform(call);
+}
+
static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
{
struct wrepl_replication *repl_in = &call->req_packet.message.replication;
@@ -318,6 +397,9 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
switch (repl_in->command) {
case WREPL_REPL_TABLE_QUERY:
+ if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PUSH)) {
+ return wreplsrv_in_stop_assoc_ctx(call);
+ }
status = wreplsrv_in_table_query(call);
break;
@@ -325,6 +407,9 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
return ERROR_INVALID_PARAMETER;
case WREPL_REPL_SEND_REQUEST:
+ if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PUSH)) {
+ return wreplsrv_in_stop_assoc_ctx(call);
+ }
status = wreplsrv_in_send_request(call);
break;
@@ -332,16 +417,32 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
return ERROR_INVALID_PARAMETER;
case WREPL_REPL_UPDATE:
- return ERROR_INVALID_PARAMETER;
+ if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
+ return wreplsrv_in_stop_assoc_ctx(call);
+ }
+ status = wreplsrv_in_update(call);
+ break;
- case WREPL_REPL_5:
- return ERROR_INVALID_PARAMETER;
+ case WREPL_REPL_UPDATE2:
+ if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
+ return wreplsrv_in_stop_assoc_ctx(call);
+ }
+ status = wreplsrv_in_update2(call);
+ break;
case WREPL_REPL_INFORM:
- return ERROR_INVALID_PARAMETER;
+ if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
+ return wreplsrv_in_stop_assoc_ctx(call);
+ }
+ status = wreplsrv_in_inform(call);
+ break;
- case WREPL_REPL_9:
- return ERROR_INVALID_PARAMETER;
+ case WREPL_REPL_INFORM2:
+ if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
+ return wreplsrv_in_stop_assoc_ctx(call);
+ }
+ status = wreplsrv_in_inform2(call);
+ break;
default:
return ERROR_INVALID_PARAMETER;
@@ -406,7 +507,8 @@ NTSTATUS wreplsrv_in_call(struct wreplsrv_in_call *call)
}
if (NT_STATUS_IS_OK(status)) {
- call->rep_packet.opcode = WREPL_OPCODE_BITS;
+ /* let the backend to set some of the opcode bits, but always add the standards */
+ call->rep_packet.opcode |= WREPL_OPCODE_BITS;
call->rep_packet.assoc_ctx = call->wreplconn->assoc_ctx.peer_ctx;
}
diff --git a/source4/wrepl_server/wrepl_in_connection.c b/source4/wrepl_server/wrepl_in_connection.c
index ed9c9998d7..5ba36a5051 100644
--- a/source4/wrepl_server/wrepl_in_connection.c
+++ b/source4/wrepl_server/wrepl_in_connection.c
@@ -32,7 +32,7 @@
#include "nbt_server/wins/winsdb.h"
#include "ldb/include/ldb.h"
-static void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn, const char *reason)
+void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn, const char *reason)
{
stream_terminate_connection(wreplconn->conn, reason);
}
@@ -267,6 +267,48 @@ static const struct stream_server_ops wreplsrv_stream_ops = {
};
/*
+ called when we get a new connection
+*/
+NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner,
+ struct socket_context *sock,
+ struct wreplsrv_in_connection **_wrepl_in)
+{
+ struct wreplsrv_service *service = partner->service;
+ struct wreplsrv_in_connection *wrepl_in;
+ const struct model_ops *model_ops;
+ struct stream_connection *conn;
+ NTSTATUS status;
+
+ /* within the wrepl task we want to be a single process, so
+ ask for the single process model ops and pass these to the
+ stream_setup_socket() call. */
+ model_ops = process_model_byname("single");
+ if (!model_ops) {
+ DEBUG(0,("Can't find 'single' process model_ops"));
+ return NT_STATUS_INTERNAL_ERROR;
+ }
+
+ wrepl_in = talloc_zero(partner, struct wreplsrv_in_connection);
+ NT_STATUS_HAVE_NO_MEMORY(wrepl_in);
+
+ wrepl_in->service = service;
+ wrepl_in->partner = partner;
+ wrepl_in->our_ip = socket_get_my_addr(sock, wrepl_in);
+ NT_STATUS_HAVE_NO_MEMORY(wrepl_in->our_ip);
+
+ status = stream_new_connection_merge(service->task->event_ctx, model_ops,
+ sock, &wreplsrv_stream_ops, service->task->msg_ctx,
+ wrepl_in, &conn);
+ NT_STATUS_NOT_OK_RETURN(status);
+
+ wrepl_in->conn = conn;
+ talloc_steal(conn, wrepl_in);
+
+ *_wrepl_in = wrepl_in;
+ return NT_STATUS_OK;
+}
+
+/*
startup the wrepl port 42 server sockets
*/
NTSTATUS wreplsrv_setup_sockets(struct wreplsrv_service *service)
diff --git a/source4/wrepl_server/wrepl_out_connection.c b/source4/wrepl_server/wrepl_out_connection.c
index 39406c7e2a..0d5bfda185 100644
--- a/source4/wrepl_server/wrepl_out_connection.c
+++ b/source4/wrepl_server/wrepl_out_connection.c
@@ -31,15 +31,221 @@
#include "wrepl_server/wrepl_server.h"
#include "nbt_server/wins/winsdb.h"
#include "ldb/include/ldb.h"
+#include "libcli/composite/composite.h"
+#include "libcli/wrepl/winsrepl.h"
+#include "wrepl_server/wrepl_out_helpers.h"
+
+static void wreplsrv_pull_handler_te(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *ptr);
+
+static void wreplsrv_pull_handler_creq(struct composite_context *creq)
+{
+ struct wreplsrv_partner *partner = talloc_get_type(creq->async.private_data, struct wreplsrv_partner);
+ uint32_t interval;
+
+ partner->pull.last_status = wreplsrv_pull_cycle_recv(partner->pull.creq);
+ partner->pull.creq = NULL;
+ talloc_free(partner->pull.cycle_io);
+ partner->pull.cycle_io = NULL;
+
+ if (!NT_STATUS_IS_OK(partner->pull.last_status)) {
+ interval = partner->pull.error_count * partner->pull.retry_interval;
+ interval = MIN(interval, partner->pull.interval);
+ partner->pull.error_count++;
+
+ DEBUG(1,("wreplsrv_pull_cycle(%s): %s: next: %us\n",
+ partner->address, nt_errstr(partner->pull.last_status),
+ interval));
+ } else {
+ interval = partner->pull.interval;
+ partner->pull.error_count = 0;
+
+ DEBUG(2,("wreplsrv_pull_cycle(%s): %s: next: %us\n",
+ partner->address, nt_errstr(partner->pull.last_status),
+ interval));
+ }
+
+ partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner,
+ timeval_current_ofs(interval, 0),
+ wreplsrv_pull_handler_te, partner);
+ if (!partner->pull.te) {
+ DEBUG(0,("wreplsrv_pull_handler_creq: event_add_timed() failed! no memory!\n"));
+ }
+}
+
+static void wreplsrv_pull_handler_te(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *ptr)
+{
+ struct wreplsrv_partner *partner = talloc_get_type(ptr, struct wreplsrv_partner);
+
+ partner->pull.te = NULL;
+
+ partner->pull.cycle_io = talloc(partner, struct wreplsrv_pull_cycle_io);
+ if (!partner->pull.cycle_io) {
+ goto requeue;
+ }
+
+ partner->pull.cycle_io->in.partner = partner;
+ partner->pull.cycle_io->in.num_owners = 0;
+ partner->pull.cycle_io->in.owners = NULL;
+ partner->pull.cycle_io->in.wreplconn = NULL;
+ partner->pull.creq = wreplsrv_pull_cycle_send(partner->pull.cycle_io, partner->pull.cycle_io);
+ if (!partner->pull.creq) {
+ DEBUG(1,("wreplsrv_pull_cycle_send(%s) failed\n",
+ partner->address));
+ goto requeue;
+ }
+
+ partner->pull.creq->async.fn = wreplsrv_pull_handler_creq;
+ partner->pull.creq->async.private_data = partner;
+
+ return;
+requeue:
+ talloc_free(partner->pull.cycle_io);
+ partner->pull.cycle_io = NULL;
+ /* retry later */
+ partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner,
+ timeval_add(&t, partner->pull.retry_interval, 0),
+ wreplsrv_pull_handler_te, partner);
+ if (!partner->pull.te) {
+ DEBUG(0,("wreplsrv_pull_handler_te: event_add_timed() failed! no memory!\n"));
+ }
+}
+
+NTSTATUS wreplsrv_sched_inform_action(struct wreplsrv_partner *partner, struct wrepl_table *inform_in)
+{
+ if (partner->pull.creq) {
+ /* there's already a pull in progress, so we're done */
+ return NT_STATUS_OK;
+ }
+
+ /* remove the scheduled pull */
+ talloc_free(partner->pull.te);
+ partner->pull.te = NULL;
+
+ partner->pull.cycle_io = talloc(partner, struct wreplsrv_pull_cycle_io);
+ if (!partner->pull.cycle_io) {
+ goto requeue;
+ }
+
+ partner->pull.cycle_io->in.partner = partner;
+ partner->pull.cycle_io->in.num_owners = inform_in->partner_count;
+ partner->pull.cycle_io->in.owners = inform_in->partners;
+ talloc_steal(partner->pull.cycle_io, inform_in->partners);
+ partner->pull.cycle_io->in.wreplconn = NULL;
+ partner->pull.creq = wreplsrv_pull_cycle_send(partner->pull.cycle_io, partner->pull.cycle_io);
+ if (!partner->pull.creq) {
+ DEBUG(1,("wreplsrv_pull_cycle_send(%s) failed\n",
+ partner->address));
+ goto requeue;
+ }
+
+ partner->pull.creq->async.fn = wreplsrv_pull_handler_creq;
+ partner->pull.creq->async.private_data = partner;
+
+ return NT_STATUS_OK;
+requeue:
+ talloc_free(partner->pull.cycle_io);
+ partner->pull.cycle_io = NULL;
+ /* retry later */
+ partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner,
+ timeval_current_ofs(partner->pull.retry_interval, 0),
+ wreplsrv_pull_handler_te, partner);
+ if (!partner->pull.te) {
+ DEBUG(0,("wreplsrv_pull_handler_te: event_add_timed() failed! no memory!\n"));
+ }
+
+ return NT_STATUS_OK;
+}
+
+static void wreplsrv_push_handler_te(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *ptr);
+
+static void wreplsrv_push_handler_creq(struct composite_context *creq)
+{
+ struct wreplsrv_partner *partner = talloc_get_type(creq->async.private_data, struct wreplsrv_partner);
+ uint32_t interval;
+
+ partner->push.last_status = wreplsrv_push_notify_recv(partner->push.creq);
+ partner->push.creq = NULL;
+ talloc_free(partner->push.notify_io);
+ partner->push.notify_io = NULL;
+
+ if (!NT_STATUS_IS_OK(partner->push.last_status)) {
+ interval = 15;
+
+ DEBUG(1,("wreplsrv_push_notify(%s): %s: next: %us\n",
+ partner->address, nt_errstr(partner->push.last_status),
+ interval));
+ } else {
+ interval = 100;
+
+ DEBUG(2,("wreplsrv_push_notify(%s): %s: next: %us\n",
+ partner->address, nt_errstr(partner->push.last_status),
+ interval));
+ }
+
+ partner->push.te = event_add_timed(partner->service->task->event_ctx, partner,
+ timeval_current_ofs(interval, 0),
+ wreplsrv_push_handler_te, partner);
+ if (!partner->push.te) {
+ DEBUG(0,("wreplsrv_push_handler_creq: event_add_timed() failed! no memory!\n"));
+ }
+}
+
+static void wreplsrv_push_handler_te(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *ptr)
+{
+ struct wreplsrv_partner *partner = talloc_get_type(ptr, struct wreplsrv_partner);
+
+ partner->push.te = NULL;
+
+ partner->push.notify_io = talloc(partner, struct wreplsrv_push_notify_io);
+ if (!partner->push.notify_io) {
+ goto requeue;
+ }
+
+ partner->push.notify_io->in.partner = partner;
+ partner->push.notify_io->in.inform = False;
+ partner->push.notify_io->in.propagate = False;
+ partner->push.creq = wreplsrv_push_notify_send(partner->push.notify_io, partner->push.notify_io);
+ if (!partner->push.creq) {
+ DEBUG(1,("wreplsrv_push_notify_send(%s) failed\n",
+ partner->address));
+ goto requeue;
+ }
+
+ partner->push.creq->async.fn = wreplsrv_push_handler_creq;
+ partner->push.creq->async.private_data = partner;
+
+ return;
+requeue:
+ talloc_free(partner->push.notify_io);
+ partner->push.notify_io = NULL;
+ /* retry later */
+ partner->push.te = event_add_timed(partner->service->task->event_ctx, partner,
+ timeval_add(&t, 5, 0),
+ wreplsrv_push_handler_te, partner);
+ if (!partner->push.te) {
+ DEBUG(0,("wreplsrv_push_handler_te: event_add_timed() failed! no memory!\n"));
+ }
+}
NTSTATUS wreplsrv_setup_out_connections(struct wreplsrv_service *service)
{
struct wreplsrv_partner *cur;
for (cur = service->partners; cur; cur = cur->next) {
- if (!(cur->type & WINSREPL_PARTNER_PULL)) continue;
-
- DEBUG(0,("TODO: pull from: %s\n", cur->address));
+ if (cur->type & WINSREPL_PARTNER_PULL) {
+ cur->pull.te = event_add_timed(service->task->event_ctx, cur,
+ timeval_zero(), wreplsrv_pull_handler_te, cur);
+ NT_STATUS_HAVE_NO_MEMORY(cur->pull.te);
+ }
+ if (cur->type & WINSREPL_PARTNER_PUSH) {
+ cur->push.te = event_add_timed(service->task->event_ctx, cur,
+ timeval_zero(), wreplsrv_push_handler_te, cur);
+ NT_STATUS_HAVE_NO_MEMORY(cur->push.te);
+ }
}
return NT_STATUS_OK;
diff --git a/source4/wrepl_server/wrepl_server.c b/source4/wrepl_server/wrepl_server.c
index 97fa23cdf1..b044ef8296 100644
--- a/source4/wrepl_server/wrepl_server.c
+++ b/source4/wrepl_server/wrepl_server.c
@@ -80,14 +80,17 @@ static NTSTATUS wreplsrv_load_partners(struct wreplsrv_service *service)
for (i=0; i < ret; i++) {
struct wreplsrv_partner *partner;
- partner = talloc(service, struct wreplsrv_partner);
+ partner = talloc_zero(service, struct wreplsrv_partner);
if (partner == NULL) goto failed;
+ partner->service = service;
partner->address = ldb_msg_find_string(res[i], "address", NULL);
if (!partner->address) goto failed;
partner->name = ldb_msg_find_string(res[i], "name", partner->address);
partner->type = ldb_msg_find_int(res[i], "type", WINSREPL_PARTNER_BOTH);
partner->pull.interval = ldb_msg_find_int(res[i], "pullInterval", WINSREPL_DEFAULT_PULL_INTERVAL);
+ partner->pull.retry_interval = ldb_msg_find_int(res[i], "pullRetryInterval",
+ WINSREPL_DEFAULT_PULL_RETRY_INTERVAL);
partner->our_address = ldb_msg_find_string(res[i], "ourAddress", NULL);
talloc_steal(partner, partner->address);
@@ -104,6 +107,29 @@ failed:
return NT_STATUS_FOOBAR;
}
+BOOL wreplsrv_is_our_address(struct wreplsrv_service *service, const char *address)
+{
+ const char *our_address;
+
+ if (lp_interfaces() && lp_bind_interfaces_only()) {
+ int num_interfaces = iface_count();
+ int i;
+ for(i = 0; i < num_interfaces; i++) {
+ our_address = iface_n_ip(i);
+ if (strcasecmp(our_address, address) == 0) {
+ return True;
+ }
+ }
+ } else {
+ our_address = lp_socket_address();
+ if (strcasecmp(our_address, address) == 0) {
+ return True;
+ }
+ }
+
+ return False;
+}
+
uint64_t wreplsrv_local_max_version(struct wreplsrv_service *service)
{
int ret;
@@ -134,6 +160,49 @@ failed:
return maxVersion;
}
+NTSTATUS wreplsrv_fill_wrepl_table(struct wreplsrv_service *service,
+ TALLOC_CTX *mem_ctx,
+ struct wrepl_table *table_out,
+ const char *our_ip,
+ const char *initiator,
+ BOOL full_table)
+{
+ struct wreplsrv_owner *cur;
+ uint64_t local_max_version;
+ uint32_t i = 0;
+
+ table_out->partner_count = 0;
+ table_out->partners = NULL;
+ table_out->initiator = initiator;
+
+ local_max_version = wreplsrv_local_max_version(service);
+ if (local_max_version > 0) {
+ table_out->partner_count++;
+ }
+
+ for (cur = service->table; full_table && cur; cur = cur->next) {
+ table_out->partner_count++;
+ }
+
+ table_out->partners = talloc_array(mem_ctx, struct wrepl_wins_owner, table_out->partner_count);
+ NT_STATUS_HAVE_NO_MEMORY(table_out->partners);
+
+ if (local_max_version > 0) {
+ table_out->partners[i].address = our_ip;
+ table_out->partners[i].min_version = 0;
+ table_out->partners[i].max_version = local_max_version;
+ table_out->partners[i].type = 1;
+ i++;
+ }
+
+ for (cur = service->table; full_table && cur; cur = cur->next) {
+ table_out->partners[i] = cur->owner;
+ i++;
+ }
+
+ return NT_STATUS_OK;
+}
+
struct wreplsrv_owner *wreplsrv_find_owner(struct wreplsrv_owner *table, const char *wins_owner)
{
struct wreplsrv_owner *cur;
@@ -151,9 +220,9 @@ struct wreplsrv_owner *wreplsrv_find_owner(struct wreplsrv_owner *table, const c
update the wins_owner_table max_version, if the given version is the highest version
if no entry for the wins_owner exists yet, create one
*/
-static NTSTATUS wreplsrv_add_table(struct wreplsrv_service *service,
- TALLOC_CTX *mem_ctx, struct wreplsrv_owner **_table,
- const char *wins_owner, uint64_t version)
+NTSTATUS wreplsrv_add_table(struct wreplsrv_service *service,
+ TALLOC_CTX *mem_ctx, struct wreplsrv_owner **_table,
+ const char *wins_owner, uint64_t version)
{
struct wreplsrv_owner *table = *_table;
struct wreplsrv_owner *cur;
diff --git a/source4/wrepl_server/wrepl_server.h b/source4/wrepl_server/wrepl_server.h
index 278b15fbb7..63603c823d 100644
--- a/source4/wrepl_server/wrepl_server.h
+++ b/source4/wrepl_server/wrepl_server.h
@@ -92,16 +92,38 @@ struct wreplsrv_in_connection {
state of an outcoming wrepl connection
*/
struct wreplsrv_out_connection {
+ /* our global service context */
+ struct wreplsrv_service *service;
+
+ /*
+ * the partner that connects us,
+ * can be NULL, when we got a connection
+ * from an unknown address
+ */
struct wreplsrv_partner *partner;
+
+ /* keep track of the assoc_ctx's */
+ struct {
+ uint32_t our_ctx;
+ uint32_t peer_ctx;
+ } assoc_ctx;
+
+ /*
+ * the client socket to the partner,
+ * NULL if not yet connected
+ */
+ struct wrepl_socket *sock;
};
enum winsrepl_partner_type {
+ WINSREPL_PARTNER_NONE = 0x0,
WINSREPL_PARTNER_PULL = 0x1,
WINSREPL_PARTNER_PUSH = 0x2,
WINSREPL_PARTNER_BOTH = (WINSREPL_PARTNER_PULL | WINSREPL_PARTNER_PUSH)
};
#define WINSREPL_DEFAULT_PULL_INTERVAL (30*60)
+#define WINSREPL_DEFAULT_PULL_RETRY_INTERVAL (30)
/*
this represents one of our configured partners
@@ -109,6 +131,9 @@ enum winsrepl_partner_type {
struct wreplsrv_partner {
struct wreplsrv_partner *prev,*next;
+ /* our global service context */
+ struct wreplsrv_service *service;
+
/* the netbios name of the partner, mostly just for debugging */
const char *name;
@@ -129,12 +154,51 @@ struct wreplsrv_partner {
/* the interval between 2 pull replications to the partner */
uint32_t interval;
+ /* the retry_interval if a pull cycle failed to the partner */
+ uint32_t retry_interval;
+
+ /* the error count till the last success */
+ uint32_t error_count;
+
+ /* the status of the last pull cycle */
+ NTSTATUS last_status;
+
/* this is a list of each wins_owner the partner knows about */
struct wreplsrv_owner *table;
/* the outgoing connection to the partner */
struct wreplsrv_out_connection *wreplconn;
+
+ /* the current pending pull cycle request */
+ struct composite_context *creq;
+
+ /* the pull cycle io params */
+ struct wreplsrv_pull_cycle_io *cycle_io;
+
+ /* the current timed_event to the next pull cycle */
+ struct timed_event *te;
} pull;
+
+ /* push specific options */
+ struct {
+ /* change count till push notification */
+ uint32_t change_count;
+
+ /* the status of the last push cycle */
+ NTSTATUS last_status;
+
+ /* the outgoing connection to the partner */
+ struct wreplsrv_out_connection *wreplconn;
+
+ /* the current push notification */
+ struct composite_context *creq;
+
+ /* the pull cycle io params */
+ struct wreplsrv_push_notify_io *notify_io;
+
+ /* the current timed_event to the next push notify */
+ struct timed_event *te;
+ } push;
};
struct wreplsrv_owner {