diff options
author | Stefan Metzmacher <metze@samba.org> | 2005-10-14 14:02:47 +0000 |
---|---|---|
committer | Gerald (Jerry) Carter <jerry@samba.org> | 2007-10-10 13:44:43 -0500 |
commit | cffd522b5c806508dfacfb10234e4c0a115c0a98 (patch) | |
tree | ec5271d813d2cb547757732815a434ba27a89582 /source4/wrepl_server | |
parent | d1e6c228692ff8b06d6eecd6be22fe0727e170ac (diff) | |
download | samba-cffd522b5c806508dfacfb10234e4c0a115c0a98.tar.gz samba-cffd522b5c806508dfacfb10234e4c0a115c0a98.tar.bz2 samba-cffd522b5c806508dfacfb10234e4c0a115c0a98.zip |
r11052: bring samba4 uptodate with the samba4-winsrepl branch,
before the bad merge
metze
(This used to be commit 471c0ca4abb17fb5f73c0efed195c67628c1c06e)
Diffstat (limited to 'source4/wrepl_server')
-rw-r--r-- | source4/wrepl_server/config.mk | 4 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_in_call.c | 258 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_in_connection.c | 44 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_out_connection.c | 212 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_server.c | 77 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_server.h | 64 |
6 files changed, 572 insertions, 87 deletions
diff --git a/source4/wrepl_server/config.mk b/source4/wrepl_server/config.mk index 18fdcad5e9..8c1202ae94 100644 --- a/source4/wrepl_server/config.mk +++ b/source4/wrepl_server/config.mk @@ -7,7 +7,9 @@ INIT_OBJ_FILES = \ wrepl_server/wrepl_server.o \ wrepl_server/wrepl_in_connection.o \ wrepl_server/wrepl_in_call.o \ - wrepl_server/wrepl_out_connection.o + wrepl_server/wrepl_out_connection.o \ + wrepl_server/wrepl_out_helpers.o \ + wrepl_server/wrepl_apply_records.o REQUIRED_SUBSYSTEMS = \ LIBCLI_WREPL WINSDB # End SUBSYSTEM WREPL_SRV diff --git a/source4/wrepl_server/wrepl_in_call.c b/source4/wrepl_server/wrepl_in_call.c index 4472a0fede..7ccb52cc20 100644 --- a/source4/wrepl_server/wrepl_in_call.c +++ b/source4/wrepl_server/wrepl_in_call.c @@ -29,7 +29,10 @@ #include "lib/messaging/irpc.h" #include "librpc/gen_ndr/ndr_winsrepl.h" #include "librpc/gen_ndr/ndr_nbt.h" +#include "libcli/wrepl/winsrepl.h" #include "wrepl_server/wrepl_server.h" +#include "wrepl_server/wrepl_out_helpers.h" +#include "libcli/composite/composite.h" #include "nbt_server/wins/winsdb.h" #include "lib/ldb/include/ldb.h" @@ -111,42 +114,12 @@ static NTSTATUS wreplsrv_in_table_query(struct wreplsrv_in_call *call) struct wreplsrv_service *service = call->wreplconn->service; struct wrepl_replication *repl_out = &call->rep_packet.message.replication; struct wrepl_table *table_out = &call->rep_packet.message.replication.info.table; - struct wreplsrv_owner *cur; - uint64_t local_max_version; - uint32_t i = 0; + const char *our_ip = call->wreplconn->our_ip; repl_out->command = WREPL_REPL_TABLE_REPLY; - table_out->partner_count = 0; - table_out->partners = NULL; - table_out->initiator = WINSDB_OWNER_LOCAL; - - local_max_version = wreplsrv_local_max_version(service); - if (local_max_version > 0) { - table_out->partner_count++; - } - - for (cur = service->table; cur; cur = cur->next) { - table_out->partner_count++; - } - - table_out->partners = talloc_array(call, struct wrepl_wins_owner, table_out->partner_count); - NT_STATUS_HAVE_NO_MEMORY(table_out->partners); - - if (local_max_version > 0) { - table_out->partners[i].address = call->wreplconn->our_ip; - table_out->partners[i].min_version = 0; - table_out->partners[i].max_version = local_max_version; - table_out->partners[i].type = 1; - i++; - } - - for (cur = service->table; cur; cur = cur->next) { - table_out->partners[i] = cur->owner; - i++; - } - - return NT_STATUS_OK; + return wreplsrv_fill_wrepl_table(service, call, table_out, + our_ip, our_ip, True); } static int wreplsrv_in_sort_wins_name(struct wrepl_wins_name *n1, @@ -157,42 +130,21 @@ static int wreplsrv_in_sort_wins_name(struct wrepl_wins_name *n1, return 0; } -static NTSTATUS wreplsrv_record2wins_name(TALLOC_CTX *mem_ctx, struct wrepl_wins_name *name, struct winsdb_record *rec) +static NTSTATUS wreplsrv_record2wins_name(TALLOC_CTX *mem_ctx, + const char *our_address, + struct wrepl_wins_name *name, + struct winsdb_record *rec) { - uint8_t *namebuf; - uint32_t namebuf_len; - uint32_t name_len; - - name_len = strlen(rec->name->name); - if (name_len > 15) { - return NT_STATUS_INVALID_PARAMETER_MIX; - } - - namebuf = (uint8_t *)talloc_asprintf(mem_ctx, "%-15s%c%s", - rec->name->name, 'X', - (rec->name->scope?rec->name->scope:"")); - NT_STATUS_HAVE_NO_MEMORY(namebuf); - namebuf_len = strlen((char *)namebuf) + 1; - - /* - * we need to set the type here, and use a place-holder in the talloc_asprintf() - * as the type can be 0x00, and then the namebuf_len = strlen(namebuf); would give wrong results - */ - namebuf[15] = rec->name->type; + uint32_t num_ips, i; + struct wrepl_ip *ips; - /* oh wow, what a nasty bug in windows ... */ - if (rec->name->type == 0x1b) { - namebuf[15] = namebuf[0]; - namebuf[0] = 0x1b; - } + name->name = rec->name; + talloc_steal(mem_ctx, rec->name); - name->name_len = namebuf_len; - name->name = namebuf; name->id = rec->version; name->unknown = WINSDB_GROUP_ADDRESS; - name->flags = rec->nb_flags; - name->group_flag = 0; + name->flags = WREPL_NAME_FLAGS(rec->type, rec->state, rec->node, rec->is_static); switch (name->flags & 2) { case 0: @@ -200,8 +152,24 @@ static NTSTATUS wreplsrv_record2wins_name(TALLOC_CTX *mem_ctx, struct wrepl_wins talloc_steal(mem_ctx, rec->addresses[0]->address); break; case 2: - name->addresses.addresses.num_ips = 0; - name->addresses.addresses.ips = NULL; + num_ips = winsdb_addr_list_length(rec->addresses); + ips = talloc_array(mem_ctx, struct wrepl_ip, num_ips); + NT_STATUS_HAVE_NO_MEMORY(ips); + + for (i = 0; i < num_ips; i++) { + if (strcasecmp(WINSDB_OWNER_LOCAL, rec->addresses[i]->wins_owner) == 0) { + ips[i].owner = talloc_strdup(ips, our_address); + NT_STATUS_HAVE_NO_MEMORY(ips[i].owner); + } else { + ips[i].owner = rec->addresses[i]->wins_owner; + talloc_steal(ips, rec->addresses[i]->wins_owner); + } + ips[i].ip = rec->addresses[i]->address; + talloc_steal(ips, rec->addresses[i]->address); + } + + name->addresses.addresses.num_ips = num_ips; + name->addresses.addresses.ips = ips; break; } @@ -251,7 +219,7 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call) * if the partner ask for nothing, or give invalid ranges, * return an empty list. */ - if (owner_in->min_version >= owner_in->max_version) { + if (owner_in->min_version > owner_in->max_version) { return NT_STATUS_OK; } @@ -259,19 +227,29 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call) * if the partner has already all records for nothing, or give invalid ranges, * return an empty list. */ - if (owner_in->min_version >= owner->owner.max_version) { + if (owner_in->min_version > owner->owner.max_version) { return NT_STATUS_OK; } - filter = talloc_asprintf(call, "(&(winsOwner=%s)(objectClass=winsRecord)(state>=%u)(versionID>=%llu)(versionID<=%llu))", - owner->owner.address, WINS_REC_ACTIVE, owner_in->min_version, owner_in->max_version); + filter = talloc_asprintf(call, + "(&(winsOwner=%s)(objectClass=winsRecord)" + "(|(recordState=%u)(recordState=%u))" + "(versionID>=%llu)(versionID<=%llu))", + owner->owner.address, + WREPL_STATE_ACTIVE, WREPL_STATE_TOMBSTONE, + owner_in->min_version, owner_in->max_version); NT_STATUS_HAVE_NO_MEMORY(filter); ret = ldb_search(service->wins_db, NULL, LDB_SCOPE_SUBTREE, filter, NULL, &res); if (res != NULL) { talloc_steal(call, res); } if (ret < 0) return NT_STATUS_INTERNAL_DB_CORRUPTION; - if (ret == 0) return NT_STATUS_OK; + if (ret == 0) { + DEBUG(2,("WINSREPL:reply [%u] records owner[%s] min[%llu] max[%llu] to partner[%s]\n", + ret, owner_in->address, owner_in->min_version, owner_in->max_version, + call->wreplconn->partner->address)); + return NT_STATUS_OK; + } names = talloc_array(call, struct wrepl_wins_name, ret); NT_STATUS_HAVE_NO_MEMORY(names); @@ -280,7 +258,7 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call) status = winsdb_record(res[i], NULL, call, &rec); NT_STATUS_NOT_OK_RETURN(status); - status = wreplsrv_record2wins_name(names, &names[i], rec); + status = wreplsrv_record2wins_name(names, call->wreplconn->our_ip, &names[i], rec); NT_STATUS_NOT_OK_RETURN(status); talloc_free(rec); talloc_free(res[i]); @@ -289,12 +267,113 @@ static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call) /* sort the names before we send them */ qsort(names, ret, sizeof(struct wrepl_wins_name), (comparison_fn_t)wreplsrv_in_sort_wins_name); + DEBUG(2,("WINSREPL:reply [%u] records owner[%s] min[%llu] max[%llu] to partner[%s]\n", + ret, owner_in->address, owner_in->min_version, owner_in->max_version, + call->wreplconn->partner->address)); + reply_out->num_names = ret; reply_out->names = names; return NT_STATUS_OK; } +struct wreplsrv_in_update_state { + struct wreplsrv_in_connection *wrepl_in; + struct wreplsrv_out_connection *wrepl_out; + struct composite_context *creq; + struct wreplsrv_pull_cycle_io cycle_io; +}; + +static void wreplsrv_in_update_handler(struct composite_context *creq) +{ + struct wreplsrv_in_update_state *update_state = talloc_get_type(creq->async.private_data, + struct wreplsrv_in_update_state); + NTSTATUS status; + + status = wreplsrv_pull_cycle_recv(creq); + + talloc_free(update_state->wrepl_out); + + wreplsrv_terminate_in_connection(update_state->wrepl_in, nt_errstr(status)); +} + +static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call) +{ + struct wreplsrv_in_connection *wrepl_in = call->wreplconn; + struct wreplsrv_out_connection *wrepl_out; + struct wrepl_table *update_in = &call->req_packet.message.replication.info.table; + struct wreplsrv_in_update_state *update_state; + + DEBUG(2,("WREPL_REPL_UPDATE: partner[%s] initiator[%s] num_owners[%u]\n", + call->wreplconn->partner->address, + update_in->initiator, update_in->partner_count)); + + /* + * we need to flip the connection into a client connection + * and do a WREPL_REPL_SEND_REQUEST's on the that connection + * and then stop this connection + */ + talloc_free(wrepl_in->conn->event.fde); + wrepl_in->conn->event.fde = NULL; + + update_state = talloc(wrepl_in, struct wreplsrv_in_update_state); + NT_STATUS_HAVE_NO_MEMORY(update_state); + + wrepl_out = talloc(update_state, struct wreplsrv_out_connection); + NT_STATUS_HAVE_NO_MEMORY(wrepl_out); + wrepl_out->service = wrepl_in->service; + wrepl_out->partner = wrepl_in->partner; + wrepl_out->assoc_ctx.our_ctx = wrepl_in->assoc_ctx.our_ctx; + wrepl_out->assoc_ctx.peer_ctx = wrepl_in->assoc_ctx.peer_ctx; + wrepl_out->sock = wrepl_socket_merge(wrepl_out, + wrepl_in->conn->event.ctx, + wrepl_in->conn->socket); + NT_STATUS_HAVE_NO_MEMORY(wrepl_out->sock); + + update_state->wrepl_in = wrepl_in; + update_state->wrepl_out = wrepl_out; + update_state->cycle_io.in.partner = wrepl_out->partner; + update_state->cycle_io.in.num_owners = update_in->partner_count; + update_state->cycle_io.in.owners = update_in->partners; + talloc_steal(update_state, update_in->partners); + update_state->cycle_io.in.wreplconn = wrepl_out; + update_state->creq = wreplsrv_pull_cycle_send(update_state, &update_state->cycle_io); + if (!update_state->creq) { + return NT_STATUS_INTERNAL_ERROR; + } + + update_state->creq->async.fn = wreplsrv_in_update_handler; + update_state->creq->async.private_data = update_state; + + return ERROR_INVALID_PARAMETER; +} + +static NTSTATUS wreplsrv_in_update2(struct wreplsrv_in_call *call) +{ + return wreplsrv_in_update(call); +} + +static NTSTATUS wreplsrv_in_inform(struct wreplsrv_in_call *call) +{ + struct wrepl_table *inform_in = &call->req_packet.message.replication.info.table; + NTSTATUS status; + + DEBUG(2,("WREPL_REPL_INFORM: partner[%s] initiator[%s] num_owners[%u]\n", + call->wreplconn->partner->address, + inform_in->initiator, inform_in->partner_count)); + + status = wreplsrv_sched_inform_action(call->wreplconn->partner, inform_in); + NT_STATUS_NOT_OK_RETURN(status); + + /* we don't reply to WREPL_REPL_INFORM messages */ + return ERROR_INVALID_PARAMETER; +} + +static NTSTATUS wreplsrv_in_inform2(struct wreplsrv_in_call *call) +{ + return wreplsrv_in_inform(call); +} + static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call) { struct wrepl_replication *repl_in = &call->req_packet.message.replication; @@ -318,6 +397,9 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call) switch (repl_in->command) { case WREPL_REPL_TABLE_QUERY: + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PUSH)) { + return wreplsrv_in_stop_assoc_ctx(call); + } status = wreplsrv_in_table_query(call); break; @@ -325,6 +407,9 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call) return ERROR_INVALID_PARAMETER; case WREPL_REPL_SEND_REQUEST: + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PUSH)) { + return wreplsrv_in_stop_assoc_ctx(call); + } status = wreplsrv_in_send_request(call); break; @@ -332,16 +417,32 @@ static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call) return ERROR_INVALID_PARAMETER; case WREPL_REPL_UPDATE: - return ERROR_INVALID_PARAMETER; + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) { + return wreplsrv_in_stop_assoc_ctx(call); + } + status = wreplsrv_in_update(call); + break; - case WREPL_REPL_5: - return ERROR_INVALID_PARAMETER; + case WREPL_REPL_UPDATE2: + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) { + return wreplsrv_in_stop_assoc_ctx(call); + } + status = wreplsrv_in_update2(call); + break; case WREPL_REPL_INFORM: - return ERROR_INVALID_PARAMETER; + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) { + return wreplsrv_in_stop_assoc_ctx(call); + } + status = wreplsrv_in_inform(call); + break; - case WREPL_REPL_9: - return ERROR_INVALID_PARAMETER; + case WREPL_REPL_INFORM2: + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) { + return wreplsrv_in_stop_assoc_ctx(call); + } + status = wreplsrv_in_inform2(call); + break; default: return ERROR_INVALID_PARAMETER; @@ -406,7 +507,8 @@ NTSTATUS wreplsrv_in_call(struct wreplsrv_in_call *call) } if (NT_STATUS_IS_OK(status)) { - call->rep_packet.opcode = WREPL_OPCODE_BITS; + /* let the backend to set some of the opcode bits, but always add the standards */ + call->rep_packet.opcode |= WREPL_OPCODE_BITS; call->rep_packet.assoc_ctx = call->wreplconn->assoc_ctx.peer_ctx; } diff --git a/source4/wrepl_server/wrepl_in_connection.c b/source4/wrepl_server/wrepl_in_connection.c index ed9c9998d7..5ba36a5051 100644 --- a/source4/wrepl_server/wrepl_in_connection.c +++ b/source4/wrepl_server/wrepl_in_connection.c @@ -32,7 +32,7 @@ #include "nbt_server/wins/winsdb.h" #include "ldb/include/ldb.h" -static void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn, const char *reason) +void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn, const char *reason) { stream_terminate_connection(wreplconn->conn, reason); } @@ -267,6 +267,48 @@ static const struct stream_server_ops wreplsrv_stream_ops = { }; /* + called when we get a new connection +*/ +NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner, + struct socket_context *sock, + struct wreplsrv_in_connection **_wrepl_in) +{ + struct wreplsrv_service *service = partner->service; + struct wreplsrv_in_connection *wrepl_in; + const struct model_ops *model_ops; + struct stream_connection *conn; + NTSTATUS status; + + /* within the wrepl task we want to be a single process, so + ask for the single process model ops and pass these to the + stream_setup_socket() call. */ + model_ops = process_model_byname("single"); + if (!model_ops) { + DEBUG(0,("Can't find 'single' process model_ops")); + return NT_STATUS_INTERNAL_ERROR; + } + + wrepl_in = talloc_zero(partner, struct wreplsrv_in_connection); + NT_STATUS_HAVE_NO_MEMORY(wrepl_in); + + wrepl_in->service = service; + wrepl_in->partner = partner; + wrepl_in->our_ip = socket_get_my_addr(sock, wrepl_in); + NT_STATUS_HAVE_NO_MEMORY(wrepl_in->our_ip); + + status = stream_new_connection_merge(service->task->event_ctx, model_ops, + sock, &wreplsrv_stream_ops, service->task->msg_ctx, + wrepl_in, &conn); + NT_STATUS_NOT_OK_RETURN(status); + + wrepl_in->conn = conn; + talloc_steal(conn, wrepl_in); + + *_wrepl_in = wrepl_in; + return NT_STATUS_OK; +} + +/* startup the wrepl port 42 server sockets */ NTSTATUS wreplsrv_setup_sockets(struct wreplsrv_service *service) diff --git a/source4/wrepl_server/wrepl_out_connection.c b/source4/wrepl_server/wrepl_out_connection.c index 39406c7e2a..0d5bfda185 100644 --- a/source4/wrepl_server/wrepl_out_connection.c +++ b/source4/wrepl_server/wrepl_out_connection.c @@ -31,15 +31,221 @@ #include "wrepl_server/wrepl_server.h" #include "nbt_server/wins/winsdb.h" #include "ldb/include/ldb.h" +#include "libcli/composite/composite.h" +#include "libcli/wrepl/winsrepl.h" +#include "wrepl_server/wrepl_out_helpers.h" + +static void wreplsrv_pull_handler_te(struct event_context *ev, struct timed_event *te, + struct timeval t, void *ptr); + +static void wreplsrv_pull_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_partner *partner = talloc_get_type(creq->async.private_data, struct wreplsrv_partner); + uint32_t interval; + + partner->pull.last_status = wreplsrv_pull_cycle_recv(partner->pull.creq); + partner->pull.creq = NULL; + talloc_free(partner->pull.cycle_io); + partner->pull.cycle_io = NULL; + + if (!NT_STATUS_IS_OK(partner->pull.last_status)) { + interval = partner->pull.error_count * partner->pull.retry_interval; + interval = MIN(interval, partner->pull.interval); + partner->pull.error_count++; + + DEBUG(1,("wreplsrv_pull_cycle(%s): %s: next: %us\n", + partner->address, nt_errstr(partner->pull.last_status), + interval)); + } else { + interval = partner->pull.interval; + partner->pull.error_count = 0; + + DEBUG(2,("wreplsrv_pull_cycle(%s): %s: next: %us\n", + partner->address, nt_errstr(partner->pull.last_status), + interval)); + } + + partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner, + timeval_current_ofs(interval, 0), + wreplsrv_pull_handler_te, partner); + if (!partner->pull.te) { + DEBUG(0,("wreplsrv_pull_handler_creq: event_add_timed() failed! no memory!\n")); + } +} + +static void wreplsrv_pull_handler_te(struct event_context *ev, struct timed_event *te, + struct timeval t, void *ptr) +{ + struct wreplsrv_partner *partner = talloc_get_type(ptr, struct wreplsrv_partner); + + partner->pull.te = NULL; + + partner->pull.cycle_io = talloc(partner, struct wreplsrv_pull_cycle_io); + if (!partner->pull.cycle_io) { + goto requeue; + } + + partner->pull.cycle_io->in.partner = partner; + partner->pull.cycle_io->in.num_owners = 0; + partner->pull.cycle_io->in.owners = NULL; + partner->pull.cycle_io->in.wreplconn = NULL; + partner->pull.creq = wreplsrv_pull_cycle_send(partner->pull.cycle_io, partner->pull.cycle_io); + if (!partner->pull.creq) { + DEBUG(1,("wreplsrv_pull_cycle_send(%s) failed\n", + partner->address)); + goto requeue; + } + + partner->pull.creq->async.fn = wreplsrv_pull_handler_creq; + partner->pull.creq->async.private_data = partner; + + return; +requeue: + talloc_free(partner->pull.cycle_io); + partner->pull.cycle_io = NULL; + /* retry later */ + partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner, + timeval_add(&t, partner->pull.retry_interval, 0), + wreplsrv_pull_handler_te, partner); + if (!partner->pull.te) { + DEBUG(0,("wreplsrv_pull_handler_te: event_add_timed() failed! no memory!\n")); + } +} + +NTSTATUS wreplsrv_sched_inform_action(struct wreplsrv_partner *partner, struct wrepl_table *inform_in) +{ + if (partner->pull.creq) { + /* there's already a pull in progress, so we're done */ + return NT_STATUS_OK; + } + + /* remove the scheduled pull */ + talloc_free(partner->pull.te); + partner->pull.te = NULL; + + partner->pull.cycle_io = talloc(partner, struct wreplsrv_pull_cycle_io); + if (!partner->pull.cycle_io) { + goto requeue; + } + + partner->pull.cycle_io->in.partner = partner; + partner->pull.cycle_io->in.num_owners = inform_in->partner_count; + partner->pull.cycle_io->in.owners = inform_in->partners; + talloc_steal(partner->pull.cycle_io, inform_in->partners); + partner->pull.cycle_io->in.wreplconn = NULL; + partner->pull.creq = wreplsrv_pull_cycle_send(partner->pull.cycle_io, partner->pull.cycle_io); + if (!partner->pull.creq) { + DEBUG(1,("wreplsrv_pull_cycle_send(%s) failed\n", + partner->address)); + goto requeue; + } + + partner->pull.creq->async.fn = wreplsrv_pull_handler_creq; + partner->pull.creq->async.private_data = partner; + + return NT_STATUS_OK; +requeue: + talloc_free(partner->pull.cycle_io); + partner->pull.cycle_io = NULL; + /* retry later */ + partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner, + timeval_current_ofs(partner->pull.retry_interval, 0), + wreplsrv_pull_handler_te, partner); + if (!partner->pull.te) { + DEBUG(0,("wreplsrv_pull_handler_te: event_add_timed() failed! no memory!\n")); + } + + return NT_STATUS_OK; +} + +static void wreplsrv_push_handler_te(struct event_context *ev, struct timed_event *te, + struct timeval t, void *ptr); + +static void wreplsrv_push_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_partner *partner = talloc_get_type(creq->async.private_data, struct wreplsrv_partner); + uint32_t interval; + + partner->push.last_status = wreplsrv_push_notify_recv(partner->push.creq); + partner->push.creq = NULL; + talloc_free(partner->push.notify_io); + partner->push.notify_io = NULL; + + if (!NT_STATUS_IS_OK(partner->push.last_status)) { + interval = 15; + + DEBUG(1,("wreplsrv_push_notify(%s): %s: next: %us\n", + partner->address, nt_errstr(partner->push.last_status), + interval)); + } else { + interval = 100; + + DEBUG(2,("wreplsrv_push_notify(%s): %s: next: %us\n", + partner->address, nt_errstr(partner->push.last_status), + interval)); + } + + partner->push.te = event_add_timed(partner->service->task->event_ctx, partner, + timeval_current_ofs(interval, 0), + wreplsrv_push_handler_te, partner); + if (!partner->push.te) { + DEBUG(0,("wreplsrv_push_handler_creq: event_add_timed() failed! no memory!\n")); + } +} + +static void wreplsrv_push_handler_te(struct event_context *ev, struct timed_event *te, + struct timeval t, void *ptr) +{ + struct wreplsrv_partner *partner = talloc_get_type(ptr, struct wreplsrv_partner); + + partner->push.te = NULL; + + partner->push.notify_io = talloc(partner, struct wreplsrv_push_notify_io); + if (!partner->push.notify_io) { + goto requeue; + } + + partner->push.notify_io->in.partner = partner; + partner->push.notify_io->in.inform = False; + partner->push.notify_io->in.propagate = False; + partner->push.creq = wreplsrv_push_notify_send(partner->push.notify_io, partner->push.notify_io); + if (!partner->push.creq) { + DEBUG(1,("wreplsrv_push_notify_send(%s) failed\n", + partner->address)); + goto requeue; + } + + partner->push.creq->async.fn = wreplsrv_push_handler_creq; + partner->push.creq->async.private_data = partner; + + return; +requeue: + talloc_free(partner->push.notify_io); + partner->push.notify_io = NULL; + /* retry later */ + partner->push.te = event_add_timed(partner->service->task->event_ctx, partner, + timeval_add(&t, 5, 0), + wreplsrv_push_handler_te, partner); + if (!partner->push.te) { + DEBUG(0,("wreplsrv_push_handler_te: event_add_timed() failed! no memory!\n")); + } +} NTSTATUS wreplsrv_setup_out_connections(struct wreplsrv_service *service) { struct wreplsrv_partner *cur; for (cur = service->partners; cur; cur = cur->next) { - if (!(cur->type & WINSREPL_PARTNER_PULL)) continue; - - DEBUG(0,("TODO: pull from: %s\n", cur->address)); + if (cur->type & WINSREPL_PARTNER_PULL) { + cur->pull.te = event_add_timed(service->task->event_ctx, cur, + timeval_zero(), wreplsrv_pull_handler_te, cur); + NT_STATUS_HAVE_NO_MEMORY(cur->pull.te); + } + if (cur->type & WINSREPL_PARTNER_PUSH) { + cur->push.te = event_add_timed(service->task->event_ctx, cur, + timeval_zero(), wreplsrv_push_handler_te, cur); + NT_STATUS_HAVE_NO_MEMORY(cur->push.te); + } } return NT_STATUS_OK; diff --git a/source4/wrepl_server/wrepl_server.c b/source4/wrepl_server/wrepl_server.c index 97fa23cdf1..b044ef8296 100644 --- a/source4/wrepl_server/wrepl_server.c +++ b/source4/wrepl_server/wrepl_server.c @@ -80,14 +80,17 @@ static NTSTATUS wreplsrv_load_partners(struct wreplsrv_service *service) for (i=0; i < ret; i++) { struct wreplsrv_partner *partner; - partner = talloc(service, struct wreplsrv_partner); + partner = talloc_zero(service, struct wreplsrv_partner); if (partner == NULL) goto failed; + partner->service = service; partner->address = ldb_msg_find_string(res[i], "address", NULL); if (!partner->address) goto failed; partner->name = ldb_msg_find_string(res[i], "name", partner->address); partner->type = ldb_msg_find_int(res[i], "type", WINSREPL_PARTNER_BOTH); partner->pull.interval = ldb_msg_find_int(res[i], "pullInterval", WINSREPL_DEFAULT_PULL_INTERVAL); + partner->pull.retry_interval = ldb_msg_find_int(res[i], "pullRetryInterval", + WINSREPL_DEFAULT_PULL_RETRY_INTERVAL); partner->our_address = ldb_msg_find_string(res[i], "ourAddress", NULL); talloc_steal(partner, partner->address); @@ -104,6 +107,29 @@ failed: return NT_STATUS_FOOBAR; } +BOOL wreplsrv_is_our_address(struct wreplsrv_service *service, const char *address) +{ + const char *our_address; + + if (lp_interfaces() && lp_bind_interfaces_only()) { + int num_interfaces = iface_count(); + int i; + for(i = 0; i < num_interfaces; i++) { + our_address = iface_n_ip(i); + if (strcasecmp(our_address, address) == 0) { + return True; + } + } + } else { + our_address = lp_socket_address(); + if (strcasecmp(our_address, address) == 0) { + return True; + } + } + + return False; +} + uint64_t wreplsrv_local_max_version(struct wreplsrv_service *service) { int ret; @@ -134,6 +160,49 @@ failed: return maxVersion; } +NTSTATUS wreplsrv_fill_wrepl_table(struct wreplsrv_service *service, + TALLOC_CTX *mem_ctx, + struct wrepl_table *table_out, + const char *our_ip, + const char *initiator, + BOOL full_table) +{ + struct wreplsrv_owner *cur; + uint64_t local_max_version; + uint32_t i = 0; + + table_out->partner_count = 0; + table_out->partners = NULL; + table_out->initiator = initiator; + + local_max_version = wreplsrv_local_max_version(service); + if (local_max_version > 0) { + table_out->partner_count++; + } + + for (cur = service->table; full_table && cur; cur = cur->next) { + table_out->partner_count++; + } + + table_out->partners = talloc_array(mem_ctx, struct wrepl_wins_owner, table_out->partner_count); + NT_STATUS_HAVE_NO_MEMORY(table_out->partners); + + if (local_max_version > 0) { + table_out->partners[i].address = our_ip; + table_out->partners[i].min_version = 0; + table_out->partners[i].max_version = local_max_version; + table_out->partners[i].type = 1; + i++; + } + + for (cur = service->table; full_table && cur; cur = cur->next) { + table_out->partners[i] = cur->owner; + i++; + } + + return NT_STATUS_OK; +} + struct wreplsrv_owner *wreplsrv_find_owner(struct wreplsrv_owner *table, const char *wins_owner) { struct wreplsrv_owner *cur; @@ -151,9 +220,9 @@ struct wreplsrv_owner *wreplsrv_find_owner(struct wreplsrv_owner *table, const c update the wins_owner_table max_version, if the given version is the highest version if no entry for the wins_owner exists yet, create one */ -static NTSTATUS wreplsrv_add_table(struct wreplsrv_service *service, - TALLOC_CTX *mem_ctx, struct wreplsrv_owner **_table, - const char *wins_owner, uint64_t version) +NTSTATUS wreplsrv_add_table(struct wreplsrv_service *service, + TALLOC_CTX *mem_ctx, struct wreplsrv_owner **_table, + const char *wins_owner, uint64_t version) { struct wreplsrv_owner *table = *_table; struct wreplsrv_owner *cur; diff --git a/source4/wrepl_server/wrepl_server.h b/source4/wrepl_server/wrepl_server.h index 278b15fbb7..63603c823d 100644 --- a/source4/wrepl_server/wrepl_server.h +++ b/source4/wrepl_server/wrepl_server.h @@ -92,16 +92,38 @@ struct wreplsrv_in_connection { state of an outcoming wrepl connection */ struct wreplsrv_out_connection { + /* our global service context */ + struct wreplsrv_service *service; + + /* + * the partner that connects us, + * can be NULL, when we got a connection + * from an unknown address + */ struct wreplsrv_partner *partner; + + /* keep track of the assoc_ctx's */ + struct { + uint32_t our_ctx; + uint32_t peer_ctx; + } assoc_ctx; + + /* + * the client socket to the partner, + * NULL if not yet connected + */ + struct wrepl_socket *sock; }; enum winsrepl_partner_type { + WINSREPL_PARTNER_NONE = 0x0, WINSREPL_PARTNER_PULL = 0x1, WINSREPL_PARTNER_PUSH = 0x2, WINSREPL_PARTNER_BOTH = (WINSREPL_PARTNER_PULL | WINSREPL_PARTNER_PUSH) }; #define WINSREPL_DEFAULT_PULL_INTERVAL (30*60) +#define WINSREPL_DEFAULT_PULL_RETRY_INTERVAL (30) /* this represents one of our configured partners @@ -109,6 +131,9 @@ enum winsrepl_partner_type { struct wreplsrv_partner { struct wreplsrv_partner *prev,*next; + /* our global service context */ + struct wreplsrv_service *service; + /* the netbios name of the partner, mostly just for debugging */ const char *name; @@ -129,12 +154,51 @@ struct wreplsrv_partner { /* the interval between 2 pull replications to the partner */ uint32_t interval; + /* the retry_interval if a pull cycle failed to the partner */ + uint32_t retry_interval; + + /* the error count till the last success */ + uint32_t error_count; + + /* the status of the last pull cycle */ + NTSTATUS last_status; + /* this is a list of each wins_owner the partner knows about */ struct wreplsrv_owner *table; /* the outgoing connection to the partner */ struct wreplsrv_out_connection *wreplconn; + + /* the current pending pull cycle request */ + struct composite_context *creq; + + /* the pull cycle io params */ + struct wreplsrv_pull_cycle_io *cycle_io; + + /* the current timed_event to the next pull cycle */ + struct timed_event *te; } pull; + + /* push specific options */ + struct { + /* change count till push notification */ + uint32_t change_count; + + /* the status of the last push cycle */ + NTSTATUS last_status; + + /* the outgoing connection to the partner */ + struct wreplsrv_out_connection *wreplconn; + + /* the current push notification */ + struct composite_context *creq; + + /* the pull cycle io params */ + struct wreplsrv_push_notify_io *notify_io; + + /* the current timed_event to the next push notify */ + struct timed_event *te; + } push; }; struct wreplsrv_owner { |