diff options
Diffstat (limited to 'source4/wrepl_server')
-rw-r--r-- | source4/wrepl_server/config.mk | 24 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_apply_records.c | 1432 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_in_call.c | 576 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_in_connection.c | 323 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_out_helpers.c | 1103 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_out_helpers.h | 37 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_out_pull.c | 142 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_out_push.c | 144 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_periodic.c | 118 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_scavenging.c | 526 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_server.c | 515 | ||||
-rw-r--r-- | source4/wrepl_server/wrepl_server.h | 313 |
12 files changed, 5253 insertions, 0 deletions
diff --git a/source4/wrepl_server/config.mk b/source4/wrepl_server/config.mk new file mode 100644 index 0000000000..235a897503 --- /dev/null +++ b/source4/wrepl_server/config.mk @@ -0,0 +1,24 @@ +# WREPL server subsystem + +####################### +# Start SUBSYSTEM WREPL_SRV +[MODULE::WREPL_SRV] +INIT_FUNCTION = server_service_wrepl_init +SUBSYSTEM = smbd +PRIVATE_DEPENDENCIES = \ + LIBCLI_WREPL WINSDB process_model +# End SUBSYSTEM WREPL_SRV +####################### + +WREPL_SRV_OBJ_FILES = $(addprefix $(wrepl_serversrcdir)/, \ + wrepl_server.o \ + wrepl_in_connection.o \ + wrepl_in_call.o \ + wrepl_apply_records.o \ + wrepl_periodic.o \ + wrepl_scavenging.o \ + wrepl_out_pull.o \ + wrepl_out_push.o \ + wrepl_out_helpers.o) + +$(eval $(call proto_header_template,$(wrepl_serversrcdir)/wrepl_server_proto.h,$(WREPL_SRV_OBJ_FILES:.o=.c))) diff --git a/source4/wrepl_server/wrepl_apply_records.c b/source4/wrepl_server/wrepl_apply_records.c new file mode 100644 index 0000000000..380b77517f --- /dev/null +++ b/source4/wrepl_server/wrepl_apply_records.c @@ -0,0 +1,1432 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "smbd/service_task.h" +#include "lib/messaging/irpc.h" +#include "librpc/gen_ndr/ndr_irpc.h" +#include "librpc/gen_ndr/ndr_winsrepl.h" +#include "wrepl_server/wrepl_server.h" +#include "nbt_server/wins/winsdb.h" +#include "libcli/wrepl/winsrepl.h" +#include "system/time.h" +#include "librpc/gen_ndr/ndr_nbt.h" + +enum _R_ACTION { + R_INVALID, + R_DO_REPLACE, + R_NOT_REPLACE, + R_DO_PROPAGATE, + R_DO_CHALLENGE, + R_DO_RELEASE_DEMAND, + R_DO_SGROUP_MERGE +}; + +static const char *_R_ACTION_enum_string(enum _R_ACTION action) +{ + switch (action) { + case R_INVALID: return "INVALID"; + case R_DO_REPLACE: return "REPLACE"; + case R_NOT_REPLACE: return "NOT_REPLACE"; + case R_DO_PROPAGATE: return "PROPAGATE"; + case R_DO_CHALLENGE: return "CHALLEGNE"; + case R_DO_RELEASE_DEMAND: return "RELEASE_DEMAND"; + case R_DO_SGROUP_MERGE: return "SGROUP_MERGE"; + } + + return "enum _R_ACTION unknown"; +} + +#define R_IS_ACTIVE(r) ((r)->state == WREPL_STATE_ACTIVE) +#if 0 /* unused */ +#define R_IS_RELEASED(r) ((r)->state == WREPL_STATE_RELEASED) +#endif +#define R_IS_TOMBSTONE(r) ((r)->state == WREPL_STATE_TOMBSTONE) + +#define R_IS_UNIQUE(r) ((r)->type == WREPL_TYPE_UNIQUE) +#define R_IS_GROUP(r) ((r)->type == WREPL_TYPE_GROUP) +#define R_IS_SGROUP(r) ((r)->type == WREPL_TYPE_SGROUP) +#if 0 /* unused */ +#define R_IS_MHOMED(r) ((r)->type == WREPL_TYPE_MHOMED) +#endif + +/* blindly overwrite records from the same owner in all cases */ +static enum _R_ACTION replace_same_owner(struct winsdb_record *r1, struct wrepl_name *r2) +{ + /* REPLACE */ + return R_DO_REPLACE; +} + +static bool r_1_is_subset_of_2_address_list(struct winsdb_record *r1, struct wrepl_name *r2, bool check_owners) +{ + uint32_t i,j; + size_t len = winsdb_addr_list_length(r1->addresses); + + for (i=0; i < len; i++) { + bool found = false; + for (j=0; j < r2->num_addresses; j++) { + if (strcmp(r1->addresses[i]->address, r2->addresses[j].address) != 0) { + continue; + } + + if (check_owners && strcmp(r1->addresses[i]->wins_owner, r2->addresses[j].owner) != 0) { + return false; + } + found = true; + break; + } + if (!found) return false; + } + + return true; +} + +static bool r_1_is_superset_of_2_address_list(struct winsdb_record *r1, struct wrepl_name *r2, bool check_owners) +{ + uint32_t i,j; + size_t len = winsdb_addr_list_length(r1->addresses); + + for (i=0; i < r2->num_addresses; i++) { + bool found = false; + for (j=0; j < len; j++) { + if (strcmp(r2->addresses[i].address, r1->addresses[j]->address) != 0) { + continue; + } + + if (check_owners && strcmp(r2->addresses[i].owner, r1->addresses[j]->wins_owner) != 0) { + return false; + } + found = true; + break; + } + if (!found) return false; + } + + return true; +} + +static bool r_1_is_same_as_2_address_list(struct winsdb_record *r1, struct wrepl_name *r2, bool check_owners) +{ + size_t len = winsdb_addr_list_length(r1->addresses); + + if (len != r2->num_addresses) { + return false; + } + + return r_1_is_superset_of_2_address_list(r1, r2, check_owners); +} + +static bool r_contains_addrs_from_owner(struct winsdb_record *r1, const char *owner) +{ + uint32_t i; + size_t len = winsdb_addr_list_length(r1->addresses); + + for (i=0; i < len; i++) { + if (strcmp(r1->addresses[i]->wins_owner, owner) == 0) { + return true; + } + } + + return false; +} + +/* +UNIQUE,ACTIVE vs. UNIQUE,ACTIVE with different ip(s) => REPLACE +UNIQUE,ACTIVE vs. UNIQUE,TOMBSTONE with different ip(s) => NOT REPLACE +UNIQUE,RELEASED vs. UNIQUE,ACTIVE with different ip(s) => REPLACE +UNIQUE,RELEASED vs. UNIQUE,TOMBSTONE with different ip(s) => REPLACE +UNIQUE,TOMBSTONE vs. UNIQUE,ACTIVE with different ip(s) => REPLACE +UNIQUE,TOMBSTONE vs. UNIQUE,TOMBSTONE with different ip(s) => REPLACE +UNIQUE,ACTIVE vs. GROUP,ACTIVE with different ip(s) => REPLACE +UNIQUE,ACTIVE vs. GROUP,TOMBSTONE with same ip(s) => NOT REPLACE +UNIQUE,RELEASED vs. GROUP,ACTIVE with different ip(s) => REPLACE +UNIQUE,RELEASED vs. GROUP,TOMBSTONE with different ip(s) => REPLACE +UNIQUE,TOMBSTONE vs. GROUP,ACTIVE with different ip(s) => REPLACE +UNIQUE,TOMBSTONE vs. GROUP,TOMBSTONE with different ip(s) => REPLACE +UNIQUE,ACTIVE vs. SGROUP,ACTIVE with same ip(s) => NOT REPLACE +UNIQUE,ACTIVE vs. SGROUP,TOMBSTONE with same ip(s) => NOT REPLACE +UNIQUE,RELEASED vs. SGROUP,ACTIVE with different ip(s) => REPLACE +UNIQUE,RELEASED vs. SGROUP,TOMBSTONE with different ip(s) => REPLACE +UNIQUE,TOMBSTONE vs. SGROUP,ACTIVE with different ip(s) => REPLACE +UNIQUE,TOMBSTONE vs. SGROUP,TOMBSTONE with different ip(s) => REPLACE +UNIQUE,ACTIVE vs. MHOMED,ACTIVE with different ip(s) => REPLACE +UNIQUE,ACTIVE vs. MHOMED,TOMBSTONE with same ip(s) => NOT REPLACE +UNIQUE,RELEASED vs. MHOMED,ACTIVE with different ip(s) => REPLACE +UNIQUE,RELEASED vs. MHOMED,TOMBSTONE with different ip(s) => REPLACE +UNIQUE,TOMBSTONE vs. MHOMED,ACTIVE with different ip(s) => REPLACE +UNIQUE,TOMBSTONE vs. MHOMED,TOMBSTONE with different ip(s) => REPLACE +*/ +static enum _R_ACTION replace_unique_replica_vs_X_replica(struct winsdb_record *r1, struct wrepl_name *r2) +{ + if (!R_IS_ACTIVE(r1)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + if (!R_IS_SGROUP(r2) && R_IS_ACTIVE(r2)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + /* NOT REPLACE */ + return R_NOT_REPLACE; +} + +/* +GROUP,ACTIVE vs. UNIQUE,ACTIVE with same ip(s) => NOT REPLACE +GROUP,ACTIVE vs. UNIQUE,TOMBSTONE with same ip(s) => NOT REPLACE +GROUP,RELEASED vs. UNIQUE,ACTIVE with same ip(s) => NOT REPLACE +GROUP,RELEASED vs. UNIQUE,TOMBSTONE with same ip(s) => NOT REPLACE +GROUP,TOMBSTONE vs. UNIQUE,ACTIVE with same ip(s) => NOT REPLACE +GROUP,TOMBSTONE vs. UNIQUE,TOMBSTONE with same ip(s) => NOT REPLACE +GROUP,ACTIVE vs. GROUP,ACTIVE with same ip(s) => NOT REPLACE +GROUP,ACTIVE vs. GROUP,TOMBSTONE with same ip(s) => NOT REPLACE +GROUP,RELEASED vs. GROUP,ACTIVE with different ip(s) => REPLACE +GROUP,RELEASED vs. GROUP,TOMBSTONE with different ip(s) => REPLACE +GROUP,TOMBSTONE vs. GROUP,ACTIVE with different ip(s) => REPLACE +GROUP,TOMBSTONE vs. GROUP,TOMBSTONE with different ip(s) => REPLACE +GROUP,ACTIVE vs. SGROUP,ACTIVE with same ip(s) => NOT REPLACE +GROUP,ACTIVE vs. SGROUP,TOMBSTONE with same ip(s) => NOT REPLACE +GROUP,RELEASED vs. SGROUP,ACTIVE with different ip(s) => REPLACE +GROUP,RELEASED vs. SGROUP,TOMBSTONE with same ip(s) => NOT REPLACE +GROUP,TOMBSTONE vs. SGROUP,ACTIVE with different ip(s) => REPLACE +GROUP,TOMBSTONE vs. SGROUP,TOMBSTONE with different ip(s) => REPLACE +GROUP,ACTIVE vs. MHOMED,ACTIVE with same ip(s) => NOT REPLACE +GROUP,ACTIVE vs. MHOMED,TOMBSTONE with same ip(s) => NOT REPLACE +GROUP,RELEASED vs. MHOMED,ACTIVE with same ip(s) => NOT REPLACE +GROUP,RELEASED vs. MHOMED,TOMBSTONE with same ip(s) => NOT REPLACE +GROUP,TOMBSTONE vs. MHOMED,ACTIVE with different ip(s) => REPLACE +GROUP,TOMBSTONE vs. MHOMED,TOMBSTONE with different ip(s) => REPLACE +*/ +static enum _R_ACTION replace_group_replica_vs_X_replica(struct winsdb_record *r1, struct wrepl_name *r2) +{ + if (!R_IS_ACTIVE(r1) && R_IS_GROUP(r2)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + if (R_IS_TOMBSTONE(r1) && !R_IS_UNIQUE(r2)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + /* NOT REPLACE */ + return R_NOT_REPLACE; +} + +/* +SGROUP,ACTIVE vs. UNIQUE,ACTIVE with same ip(s) => NOT REPLACE +SGROUP,ACTIVE vs. UNIQUE,TOMBSTONE with same ip(s) => NOT REPLACE +SGROUP,RELEASED vs. UNIQUE,ACTIVE with different ip(s) => REPLACE +SGROUP,RELEASED vs. UNIQUE,TOMBSTONE with different ip(s) => REPLACE +SGROUP,TOMBSTONE vs. UNIQUE,ACTIVE with different ip(s) => REPLACE +SGROUP,TOMBSTONE vs. UNIQUE,TOMBSTONE with different ip(s) => REPLACE +SGROUP,ACTIVE vs. GROUP,ACTIVE with same ip(s) => NOT REPLACE +SGROUP,ACTIVE vs. GROUP,TOMBSTONE with same ip(s) => NOT REPLACE +SGROUP,RELEASED vs. GROUP,ACTIVE with different ip(s) => REPLACE +SGROUP,RELEASED vs. GROUP,TOMBSTONE with different ip(s) => REPLACE +SGROUP,TOMBSTONE vs. GROUP,ACTIVE with different ip(s) => REPLACE +SGROUP,TOMBSTONE vs. GROUP,TOMBSTONE with different ip(s) => REPLACE +SGROUP,RELEASED vs. SGROUP,ACTIVE with different ip(s) => REPLACE +SGROUP,RELEASED vs. SGROUP,TOMBSTONE with different ip(s) => REPLACE +SGROUP,TOMBSTONE vs. SGROUP,ACTIVE with different ip(s) => REPLACE +SGROUP,TOMBSTONE vs. SGROUP,TOMBSTONE with different ip(s) => REPLACE +SGROUP,ACTIVE vs. MHOMED,ACTIVE with same ip(s) => NOT REPLACE +SGROUP,ACTIVE vs. MHOMED,TOMBSTONE with same ip(s) => NOT REPLACE +SGROUP,RELEASED vs. MHOMED,ACTIVE with different ip(s) => REPLACE +SGROUP,RELEASED vs. MHOMED,TOMBSTONE with different ip(s) => REPLACE +SGROUP,TOMBSTONE vs. MHOMED,ACTIVE with different ip(s) => REPLACE +SGROUP,TOMBSTONE vs. MHOMED,TOMBSTONE with different ip(s) => REPLACE + +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4 vs. B:A_3_4 => NOT REPLACE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4 vs. B:NULL => NOT REPLACE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4_X_3_4 vs. B:A_3_4 => NOT REPLACE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:B_3_4 vs. B:A_3_4 => REPLACE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4 vs. B:A_3_4_OWNER_B => REPLACE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4_OWNER_B vs. B:A_3_4 => REPLACE + +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4 vs. B:B_3_4 => C:A_3_4_B_3_4 => SGROUP_MERGE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:B_3_4_X_3_4 vs. B:A_3_4 => B:A_3_4_X_3_4 => SGROUP_MERGE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:X_3_4 vs. B:A_3_4 => C:A_3_4_X_3_4 => SGROUP_MERGE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4_X_3_4 vs. B:A_3_4_OWNER_B => B:A_3_4_OWNER_B_X_3_4 => SGROUP_MERGE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:B_3_4_X_3_4 vs. B:B_3_4_X_1_2 => C:B_3_4_X_1_2_3_4 => SGROUP_MERGE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:B_3_4_X_3_4 vs. B:NULL => B:X_3_4 => SGROUP_MERGE + + +this is a bit strange, incoming tombstone replicas always replace old replicas: + +SGROUP,ACTIVE vs. SGROUP,TOMBSTONE A:B_3_4_X_3_4 vs. B:NULL => B:NULL => REPLACE +SGROUP,ACTIVE vs. SGROUP,TOMBSTONE A:B_3_4_X_3_4 vs. B:A_3_4 => B:A_3_4 => REPLACE +SGROUP,ACTIVE vs. SGROUP,TOMBSTONE A:B_3_4_X_3_4 vs. B:B_3_4 => B:B_3_4 => REPLACE +SGROUP,ACTIVE vs. SGROUP,TOMBSTONE A:B_3_4_X_3_4 vs. B:B_3_4_X_3_4 => B:B_3_4_X_3_4 => REPLACE +*/ +static enum _R_ACTION replace_sgroup_replica_vs_X_replica(struct winsdb_record *r1, struct wrepl_name *r2) +{ + if (!R_IS_ACTIVE(r1)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + if (!R_IS_SGROUP(r2)) { + /* NOT REPLACE */ + return R_NOT_REPLACE; + } + + /* + * this is strange, but correct + * the incoming tombstone replace the current active + * record + */ + if (!R_IS_ACTIVE(r2)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + if (r2->num_addresses == 0) { + if (r_contains_addrs_from_owner(r1, r2->owner)) { + /* not handled here: MERGE */ + return R_DO_SGROUP_MERGE; + } + + /* NOT REPLACE */ + return R_NOT_REPLACE; + } + + if (r_1_is_superset_of_2_address_list(r1, r2, true)) { + /* NOT REPLACE */ + return R_NOT_REPLACE; + } + + if (r_1_is_same_as_2_address_list(r1, r2, false)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + /* not handled here: MERGE */ + return R_DO_SGROUP_MERGE; +} + +/* +MHOMED,ACTIVE vs. UNIQUE,ACTIVE with different ip(s) => REPLACE +MHOMED,ACTIVE vs. UNIQUE,TOMBSTONE with same ip(s) => NOT REPLACE +MHOMED,RELEASED vs. UNIQUE,ACTIVE with different ip(s) => REPLACE +MHOMED,RELEASED vs. UNIQUE,TOMBSTONE with different ip(s) => REPLACE +MHOMED,TOMBSTONE vs. UNIQUE,ACTIVE with different ip(s) => REPLACE +MHOMED,TOMBSTONE vs. UNIQUE,TOMBSTONE with different ip(s) => REPLACE +MHOMED,ACTIVE vs. GROUP,ACTIVE with different ip(s) => REPLACE +MHOMED,ACTIVE vs. GROUP,TOMBSTONE with same ip(s) => NOT REPLACE +MHOMED,RELEASED vs. GROUP,ACTIVE with different ip(s) => REPLACE +MHOMED,RELEASED vs. GROUP,TOMBSTONE with different ip(s) => REPLACE +MHOMED,TOMBSTONE vs. GROUP,ACTIVE with different ip(s) => REPLACE +MHOMED,TOMBSTONE vs. GROUP,TOMBSTONE with different ip(s) => REPLACE +MHOMED,ACTIVE vs. SGROUP,ACTIVE with same ip(s) => NOT REPLACE +MHOMED,ACTIVE vs. SGROUP,TOMBSTONE with same ip(s) => NOT REPLACE +MHOMED,RELEASED vs. SGROUP,ACTIVE with different ip(s) => REPLACE +MHOMED,RELEASED vs. SGROUP,TOMBSTONE with different ip(s) => REPLACE +MHOMED,TOMBSTONE vs. SGROUP,ACTIVE with different ip(s) => REPLACE +MHOMED,TOMBSTONE vs. SGROUP,TOMBSTONE with different ip(s) => REPLACE +MHOMED,ACTIVE vs. MHOMED,ACTIVE with different ip(s) => REPLACE +MHOMED,ACTIVE vs. MHOMED,TOMBSTONE with same ip(s) => NOT REPLACE +MHOMED,RELEASED vs. MHOMED,ACTIVE with different ip(s) => REPLACE +MHOMED,RELEASED vs. MHOMED,TOMBSTONE with different ip(s) => REPLACE +MHOMED,TOMBSTONE vs. MHOMED,ACTIVE with different ip(s) => REPLACE +MHOMED,TOMBSTONE vs. MHOMED,TOMBSTONE with different ip(s) => REPLACE +*/ +static enum _R_ACTION replace_mhomed_replica_vs_X_replica(struct winsdb_record *r1, struct wrepl_name *r2) +{ + if (!R_IS_ACTIVE(r1)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + if (!R_IS_SGROUP(r2) && R_IS_ACTIVE(r2)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + /* NOT REPLACE */ + return R_NOT_REPLACE; +} + +/* +active: +_UA_UA_SI_U<00> => REPLACE +_UA_UA_DI_P<00> => NOT REPLACE +_UA_UA_DI_O<00> => NOT REPLACE +_UA_UA_DI_N<00> => REPLACE +_UA_UT_SI_U<00> => NOT REPLACE +_UA_UT_DI_U<00> => NOT REPLACE +_UA_GA_SI_R<00> => REPLACE +_UA_GA_DI_R<00> => REPLACE +_UA_GT_SI_U<00> => NOT REPLACE +_UA_GT_DI_U<00> => NOT REPLACE +_UA_SA_SI_R<00> => REPLACE +_UA_SA_DI_R<00> => REPLACE +_UA_ST_SI_U<00> => NOT REPLACE +_UA_ST_DI_U<00> => NOT REPLACE +_UA_MA_SI_U<00> => REPLACE +_UA_MA_SP_U<00> => REPLACE +_UA_MA_DI_P<00> => NOT REPLACE +_UA_MA_DI_O<00> => NOT REPLACE +_UA_MA_DI_N<00> => REPLACE +_UA_MT_SI_U<00> => NOT REPLACE +_UA_MT_DI_U<00> => NOT REPLACE +Test Replica vs. owned active: some more UNIQUE,MHOMED combinations +_UA_UA_DI_A<00> => MHOMED_MERGE +_UA_MA_DI_A<00> => MHOMED_MERGE + +released: +_UR_UA_SI<00> => REPLACE +_UR_UA_DI<00> => REPLACE +_UR_UT_SI<00> => REPLACE +_UR_UT_DI<00> => REPLACE +_UR_GA_SI<00> => REPLACE +_UR_GA_DI<00> => REPLACE +_UR_GT_SI<00> => REPLACE +_UR_GT_DI<00> => REPLACE +_UR_SA_SI<00> => REPLACE +_UR_SA_DI<00> => REPLACE +_UR_ST_SI<00> => REPLACE +_UR_ST_DI<00> => REPLACE +_UR_MA_SI<00> => REPLACE +_UR_MA_DI<00> => REPLACE +_UR_MT_SI<00> => REPLACE +_UR_MT_DI<00> => REPLACE +*/ +static enum _R_ACTION replace_unique_owned_vs_X_replica(struct winsdb_record *r1, struct wrepl_name *r2) +{ + if (!R_IS_ACTIVE(r1)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + if (!R_IS_ACTIVE(r2)) { + /* NOT REPLACE, and PROPAGATE */ + return R_DO_PROPAGATE; + } + + if (R_IS_GROUP(r2) || R_IS_SGROUP(r2)) { + /* REPLACE and send a release demand to the old name owner */ + return R_DO_RELEASE_DEMAND; + } + + /* + * here we only have unique,active,owned vs. + * is unique,active,replica or mhomed,active,replica + */ + + if (r_1_is_subset_of_2_address_list(r1, r2, false)) { + /* + * if r1 has a subset(or same) of the addresses of r2 + * <=> + * if r2 has a superset(or same) of the addresses of r1 + * + * then replace the record + */ + return R_DO_REPLACE; + } + + /* + * in any other case, we need to do + * a name request to the old name holder + * to see if it's still there... + */ + return R_DO_CHALLENGE; +} + +/* +active: +_GA_UA_SI_U<00> => NOT REPLACE +_GA_UA_DI_U<00> => NOT REPLACE +_GA_UT_SI_U<00> => NOT REPLACE +_GA_UT_DI_U<00> => NOT REPLACE +_GA_GA_SI_U<00> => REPLACE +_GA_GA_DI_U<00> => REPLACE +_GA_GT_SI_U<00> => NOT REPLACE +_GA_GT_DI_U<00> => NOT REPLACE +_GA_SA_SI_U<00> => NOT REPLACE +_GA_SA_DI_U<00> => NOT REPLACE +_GA_ST_SI_U<00> => NOT REPLACE +_GA_ST_DI_U<00> => NOT REPLACE +_GA_MA_SI_U<00> => NOT REPLACE +_GA_MA_DI_U<00> => NOT REPLACE +_GA_MT_SI_U<00> => NOT REPLACE +_GA_MT_DI_U<00> => NOT REPLACE + +released: +_GR_UA_SI<00> => NOT REPLACE +_GR_UA_DI<00> => NOT REPLACE +_GR_UT_SI<00> => NOT REPLACE +_GR_UT_DI<00> => NOT REPLACE +_GR_GA_SI<00> => REPLACE +_GR_GA_DI<00> => REPLACE +_GR_GT_SI<00> => REPLACE +_GR_GT_DI<00> => REPLACE +_GR_SA_SI<00> => NOT REPLACE +_GR_SA_DI<00> => NOT REPLACE +_GR_ST_SI<00> => NOT REPLACE +_GR_ST_DI<00> => NOT REPLACE +_GR_MA_SI<00> => NOT REPLACE +_GR_MA_DI<00> => NOT REPLACE +_GR_MT_SI<00> => NOT REPLACE +_GR_MT_DI<00> => NOT REPLACE +*/ +static enum _R_ACTION replace_group_owned_vs_X_replica(struct winsdb_record *r1, struct wrepl_name *r2) +{ + if (R_IS_GROUP(r1) && R_IS_GROUP(r2)) { + if (!R_IS_ACTIVE(r1) || R_IS_ACTIVE(r2)) { + /* REPLACE */ + return R_DO_REPLACE; + } + } + + /* NOT REPLACE, but PROPAGATE */ + return R_DO_PROPAGATE; +} + +/* +active (not sgroup vs. sgroup yet!): +_SA_UA_SI_U<1c> => NOT REPLACE +_SA_UA_DI_U<1c> => NOT REPLACE +_SA_UT_SI_U<1c> => NOT REPLACE +_SA_UT_DI_U<1c> => NOT REPLACE +_SA_GA_SI_U<1c> => NOT REPLACE +_SA_GA_DI_U<1c> => NOT REPLACE +_SA_GT_SI_U<1c> => NOT REPLACE +_SA_GT_DI_U<1c> => NOT REPLACE +_SA_MA_SI_U<1c> => NOT REPLACE +_SA_MA_DI_U<1c> => NOT REPLACE +_SA_MT_SI_U<1c> => NOT REPLACE +_SA_MT_DI_U<1c> => NOT REPLACE + +Test Replica vs. owned active: SGROUP vs. SGROUP tests +_SA_SA_DI_U<1c> => SGROUP_MERGE +_SA_SA_SI_U<1c> => SGROUP_MERGE +_SA_SA_SP_U<1c> => SGROUP_MERGE +_SA_SA_SB_U<1c> => SGROUP_MERGE +_SA_ST_DI_U<1c> => NOT REPLACE +_SA_ST_SI_U<1c> => NOT REPLACE +_SA_ST_SP_U<1c> => NOT REPLACE +_SA_ST_SB_U<1c> => NOT REPLACE + +SGROUP,ACTIVE vs. SGROUP,* is not handled here! + +released: +_SR_UA_SI<1c> => REPLACE +_SR_UA_DI<1c> => REPLACE +_SR_UT_SI<1c> => REPLACE +_SR_UT_DI<1c> => REPLACE +_SR_GA_SI<1c> => REPLACE +_SR_GA_DI<1c> => REPLACE +_SR_GT_SI<1c> => REPLACE +_SR_GT_DI<1c> => REPLACE +_SR_SA_SI<1c> => REPLACE +_SR_SA_DI<1c> => REPLACE +_SR_ST_SI<1c> => REPLACE +_SR_ST_DI<1c> => REPLACE +_SR_MA_SI<1c> => REPLACE +_SR_MA_DI<1c> => REPLACE +_SR_MT_SI<1c> => REPLACE +_SR_MT_DI<1c> => REPLACE +*/ +static enum _R_ACTION replace_sgroup_owned_vs_X_replica(struct winsdb_record *r1, struct wrepl_name *r2) +{ + if (!R_IS_ACTIVE(r1)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + if (!R_IS_SGROUP(r2) || !R_IS_ACTIVE(r2)) { + /* NOT REPLACE, but PROPAGATE */ + return R_DO_PROPAGATE; + } + + if (r_1_is_same_as_2_address_list(r1, r2, true)) { + /* + * as we're the old owner and the addresses and their + * owners are identical + */ + return R_NOT_REPLACE; + } + + /* not handled here: MERGE */ + return R_DO_SGROUP_MERGE; +} + +/* +active: +_MA_UA_SI_U<00> => REPLACE +_MA_UA_DI_P<00> => NOT REPLACE +_MA_UA_DI_O<00> => NOT REPLACE +_MA_UA_DI_N<00> => REPLACE +_MA_UT_SI_U<00> => NOT REPLACE +_MA_UT_DI_U<00> => NOT REPLACE +_MA_GA_SI_R<00> => REPLACE +_MA_GA_DI_R<00> => REPLACE +_MA_GT_SI_U<00> => NOT REPLACE +_MA_GT_DI_U<00> => NOT REPLACE +_MA_SA_SI_R<00> => REPLACE +_MA_SA_DI_R<00> => REPLACE +_MA_ST_SI_U<00> => NOT REPLACE +_MA_ST_DI_U<00> => NOT REPLACE +_MA_MA_SI_U<00> => REPLACE +_MA_MA_SP_U<00> => REPLACE +_MA_MA_DI_P<00> => NOT REPLACE +_MA_MA_DI_O<00> => NOT REPLACE +_MA_MA_DI_N<00> => REPLACE +_MA_MT_SI_U<00> => NOT REPLACE +_MA_MT_DI_U<00> => NOT REPLACE +Test Replica vs. owned active: some more MHOMED combinations +_MA_MA_SP_U<00> => REPLACE +_MA_MA_SM_U<00> => REPLACE +_MA_MA_SB_P<00> => MHOMED_MERGE +_MA_MA_SB_A<00> => MHOMED_MERGE +_MA_MA_SB_PRA<00> => NOT REPLACE +_MA_MA_SB_O<00> => NOT REPLACE +_MA_MA_SB_N<00> => REPLACE +Test Replica vs. owned active: some more UNIQUE,MHOMED combinations +_MA_UA_SB_P<00> => MHOMED_MERGE + +released: +_MR_UA_SI<00> => REPLACE +_MR_UA_DI<00> => REPLACE +_MR_UT_SI<00> => REPLACE +_MR_UT_DI<00> => REPLACE +_MR_GA_SI<00> => REPLACE +_MR_GA_DI<00> => REPLACE +_MR_GT_SI<00> => REPLACE +_MR_GT_DI<00> => REPLACE +_MR_SA_SI<00> => REPLACE +_MR_SA_DI<00> => REPLACE +_MR_ST_SI<00> => REPLACE +_MR_ST_DI<00> => REPLACE +_MR_MA_SI<00> => REPLACE +_MR_MA_DI<00> => REPLACE +_MR_MT_SI<00> => REPLACE +_MR_MT_DI<00> => REPLACE +*/ +static enum _R_ACTION replace_mhomed_owned_vs_X_replica(struct winsdb_record *r1, struct wrepl_name *r2) +{ + if (!R_IS_ACTIVE(r1)) { + /* REPLACE */ + return R_DO_REPLACE; + } + + if (!R_IS_ACTIVE(r2)) { + /* NOT REPLACE, but PROPAGATE */ + return R_DO_PROPAGATE; + } + + if (R_IS_GROUP(r2) || R_IS_SGROUP(r2)) { + /* REPLACE and send a release demand to the old name owner */ + return R_DO_RELEASE_DEMAND; + } + + /* + * here we only have mhomed,active,owned vs. + * is unique,active,replica or mhomed,active,replica + */ + + if (r_1_is_subset_of_2_address_list(r1, r2, false)) { + /* + * if r1 has a subset(or same) of the addresses of r2 + * <=> + * if r2 has a superset(or same) of the addresses of r1 + * + * then replace the record + */ + return R_DO_REPLACE; + } + + /* + * in any other case, we need to do + * a name request to the old name holder + * to see if it's still there... + */ + return R_DO_CHALLENGE; +} + +static NTSTATUS r_do_add(struct wreplsrv_partner *partner, + TALLOC_CTX *mem_ctx, + struct wrepl_wins_owner *owner, + struct wrepl_name *replica) +{ + struct winsdb_record *rec; + uint32_t i; + uint8_t ret; + + rec = talloc(mem_ctx, struct winsdb_record); + NT_STATUS_HAVE_NO_MEMORY(rec); + + rec->name = &replica->name; + rec->type = replica->type; + rec->state = replica->state; + rec->node = replica->node; + rec->is_static = replica->is_static; + rec->expire_time= time(NULL) + partner->service->config.verify_interval; + rec->version = replica->version_id; + rec->wins_owner = replica->owner; + rec->addresses = winsdb_addr_list_make(rec); + NT_STATUS_HAVE_NO_MEMORY(rec->addresses); + rec->registered_by = NULL; + + for (i=0; i < replica->num_addresses; i++) { + /* TODO: find out if rec->expire_time is correct here */ + rec->addresses = winsdb_addr_list_add(partner->service->wins_db, + rec, rec->addresses, + replica->addresses[i].address, + replica->addresses[i].owner, + rec->expire_time, + false); + NT_STATUS_HAVE_NO_MEMORY(rec->addresses); + } + + ret = winsdb_add(partner->service->wins_db, rec, 0); + if (ret != NBT_RCODE_OK) { + DEBUG(0,("Failed to add record %s: %u\n", + nbt_name_string(mem_ctx, &replica->name), ret)); + return NT_STATUS_FOOBAR; + } + + DEBUG(4,("added record %s\n", + nbt_name_string(mem_ctx, &replica->name))); + + return NT_STATUS_OK; +} + +static NTSTATUS r_do_replace(struct wreplsrv_partner *partner, + TALLOC_CTX *mem_ctx, + struct winsdb_record *rec, + struct wrepl_wins_owner *owner, + struct wrepl_name *replica) +{ + uint32_t i; + uint8_t ret; + + rec->name = &replica->name; + rec->type = replica->type; + rec->state = replica->state; + rec->node = replica->node; + rec->is_static = replica->is_static; + rec->expire_time= time(NULL) + partner->service->config.verify_interval; + rec->version = replica->version_id; + rec->wins_owner = replica->owner; + rec->addresses = winsdb_addr_list_make(rec); + NT_STATUS_HAVE_NO_MEMORY(rec->addresses); + rec->registered_by = NULL; + + for (i=0; i < replica->num_addresses; i++) { + /* TODO: find out if rec->expire_time is correct here */ + rec->addresses = winsdb_addr_list_add(partner->service->wins_db, + rec, rec->addresses, + replica->addresses[i].address, + replica->addresses[i].owner, + rec->expire_time, + false); + NT_STATUS_HAVE_NO_MEMORY(rec->addresses); + } + + ret = winsdb_modify(partner->service->wins_db, rec, 0); + if (ret != NBT_RCODE_OK) { + DEBUG(0,("Failed to replace record %s: %u\n", + nbt_name_string(mem_ctx, &replica->name), ret)); + return NT_STATUS_FOOBAR; + } + + DEBUG(4,("replaced record %s\n", + nbt_name_string(mem_ctx, &replica->name))); + + return NT_STATUS_OK; +} + +static NTSTATUS r_not_replace(struct wreplsrv_partner *partner, + TALLOC_CTX *mem_ctx, + struct winsdb_record *rec, + struct wrepl_wins_owner *owner, + struct wrepl_name *replica) +{ + DEBUG(4,("not replace record %s\n", + nbt_name_string(mem_ctx, &replica->name))); + return NT_STATUS_OK; +} + +static NTSTATUS r_do_propagate(struct wreplsrv_partner *partner, + TALLOC_CTX *mem_ctx, + struct winsdb_record *rec, + struct wrepl_wins_owner *owner, + struct wrepl_name *replica) +{ + uint8_t ret; + uint32_t modify_flags; + + /* + * allocate a new version id for the record to that it'll be replicated + */ + modify_flags = WINSDB_FLAG_ALLOC_VERSION | WINSDB_FLAG_TAKE_OWNERSHIP; + + ret = winsdb_modify(partner->service->wins_db, rec, modify_flags); + if (ret != NBT_RCODE_OK) { + DEBUG(0,("Failed to replace record %s: %u\n", + nbt_name_string(mem_ctx, &replica->name), ret)); + return NT_STATUS_FOOBAR; + } + + DEBUG(4,("propagated record %s\n", + nbt_name_string(mem_ctx, &replica->name))); + + return NT_STATUS_OK; +} + +/* +Test Replica vs. owned active: some more MHOMED combinations +_MA_MA_SP_U<00>: C:MHOMED vs. B:ALL => B:ALL => REPLACE +_MA_MA_SM_U<00>: C:MHOMED vs. B:MHOMED => B:MHOMED => REPLACE +_MA_MA_SB_P<00>: C:MHOMED vs. B:BEST (C:MHOMED) => B:MHOMED => MHOMED_MERGE +_MA_MA_SB_A<00>: C:MHOMED vs. B:BEST (C:ALL) => B:MHOMED => MHOMED_MERGE +_MA_MA_SB_PRA<00>: C:MHOMED vs. B:BEST (C:BEST) => C:MHOMED => NOT REPLACE +_MA_MA_SB_O<00>: C:MHOMED vs. B:BEST (B:B_3_4) =>C:MHOMED => NOT REPLACE +_MA_MA_SB_N<00>: C:MHOMED vs. B:BEST (NEGATIVE) => B:BEST => REPLACE +Test Replica vs. owned active: some more UNIQUE,MHOMED combinations +_MA_UA_SB_P<00>: C:MHOMED vs. B:UNIQUE,BEST (C:MHOMED) => B:MHOMED => MHOMED_MERGE +_UA_UA_DI_PRA<00>: C:BEST vs. B:BEST2 (C:BEST2,LR:BEST2) => C:BEST => NOT REPLACE +_UA_UA_DI_A<00>: C:BEST vs. B:BEST2 (C:ALL) => B:MHOMED => MHOMED_MERGE +_UA_MA_DI_A<00>: C:BEST vs. B:BEST2 (C:ALL) => B:MHOMED => MHOMED_MERGE +*/ +static NTSTATUS r_do_mhomed_merge(struct wreplsrv_partner *partner, + TALLOC_CTX *mem_ctx, + struct winsdb_record *rec, + struct wrepl_wins_owner *owner, + struct wrepl_name *replica) +{ + struct winsdb_record *merge; + uint32_t i,j; + uint8_t ret; + size_t len; + + merge = talloc(mem_ctx, struct winsdb_record); + NT_STATUS_HAVE_NO_MEMORY(merge); + + merge->name = &replica->name; + merge->type = WREPL_TYPE_MHOMED; + merge->state = replica->state; + merge->node = replica->node; + merge->is_static = replica->is_static; + merge->expire_time = time(NULL) + partner->service->config.verify_interval; + merge->version = replica->version_id; + merge->wins_owner = replica->owner; + merge->addresses = winsdb_addr_list_make(merge); + NT_STATUS_HAVE_NO_MEMORY(merge->addresses); + merge->registered_by = NULL; + + for (i=0; i < replica->num_addresses; i++) { + merge->addresses = winsdb_addr_list_add(partner->service->wins_db, + merge, merge->addresses, + replica->addresses[i].address, + replica->addresses[i].owner, + merge->expire_time, + false); + NT_STATUS_HAVE_NO_MEMORY(merge->addresses); + } + + len = winsdb_addr_list_length(rec->addresses); + + for (i=0; i < len; i++) { + bool found = false; + for (j=0; j < replica->num_addresses; j++) { + if (strcmp(replica->addresses[j].address, rec->addresses[i]->address) == 0) { + found = true; + break; + } + } + if (found) continue; + + merge->addresses = winsdb_addr_list_add(partner->service->wins_db, + merge, merge->addresses, + rec->addresses[i]->address, + rec->addresses[i]->wins_owner, + rec->addresses[i]->expire_time, + false); + NT_STATUS_HAVE_NO_MEMORY(merge->addresses); + } + + ret = winsdb_modify(partner->service->wins_db, merge, 0); + if (ret != NBT_RCODE_OK) { + DEBUG(0,("Failed to modify mhomed merge record %s: %u\n", + nbt_name_string(mem_ctx, &replica->name), ret)); + return NT_STATUS_FOOBAR; + } + + DEBUG(4,("mhomed merge record %s\n", + nbt_name_string(mem_ctx, &replica->name))); + + return NT_STATUS_OK; +} + +struct r_do_challenge_state { + struct messaging_context *msg_ctx; + struct wreplsrv_partner *partner; + struct winsdb_record *rec; + struct wrepl_wins_owner owner; + struct wrepl_name replica; + struct nbtd_proxy_wins_challenge r; +}; + +static void r_do_late_release_demand_handler(struct irpc_request *ireq) +{ + NTSTATUS status; + struct r_do_challenge_state *state = talloc_get_type(ireq->async.private, + struct r_do_challenge_state); + + status = irpc_call_recv(ireq); + /* don't care about the result */ + talloc_free(state); +} + +static NTSTATUS r_do_late_release_demand(struct r_do_challenge_state *state) +{ + struct irpc_request *ireq; + struct server_id *nbt_servers; + struct nbtd_proxy_wins_release_demand r; + uint32_t i; + + DEBUG(4,("late release demand record %s\n", + nbt_name_string(state, &state->replica.name))); + + nbt_servers = irpc_servers_byname(state->msg_ctx, state, "nbt_server"); + if ((nbt_servers == NULL) || (nbt_servers[0].id == 0)) { + return NT_STATUS_INTERNAL_ERROR; + } + + r.in.name = state->replica.name; + r.in.num_addrs = state->r.out.num_addrs; + r.in.addrs = talloc_array(state, struct nbtd_proxy_wins_addr, r.in.num_addrs); + NT_STATUS_HAVE_NO_MEMORY(r.in.addrs); + /* TODO: fix pidl to handle inline ipv4address arrays */ + for (i=0; i < r.in.num_addrs; i++) { + r.in.addrs[i].addr = state->r.out.addrs[i].addr; + } + + ireq = IRPC_CALL_SEND(state->msg_ctx, nbt_servers[0], + irpc, NBTD_PROXY_WINS_RELEASE_DEMAND, + &r, state); + NT_STATUS_HAVE_NO_MEMORY(ireq); + + ireq->async.fn = r_do_late_release_demand_handler; + ireq->async.private = state; + + return NT_STATUS_OK; +} + +/* +Test Replica vs. owned active: some more MHOMED combinations +_MA_MA_SP_U<00>: C:MHOMED vs. B:ALL => B:ALL => REPLACE +_MA_MA_SM_U<00>: C:MHOMED vs. B:MHOMED => B:MHOMED => REPLACE +_MA_MA_SB_P<00>: C:MHOMED vs. B:BEST (C:MHOMED) => B:MHOMED => MHOMED_MERGE +_MA_MA_SB_A<00>: C:MHOMED vs. B:BEST (C:ALL) => B:MHOMED => MHOMED_MERGE +_MA_MA_SB_PRA<00>: C:MHOMED vs. B:BEST (C:BEST) => C:MHOMED => NOT REPLACE +_MA_MA_SB_O<00>: C:MHOMED vs. B:BEST (B:B_3_4) =>C:MHOMED => NOT REPLACE +_MA_MA_SB_N<00>: C:MHOMED vs. B:BEST (NEGATIVE) => B:BEST => REPLACE +Test Replica vs. owned active: some more UNIQUE,MHOMED combinations +_MA_UA_SB_P<00>: C:MHOMED vs. B:UNIQUE,BEST (C:MHOMED) => B:MHOMED => MHOMED_MERGE +_UA_UA_DI_PRA<00>: C:BEST vs. B:BEST2 (C:BEST2,LR:BEST2) => C:BEST => NOT REPLACE +_UA_UA_DI_A<00>: C:BEST vs. B:BEST2 (C:ALL) => B:MHOMED => MHOMED_MERGE +_UA_MA_DI_A<00>: C:BEST vs. B:BEST2 (C:ALL) => B:MHOMED => MHOMED_MERGE +*/ +static void r_do_challenge_handler(struct irpc_request *ireq) +{ + NTSTATUS status; + struct r_do_challenge_state *state = talloc_get_type(ireq->async.private, + struct r_do_challenge_state); + bool old_is_subset = false; + bool new_is_subset = false; + bool found = false; + uint32_t i,j; + uint32_t num_rec_addrs; + + status = irpc_call_recv(ireq); + + DEBUG(4,("r_do_challenge_handler: %s: %s\n", + nbt_name_string(state, &state->replica.name), nt_errstr(status))); + + if (NT_STATUS_EQUAL(NT_STATUS_IO_TIMEOUT, status) || + NT_STATUS_EQUAL(NT_STATUS_OBJECT_NAME_NOT_FOUND, status)) { + r_do_replace(state->partner, state, state->rec, &state->owner, &state->replica); + talloc_free(state); + return; + } + + for (i=0; i < state->replica.num_addresses; i++) { + found = false; + new_is_subset = true; + for (j=0; j < state->r.out.num_addrs; j++) { + if (strcmp(state->replica.addresses[i].address, state->r.out.addrs[j].addr) == 0) { + found = true; + break; + } + } + if (found) continue; + + new_is_subset = false; + break; + } + + if (!new_is_subset) { + r_not_replace(state->partner, state, state->rec, &state->owner, &state->replica); + talloc_free(state); + return; + } + + num_rec_addrs = winsdb_addr_list_length(state->rec->addresses); + for (i=0; i < num_rec_addrs; i++) { + found = false; + old_is_subset = true; + for (j=0; j < state->r.out.num_addrs; j++) { + if (strcmp(state->rec->addresses[i]->address, state->r.out.addrs[j].addr) == 0) { + found = true; + break; + } + } + if (found) continue; + + old_is_subset = false; + break; + } + + if (!old_is_subset) { + status = r_do_late_release_demand(state); + /* + * only free state on error, because we pass it down, + * and r_do_late_release_demand() will free it + */ + if (!NT_STATUS_IS_OK(status)) { + talloc_free(state); + } + return; + } + + r_do_mhomed_merge(state->partner, state, state->rec, &state->owner, &state->replica); + talloc_free(state); +} + +static NTSTATUS r_do_challenge(struct wreplsrv_partner *partner, + TALLOC_CTX *mem_ctx, + struct winsdb_record *rec, + struct wrepl_wins_owner *owner, + struct wrepl_name *replica) +{ + struct irpc_request *ireq; + struct r_do_challenge_state *state; + struct server_id *nbt_servers; + const char **addrs; + uint32_t i; + + DEBUG(4,("challenge record %s\n", + nbt_name_string(mem_ctx, &replica->name))); + + state = talloc_zero(mem_ctx, struct r_do_challenge_state); + NT_STATUS_HAVE_NO_MEMORY(state); + state->msg_ctx = partner->service->task->msg_ctx; + state->partner = partner; + state->rec = talloc_steal(state, rec); + state->owner = *owner; + state->replica = *replica; + /* some stuff to have valid memory pointers in the async complete function */ + state->replica.name = *state->rec->name; + talloc_steal(state, replica->owner); + talloc_steal(state, replica->addresses); + + nbt_servers = irpc_servers_byname(state->msg_ctx, state, "nbt_server"); + if ((nbt_servers == NULL) || (nbt_servers[0].id == 0)) { + return NT_STATUS_INTERNAL_ERROR; + } + + state->r.in.name = *rec->name; + state->r.in.num_addrs = winsdb_addr_list_length(rec->addresses); + state->r.in.addrs = talloc_array(state, struct nbtd_proxy_wins_addr, state->r.in.num_addrs); + NT_STATUS_HAVE_NO_MEMORY(state->r.in.addrs); + /* TODO: fix pidl to handle inline ipv4address arrays */ + addrs = winsdb_addr_string_list(state->r.in.addrs, rec->addresses); + NT_STATUS_HAVE_NO_MEMORY(addrs); + for (i=0; i < state->r.in.num_addrs; i++) { + state->r.in.addrs[i].addr = addrs[i]; + } + + ireq = IRPC_CALL_SEND(state->msg_ctx, nbt_servers[0], + irpc, NBTD_PROXY_WINS_CHALLENGE, + &state->r, state); + NT_STATUS_HAVE_NO_MEMORY(ireq); + + ireq->async.fn = r_do_challenge_handler; + ireq->async.private = state; + + talloc_steal(partner, state); + return NT_STATUS_OK; +} + +struct r_do_release_demand_state { + struct messaging_context *msg_ctx; + struct nbtd_proxy_wins_release_demand r; +}; + +static void r_do_release_demand_handler(struct irpc_request *ireq) +{ + NTSTATUS status; + struct r_do_release_demand_state *state = talloc_get_type(ireq->async.private, + struct r_do_release_demand_state); + + status = irpc_call_recv(ireq); + /* don't care about the result */ + talloc_free(state); +} + +static NTSTATUS r_do_release_demand(struct wreplsrv_partner *partner, + TALLOC_CTX *mem_ctx, + struct winsdb_record *rec, + struct wrepl_wins_owner *owner, + struct wrepl_name *replica) +{ + NTSTATUS status; + struct irpc_request *ireq; + struct server_id *nbt_servers; + const char **addrs; + struct winsdb_addr **addresses; + struct r_do_release_demand_state *state; + uint32_t i; + + /* + * we need to get a reference to the old addresses, + * as we need to send a release demand to them after replacing the record + * and r_do_replace() will modify rec->addresses + */ + addresses = rec->addresses; + + status = r_do_replace(partner, mem_ctx, rec, owner, replica); + NT_STATUS_NOT_OK_RETURN(status); + + DEBUG(4,("release demand record %s\n", + nbt_name_string(mem_ctx, &replica->name))); + + state = talloc_zero(mem_ctx, struct r_do_release_demand_state); + NT_STATUS_HAVE_NO_MEMORY(state); + state->msg_ctx = partner->service->task->msg_ctx; + + nbt_servers = irpc_servers_byname(state->msg_ctx, state, "nbt_server"); + if ((nbt_servers == NULL) || (nbt_servers[0].id == 0)) { + return NT_STATUS_INTERNAL_ERROR; + } + + state->r.in.name = *rec->name; + state->r.in.num_addrs = winsdb_addr_list_length(addresses); + state->r.in.addrs = talloc_array(state, struct nbtd_proxy_wins_addr, + state->r.in.num_addrs); + NT_STATUS_HAVE_NO_MEMORY(state->r.in.addrs); + /* TODO: fix pidl to handle inline ipv4address arrays */ + addrs = winsdb_addr_string_list(state->r.in.addrs, addresses); + NT_STATUS_HAVE_NO_MEMORY(addrs); + for (i=0; i < state->r.in.num_addrs; i++) { + state->r.in.addrs[i].addr = addrs[i]; + } + + ireq = IRPC_CALL_SEND(state->msg_ctx, nbt_servers[0], + irpc, NBTD_PROXY_WINS_RELEASE_DEMAND, + &state->r, state); + NT_STATUS_HAVE_NO_MEMORY(ireq); + + ireq->async.fn = r_do_release_demand_handler; + ireq->async.private = state; + + talloc_steal(partner, state); + return NT_STATUS_OK; +} + +/* +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4 vs. B:A_3_4 => NOT REPLACE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4 vs. B:NULL => NOT REPLACE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4_X_3_4 vs. B:A_3_4 => NOT REPLACE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:B_3_4 vs. B:A_3_4 => REPLACE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4 vs. B:A_3_4_OWNER_B => REPLACE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4_OWNER_B vs. B:A_3_4 => REPLACE + +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4 vs. B:B_3_4 => C:A_3_4_B_3_4 => SGROUP_MERGE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:B_3_4_X_3_4 vs. B:A_3_4 => B:A_3_4_X_3_4 => SGROUP_MERGE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:X_3_4 vs. B:A_3_4 => C:A_3_4_X_3_4 => SGROUP_MERGE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:A_3_4_X_3_4 vs. B:A_3_4_OWNER_B => B:A_3_4_OWNER_B_X_3_4 => SGROUP_MERGE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:B_3_4_X_3_4 vs. B:B_3_4_X_1_2 => C:B_3_4_X_1_2_3_4 => SGROUP_MERGE +SGROUP,ACTIVE vs. SGROUP,ACTIVE A:B_3_4_X_3_4 vs. B:NULL => B:X_3_4 => SGROUP_MERGE + +Test Replica vs. owned active: SGROUP vs. SGROUP tests +_SA_SA_DI_U<1c> => SGROUP_MERGE +_SA_SA_SI_U<1c> => SGROUP_MERGE +_SA_SA_SP_U<1c> => SGROUP_MERGE +_SA_SA_SB_U<1c> => SGROUP_MERGE +*/ +static NTSTATUS r_do_sgroup_merge(struct wreplsrv_partner *partner, + TALLOC_CTX *mem_ctx, + struct winsdb_record *rec, + struct wrepl_wins_owner *owner, + struct wrepl_name *replica) +{ + struct winsdb_record *merge; + uint32_t modify_flags = 0; + uint32_t i,j; + uint8_t ret; + size_t len; + bool changed_old_addrs = false; + bool become_owner = true; + + merge = talloc(mem_ctx, struct winsdb_record); + NT_STATUS_HAVE_NO_MEMORY(merge); + + merge->name = &replica->name; + merge->type = replica->type; + merge->state = replica->state; + merge->node = replica->node; + merge->is_static = replica->is_static; + merge->expire_time = time(NULL) + partner->service->config.verify_interval; + merge->version = replica->version_id; + merge->wins_owner = replica->owner; + merge->addresses = winsdb_addr_list_make(merge); + NT_STATUS_HAVE_NO_MEMORY(merge->addresses); + merge->registered_by = NULL; + + len = winsdb_addr_list_length(rec->addresses); + + for (i=0; i < len; i++) { + bool found = false; + + for (j=0; j < replica->num_addresses; j++) { + if (strcmp(rec->addresses[i]->address, replica->addresses[j].address) != 0) { + continue; + } + + found = true; + + if (strcmp(rec->addresses[i]->wins_owner, replica->addresses[j].owner) != 0) { + changed_old_addrs = true; + break; + } + break; + } + + /* + * if the address isn't in the replica and is owned by replicas owner, + * it won't be added to the merged record + */ + if (!found && strcmp(rec->addresses[i]->wins_owner, owner->address) == 0) { + changed_old_addrs = true; + continue; + } + + /* + * add the address to the merge result, with the old owner and expire_time, + * the owner and expire_time will be overwritten later if the address is + * in the replica too + */ + merge->addresses = winsdb_addr_list_add(partner->service->wins_db, + merge, merge->addresses, + rec->addresses[i]->address, + rec->addresses[i]->wins_owner, + rec->addresses[i]->expire_time, + false); + NT_STATUS_HAVE_NO_MEMORY(merge->addresses); + } + + for (i=0; i < replica->num_addresses; i++) { + merge->addresses = winsdb_addr_list_add(partner->service->wins_db, + merge, merge->addresses, + replica->addresses[i].address, + replica->addresses[i].owner, + merge->expire_time, + false); + NT_STATUS_HAVE_NO_MEMORY(merge->addresses); + } + + /* we the old addresses change changed we don't become the owner */ + if (changed_old_addrs) { + become_owner = false; + } + + /* if we're the owner of the old record, we'll be the owner of the new one too */ + if (strcmp(rec->wins_owner, partner->service->wins_db->local_owner)==0) { + become_owner = true; + } + + /* + * if the result has no addresses we take the ownership + */ + len = winsdb_addr_list_length(merge->addresses); + if (len == 0) { + become_owner = true; + } + + /* + * if addresses of the old record will be changed the replica owner + * will be owner of the merge result, otherwise we take the ownership + */ + if (become_owner) { + modify_flags = WINSDB_FLAG_ALLOC_VERSION | WINSDB_FLAG_TAKE_OWNERSHIP; + } + + ret = winsdb_modify(partner->service->wins_db, merge, modify_flags); + if (ret != NBT_RCODE_OK) { + DEBUG(0,("Failed to modify sgroup merge record %s: %u\n", + nbt_name_string(mem_ctx, &replica->name), ret)); + return NT_STATUS_FOOBAR; + } + + DEBUG(4,("sgroup merge record %s\n", + nbt_name_string(mem_ctx, &replica->name))); + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_apply_one_record(struct wreplsrv_partner *partner, + TALLOC_CTX *mem_ctx, + struct wrepl_wins_owner *owner, + struct wrepl_name *replica) +{ + NTSTATUS status; + struct winsdb_record *rec = NULL; + enum _R_ACTION action = R_INVALID; + bool same_owner = false; + bool replica_vs_replica = false; + bool local_vs_replica = false; + + status = winsdb_lookup(partner->service->wins_db, + &replica->name, mem_ctx, &rec); + if (NT_STATUS_EQUAL(NT_STATUS_OBJECT_NAME_NOT_FOUND, status)) { + return r_do_add(partner, mem_ctx, owner, replica); + } + NT_STATUS_NOT_OK_RETURN(status); + + if (strcmp(rec->wins_owner, partner->service->wins_db->local_owner)==0) { + local_vs_replica = true; + } else if (strcmp(rec->wins_owner, owner->address)==0) { + same_owner = true; + } else { + replica_vs_replica = true; + } + + if (rec->is_static && !same_owner) { + action = R_NOT_REPLACE; + + /* + * if we own the local record, then propagate it back to + * the other wins servers. + * to prevent ping-pong with other servers, we don't do this + * if the replica is static too. + * + * It seems that w2k3 doesn't do this, but I thing that's a bug + * and doing propagation helps to have consistent data on all servers + */ + if (local_vs_replica && !replica->is_static) { + action = R_DO_PROPAGATE; + } + } else if (replica->is_static && !rec->is_static && !same_owner) { + action = R_DO_REPLACE; + } else if (same_owner) { + action = replace_same_owner(rec, replica); + } else if (replica_vs_replica) { + switch (rec->type) { + case WREPL_TYPE_UNIQUE: + action = replace_unique_replica_vs_X_replica(rec, replica); + break; + case WREPL_TYPE_GROUP: + action = replace_group_replica_vs_X_replica(rec, replica); + break; + case WREPL_TYPE_SGROUP: + action = replace_sgroup_replica_vs_X_replica(rec, replica); + break; + case WREPL_TYPE_MHOMED: + action = replace_mhomed_replica_vs_X_replica(rec, replica); + break; + } + } else if (local_vs_replica) { + switch (rec->type) { + case WREPL_TYPE_UNIQUE: + action = replace_unique_owned_vs_X_replica(rec, replica); + break; + case WREPL_TYPE_GROUP: + action = replace_group_owned_vs_X_replica(rec, replica); + break; + case WREPL_TYPE_SGROUP: + action = replace_sgroup_owned_vs_X_replica(rec, replica); + break; + case WREPL_TYPE_MHOMED: + action = replace_mhomed_owned_vs_X_replica(rec, replica); + break; + } + } + + DEBUG(4,("apply record %s: %s\n", + nbt_name_string(mem_ctx, &replica->name), _R_ACTION_enum_string(action))); + + switch (action) { + case R_INVALID: break; + case R_DO_REPLACE: + return r_do_replace(partner, mem_ctx, rec, owner, replica); + case R_NOT_REPLACE: + return r_not_replace(partner, mem_ctx, rec, owner, replica); + case R_DO_PROPAGATE: + return r_do_propagate(partner, mem_ctx, rec, owner, replica); + case R_DO_CHALLENGE: + return r_do_challenge(partner, mem_ctx, rec, owner, replica); + case R_DO_RELEASE_DEMAND: + return r_do_release_demand(partner, mem_ctx, rec, owner, replica); + case R_DO_SGROUP_MERGE: + return r_do_sgroup_merge(partner, mem_ctx, rec, owner, replica); + } + + return NT_STATUS_INTERNAL_ERROR; +} + +NTSTATUS wreplsrv_apply_records(struct wreplsrv_partner *partner, + struct wrepl_wins_owner *owner, + uint32_t num_names, struct wrepl_name *names) +{ + NTSTATUS status; + uint32_t i; + + DEBUG(4,("apply records count[%u]:owner[%s]:min[%llu]:max[%llu]:partner[%s]\n", + num_names, owner->address, + (long long)owner->min_version, + (long long)owner->max_version, + partner->address)); + + for (i=0; i < num_names; i++) { + TALLOC_CTX *tmp_mem = talloc_new(partner); + NT_STATUS_HAVE_NO_MEMORY(tmp_mem); + + status = wreplsrv_apply_one_record(partner, tmp_mem, + owner, &names[i]); + talloc_free(tmp_mem); + NT_STATUS_NOT_OK_RETURN(status); + } + + status = wreplsrv_add_table(partner->service, + partner->service, + &partner->service->table, + owner->address, + owner->max_version); + NT_STATUS_NOT_OK_RETURN(status); + + return NT_STATUS_OK; +} diff --git a/source4/wrepl_server/wrepl_in_call.c b/source4/wrepl_server/wrepl_in_call.c new file mode 100644 index 0000000000..0508c306e0 --- /dev/null +++ b/source4/wrepl_server/wrepl_in_call.c @@ -0,0 +1,576 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "lib/socket/socket.h" +#include "smbd/service_stream.h" +#include "libcli/wrepl/winsrepl.h" +#include "wrepl_server/wrepl_server.h" +#include "libcli/composite/composite.h" +#include "nbt_server/wins/winsdb.h" +#include "lib/ldb/include/ldb.h" +#include "lib/ldb/include/ldb_errors.h" +#include "system/time.h" + +static NTSTATUS wreplsrv_in_start_association(struct wreplsrv_in_call *call) +{ + struct wrepl_start *start = &call->req_packet.message.start; + struct wrepl_start *start_reply = &call->rep_packet.message.start_reply; + + if (call->req_packet.opcode & WREPL_OPCODE_BITS) { + /* + *if the assoc_ctx doesn't match ignore the packet + */ + if ((call->req_packet.assoc_ctx != call->wreplconn->assoc_ctx.our_ctx) + && (call->req_packet.assoc_ctx != 0)) { + return ERROR_INVALID_PARAMETER; + } + } else { + call->wreplconn->assoc_ctx.our_ctx = WREPLSRV_INVALID_ASSOC_CTX; + return NT_STATUS_OK; + } + +/* + * it seems that we don't know all details about the start_association + * to support replication with NT4 (it sends 1.1 instead of 5.2) + * we ignore the version numbers until we know all details + */ +#if 0 + if (start->minor_version != 2 || start->major_version != 5) { + /* w2k terminate the connection if the versions doesn't match */ + return NT_STATUS_UNKNOWN_REVISION; + } +#endif + + call->wreplconn->assoc_ctx.stopped = false; + call->wreplconn->assoc_ctx.our_ctx = WREPLSRV_VALID_ASSOC_CTX; + call->wreplconn->assoc_ctx.peer_ctx = start->assoc_ctx; + + call->rep_packet.mess_type = WREPL_START_ASSOCIATION_REPLY; + start_reply->assoc_ctx = call->wreplconn->assoc_ctx.our_ctx; + start_reply->minor_version = 2; + start_reply->major_version = 5; + + /* + * nt4 uses 41 bytes for the start_association call + * so do it the same and as we don't know the meanings of this bytes + * we just send zeros and nt4, w2k and w2k3 seems to be happy with this + * + * if we don't do this nt4 uses an old version of the wins replication protocol + * and that would break nt4 <-> samba replication + */ + call->rep_packet.padding = data_blob_talloc(call, NULL, 21); + NT_STATUS_HAVE_NO_MEMORY(call->rep_packet.padding.data); + + memset(call->rep_packet.padding.data, 0, call->rep_packet.padding.length); + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_in_stop_assoc_ctx(struct wreplsrv_in_call *call) +{ + struct wrepl_stop *stop_out = &call->rep_packet.message.stop; + + call->wreplconn->assoc_ctx.stopped = true; + + call->rep_packet.mess_type = WREPL_STOP_ASSOCIATION; + stop_out->reason = 4; + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_in_stop_association(struct wreplsrv_in_call *call) +{ + /* + * w2k only check the assoc_ctx if the opcode has the 0x00007800 bits are set + */ + if (call->req_packet.opcode & WREPL_OPCODE_BITS) { + /* + *if the assoc_ctx doesn't match ignore the packet + */ + if (call->req_packet.assoc_ctx != call->wreplconn->assoc_ctx.our_ctx) { + return ERROR_INVALID_PARAMETER; + } + /* when the opcode bits are set the connection should be directly terminated */ + return NT_STATUS_CONNECTION_RESET; + } + + if (call->wreplconn->assoc_ctx.stopped) { + /* this causes the connection to be directly terminated */ + return NT_STATUS_CONNECTION_RESET; + } + + /* this will cause to not receive packets anymore and terminate the connection if the reply is send */ + call->terminate_after_send = true; + return wreplsrv_in_stop_assoc_ctx(call); +} + +static NTSTATUS wreplsrv_in_table_query(struct wreplsrv_in_call *call) +{ + struct wreplsrv_service *service = call->wreplconn->service; + struct wrepl_replication *repl_out = &call->rep_packet.message.replication; + struct wrepl_table *table_out = &call->rep_packet.message.replication.info.table; + + repl_out->command = WREPL_REPL_TABLE_REPLY; + + return wreplsrv_fill_wrepl_table(service, call, table_out, + service->wins_db->local_owner, true); +} + +static int wreplsrv_in_sort_wins_name(struct wrepl_wins_name *n1, + struct wrepl_wins_name *n2) +{ + if (n1->id < n2->id) return -1; + if (n1->id > n2->id) return 1; + return 0; +} + +static NTSTATUS wreplsrv_record2wins_name(TALLOC_CTX *mem_ctx, + struct wrepl_wins_name *name, + struct winsdb_record *rec) +{ + uint32_t num_ips, i; + struct wrepl_ip *ips; + + name->name = rec->name; + talloc_steal(mem_ctx, rec->name); + + name->id = rec->version; + name->unknown = "255.255.255.255"; + + name->flags = WREPL_NAME_FLAGS(rec->type, rec->state, rec->node, rec->is_static); + + switch (name->flags & 2) { + case 0: + name->addresses.ip = rec->addresses[0]->address; + talloc_steal(mem_ctx, rec->addresses[0]->address); + break; + case 2: + num_ips = winsdb_addr_list_length(rec->addresses); + ips = talloc_array(mem_ctx, struct wrepl_ip, num_ips); + NT_STATUS_HAVE_NO_MEMORY(ips); + + for (i = 0; i < num_ips; i++) { + ips[i].owner = rec->addresses[i]->wins_owner; + talloc_steal(ips, rec->addresses[i]->wins_owner); + ips[i].ip = rec->addresses[i]->address; + talloc_steal(ips, rec->addresses[i]->address); + } + + name->addresses.addresses.num_ips = num_ips; + name->addresses.addresses.ips = ips; + break; + } + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call) +{ + struct wreplsrv_service *service = call->wreplconn->service; + struct wrepl_wins_owner *owner_in = &call->req_packet.message.replication.info.owner; + struct wrepl_replication *repl_out = &call->rep_packet.message.replication; + struct wrepl_send_reply *reply_out = &call->rep_packet.message.replication.info.reply; + struct wreplsrv_owner *owner; + const char *owner_filter; + const char *filter; + struct ldb_result *res = NULL; + int ret; + struct wrepl_wins_name *names; + struct winsdb_record *rec; + NTSTATUS status; + uint32_t i, j; + time_t now = time(NULL); + + owner = wreplsrv_find_owner(service, service->table, owner_in->address); + + repl_out->command = WREPL_REPL_SEND_REPLY; + reply_out->num_names = 0; + reply_out->names = NULL; + + /* + * if we didn't know this owner, must be a bug in the partners client code... + * return an empty list. + */ + if (!owner) { + DEBUG(2,("WINSREPL:reply [0] records unknown owner[%s] to partner[%s]\n", + owner_in->address, call->wreplconn->partner->address)); + return NT_STATUS_OK; + } + + /* + * the client sends a max_version of 0, interpret it as + * (uint64_t)-1 + */ + if (owner_in->max_version == 0) { + owner_in->max_version = (uint64_t)-1; + } + + /* + * if the partner ask for nothing, or give invalid ranges, + * return an empty list. + */ + if (owner_in->min_version > owner_in->max_version) { + DEBUG(2,("WINSREPL:reply [0] records owner[%s] min[%llu] max[%llu] to partner[%s]\n", + owner_in->address, + (long long)owner_in->min_version, + (long long)owner_in->max_version, + call->wreplconn->partner->address)); + return NT_STATUS_OK; + } + + /* + * if the partner has already all records for nothing, or give invalid ranges, + * return an empty list. + */ + if (owner_in->min_version > owner->owner.max_version) { + DEBUG(2,("WINSREPL:reply [0] records owner[%s] min[%llu] max[%llu] to partner[%s]\n", + owner_in->address, + (long long)owner_in->min_version, + (long long)owner_in->max_version, + call->wreplconn->partner->address)); + return NT_STATUS_OK; + } + + owner_filter = wreplsrv_owner_filter(service, call, owner->owner.address); + NT_STATUS_HAVE_NO_MEMORY(owner_filter); + filter = talloc_asprintf(call, + "(&%s(objectClass=winsRecord)" + "(|(recordState=%u)(recordState=%u))" + "(versionID>=%llu)(versionID<=%llu))", + owner_filter, + WREPL_STATE_ACTIVE, WREPL_STATE_TOMBSTONE, + (long long)owner_in->min_version, + (long long)owner_in->max_version); + NT_STATUS_HAVE_NO_MEMORY(filter); + ret = ldb_search(service->wins_db->ldb, NULL, LDB_SCOPE_SUBTREE, filter, NULL, &res); + if (ret != LDB_SUCCESS) return NT_STATUS_INTERNAL_DB_CORRUPTION; + talloc_steal(call, res); + DEBUG(10,("WINSREPL: filter '%s' count %d\n", filter, res->count)); + + if (res->count == 0) { + DEBUG(2,("WINSREPL:reply [%u] records owner[%s] min[%llu] max[%llu] to partner[%s]\n", + res->count, owner_in->address, + (long long)owner_in->min_version, + (long long)owner_in->max_version, + call->wreplconn->partner->address)); + return NT_STATUS_OK; + } + + names = talloc_array(call, struct wrepl_wins_name, res->count); + NT_STATUS_HAVE_NO_MEMORY(names); + + for (i=0, j=0; i < res->count; i++) { + status = winsdb_record(service->wins_db, res->msgs[i], call, now, &rec); + NT_STATUS_NOT_OK_RETURN(status); + + /* + * it's possible that winsdb_record() made the record RELEASED + * because it's expired, but in the database it's still stored + * as ACTIVE... + * + * make sure we really only replicate ACTIVE and TOMBSTONE records + */ + if (rec->state == WREPL_STATE_ACTIVE || rec->state == WREPL_STATE_TOMBSTONE) { + status = wreplsrv_record2wins_name(names, &names[j], rec); + NT_STATUS_NOT_OK_RETURN(status); + j++; + } + + talloc_free(rec); + talloc_free(res->msgs[i]); + } + + /* sort the names before we send them */ + qsort(names, j, sizeof(struct wrepl_wins_name), (comparison_fn_t)wreplsrv_in_sort_wins_name); + + DEBUG(2,("WINSREPL:reply [%u] records owner[%s] min[%llu] max[%llu] to partner[%s]\n", + j, owner_in->address, + (long long)owner_in->min_version, + (long long)owner_in->max_version, + call->wreplconn->partner->address)); + + reply_out->num_names = j; + reply_out->names = names; + + return NT_STATUS_OK; +} + +struct wreplsrv_in_update_state { + struct wreplsrv_in_connection *wrepl_in; + struct wreplsrv_out_connection *wrepl_out; + struct composite_context *creq; + struct wreplsrv_pull_cycle_io cycle_io; +}; + +static void wreplsrv_in_update_handler(struct composite_context *creq) +{ + struct wreplsrv_in_update_state *update_state = talloc_get_type(creq->async.private_data, + struct wreplsrv_in_update_state); + NTSTATUS status; + + status = wreplsrv_pull_cycle_recv(creq); + + talloc_free(update_state->wrepl_out); + + wreplsrv_terminate_in_connection(update_state->wrepl_in, nt_errstr(status)); +} + +static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call) +{ + struct wreplsrv_in_connection *wrepl_in = call->wreplconn; + struct wreplsrv_out_connection *wrepl_out; + struct wrepl_table *update_in = &call->req_packet.message.replication.info.table; + struct wreplsrv_in_update_state *update_state; + uint16_t fde_flags; + + DEBUG(2,("WREPL_REPL_UPDATE: partner[%s] initiator[%s] num_owners[%u]\n", + call->wreplconn->partner->address, + update_in->initiator, update_in->partner_count)); + + /* + * we need to flip the connection into a client connection + * and do a WREPL_REPL_SEND_REQUEST's on the that connection + * and then stop this connection + */ + fde_flags = event_get_fd_flags(wrepl_in->conn->event.fde); + talloc_free(wrepl_in->conn->event.fde); + wrepl_in->conn->event.fde = NULL; + + update_state = talloc(wrepl_in, struct wreplsrv_in_update_state); + NT_STATUS_HAVE_NO_MEMORY(update_state); + + wrepl_out = talloc(update_state, struct wreplsrv_out_connection); + NT_STATUS_HAVE_NO_MEMORY(wrepl_out); + wrepl_out->service = wrepl_in->service; + wrepl_out->partner = wrepl_in->partner; + wrepl_out->assoc_ctx.our_ctx = wrepl_in->assoc_ctx.our_ctx; + wrepl_out->assoc_ctx.peer_ctx = wrepl_in->assoc_ctx.peer_ctx; + wrepl_out->sock = wrepl_socket_merge(wrepl_out, + wrepl_in->conn->event.ctx, + wrepl_in->conn->socket, + wrepl_in->packet); + NT_STATUS_HAVE_NO_MEMORY(wrepl_out->sock); + + event_set_fd_flags(wrepl_out->sock->event.fde, fde_flags); + + update_state->wrepl_in = wrepl_in; + update_state->wrepl_out = wrepl_out; + update_state->cycle_io.in.partner = wrepl_out->partner; + update_state->cycle_io.in.num_owners = update_in->partner_count; + update_state->cycle_io.in.owners = update_in->partners; + talloc_steal(update_state, update_in->partners); + update_state->cycle_io.in.wreplconn = wrepl_out; + update_state->creq = wreplsrv_pull_cycle_send(update_state, &update_state->cycle_io); + if (!update_state->creq) { + return NT_STATUS_INTERNAL_ERROR; + } + + update_state->creq->async.fn = wreplsrv_in_update_handler; + update_state->creq->async.private_data = update_state; + + return ERROR_INVALID_PARAMETER; +} + +static NTSTATUS wreplsrv_in_update2(struct wreplsrv_in_call *call) +{ + return wreplsrv_in_update(call); +} + +static NTSTATUS wreplsrv_in_inform(struct wreplsrv_in_call *call) +{ + struct wrepl_table *inform_in = &call->req_packet.message.replication.info.table; + + DEBUG(2,("WREPL_REPL_INFORM: partner[%s] initiator[%s] num_owners[%u]\n", + call->wreplconn->partner->address, + inform_in->initiator, inform_in->partner_count)); + + wreplsrv_out_partner_pull(call->wreplconn->partner, inform_in); + + /* we don't reply to WREPL_REPL_INFORM messages */ + return ERROR_INVALID_PARAMETER; +} + +static NTSTATUS wreplsrv_in_inform2(struct wreplsrv_in_call *call) +{ + return wreplsrv_in_inform(call); +} + +static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call) +{ + struct wrepl_replication *repl_in = &call->req_packet.message.replication; + NTSTATUS status; + + /* + * w2k only check the assoc_ctx if the opcode has the 0x00007800 bits are set + */ + if (call->req_packet.opcode & WREPL_OPCODE_BITS) { + /* + *if the assoc_ctx doesn't match ignore the packet + */ + if (call->req_packet.assoc_ctx != call->wreplconn->assoc_ctx.our_ctx) { + return ERROR_INVALID_PARAMETER; + } + } + + if (!call->wreplconn->partner) { + struct socket_address *partner_ip = socket_get_peer_addr(call->wreplconn->conn->socket, call); + + call->wreplconn->partner = wreplsrv_find_partner(call->wreplconn->service, partner_ip->addr); + if (!call->wreplconn->partner) { + DEBUG(1,("Failing WINS replication from non-partner %s\n", + partner_ip ? partner_ip->addr : NULL)); + return wreplsrv_in_stop_assoc_ctx(call); + } + } + + switch (repl_in->command) { + case WREPL_REPL_TABLE_QUERY: + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PUSH)) { + DEBUG(2,("Failing WINS replication TABLE_QUERY from non-push-partner %s\n", + call->wreplconn->partner->address)); + return wreplsrv_in_stop_assoc_ctx(call); + } + status = wreplsrv_in_table_query(call); + break; + + case WREPL_REPL_TABLE_REPLY: + return ERROR_INVALID_PARAMETER; + + case WREPL_REPL_SEND_REQUEST: + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PUSH)) { + DEBUG(2,("Failing WINS replication SEND_REQUESET from non-push-partner %s\n", + call->wreplconn->partner->address)); + return wreplsrv_in_stop_assoc_ctx(call); + } + status = wreplsrv_in_send_request(call); + break; + + case WREPL_REPL_SEND_REPLY: + return ERROR_INVALID_PARAMETER; + + case WREPL_REPL_UPDATE: + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) { + DEBUG(2,("Failing WINS replication UPDATE from non-pull-partner %s\n", + call->wreplconn->partner->address)); + return wreplsrv_in_stop_assoc_ctx(call); + } + status = wreplsrv_in_update(call); + break; + + case WREPL_REPL_UPDATE2: + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) { + DEBUG(2,("Failing WINS replication UPDATE2 from non-pull-partner %s\n", + call->wreplconn->partner->address)); + return wreplsrv_in_stop_assoc_ctx(call); + } + status = wreplsrv_in_update2(call); + break; + + case WREPL_REPL_INFORM: + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) { + DEBUG(2,("Failing WINS replication INFORM from non-pull-partner %s\n", + call->wreplconn->partner->address)); + return wreplsrv_in_stop_assoc_ctx(call); + } + status = wreplsrv_in_inform(call); + break; + + case WREPL_REPL_INFORM2: + if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) { + DEBUG(2,("Failing WINS replication INFORM2 from non-pull-partner %s\n", + call->wreplconn->partner->address)); + return wreplsrv_in_stop_assoc_ctx(call); + } + status = wreplsrv_in_inform2(call); + break; + + default: + return ERROR_INVALID_PARAMETER; + } + + if (NT_STATUS_IS_OK(status)) { + call->rep_packet.mess_type = WREPL_REPLICATION; + } + + return status; +} + +static NTSTATUS wreplsrv_in_invalid_assoc_ctx(struct wreplsrv_in_call *call) +{ + struct wrepl_start *start = &call->rep_packet.message.start; + + call->rep_packet.opcode = 0x00008583; + call->rep_packet.assoc_ctx = 0; + call->rep_packet.mess_type = WREPL_START_ASSOCIATION; + + start->assoc_ctx = 0x0000000a; + start->minor_version = 0x0001; + start->major_version = 0x0000; + + call->rep_packet.padding = data_blob_talloc(call, NULL, 4); + memset(call->rep_packet.padding.data, '\0', call->rep_packet.padding.length); + + return NT_STATUS_OK; +} + +NTSTATUS wreplsrv_in_call(struct wreplsrv_in_call *call) +{ + NTSTATUS status; + + if (!(call->req_packet.opcode & WREPL_OPCODE_BITS) + && (call->wreplconn->assoc_ctx.our_ctx == WREPLSRV_INVALID_ASSOC_CTX)) { + return wreplsrv_in_invalid_assoc_ctx(call); + } + + switch (call->req_packet.mess_type) { + case WREPL_START_ASSOCIATION: + status = wreplsrv_in_start_association(call); + break; + case WREPL_START_ASSOCIATION_REPLY: + /* this is not valid here, so we ignore it */ + return ERROR_INVALID_PARAMETER; + + case WREPL_STOP_ASSOCIATION: + status = wreplsrv_in_stop_association(call); + break; + + case WREPL_REPLICATION: + status = wreplsrv_in_replication(call); + break; + default: + /* everythingelse is also not valid here, so we ignore it */ + return ERROR_INVALID_PARAMETER; + } + + if (call->wreplconn->assoc_ctx.our_ctx == WREPLSRV_INVALID_ASSOC_CTX) { + return wreplsrv_in_invalid_assoc_ctx(call); + } + + if (NT_STATUS_IS_OK(status)) { + /* let the backend to set some of the opcode bits, but always add the standards */ + call->rep_packet.opcode |= WREPL_OPCODE_BITS; + call->rep_packet.assoc_ctx = call->wreplconn->assoc_ctx.peer_ctx; + } + + return status; +} diff --git a/source4/wrepl_server/wrepl_in_connection.c b/source4/wrepl_server/wrepl_in_connection.c new file mode 100644 index 0000000000..25227481b8 --- /dev/null +++ b/source4/wrepl_server/wrepl_in_connection.c @@ -0,0 +1,323 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/socket/socket.h" +#include "lib/stream/packet.h" +#include "smbd/service_task.h" +#include "smbd/service_stream.h" +#include "smbd/service.h" +#include "lib/messaging/irpc.h" +#include "librpc/gen_ndr/ndr_winsrepl.h" +#include "wrepl_server/wrepl_server.h" +#include "smbd/process_model.h" +#include "system/network.h" +#include "lib/socket/netif.h" +#include "param/param.h" + +void wreplsrv_terminate_in_connection(struct wreplsrv_in_connection *wreplconn, const char *reason) +{ + stream_terminate_connection(wreplconn->conn, reason); +} + +static int terminate_after_send_destructor(struct wreplsrv_in_connection **tas) +{ + wreplsrv_terminate_in_connection(*tas, "wreplsrv_in_connection: terminate_after_send"); + return 0; +} + +/* + receive some data on a WREPL connection +*/ +static NTSTATUS wreplsrv_recv_request(void *private, DATA_BLOB blob) +{ + struct wreplsrv_in_connection *wreplconn = talloc_get_type(private, struct wreplsrv_in_connection); + struct wreplsrv_in_call *call; + DATA_BLOB packet_in_blob; + DATA_BLOB packet_out_blob; + struct wrepl_wrap packet_out_wrap; + NTSTATUS status; + enum ndr_err_code ndr_err; + + call = talloc_zero(wreplconn, struct wreplsrv_in_call); + NT_STATUS_HAVE_NO_MEMORY(call); + call->wreplconn = wreplconn; + talloc_steal(call, blob.data); + + packet_in_blob.data = blob.data + 4; + packet_in_blob.length = blob.length - 4; + + ndr_err = ndr_pull_struct_blob(&packet_in_blob, call, + lp_iconv_convenience(wreplconn->service->task->lp_ctx), + &call->req_packet, + (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + return ndr_map_error2ntstatus(ndr_err); + } + + if (DEBUGLVL(10)) { + DEBUG(10,("Received WINS-Replication packet of length %u\n", + (unsigned)packet_in_blob.length + 4)); + NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet); + } + + status = wreplsrv_in_call(call); + NT_STATUS_IS_ERR_RETURN(status); + if (!NT_STATUS_IS_OK(status)) { + /* w2k just ignores invalid packets, so we do */ + DEBUG(10,("Received WINS-Replication packet was invalid, we just ignore it\n")); + talloc_free(call); + return NT_STATUS_OK; + } + + /* and now encode the reply */ + packet_out_wrap.packet = call->rep_packet; + ndr_err = ndr_push_struct_blob(&packet_out_blob, call, + lp_iconv_convenience(wreplconn->service->task->lp_ctx), + &packet_out_wrap, + (ndr_push_flags_fn_t)ndr_push_wrepl_wrap); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + return ndr_map_error2ntstatus(ndr_err); + } + + if (DEBUGLVL(10)) { + DEBUG(10,("Sending WINS-Replication packet of length %d\n", (int)packet_out_blob.length)); + NDR_PRINT_DEBUG(wrepl_packet, &call->rep_packet); + } + + if (call->terminate_after_send) { + struct wreplsrv_in_connection **tas; + tas = talloc(packet_out_blob.data, struct wreplsrv_in_connection *); + NT_STATUS_HAVE_NO_MEMORY(tas); + *tas = wreplconn; + talloc_set_destructor(tas, terminate_after_send_destructor); + } + + status = packet_send(wreplconn->packet, packet_out_blob); + NT_STATUS_NOT_OK_RETURN(status); + + talloc_free(call); + return NT_STATUS_OK; +} + +/* + called when the socket becomes readable +*/ +static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags) +{ + struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, + struct wreplsrv_in_connection); + + packet_recv(wreplconn->packet); +} + +/* + called when the socket becomes writable +*/ +static void wreplsrv_send(struct stream_connection *conn, uint16_t flags) +{ + struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, + struct wreplsrv_in_connection); + packet_queue_run(wreplconn->packet); +} + +/* + handle socket recv errors +*/ +static void wreplsrv_recv_error(void *private, NTSTATUS status) +{ + struct wreplsrv_in_connection *wreplconn = talloc_get_type(private, + struct wreplsrv_in_connection); + wreplsrv_terminate_in_connection(wreplconn, nt_errstr(status)); +} + +/* + called when we get a new connection +*/ +static void wreplsrv_accept(struct stream_connection *conn) +{ + struct wreplsrv_service *service = talloc_get_type(conn->private, struct wreplsrv_service); + struct wreplsrv_in_connection *wreplconn; + struct socket_address *peer_ip; + + wreplconn = talloc_zero(conn, struct wreplsrv_in_connection); + if (!wreplconn) { + stream_terminate_connection(conn, "wreplsrv_accept: out of memory"); + return; + } + + wreplconn->packet = packet_init(wreplconn); + if (!wreplconn->packet) { + wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: out of memory"); + return; + } + packet_set_private(wreplconn->packet, wreplconn); + packet_set_socket(wreplconn->packet, conn->socket); + packet_set_callback(wreplconn->packet, wreplsrv_recv_request); + packet_set_full_request(wreplconn->packet, packet_full_request_u32); + packet_set_error_handler(wreplconn->packet, wreplsrv_recv_error); + packet_set_event_context(wreplconn->packet, conn->event.ctx); + packet_set_fde(wreplconn->packet, conn->event.fde); + packet_set_serialise(wreplconn->packet); + + wreplconn->conn = conn; + wreplconn->service = service; + + peer_ip = socket_get_peer_addr(conn->socket, wreplconn); + if (!peer_ip) { + wreplsrv_terminate_in_connection(wreplconn, "wreplsrv_accept: could not obtain peer IP from kernel"); + return; + } + + wreplconn->partner = wreplsrv_find_partner(service, peer_ip->addr); + + conn->private = wreplconn; + + irpc_add_name(conn->msg_ctx, "wreplsrv_connection"); +} + +static const struct stream_server_ops wreplsrv_stream_ops = { + .name = "wreplsrv", + .accept_connection = wreplsrv_accept, + .recv_handler = wreplsrv_recv, + .send_handler = wreplsrv_send, +}; + +/* + called when we get a new connection +*/ +NTSTATUS wreplsrv_in_connection_merge(struct wreplsrv_partner *partner, + struct socket_context *sock, + struct packet_context *packet, + struct wreplsrv_in_connection **_wrepl_in) +{ + struct wreplsrv_service *service = partner->service; + struct wreplsrv_in_connection *wrepl_in; + const struct model_ops *model_ops; + struct stream_connection *conn; + NTSTATUS status; + + /* within the wrepl task we want to be a single process, so + ask for the single process model ops and pass these to the + stream_setup_socket() call. */ + model_ops = process_model_byname("single"); + if (!model_ops) { + DEBUG(0,("Can't find 'single' process model_ops")); + return NT_STATUS_INTERNAL_ERROR; + } + + wrepl_in = talloc_zero(partner, struct wreplsrv_in_connection); + NT_STATUS_HAVE_NO_MEMORY(wrepl_in); + + wrepl_in->service = service; + wrepl_in->partner = partner; + + status = stream_new_connection_merge(service->task->event_ctx, service->task->lp_ctx, model_ops, + sock, &wreplsrv_stream_ops, service->task->msg_ctx, + wrepl_in, &conn); + NT_STATUS_NOT_OK_RETURN(status); + + /* + * make the wreplsrv_in_connection structure a child of the + * stream_connection, to match the hierachie of wreplsrv_accept + */ + wrepl_in->conn = conn; + talloc_steal(conn, wrepl_in); + + /* + * now update the packet handling callback,... + */ + wrepl_in->packet = talloc_steal(wrepl_in, packet); + packet_set_private(wrepl_in->packet, wrepl_in); + packet_set_socket(wrepl_in->packet, conn->socket); + packet_set_callback(wrepl_in->packet, wreplsrv_recv_request); + packet_set_full_request(wrepl_in->packet, packet_full_request_u32); + packet_set_error_handler(wrepl_in->packet, wreplsrv_recv_error); + packet_set_event_context(wrepl_in->packet, conn->event.ctx); + packet_set_fde(wrepl_in->packet, conn->event.fde); + packet_set_serialise(wrepl_in->packet); + + *_wrepl_in = wrepl_in; + return NT_STATUS_OK; +} + +/* + startup the wrepl port 42 server sockets +*/ +NTSTATUS wreplsrv_setup_sockets(struct wreplsrv_service *service, struct loadparm_context *lp_ctx) +{ + NTSTATUS status; + struct task_server *task = service->task; + const struct model_ops *model_ops; + const char *address; + uint16_t port = WINS_REPLICATION_PORT; + + /* within the wrepl task we want to be a single process, so + ask for the single process model ops and pass these to the + stream_setup_socket() call. */ + model_ops = process_model_byname("single"); + if (!model_ops) { + DEBUG(0,("Can't find 'single' process model_ops")); + return NT_STATUS_INTERNAL_ERROR; + } + + if (lp_interfaces(lp_ctx) && lp_bind_interfaces_only(lp_ctx)) { + int num_interfaces; + int i; + struct interface *ifaces; + + load_interfaces(task, lp_interfaces(lp_ctx), &ifaces); + + num_interfaces = iface_count(ifaces); + + /* We have been given an interfaces line, and been + told to only bind to those interfaces. Create a + socket per interface and bind to only these. + */ + for(i = 0; i < num_interfaces; i++) { + address = iface_n_ip(ifaces, i); + status = stream_setup_socket(task->event_ctx, + task->lp_ctx, model_ops, + &wreplsrv_stream_ops, + "ipv4", address, &port, + lp_socket_options(task->lp_ctx), + service); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0,("stream_setup_socket(address=%s,port=%u) failed - %s\n", + address, port, nt_errstr(status))); + return status; + } + } + } else { + address = lp_socket_address(lp_ctx); + status = stream_setup_socket(task->event_ctx, task->lp_ctx, + model_ops, &wreplsrv_stream_ops, + "ipv4", address, &port, lp_socket_options(task->lp_ctx), + service); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0,("stream_setup_socket(address=%s,port=%u) failed - %s\n", + address, port, nt_errstr(status))); + return status; + } + } + + return NT_STATUS_OK; +} diff --git a/source4/wrepl_server/wrepl_out_helpers.c b/source4/wrepl_server/wrepl_out_helpers.c new file mode 100644 index 0000000000..12605196ab --- /dev/null +++ b/source4/wrepl_server/wrepl_out_helpers.c @@ -0,0 +1,1103 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "lib/socket/socket.h" +#include "smbd/service_task.h" +#include "smbd/service_stream.h" +#include "librpc/gen_ndr/winsrepl.h" +#include "wrepl_server/wrepl_server.h" +#include "nbt_server/wins/winsdb.h" +#include "libcli/composite/composite.h" +#include "libcli/wrepl/winsrepl.h" +#include "libcli/resolve/resolve.h" +#include "param/param.h" + +enum wreplsrv_out_connect_stage { + WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET, + WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX, + WREPLSRV_OUT_CONNECT_STAGE_DONE +}; + +struct wreplsrv_out_connect_state { + enum wreplsrv_out_connect_stage stage; + struct composite_context *c; + struct wrepl_request *req; + struct composite_context *c_req; + struct wrepl_associate assoc_io; + enum winsrepl_partner_type type; + struct wreplsrv_out_connection *wreplconn; +}; + +static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req); +static void wreplsrv_out_connect_handler_req(struct wrepl_request *req); + +static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state) +{ + NTSTATUS status; + + status = wrepl_connect_recv(state->c_req); + NT_STATUS_NOT_OK_RETURN(status); + + state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io); + NT_STATUS_HAVE_NO_MEMORY(state->req); + + state->req->async.fn = wreplsrv_out_connect_handler_req; + state->req->async.private = state; + + state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX; + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state) +{ + NTSTATUS status; + + status = wrepl_associate_recv(state->req, &state->assoc_io); + NT_STATUS_NOT_OK_RETURN(status); + + state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx; + + if (state->type == WINSREPL_PARTNER_PUSH) { + state->wreplconn->partner->push.wreplconn = state->wreplconn; + talloc_steal(state->wreplconn->partner, state->wreplconn); + } else if (state->type == WINSREPL_PARTNER_PULL) { + state->wreplconn->partner->pull.wreplconn = state->wreplconn; + talloc_steal(state->wreplconn->partner, state->wreplconn); + } + + state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE; + + return NT_STATUS_OK; +} + +static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state) +{ + struct composite_context *c = state->c; + + switch (state->stage) { + case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET: + c->status = wreplsrv_out_connect_wait_socket(state); + break; + case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX: + c->status = wreplsrv_out_connect_wait_assoc_ctx(state); + c->state = COMPOSITE_STATE_DONE; + break; + case WREPLSRV_OUT_CONNECT_STAGE_DONE: + c->status = NT_STATUS_INTERNAL_ERROR; + } + + if (!NT_STATUS_IS_OK(c->status)) { + c->state = COMPOSITE_STATE_ERROR; + } + + if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) { + c->async.fn(c); + } +} + +static void wreplsrv_out_connect_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data, + struct wreplsrv_out_connect_state); + wreplsrv_out_connect_handler(state); + return; +} + +static void wreplsrv_out_connect_handler_req(struct wrepl_request *req) +{ + struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private, + struct wreplsrv_out_connect_state); + wreplsrv_out_connect_handler(state); + return; +} + +static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner, + enum winsrepl_partner_type type, + struct wreplsrv_out_connection *wreplconn) +{ + struct composite_context *c = NULL; + struct wreplsrv_service *service = partner->service; + struct wreplsrv_out_connect_state *state = NULL; + struct wreplsrv_out_connection **wreplconnp = &wreplconn; + bool cached_connection = false; + + c = talloc_zero(partner, struct composite_context); + if (!c) goto failed; + + state = talloc_zero(c, struct wreplsrv_out_connect_state); + if (!state) goto failed; + state->c = c; + state->type = type; + + c->state = COMPOSITE_STATE_IN_PROGRESS; + c->event_ctx = service->task->event_ctx; + c->private_data = state; + + if (type == WINSREPL_PARTNER_PUSH) { + cached_connection = true; + wreplconn = partner->push.wreplconn; + wreplconnp = &partner->push.wreplconn; + } else if (type == WINSREPL_PARTNER_PULL) { + cached_connection = true; + wreplconn = partner->pull.wreplconn; + wreplconnp = &partner->pull.wreplconn; + } + + /* we have a connection already, so use it */ + if (wreplconn) { + if (!wreplconn->sock->dead) { + state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE; + state->wreplconn= wreplconn; + composite_done(c); + return c; + } else if (!cached_connection) { + state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE; + state->wreplconn= NULL; + composite_done(c); + return c; + } else { + talloc_free(wreplconn); + *wreplconnp = NULL; + } + } + + wreplconn = talloc_zero(state, struct wreplsrv_out_connection); + if (!wreplconn) goto failed; + + wreplconn->service = service; + wreplconn->partner = partner; + wreplconn->sock = wrepl_socket_init(wreplconn, service->task->event_ctx, lp_iconv_convenience(service->task->lp_ctx)); + if (!wreplconn->sock) goto failed; + + state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET; + state->wreplconn= wreplconn; + state->c_req = wrepl_connect_send(wreplconn->sock, + lp_resolve_context(service->task->lp_ctx), + partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address), + partner->address); + if (!state->c_req) goto failed; + + state->c_req->async.fn = wreplsrv_out_connect_handler_creq; + state->c_req->async.private_data = state; + + return c; +failed: + talloc_free(c); + return NULL; +} + +static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx, + struct wreplsrv_out_connection **wreplconn) +{ + NTSTATUS status; + + status = composite_wait(c); + + if (NT_STATUS_IS_OK(status)) { + struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data, + struct wreplsrv_out_connect_state); + if (state->wreplconn) { + *wreplconn = talloc_reference(mem_ctx, state->wreplconn); + if (!*wreplconn) status = NT_STATUS_NO_MEMORY; + } else { + status = NT_STATUS_INVALID_CONNECTION; + } + } + + talloc_free(c); + return status; + +} + +struct wreplsrv_pull_table_io { + struct { + struct wreplsrv_partner *partner; + uint32_t num_owners; + struct wrepl_wins_owner *owners; + } in; + struct { + uint32_t num_owners; + struct wrepl_wins_owner *owners; + } out; +}; + +enum wreplsrv_pull_table_stage { + WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION, + WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY, + WREPLSRV_PULL_TABLE_STAGE_DONE +}; + +struct wreplsrv_pull_table_state { + enum wreplsrv_pull_table_stage stage; + struct composite_context *c; + struct wrepl_request *req; + struct wrepl_pull_table table_io; + struct wreplsrv_pull_table_io *io; + struct composite_context *creq; + struct wreplsrv_out_connection *wreplconn; +}; + +static void wreplsrv_pull_table_handler_req(struct wrepl_request *req); + +static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state) +{ + NTSTATUS status; + + status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn); + NT_STATUS_NOT_OK_RETURN(status); + + state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx; + state->req = wrepl_pull_table_send(state->wreplconn->sock, &state->table_io); + NT_STATUS_HAVE_NO_MEMORY(state->req); + + state->req->async.fn = wreplsrv_pull_table_handler_req; + state->req->async.private = state; + + state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY; + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state) +{ + NTSTATUS status; + + status = wrepl_pull_table_recv(state->req, state, &state->table_io); + NT_STATUS_NOT_OK_RETURN(status); + + state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE; + + return NT_STATUS_OK; +} + +static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state) +{ + struct composite_context *c = state->c; + + switch (state->stage) { + case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION: + c->status = wreplsrv_pull_table_wait_connection(state); + break; + case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY: + c->status = wreplsrv_pull_table_wait_table_reply(state); + c->state = COMPOSITE_STATE_DONE; + break; + case WREPLSRV_PULL_TABLE_STAGE_DONE: + c->status = NT_STATUS_INTERNAL_ERROR; + } + + if (!NT_STATUS_IS_OK(c->status)) { + c->state = COMPOSITE_STATE_ERROR; + } + + if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) { + c->async.fn(c); + } +} + +static void wreplsrv_pull_table_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data, + struct wreplsrv_pull_table_state); + wreplsrv_pull_table_handler(state); + return; +} + +static void wreplsrv_pull_table_handler_req(struct wrepl_request *req) +{ + struct wreplsrv_pull_table_state *state = talloc_get_type(req->async.private, + struct wreplsrv_pull_table_state); + wreplsrv_pull_table_handler(state); + return; +} + +static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io) +{ + struct composite_context *c = NULL; + struct wreplsrv_service *service = io->in.partner->service; + struct wreplsrv_pull_table_state *state = NULL; + + c = talloc_zero(mem_ctx, struct composite_context); + if (!c) goto failed; + + state = talloc_zero(c, struct wreplsrv_pull_table_state); + if (!state) goto failed; + state->c = c; + state->io = io; + + c->state = COMPOSITE_STATE_IN_PROGRESS; + c->event_ctx = service->task->event_ctx; + c->private_data = state; + + if (io->in.num_owners) { + state->table_io.out.num_partners = io->in.num_owners; + state->table_io.out.partners = io->in.owners; + state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE; + composite_done(c); + return c; + } + + state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION; + state->creq = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL); + if (!state->creq) goto failed; + + state->creq->async.fn = wreplsrv_pull_table_handler_creq; + state->creq->async.private_data = state; + + return c; +failed: + talloc_free(c); + return NULL; +} + +static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx, + struct wreplsrv_pull_table_io *io) +{ + NTSTATUS status; + + status = composite_wait(c); + + if (NT_STATUS_IS_OK(status)) { + struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data, + struct wreplsrv_pull_table_state); + io->out.num_owners = state->table_io.out.num_partners; + io->out.owners = talloc_reference(mem_ctx, state->table_io.out.partners); + } + + talloc_free(c); + return status; +} + +struct wreplsrv_pull_names_io { + struct { + struct wreplsrv_partner *partner; + struct wreplsrv_out_connection *wreplconn; + struct wrepl_wins_owner owner; + } in; + struct { + uint32_t num_names; + struct wrepl_name *names; + } out; +}; + +enum wreplsrv_pull_names_stage { + WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION, + WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY, + WREPLSRV_PULL_NAMES_STAGE_DONE +}; + +struct wreplsrv_pull_names_state { + enum wreplsrv_pull_names_stage stage; + struct composite_context *c; + struct wrepl_request *req; + struct wrepl_pull_names pull_io; + struct wreplsrv_pull_names_io *io; + struct composite_context *creq; + struct wreplsrv_out_connection *wreplconn; +}; + +static void wreplsrv_pull_names_handler_req(struct wrepl_request *req); + +static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state) +{ + NTSTATUS status; + + status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn); + NT_STATUS_NOT_OK_RETURN(status); + + state->pull_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx; + state->pull_io.in.partner = state->io->in.owner; + state->req = wrepl_pull_names_send(state->wreplconn->sock, &state->pull_io); + NT_STATUS_HAVE_NO_MEMORY(state->req); + + state->req->async.fn = wreplsrv_pull_names_handler_req; + state->req->async.private = state; + + state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY; + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state) +{ + NTSTATUS status; + + status = wrepl_pull_names_recv(state->req, state, &state->pull_io); + NT_STATUS_NOT_OK_RETURN(status); + + state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE; + + return NT_STATUS_OK; +} + +static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state) +{ + struct composite_context *c = state->c; + + switch (state->stage) { + case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION: + c->status = wreplsrv_pull_names_wait_connection(state); + break; + case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY: + c->status = wreplsrv_pull_names_wait_send_reply(state); + c->state = COMPOSITE_STATE_DONE; + break; + case WREPLSRV_PULL_NAMES_STAGE_DONE: + c->status = NT_STATUS_INTERNAL_ERROR; + } + + if (!NT_STATUS_IS_OK(c->status)) { + c->state = COMPOSITE_STATE_ERROR; + } + + if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) { + c->async.fn(c); + } +} + +static void wreplsrv_pull_names_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data, + struct wreplsrv_pull_names_state); + wreplsrv_pull_names_handler(state); + return; +} + +static void wreplsrv_pull_names_handler_req(struct wrepl_request *req) +{ + struct wreplsrv_pull_names_state *state = talloc_get_type(req->async.private, + struct wreplsrv_pull_names_state); + wreplsrv_pull_names_handler(state); + return; +} + +static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io) +{ + struct composite_context *c = NULL; + struct wreplsrv_service *service = io->in.partner->service; + struct wreplsrv_pull_names_state *state = NULL; + enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL; + + if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE; + + c = talloc_zero(mem_ctx, struct composite_context); + if (!c) goto failed; + + state = talloc_zero(c, struct wreplsrv_pull_names_state); + if (!state) goto failed; + state->c = c; + state->io = io; + + c->state = COMPOSITE_STATE_IN_PROGRESS; + c->event_ctx = service->task->event_ctx; + c->private_data = state; + + state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION; + state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn); + if (!state->creq) goto failed; + + state->creq->async.fn = wreplsrv_pull_names_handler_creq; + state->creq->async.private_data = state; + + return c; +failed: + talloc_free(c); + return NULL; +} + +static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx, + struct wreplsrv_pull_names_io *io) +{ + NTSTATUS status; + + status = composite_wait(c); + + if (NT_STATUS_IS_OK(status)) { + struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data, + struct wreplsrv_pull_names_state); + io->out.num_names = state->pull_io.out.num_names; + io->out.names = talloc_reference(mem_ctx, state->pull_io.out.names); + } + + talloc_free(c); + return status; + +} + +enum wreplsrv_pull_cycle_stage { + WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY, + WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES, + WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC, + WREPLSRV_PULL_CYCLE_STAGE_DONE +}; + +struct wreplsrv_pull_cycle_state { + enum wreplsrv_pull_cycle_stage stage; + struct composite_context *c; + struct wreplsrv_pull_cycle_io *io; + struct wreplsrv_pull_table_io table_io; + uint32_t current; + struct wreplsrv_pull_names_io names_io; + struct composite_context *creq; + struct wrepl_associate_stop assoc_stop_io; + struct wrepl_request *req; +}; + +static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq); +static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req); + +static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state) +{ + struct wreplsrv_owner *current_owner=NULL; + struct wreplsrv_owner *local_owner; + uint32_t i; + uint64_t old_max_version = 0; + bool do_pull = false; + + for (i=state->current; i < state->table_io.out.num_owners; i++) { + current_owner = wreplsrv_find_owner(state->io->in.partner->service, + state->io->in.partner->pull.table, + state->table_io.out.owners[i].address); + + local_owner = wreplsrv_find_owner(state->io->in.partner->service, + state->io->in.partner->service->table, + state->table_io.out.owners[i].address); + /* + * this means we are ourself the current owner, + * and we don't want replicate ourself + */ + if (!current_owner) continue; + + /* + * this means we don't have any records of this owner + * so fetch them + */ + if (!local_owner) { + do_pull = true; + + break; + } + + /* + * this means the remote partner has some new records of this owner + * fetch them + */ + if (current_owner->owner.max_version > local_owner->owner.max_version) { + do_pull = true; + old_max_version = local_owner->owner.max_version; + break; + } + } + state->current = i; + + if (do_pull) { + state->names_io.in.partner = state->io->in.partner; + state->names_io.in.wreplconn = state->io->in.wreplconn; + state->names_io.in.owner = current_owner->owner; + state->names_io.in.owner.min_version = old_max_version + 1; + state->creq = wreplsrv_pull_names_send(state, &state->names_io); + NT_STATUS_HAVE_NO_MEMORY(state->creq); + + state->creq->async.fn = wreplsrv_pull_cycle_handler_creq; + state->creq->async.private_data = state; + + return STATUS_MORE_ENTRIES; + } + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state) +{ + NTSTATUS status; + + status = wreplsrv_pull_cycle_next_owner_do_work(state); + if (NT_STATUS_IS_OK(status)) { + state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE; + } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) { + state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES; + status = NT_STATUS_OK; + } + + if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) { + state->assoc_stop_io.in.assoc_ctx = state->io->in.wreplconn->assoc_ctx.peer_ctx; + state->assoc_stop_io.in.reason = 0; + state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io); + NT_STATUS_HAVE_NO_MEMORY(state->req); + + state->req->async.fn = wreplsrv_pull_cycle_handler_req; + state->req->async.private = state; + + state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC; + } + + return status; +} + +static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state) +{ + NTSTATUS status; + uint32_t i; + + status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io); + NT_STATUS_NOT_OK_RETURN(status); + + /* update partner table */ + for (i=0; i < state->table_io.out.num_owners; i++) { + status = wreplsrv_add_table(state->io->in.partner->service, + state->io->in.partner, + &state->io->in.partner->pull.table, + state->table_io.out.owners[i].address, + state->table_io.out.owners[i].max_version); + NT_STATUS_NOT_OK_RETURN(status); + } + + status = wreplsrv_pull_cycle_next_owner_wrapper(state); + NT_STATUS_NOT_OK_RETURN(status); + + return status; +} + +static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state) +{ + NTSTATUS status; + + status = wreplsrv_apply_records(state->io->in.partner, + &state->names_io.in.owner, + state->names_io.out.num_names, + state->names_io.out.names); + NT_STATUS_NOT_OK_RETURN(status); + + talloc_free(state->names_io.out.names); + ZERO_STRUCT(state->names_io); + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state) +{ + NTSTATUS status; + + status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io); + NT_STATUS_NOT_OK_RETURN(status); + + /* + * TODO: this should maybe an async call, + * because we may need some network access + * for conflict resolving + */ + status = wreplsrv_pull_cycle_apply_records(state); + NT_STATUS_NOT_OK_RETURN(status); + + status = wreplsrv_pull_cycle_next_owner_wrapper(state); + NT_STATUS_NOT_OK_RETURN(status); + + return status; +} + +static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state) +{ + NTSTATUS status; + + status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io); + NT_STATUS_NOT_OK_RETURN(status); + + state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE; + + return status; +} + +static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state) +{ + struct composite_context *c = state->c; + + switch (state->stage) { + case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY: + c->status = wreplsrv_pull_cycle_wait_table_reply(state); + break; + case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES: + c->status = wreplsrv_pull_cycle_wait_send_replies(state); + break; + case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC: + c->status = wreplsrv_pull_cycle_wait_stop_assoc(state); + break; + case WREPLSRV_PULL_CYCLE_STAGE_DONE: + c->status = NT_STATUS_INTERNAL_ERROR; + } + + if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) { + c->state = COMPOSITE_STATE_DONE; + } + + if (!NT_STATUS_IS_OK(c->status)) { + c->state = COMPOSITE_STATE_ERROR; + } + + if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) { + c->async.fn(c); + } +} + +static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data, + struct wreplsrv_pull_cycle_state); + wreplsrv_pull_cycle_handler(state); + return; +} + +static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req) +{ + struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private, + struct wreplsrv_pull_cycle_state); + wreplsrv_pull_cycle_handler(state); + return; +} + +struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io) +{ + struct composite_context *c = NULL; + struct wreplsrv_service *service = io->in.partner->service; + struct wreplsrv_pull_cycle_state *state = NULL; + + c = talloc_zero(mem_ctx, struct composite_context); + if (!c) goto failed; + + state = talloc_zero(c, struct wreplsrv_pull_cycle_state); + if (!state) goto failed; + state->c = c; + state->io = io; + + c->state = COMPOSITE_STATE_IN_PROGRESS; + c->event_ctx = service->task->event_ctx; + c->private_data = state; + + state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY; + state->table_io.in.partner = io->in.partner; + state->table_io.in.num_owners = io->in.num_owners; + state->table_io.in.owners = io->in.owners; + state->creq = wreplsrv_pull_table_send(state, &state->table_io); + if (!state->creq) goto failed; + + state->creq->async.fn = wreplsrv_pull_cycle_handler_creq; + state->creq->async.private_data = state; + + return c; +failed: + talloc_free(c); + return NULL; +} + +NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c) +{ + NTSTATUS status; + + status = composite_wait(c); + + talloc_free(c); + return status; +} + +enum wreplsrv_push_notify_stage { + WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT, + WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM, + WREPLSRV_PUSH_NOTIFY_STAGE_DONE +}; + +struct wreplsrv_push_notify_state { + enum wreplsrv_push_notify_stage stage; + struct composite_context *c; + struct wreplsrv_push_notify_io *io; + enum wrepl_replication_cmd command; + bool full_table; + struct wrepl_send_ctrl ctrl; + struct wrepl_request *req; + struct wrepl_packet req_packet; + struct wrepl_packet *rep_packet; + struct composite_context *creq; + struct wreplsrv_out_connection *wreplconn; +}; + +static void wreplsrv_push_notify_handler_creq(struct composite_context *creq); +static void wreplsrv_push_notify_handler_req(struct wrepl_request *req); + +static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state) +{ + struct wreplsrv_service *service = state->io->in.partner->service; + struct wrepl_packet *req = &state->req_packet; + struct wrepl_replication *repl_out = &state->req_packet.message.replication; + struct wrepl_table *table_out = &state->req_packet.message.replication.info.table; + struct wreplsrv_in_connection *wrepl_in; + NTSTATUS status; + struct socket_context *sock; + struct packet_context *packet; + uint16_t fde_flags; + + /* prepare the outgoing request */ + req->opcode = WREPL_OPCODE_BITS; + req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx; + req->mess_type = WREPL_REPLICATION; + + repl_out->command = state->command; + + status = wreplsrv_fill_wrepl_table(service, state, table_out, + service->wins_db->local_owner, state->full_table); + NT_STATUS_NOT_OK_RETURN(status); + + /* queue the request */ + state->req = wrepl_request_send(state->wreplconn->sock, req, NULL); + NT_STATUS_HAVE_NO_MEMORY(state->req); + + /* + * now we need to convert the wrepl_socket (client connection) + * into a wreplsrv_in_connection (server connection), because + * we'll act as a server on this connection after the WREPL_REPL_UPDATE* + * message is received by the peer. + */ + + /* steal the socket_context */ + sock = state->wreplconn->sock->sock; + state->wreplconn->sock->sock = NULL; + talloc_steal(state, sock); + + /* + * steal the packet_context + * note the request DATA_BLOB we just send on the + * wrepl_socket (client connection) is still unter the + * packet context and will be send to the wire + */ + packet = state->wreplconn->sock->packet; + state->wreplconn->sock->packet = NULL; + talloc_steal(state, packet); + + /* + * get the fde_flags of the old fde event, + * so that we can later set the same flags to the new one + */ + fde_flags = event_get_fd_flags(state->wreplconn->sock->event.fde); + + /* + * free the wrepl_socket (client connection) + */ + talloc_free(state->wreplconn->sock); + state->wreplconn->sock = NULL; + + /* + * now create a wreplsrv_in_connection, + * on which we act as server + * + * NOTE: sock and packet will be stolen by + * wreplsrv_in_connection_merge() + */ + status = wreplsrv_in_connection_merge(state->io->in.partner, + sock, packet, &wrepl_in); + NT_STATUS_NOT_OK_RETURN(status); + + event_set_fd_flags(wrepl_in->conn->event.fde, fde_flags); + + wrepl_in->assoc_ctx.peer_ctx = state->wreplconn->assoc_ctx.peer_ctx; + wrepl_in->assoc_ctx.our_ctx = 0; + + /* now we can free the wreplsrv_out_connection */ + talloc_free(state->wreplconn); + state->wreplconn = NULL; + + state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE; + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state) +{ + struct wreplsrv_service *service = state->io->in.partner->service; + struct wrepl_packet *req = &state->req_packet; + struct wrepl_replication *repl_out = &state->req_packet.message.replication; + struct wrepl_table *table_out = &state->req_packet.message.replication.info.table; + NTSTATUS status; + + req->opcode = WREPL_OPCODE_BITS; + req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx; + req->mess_type = WREPL_REPLICATION; + + repl_out->command = state->command; + + status = wreplsrv_fill_wrepl_table(service, state, table_out, + service->wins_db->local_owner, state->full_table); + NT_STATUS_NOT_OK_RETURN(status); + + /* we won't get a reply to a inform message */ + state->ctrl.send_only = true; + + state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl); + NT_STATUS_HAVE_NO_MEMORY(state->req); + + state->req->async.fn = wreplsrv_push_notify_handler_req; + state->req->async.private = state; + + state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM; + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state) +{ + NTSTATUS status; + + status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn); + NT_STATUS_NOT_OK_RETURN(status); + + switch (state->command) { + case WREPL_REPL_UPDATE: + state->full_table = true; + return wreplsrv_push_notify_update(state); + case WREPL_REPL_UPDATE2: + state->full_table = false; + return wreplsrv_push_notify_update(state); + case WREPL_REPL_INFORM: + state->full_table = true; + return wreplsrv_push_notify_inform(state); + case WREPL_REPL_INFORM2: + state->full_table = false; + return wreplsrv_push_notify_inform(state); + default: + return NT_STATUS_INTERNAL_ERROR; + } + + return NT_STATUS_INTERNAL_ERROR; +} + +static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state) +{ + NTSTATUS status; + + status = wrepl_request_recv(state->req, state, NULL); + NT_STATUS_NOT_OK_RETURN(status); + + state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE; + return status; +} + +static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state) +{ + struct composite_context *c = state->c; + + switch (state->stage) { + case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT: + c->status = wreplsrv_push_notify_wait_connect(state); + break; + case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM: + c->status = wreplsrv_push_notify_wait_inform(state); + break; + case WREPLSRV_PUSH_NOTIFY_STAGE_DONE: + c->status = NT_STATUS_INTERNAL_ERROR; + } + + if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) { + c->state = COMPOSITE_STATE_DONE; + } + + if (!NT_STATUS_IS_OK(c->status)) { + c->state = COMPOSITE_STATE_ERROR; + } + + if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) { + c->async.fn(c); + } +} + +static void wreplsrv_push_notify_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data, + struct wreplsrv_push_notify_state); + wreplsrv_push_notify_handler(state); + return; +} + +static void wreplsrv_push_notify_handler_req(struct wrepl_request *req) +{ + struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private, + struct wreplsrv_push_notify_state); + wreplsrv_push_notify_handler(state); + return; +} + +struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io) +{ + struct composite_context *c = NULL; + struct wreplsrv_service *service = io->in.partner->service; + struct wreplsrv_push_notify_state *state = NULL; + enum winsrepl_partner_type partner_type; + + c = talloc_zero(mem_ctx, struct composite_context); + if (!c) goto failed; + + state = talloc_zero(c, struct wreplsrv_push_notify_state); + if (!state) goto failed; + state->c = c; + state->io = io; + + if (io->in.inform) { + /* we can cache the connection in partner->push->wreplconn */ + partner_type = WINSREPL_PARTNER_PUSH; + if (io->in.propagate) { + state->command = WREPL_REPL_INFORM2; + } else { + state->command = WREPL_REPL_INFORM; + } + } else { + /* we can NOT cache the connection */ + partner_type = WINSREPL_PARTNER_NONE; + if (io->in.propagate) { + state->command = WREPL_REPL_UPDATE2; + } else { + state->command = WREPL_REPL_UPDATE; + } + } + + c->state = COMPOSITE_STATE_IN_PROGRESS; + c->event_ctx = service->task->event_ctx; + c->private_data = state; + + state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT; + state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL); + if (!state->creq) goto failed; + + state->creq->async.fn = wreplsrv_push_notify_handler_creq; + state->creq->async.private_data = state; + + return c; +failed: + talloc_free(c); + return NULL; +} + +NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c) +{ + NTSTATUS status; + + status = composite_wait(c); + + talloc_free(c); + return status; +} diff --git a/source4/wrepl_server/wrepl_out_helpers.h b/source4/wrepl_server/wrepl_out_helpers.h new file mode 100644 index 0000000000..92bbe561af --- /dev/null +++ b/source4/wrepl_server/wrepl_out_helpers.h @@ -0,0 +1,37 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +struct wreplsrv_pull_cycle_io { + struct { + struct wreplsrv_partner *partner; + uint32_t num_owners; + struct wrepl_wins_owner *owners; + struct wreplsrv_out_connection *wreplconn; + } in; +}; + +struct wreplsrv_push_notify_io { + struct { + struct wreplsrv_partner *partner; + bool inform; + bool propagate; + } in; +}; diff --git a/source4/wrepl_server/wrepl_out_pull.c b/source4/wrepl_server/wrepl_out_pull.c new file mode 100644 index 0000000000..d508cc094b --- /dev/null +++ b/source4/wrepl_server/wrepl_out_pull.c @@ -0,0 +1,142 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "librpc/gen_ndr/winsrepl.h" +#include "wrepl_server/wrepl_server.h" +#include "libcli/composite/composite.h" + +static void wreplsrv_out_pull_reschedule(struct wreplsrv_partner *partner, uint32_t interval) +{ + NTSTATUS status; + + partner->pull.next_run = timeval_current_ofs(interval, 0); + status = wreplsrv_periodic_schedule(partner->service, interval); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0,("wreplsrv_periodic_schedule() failed\n")); + } +} + +static void wreplsrv_pull_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_partner *partner = talloc_get_type(creq->async.private_data, struct wreplsrv_partner); + struct wreplsrv_pull_cycle_io *old_cycle_io; + struct wrepl_table inform_in; + + partner->pull.last_status = wreplsrv_pull_cycle_recv(partner->pull.creq); + partner->pull.creq = NULL; + + old_cycle_io = partner->pull.cycle_io; + partner->pull.cycle_io = NULL; + + if (NT_STATUS_IS_OK(partner->pull.last_status)) { + partner->pull.error_count = 0; + DEBUG(2,("wreplsrv_pull_cycle(%s): %s\n", + partner->address, nt_errstr(partner->pull.last_status))); + goto done; + } + + partner->pull.error_count++; + + if (partner->pull.error_count > 1) { + uint32_t retry_interval; + + retry_interval = partner->pull.error_count * partner->pull.retry_interval; + retry_interval = MIN(retry_interval, partner->pull.interval); + + DEBUG(1,("wreplsrv_pull_cycle(%s): %s: error_count: %u: reschedule(%u)\n", + partner->address, nt_errstr(partner->pull.last_status), + partner->pull.error_count, retry_interval)); + + wreplsrv_out_pull_reschedule(partner, retry_interval); + goto done; + } + + DEBUG(1,("wreplsrv_pull_cycle(%s): %s: error_count:%u retry\n", + partner->address, nt_errstr(partner->pull.last_status), + partner->pull.error_count)); + inform_in.partner_count = old_cycle_io->in.num_owners; + inform_in.partners = old_cycle_io->in.owners; + wreplsrv_out_partner_pull(partner, &inform_in); +done: + talloc_free(old_cycle_io); +} + +void wreplsrv_out_partner_pull(struct wreplsrv_partner *partner, struct wrepl_table *inform_in) +{ + /* there's already a pull in progress, so we're done */ + if (partner->pull.creq) return; + + partner->pull.cycle_io = talloc(partner, struct wreplsrv_pull_cycle_io); + if (!partner->pull.cycle_io) { + goto nomem; + } + + partner->pull.cycle_io->in.partner = partner; + partner->pull.cycle_io->in.wreplconn = NULL; + if (inform_in) { + partner->pull.cycle_io->in.num_owners = inform_in->partner_count; + partner->pull.cycle_io->in.owners = inform_in->partners; + talloc_steal(partner->pull.cycle_io, inform_in->partners); + } else { + partner->pull.cycle_io->in.num_owners = 0; + partner->pull.cycle_io->in.owners = NULL; + } + partner->pull.creq = wreplsrv_pull_cycle_send(partner->pull.cycle_io, partner->pull.cycle_io); + if (!partner->pull.creq) { + DEBUG(1,("wreplsrv_pull_cycle_send(%s) failed\n", + partner->address)); + goto nomem; + } + + partner->pull.creq->async.fn = wreplsrv_pull_handler_creq; + partner->pull.creq->async.private_data = partner; + + return; +nomem: + talloc_free(partner->pull.cycle_io); + partner->pull.cycle_io = NULL; + DEBUG(0,("wreplsrv_out_partner_pull(%s): no memory? (ignoring)\n", + partner->address)); + return; +} + +NTSTATUS wreplsrv_out_pull_run(struct wreplsrv_service *service) +{ + struct wreplsrv_partner *partner; + + for (partner = service->partners; partner; partner = partner->next) { + /* if it's not a pull partner, go to the next partner */ + if (!(partner->type & WINSREPL_PARTNER_PULL)) continue; + + /* if pulling is disabled for the partner, go to the next one */ + if (partner->pull.interval == 0) continue; + + /* if the next timer isn't reached, go to the next partner */ + if (!timeval_expired(&partner->pull.next_run)) continue; + + wreplsrv_out_pull_reschedule(partner, partner->pull.interval); + + wreplsrv_out_partner_pull(partner, NULL); + } + + return NT_STATUS_OK; +} diff --git a/source4/wrepl_server/wrepl_out_push.c b/source4/wrepl_server/wrepl_out_push.c new file mode 100644 index 0000000000..8f0c409662 --- /dev/null +++ b/source4/wrepl_server/wrepl_out_push.c @@ -0,0 +1,144 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "librpc/gen_ndr/winsrepl.h" +#include "wrepl_server/wrepl_server.h" +#include "libcli/composite/composite.h" +#include "nbt_server/wins/winsdb.h" + +static void wreplsrv_out_partner_push(struct wreplsrv_partner *partner, bool propagate); + +static void wreplsrv_push_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_partner *partner = talloc_get_type(creq->async.private_data, struct wreplsrv_partner); + struct wreplsrv_push_notify_io *old_notify_io; + + partner->push.last_status = wreplsrv_push_notify_recv(partner->push.creq); + partner->push.creq = NULL; + + old_notify_io = partner->push.notify_io; + partner->push.notify_io = NULL; + + if (NT_STATUS_IS_OK(partner->push.last_status)) { + partner->push.error_count = 0; + DEBUG(2,("wreplsrv_push_notify(%s): %s\n", + partner->address, nt_errstr(partner->push.last_status))); + goto done; + } + + partner->push.error_count++; + + if (partner->push.error_count > 1) { + DEBUG(1,("wreplsrv_push_notify(%s): %s: error_count: %u: giving up\n", + partner->address, nt_errstr(partner->push.last_status), + partner->push.error_count)); + goto done; + } + + DEBUG(1,("wreplsrv_push_notify(%s): %s: error_count: %u: retry\n", + partner->address, nt_errstr(partner->push.last_status), + partner->push.error_count)); + wreplsrv_out_partner_push(partner, old_notify_io->in.propagate); +done: + talloc_free(old_notify_io); +} + +static void wreplsrv_out_partner_push(struct wreplsrv_partner *partner, bool propagate) +{ + /* a push for this partner is currently in progress, so we're done */ + if (partner->push.creq) return; + + /* now prepare the push notify */ + partner->push.notify_io = talloc(partner, struct wreplsrv_push_notify_io); + if (!partner->push.notify_io) { + goto nomem; + } + + partner->push.notify_io->in.partner = partner; + partner->push.notify_io->in.inform = partner->push.use_inform; + partner->push.notify_io->in.propagate = propagate; + partner->push.creq = wreplsrv_push_notify_send(partner->push.notify_io, partner->push.notify_io); + if (!partner->push.creq) { + DEBUG(1,("wreplsrv_push_notify_send(%s) failed nomem?\n", + partner->address)); + goto nomem; + } + + partner->push.creq->async.fn = wreplsrv_push_handler_creq; + partner->push.creq->async.private_data = partner; + + return; +nomem: + talloc_free(partner->push.notify_io); + partner->push.notify_io = NULL; + DEBUG(1,("wreplsrv_out_partner_push(%s,%u) failed nomem? (ignoring)\n", + partner->address, propagate)); + return; +} + +static uint32_t wreplsrv_calc_change_count(struct wreplsrv_partner *partner, uint64_t maxVersionID) +{ + uint64_t tmp_diff = UINT32_MAX; + + /* catch an overflow */ + if (partner->push.maxVersionID > maxVersionID) { + goto done; + } + + tmp_diff = maxVersionID - partner->push.maxVersionID; + + if (tmp_diff > UINT32_MAX) { + tmp_diff = UINT32_MAX; + goto done; + } + +done: + partner->push.maxVersionID = maxVersionID; + return (uint32_t)(tmp_diff & UINT32_MAX); +} + +NTSTATUS wreplsrv_out_push_run(struct wreplsrv_service *service) +{ + struct wreplsrv_partner *partner; + uint64_t seqnumber; + uint32_t change_count; + + seqnumber = winsdb_get_maxVersion(service->wins_db); + + for (partner = service->partners; partner; partner = partner->next) { + /* if it's not a push partner, go to the next partner */ + if (!(partner->type & WINSREPL_PARTNER_PUSH)) continue; + + /* if push notifies are disabled for this partner, go to the next partner */ + if (partner->push.change_count == 0) continue; + + /* get the actual change count for the partner */ + change_count = wreplsrv_calc_change_count(partner, seqnumber); + + /* if the configured change count isn't reached, go to the next partner */ + if (change_count < partner->push.change_count) continue; + + wreplsrv_out_partner_push(partner, false); + } + + return NT_STATUS_OK; +} diff --git a/source4/wrepl_server/wrepl_periodic.c b/source4/wrepl_server/wrepl_periodic.c new file mode 100644 index 0000000000..4559a7f786 --- /dev/null +++ b/source4/wrepl_server/wrepl_periodic.c @@ -0,0 +1,118 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "smbd/service_task.h" +#include "smbd/service.h" +#include "librpc/gen_ndr/winsrepl.h" +#include "wrepl_server/wrepl_server.h" + +static NTSTATUS wreplsrv_periodic_run(struct wreplsrv_service *service) +{ + NTSTATUS status; + + status = wreplsrv_load_partners(service); + NT_STATUS_NOT_OK_RETURN(status); + + status = wreplsrv_scavenging_run(service); + NT_STATUS_NOT_OK_RETURN(status); + + status = wreplsrv_out_pull_run(service); + NT_STATUS_NOT_OK_RETURN(status); + + status = wreplsrv_out_push_run(service); + NT_STATUS_NOT_OK_RETURN(status); + + return NT_STATUS_OK; +} + +static void wreplsrv_periodic_handler_te(struct event_context *ev, struct timed_event *te, + struct timeval t, void *ptr) +{ + struct wreplsrv_service *service = talloc_get_type(ptr, struct wreplsrv_service); + NTSTATUS status; + + service->periodic.te = NULL; + + status = wreplsrv_periodic_schedule(service, service->config.periodic_interval); + if (!NT_STATUS_IS_OK(status)) { + task_server_terminate(service->task, nt_errstr(status)); + return; + } + + status = wreplsrv_periodic_run(service); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0,("wresrv_periodic_run() failed: %s\n", nt_errstr(status))); + } +} + +NTSTATUS wreplsrv_periodic_schedule(struct wreplsrv_service *service, uint32_t next_interval) +{ + TALLOC_CTX *tmp_mem; + struct timed_event *new_te; + struct timeval next_time; + + /* prevent looping */ + if (next_interval == 0) next_interval = 1; + + next_time = timeval_current_ofs(next_interval, 5000); + + if (service->periodic.te) { + /* + * if the timestamp of the new event is higher, + * as current next we don't need to reschedule + */ + if (timeval_compare(&next_time, &service->periodic.next_event) > 0) { + return NT_STATUS_OK; + } + } + + /* reset the next scheduled timestamp */ + service->periodic.next_event = next_time; + + new_te = event_add_timed(service->task->event_ctx, service, + service->periodic.next_event, + wreplsrv_periodic_handler_te, service); + NT_STATUS_HAVE_NO_MEMORY(new_te); + + tmp_mem = talloc_new(service); + DEBUG(6,("wreplsrv_periodic_schedule(%u) %sscheduled for: %s\n", + next_interval, + (service->periodic.te?"re":""), + nt_time_string(tmp_mem, timeval_to_nttime(&next_time)))); + talloc_free(tmp_mem); + + talloc_free(service->periodic.te); + service->periodic.te = new_te; + + return NT_STATUS_OK; +} + +NTSTATUS wreplsrv_setup_periodic(struct wreplsrv_service *service) +{ + NTSTATUS status; + + status = wreplsrv_periodic_schedule(service, 0); + NT_STATUS_NOT_OK_RETURN(status); + + return NT_STATUS_OK; +} diff --git a/source4/wrepl_server/wrepl_scavenging.c b/source4/wrepl_server/wrepl_scavenging.c new file mode 100644 index 0000000000..a5cd36797d --- /dev/null +++ b/source4/wrepl_server/wrepl_scavenging.c @@ -0,0 +1,526 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "librpc/gen_ndr/ndr_winsrepl.h" +#include "wrepl_server/wrepl_server.h" +#include "nbt_server/wins/winsdb.h" +#include "ldb/include/ldb.h" +#include "ldb/include/ldb_errors.h" +#include "system/time.h" +#include "smbd/service_task.h" +#include "lib/messaging/irpc.h" +#include "librpc/gen_ndr/ndr_irpc.h" +#include "librpc/gen_ndr/ndr_nbt.h" + +const char *wreplsrv_owner_filter(struct wreplsrv_service *service, + TALLOC_CTX *mem_ctx, + const char *wins_owner) +{ + if (strcmp(wins_owner, service->wins_db->local_owner) == 0) { + return talloc_asprintf(mem_ctx, "(|(winsOwner=%s)(winsOwner=0.0.0.0))", + wins_owner); + } + + return talloc_asprintf(mem_ctx, "(&(winsOwner=%s)(!(winsOwner=0.0.0.0)))", + wins_owner); +} + +static NTSTATUS wreplsrv_scavenging_owned_records(struct wreplsrv_service *service, TALLOC_CTX *tmp_mem) +{ + NTSTATUS status; + struct winsdb_record *rec = NULL; + struct ldb_result *res = NULL; + const char *owner_filter; + const char *filter; + uint32_t i; + int ret; + time_t now = time(NULL); + const char *now_timestr; + const char *action; + const char *old_state=NULL; + const char *new_state=NULL; + uint32_t modify_flags; + bool modify_record; + bool delete_record; + bool delete_tombstones; + struct timeval tombstone_extra_time; + + now_timestr = ldb_timestring(tmp_mem, now); + NT_STATUS_HAVE_NO_MEMORY(now_timestr); + owner_filter = wreplsrv_owner_filter(service, tmp_mem, + service->wins_db->local_owner); + NT_STATUS_HAVE_NO_MEMORY(owner_filter); + filter = talloc_asprintf(tmp_mem, + "(&%s(objectClass=winsRecord)" + "(expireTime<=%s))", + owner_filter, now_timestr); + NT_STATUS_HAVE_NO_MEMORY(filter); + ret = ldb_search(service->wins_db->ldb, NULL, LDB_SCOPE_SUBTREE, filter, NULL, &res); + if (ret != LDB_SUCCESS) return NT_STATUS_INTERNAL_DB_CORRUPTION; + talloc_steal(tmp_mem, res); + DEBUG(10,("WINS scavenging: filter '%s' count %d\n", filter, res->count)); + + tombstone_extra_time = timeval_add(&service->startup_time, + service->config.tombstone_extra_timeout, + 0); + delete_tombstones = timeval_expired(&tombstone_extra_time); + + for (i=0; i < res->count; i++) { + /* + * we pass '0' as 'now' here, + * because we want to get the raw timestamps which are in the DB + */ + status = winsdb_record(service->wins_db, res->msgs[i], tmp_mem, 0, &rec); + NT_STATUS_NOT_OK_RETURN(status); + talloc_free(res->msgs[i]); + + modify_flags = 0; + modify_record = false; + delete_record = false; + + switch (rec->state) { + case WREPL_STATE_ACTIVE: + old_state = "active"; + new_state = "active"; + if (!rec->is_static) { + new_state = "released"; + rec->state = WREPL_STATE_RELEASED; + rec->expire_time= service->config.tombstone_interval + now; + } + modify_flags = 0; + modify_record = true; + break; + + case WREPL_STATE_RELEASED: + old_state = "released"; + new_state = "tombstone"; + rec->state = WREPL_STATE_TOMBSTONE; + rec->expire_time= service->config.tombstone_timeout + now; + modify_flags = WINSDB_FLAG_ALLOC_VERSION | WINSDB_FLAG_TAKE_OWNERSHIP; + modify_record = true; + break; + + case WREPL_STATE_TOMBSTONE: + old_state = "tombstone"; + new_state = "tombstone"; + if (!delete_tombstones) break; + new_state = "deleted"; + delete_record = true; + break; + + case WREPL_STATE_RESERVED: + DEBUG(0,("%s: corrupted record: %s\n", + __location__, nbt_name_string(rec, rec->name))); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + + if (modify_record) { + action = "modify"; + ret = winsdb_modify(service->wins_db, rec, modify_flags); + } else if (delete_record) { + action = "delete"; + ret = winsdb_delete(service->wins_db, rec); + } else { + action = "skip"; + ret = NBT_RCODE_OK; + } + + if (ret != NBT_RCODE_OK) { + DEBUG(1,("WINS scavenging: failed to %s name %s (owned:%s -> owned:%s): error:%u\n", + action, nbt_name_string(rec, rec->name), old_state, new_state, ret)); + } else { + DEBUG(4,("WINS scavenging: %s name: %s (owned:%s -> owned:%s)\n", + action, nbt_name_string(rec, rec->name), old_state, new_state)); + } + + talloc_free(rec); + } + + return NT_STATUS_OK; +} + +static NTSTATUS wreplsrv_scavenging_replica_non_active_records(struct wreplsrv_service *service, TALLOC_CTX *tmp_mem) +{ + NTSTATUS status; + struct winsdb_record *rec = NULL; + struct ldb_result *res = NULL; + const char *owner_filter; + const char *filter; + uint32_t i; + int ret; + time_t now = time(NULL); + const char *now_timestr; + const char *action; + const char *old_state=NULL; + const char *new_state=NULL; + uint32_t modify_flags; + bool modify_record; + bool delete_record; + bool delete_tombstones; + struct timeval tombstone_extra_time; + + now_timestr = ldb_timestring(tmp_mem, now); + NT_STATUS_HAVE_NO_MEMORY(now_timestr); + owner_filter = wreplsrv_owner_filter(service, tmp_mem, + service->wins_db->local_owner); + NT_STATUS_HAVE_NO_MEMORY(owner_filter); + filter = talloc_asprintf(tmp_mem, + "(&(!%s)(objectClass=winsRecord)" + "(!(recordState=%u))(expireTime<=%s))", + owner_filter, WREPL_STATE_ACTIVE, now_timestr); + NT_STATUS_HAVE_NO_MEMORY(filter); + ret = ldb_search(service->wins_db->ldb, NULL, LDB_SCOPE_SUBTREE, filter, NULL, &res); + if (ret != LDB_SUCCESS) return NT_STATUS_INTERNAL_DB_CORRUPTION; + talloc_steal(tmp_mem, res); + DEBUG(10,("WINS scavenging: filter '%s' count %d\n", filter, res->count)); + + tombstone_extra_time = timeval_add(&service->startup_time, + service->config.tombstone_extra_timeout, + 0); + delete_tombstones = timeval_expired(&tombstone_extra_time); + + for (i=0; i < res->count; i++) { + /* + * we pass '0' as 'now' here, + * because we want to get the raw timestamps which are in the DB + */ + status = winsdb_record(service->wins_db, res->msgs[i], tmp_mem, 0, &rec); + NT_STATUS_NOT_OK_RETURN(status); + talloc_free(res->msgs[i]); + + modify_flags = 0; + modify_record = false; + delete_record = false; + + switch (rec->state) { + case WREPL_STATE_ACTIVE: + DEBUG(0,("%s: corrupted record: %s\n", + __location__, nbt_name_string(rec, rec->name))); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + + case WREPL_STATE_RELEASED: + old_state = "released"; + new_state = "tombstone"; + rec->state = WREPL_STATE_TOMBSTONE; + rec->expire_time= service->config.tombstone_timeout + now; + modify_flags = 0; + modify_record = true; + break; + + case WREPL_STATE_TOMBSTONE: + old_state = "tombstone"; + new_state = "tombstone"; + if (!delete_tombstones) break; + new_state = "deleted"; + delete_record = true; + break; + + case WREPL_STATE_RESERVED: + DEBUG(0,("%s: corrupted record: %s\n", + __location__, nbt_name_string(rec, rec->name))); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + + if (modify_record) { + action = "modify"; + ret = winsdb_modify(service->wins_db, rec, modify_flags); + } else if (delete_record) { + action = "delete"; + ret = winsdb_delete(service->wins_db, rec); + } else { + action = "skip"; + ret = NBT_RCODE_OK; + } + + if (ret != NBT_RCODE_OK) { + DEBUG(1,("WINS scavenging: failed to %s name %s (replica:%s -> replica:%s): error:%u\n", + action, nbt_name_string(rec, rec->name), old_state, new_state, ret)); + } else { + DEBUG(4,("WINS scavenging: %s name: %s (replica:%s -> replica:%s)\n", + action, nbt_name_string(rec, rec->name), old_state, new_state)); + } + + talloc_free(rec); + } + + return NT_STATUS_OK; +} + +struct verify_state { + struct messaging_context *msg_ctx; + struct wreplsrv_service *service; + struct winsdb_record *rec; + struct nbtd_proxy_wins_challenge r; +}; + +static void verify_handler(struct irpc_request *ireq) +{ + struct verify_state *s = talloc_get_type(ireq->async.private, + struct verify_state); + struct winsdb_record *rec = s->rec; + const char *action; + const char *old_state = "active"; + const char *new_state = "active"; + const char *new_owner = "replica"; + uint32_t modify_flags = 0; + bool modify_record = false; + bool delete_record = false; + bool different = false; + int ret; + NTSTATUS status; + uint32_t i, j; + + /* + * - if the name isn't present anymore remove our record + * - if the name is found and not a normal group check if the addresses match, + * - if they don't match remove the record + * - if they match do nothing + * - if an error happens do nothing + */ + status = irpc_call_recv(ireq); + if (NT_STATUS_EQUAL(NT_STATUS_OBJECT_NAME_NOT_FOUND, status)) { + delete_record = true; + new_state = "deleted"; + } else if (NT_STATUS_IS_OK(status) && rec->type != WREPL_TYPE_GROUP) { + for (i=0; i < s->r.out.num_addrs; i++) { + bool found = false; + for (j=0; rec->addresses[j]; j++) { + if (strcmp(s->r.out.addrs[i].addr, rec->addresses[j]->address) == 0) { + found = true; + break; + } + } + if (!found) { + different = true; + break; + } + } + } else if (NT_STATUS_IS_OK(status) && rec->type == WREPL_TYPE_GROUP) { + if (s->r.out.num_addrs != 1 || strcmp(s->r.out.addrs[0].addr, "255.255.255.255") != 0) { + different = true; + } + } + + if (different) { + /* + * if the reply from the owning wins server has different addresses + * then take the ownership of the record and make it a tombstone + * this will then hopefully replicated to the original owner of the record + * which will then propagate it's own record, so that the current record will + * be replicated to to us + */ + DEBUG(0,("WINS scavenging: replica %s verify got different addresses from winsserver: %s: tombstoning record\n", + nbt_name_string(rec, rec->name), rec->wins_owner)); + + rec->state = WREPL_STATE_TOMBSTONE; + rec->expire_time= time(NULL) + s->service->config.tombstone_timeout; + for (i=0; rec->addresses[i]; i++) { + rec->addresses[i]->expire_time = rec->expire_time; + } + modify_record = true; + modify_flags = WINSDB_FLAG_ALLOC_VERSION | WINSDB_FLAG_TAKE_OWNERSHIP; + new_state = "tombstone"; + new_owner = "owned"; + } else if (NT_STATUS_IS_OK(status)) { + /* if the addresses are the same, just update the timestamps */ + rec->expire_time = time(NULL) + s->service->config.verify_interval; + for (i=0; rec->addresses[i]; i++) { + rec->addresses[i]->expire_time = rec->expire_time; + } + modify_record = true; + modify_flags = 0; + new_state = "active"; + } + + if (modify_record) { + action = "modify"; + ret = winsdb_modify(s->service->wins_db, rec, modify_flags); + } else if (delete_record) { + action = "delete"; + ret = winsdb_delete(s->service->wins_db, rec); + } else { + action = "skip"; + ret = NBT_RCODE_OK; + } + + if (ret != NBT_RCODE_OK) { + DEBUG(1,("WINS scavenging: failed to %s name %s (replica:%s -> %s:%s): error:%u\n", + action, nbt_name_string(rec, rec->name), old_state, new_owner, new_state, ret)); + } else { + DEBUG(4,("WINS scavenging: %s name: %s (replica:%s -> %s:%s): %s: %s\n", + action, nbt_name_string(rec, rec->name), old_state, new_owner, new_state, + rec->wins_owner, nt_errstr(status))); + } + + talloc_free(s); +} + +static NTSTATUS wreplsrv_scavenging_replica_active_records(struct wreplsrv_service *service, TALLOC_CTX *tmp_mem) +{ + NTSTATUS status; + struct winsdb_record *rec = NULL; + struct ldb_result *res = NULL; + const char *owner_filter; + const char *filter; + uint32_t i; + int ret; + time_t now = time(NULL); + const char *now_timestr; + struct irpc_request *ireq; + struct verify_state *s; + struct server_id *nbt_servers; + + nbt_servers = irpc_servers_byname(service->task->msg_ctx, tmp_mem, "nbt_server"); + if ((nbt_servers == NULL) || (nbt_servers[0].id == 0)) { + return NT_STATUS_INTERNAL_ERROR; + } + + now_timestr = ldb_timestring(tmp_mem, now); + NT_STATUS_HAVE_NO_MEMORY(now_timestr); + owner_filter = wreplsrv_owner_filter(service, tmp_mem, + service->wins_db->local_owner); + NT_STATUS_HAVE_NO_MEMORY(owner_filter); + filter = talloc_asprintf(tmp_mem, + "(&(!%s)(objectClass=winsRecord)" + "(recordState=%u)(expireTime<=%s))", + owner_filter, WREPL_STATE_ACTIVE, now_timestr); + NT_STATUS_HAVE_NO_MEMORY(filter); + ret = ldb_search(service->wins_db->ldb, NULL, LDB_SCOPE_SUBTREE, filter, NULL, &res); + if (ret != LDB_SUCCESS) return NT_STATUS_INTERNAL_DB_CORRUPTION; + talloc_steal(tmp_mem, res); + DEBUG(10,("WINS scavenging: filter '%s' count %d\n", filter, res->count)); + + for (i=0; i < res->count; i++) { + /* + * we pass '0' as 'now' here, + * because we want to get the raw timestamps which are in the DB + */ + status = winsdb_record(service->wins_db, res->msgs[i], tmp_mem, 0, &rec); + NT_STATUS_NOT_OK_RETURN(status); + talloc_free(res->msgs[i]); + + if (rec->state != WREPL_STATE_ACTIVE) { + DEBUG(0,("%s: corrupted record: %s\n", + __location__, nbt_name_string(rec, rec->name))); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + + /* + * ask the owning wins server if the record still exists, + * if not delete the record + * + * TODO: NOTE: this is a simpliefied version, to verify that + * a record still exist, I assume that w2k3 uses + * DCERPC calls or some WINSREPL packets for this, + * but we use a wins name query + */ + DEBUG(0,("ask wins server '%s' if '%s' with version_id:%llu still exists\n", + rec->wins_owner, nbt_name_string(rec, rec->name), + (unsigned long long)rec->version)); + + s = talloc_zero(tmp_mem, struct verify_state); + NT_STATUS_HAVE_NO_MEMORY(s); + s->msg_ctx = service->task->msg_ctx; + s->service = service; + s->rec = talloc_steal(s, rec); + + s->r.in.name = *rec->name; + s->r.in.num_addrs = 1; + s->r.in.addrs = talloc_array(s, struct nbtd_proxy_wins_addr, s->r.in.num_addrs); + NT_STATUS_HAVE_NO_MEMORY(s->r.in.addrs); + /* TODO: fix pidl to handle inline ipv4address arrays */ + s->r.in.addrs[0].addr = rec->wins_owner; + + ireq = IRPC_CALL_SEND(s->msg_ctx, nbt_servers[0], + irpc, NBTD_PROXY_WINS_CHALLENGE, + &s->r, s); + NT_STATUS_HAVE_NO_MEMORY(ireq); + + ireq->async.fn = verify_handler; + ireq->async.private = s; + + talloc_steal(service, s); + } + + return NT_STATUS_OK; +} + +NTSTATUS wreplsrv_scavenging_run(struct wreplsrv_service *service) +{ + NTSTATUS status; + TALLOC_CTX *tmp_mem; + bool skip_first_run = false; + + if (!timeval_expired(&service->scavenging.next_run)) { + return NT_STATUS_OK; + } + + if (timeval_is_zero(&service->scavenging.next_run)) { + skip_first_run = true; + } + + service->scavenging.next_run = timeval_current_ofs(service->config.scavenging_interval, 0); + status = wreplsrv_periodic_schedule(service, service->config.scavenging_interval); + NT_STATUS_NOT_OK_RETURN(status); + + /* + * if it's the first time this functions is called (startup) + * the next_run is zero, in this case we should not do scavenging + */ + if (skip_first_run) { + return NT_STATUS_OK; + } + + if (service->scavenging.processing) { + return NT_STATUS_OK; + } + + DEBUG(4,("wreplsrv_scavenging_run(): start\n")); + + tmp_mem = talloc_new(service); + NT_STATUS_HAVE_NO_MEMORY(tmp_mem); + service->scavenging.processing = true; + status = wreplsrv_scavenging_owned_records(service,tmp_mem); + service->scavenging.processing = false; + talloc_free(tmp_mem); + NT_STATUS_NOT_OK_RETURN(status); + + tmp_mem = talloc_new(service); + NT_STATUS_HAVE_NO_MEMORY(tmp_mem); + service->scavenging.processing = true; + status = wreplsrv_scavenging_replica_non_active_records(service, tmp_mem); + service->scavenging.processing = false; + talloc_free(tmp_mem); + NT_STATUS_NOT_OK_RETURN(status); + + tmp_mem = talloc_new(service); + NT_STATUS_HAVE_NO_MEMORY(tmp_mem); + service->scavenging.processing = true; + status = wreplsrv_scavenging_replica_active_records(service, tmp_mem); + service->scavenging.processing = false; + talloc_free(tmp_mem); + NT_STATUS_NOT_OK_RETURN(status); + + DEBUG(4,("wreplsrv_scavenging_run(): end\n")); + + return NT_STATUS_OK; +} diff --git a/source4/wrepl_server/wrepl_server.c b/source4/wrepl_server/wrepl_server.c new file mode 100644 index 0000000000..b703066986 --- /dev/null +++ b/source4/wrepl_server/wrepl_server.c @@ -0,0 +1,515 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "includes.h" +#include "lib/util/dlinklist.h" +#include "smbd/service_task.h" +#include "smbd/service.h" +#include "lib/messaging/irpc.h" +#include "librpc/gen_ndr/winsrepl.h" +#include "wrepl_server/wrepl_server.h" +#include "nbt_server/wins/winsdb.h" +#include "ldb/include/ldb.h" +#include "ldb/include/ldb_errors.h" +#include "auth/auth.h" +#include "ldb_wrap.h" +#include "param/param.h" +#include "lib/socket/netif.h" + +static struct ldb_context *wins_config_db_connect(TALLOC_CTX *mem_ctx, + struct event_context *ev_ctx, + struct loadparm_context *lp_ctx) +{ + return ldb_wrap_connect(mem_ctx, ev_ctx, lp_ctx, private_path(mem_ctx, + lp_ctx, lp_wins_config_url(lp_ctx)), + system_session(mem_ctx, lp_ctx), NULL, 0, NULL); +} + +static uint64_t wins_config_db_get_seqnumber(struct ldb_context *ldb) +{ + int ret; + struct ldb_dn *dn; + struct ldb_result *res = NULL; + TALLOC_CTX *tmp_ctx = talloc_new(ldb); + uint64_t seqnumber = 0; + + dn = ldb_dn_new(tmp_ctx, ldb, "@BASEINFO"); + if (!dn) goto failed; + + /* find the record in the WINS database */ + ret = ldb_search(ldb, dn, LDB_SCOPE_BASE, + NULL, NULL, &res); + if (ret != LDB_SUCCESS) goto failed; + talloc_steal(tmp_ctx, res); + if (res->count > 1) goto failed; + + if (res->count == 1) { + seqnumber = ldb_msg_find_attr_as_uint64(res->msgs[0], "sequenceNumber", 0); + } + +failed: + talloc_free(tmp_ctx); + return seqnumber; +} + +/* + open winsdb +*/ +static NTSTATUS wreplsrv_open_winsdb(struct wreplsrv_service *service, + struct loadparm_context *lp_ctx) +{ + const char *owner = lp_parm_string(lp_ctx, NULL, "winsdb", "local_owner"); + + if (owner == NULL) { + struct interface *ifaces; + load_interfaces(service, lp_interfaces(lp_ctx), &ifaces); + owner = iface_n_ip(ifaces, 0); + } + + service->wins_db = winsdb_connect(service, service->task->event_ctx, lp_ctx, owner, WINSDB_HANDLE_CALLER_WREPL); + if (!service->wins_db) { + return NT_STATUS_INTERNAL_DB_ERROR; + } + + service->config.ldb = wins_config_db_connect(service, service->task->event_ctx, lp_ctx); + if (!service->config.ldb) { + return NT_STATUS_INTERNAL_DB_ERROR; + } + + /* the default renew interval is 6 days */ + service->config.renew_interval = lp_parm_int(lp_ctx, NULL,"wreplsrv","renew_interval", 6*24*60*60); + + /* the default tombstone (extinction) interval is 6 days */ + service->config.tombstone_interval= lp_parm_int(lp_ctx, NULL,"wreplsrv","tombstone_interval", 6*24*60*60); + + /* the default tombstone (extinction) timeout is 1 day */ + service->config.tombstone_timeout = lp_parm_int(lp_ctx, NULL,"wreplsrv","tombstone_timeout", 1*24*60*60); + + /* the default tombstone extra timeout is 3 days */ + service->config.tombstone_extra_timeout = lp_parm_int(lp_ctx, NULL,"wreplsrv","tombstone_extra_timeout", 3*24*60*60); + + /* the default verify interval is 24 days */ + service->config.verify_interval = lp_parm_int(lp_ctx, NULL,"wreplsrv","verify_interval", 24*24*60*60); + + /* the default scavenging interval is 'renew_interval/2' */ + service->config.scavenging_interval=lp_parm_int(lp_ctx, NULL,"wreplsrv","scavenging_interval", + service->config.renew_interval/2); + + /* the maximun interval to the next periodic processing event */ + service->config.periodic_interval = lp_parm_int(lp_ctx, NULL,"wreplsrv","periodic_interval", 15); + + return NT_STATUS_OK; +} + +struct wreplsrv_partner *wreplsrv_find_partner(struct wreplsrv_service *service, const char *peer_addr) +{ + struct wreplsrv_partner *cur; + + for (cur = service->partners; cur; cur = cur->next) { + if (strcmp(cur->address, peer_addr) == 0) { + return cur; + } + } + + return NULL; +} + +/* + load our replication partners +*/ +NTSTATUS wreplsrv_load_partners(struct wreplsrv_service *service) +{ + struct wreplsrv_partner *partner; + struct ldb_result *res = NULL; + int ret; + TALLOC_CTX *tmp_ctx; + int i; + uint64_t new_seqnumber; + + new_seqnumber = wins_config_db_get_seqnumber(service->config.ldb); + + /* if it's not the first run and nothing changed we're done */ + if (service->config.seqnumber != 0 && service->config.seqnumber == new_seqnumber) { + return NT_STATUS_OK; + } + + tmp_ctx = talloc_new(service); + NT_STATUS_HAVE_NO_MEMORY(tmp_ctx); + + service->config.seqnumber = new_seqnumber; + + /* find the record in the WINS database */ + ret = ldb_search(service->config.ldb, ldb_dn_new(tmp_ctx, service->config.ldb, "CN=PARTNERS"), LDB_SCOPE_SUBTREE, + "(objectClass=wreplPartner)", NULL, &res); + if (ret != LDB_SUCCESS) goto failed; + talloc_steal(tmp_ctx, res); + + /* first disable all existing partners */ + for (partner=service->partners; partner; partner = partner->next) { + partner->type = WINSREPL_PARTNER_NONE; + } + + for (i=0; i < res->count; i++) { + const char *address; + + address = ldb_msg_find_attr_as_string(res->msgs[i], "address", NULL); + if (!address) { + goto failed; + } + + partner = wreplsrv_find_partner(service, address); + if (partner) { + if (partner->name != partner->address) { + talloc_free(discard_const(partner->name)); + } + partner->name = NULL; + talloc_free(discard_const(partner->our_address)); + partner->our_address = NULL; + + /* force rescheduling of pulling */ + partner->pull.next_run = timeval_zero(); + } else { + partner = talloc_zero(service, struct wreplsrv_partner); + if (partner == NULL) goto failed; + + partner->service = service; + partner->address = address; + talloc_steal(partner, partner->address); + + DLIST_ADD_END(service->partners, partner, struct wreplsrv_partner *); + } + + partner->name = ldb_msg_find_attr_as_string(res->msgs[i], "name", partner->address); + talloc_steal(partner, partner->name); + partner->our_address = ldb_msg_find_attr_as_string(res->msgs[i], "ourAddress", NULL); + talloc_steal(partner, partner->our_address); + + partner->type = ldb_msg_find_attr_as_uint(res->msgs[i], "type", WINSREPL_PARTNER_BOTH); + partner->pull.interval = ldb_msg_find_attr_as_uint(res->msgs[i], "pullInterval", + WINSREPL_DEFAULT_PULL_INTERVAL); + partner->pull.retry_interval = ldb_msg_find_attr_as_uint(res->msgs[i], "pullRetryInterval", + WINSREPL_DEFAULT_PULL_RETRY_INTERVAL); + partner->push.change_count = ldb_msg_find_attr_as_uint(res->msgs[i], "pushChangeCount", + WINSREPL_DEFAULT_PUSH_CHANGE_COUNT); + partner->push.use_inform = ldb_msg_find_attr_as_uint(res->msgs[i], "pushUseInform", false); + + DEBUG(3,("wreplsrv_load_partners: found partner: %s type: 0x%X\n", + partner->address, partner->type)); + } + + DEBUG(2,("wreplsrv_load_partners: %u partners found: wins_config_db seqnumber %llu\n", + res->count, (unsigned long long)service->config.seqnumber)); + + talloc_free(tmp_ctx); + return NT_STATUS_OK; +failed: + talloc_free(tmp_ctx); + return NT_STATUS_FOOBAR; +} + +NTSTATUS wreplsrv_fill_wrepl_table(struct wreplsrv_service *service, + TALLOC_CTX *mem_ctx, + struct wrepl_table *table_out, + const char *initiator, + bool full_table) +{ + struct wreplsrv_owner *cur; + uint32_t i = 0; + + table_out->partner_count = 0; + table_out->partners = NULL; + table_out->initiator = initiator; + + for (cur = service->table; cur; cur = cur->next) { + if (full_table) { + table_out->partner_count++; + continue; + } + + if (strcmp(initiator, cur->owner.address) != 0) continue; + + table_out->partner_count++; + break; + } + + table_out->partners = talloc_array(mem_ctx, struct wrepl_wins_owner, table_out->partner_count); + NT_STATUS_HAVE_NO_MEMORY(table_out->partners); + + for (cur = service->table; cur && i < table_out->partner_count; cur = cur->next) { + /* + * if it's our local entry + * update the max version + */ + if (cur == service->owner) { + cur->owner.max_version = winsdb_get_maxVersion(service->wins_db); + } + + if (full_table) { + table_out->partners[i] = cur->owner; + i++; + continue; + } + + if (strcmp(initiator, cur->owner.address) != 0) continue; + + table_out->partners[i] = cur->owner; + i++; + break; + } + + return NT_STATUS_OK; +} + +struct wreplsrv_owner *wreplsrv_find_owner(struct wreplsrv_service *service, + struct wreplsrv_owner *table, + const char *wins_owner) +{ + struct wreplsrv_owner *cur; + + for (cur = table; cur; cur = cur->next) { + if (strcmp(cur->owner.address, wins_owner) == 0) { + /* + * if it's our local entry + * update the max version + */ + if (cur == service->owner) { + cur->owner.max_version = winsdb_get_maxVersion(service->wins_db); + } + return cur; + } + } + + return NULL; +} + +/* + update the wins_owner_table max_version, if the given version is the highest version + if no entry for the wins_owner exists yet, create one +*/ +NTSTATUS wreplsrv_add_table(struct wreplsrv_service *service, + TALLOC_CTX *mem_ctx, struct wreplsrv_owner **_table, + const char *wins_owner, uint64_t version) +{ + struct wreplsrv_owner *table = *_table; + struct wreplsrv_owner *cur; + + if (!wins_owner || strcmp(wins_owner, "0.0.0.0") == 0) { + wins_owner = service->wins_db->local_owner; + } + + cur = wreplsrv_find_owner(service, table, wins_owner); + + /* if it doesn't exists yet, create one */ + if (!cur) { + cur = talloc_zero(mem_ctx, struct wreplsrv_owner); + NT_STATUS_HAVE_NO_MEMORY(cur); + + cur->owner.address = talloc_strdup(cur, wins_owner); + NT_STATUS_HAVE_NO_MEMORY(cur->owner.address); + cur->owner.min_version = 0; + cur->owner.max_version = 0; + cur->owner.type = 1; /* don't know why this is always 1 */ + + cur->partner = wreplsrv_find_partner(service, wins_owner); + + DLIST_ADD_END(table, cur, struct wreplsrv_owner *); + *_table = table; + } + + /* the min_version is always 0 here, and won't be updated */ + + /* if the given version is higher than the current max_version, update */ + if (cur->owner.max_version < version) { + cur->owner.max_version = version; + /* if it's for our local db, we need to update the wins.ldb too */ + if (cur == service->owner) { + uint64_t ret; + ret = winsdb_set_maxVersion(service->wins_db, cur->owner.max_version); + if (ret != cur->owner.max_version) { + DEBUG(0,("winsdb_set_maxVersion(%llu) failed: %llu\n", + (unsigned long long)cur->owner.max_version, + (unsigned long long)ret)); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + } + } + + return NT_STATUS_OK; +} + +/* + load the partner table +*/ +static NTSTATUS wreplsrv_load_table(struct wreplsrv_service *service) +{ + struct ldb_result *res = NULL; + int ret; + NTSTATUS status; + TALLOC_CTX *tmp_ctx = talloc_new(service); + struct ldb_context *ldb = service->wins_db->ldb; + int i; + struct wreplsrv_owner *local_owner; + const char *wins_owner; + uint64_t version; + const char * const attrs[] = { + "winsOwner", + "versionID", + NULL + }; + + /* + * make sure we have our local entry in the list, + * but we set service->owner when we're done + * to avoid to many calls to wreplsrv_local_max_version() + */ + status = wreplsrv_add_table(service, + service, &service->table, + service->wins_db->local_owner, 0); + if (!NT_STATUS_IS_OK(status)) goto failed; + local_owner = wreplsrv_find_owner(service, service->table, service->wins_db->local_owner); + if (!local_owner) { + status = NT_STATUS_INTERNAL_ERROR; + goto failed; + } + + /* find the record in the WINS database */ + ret = ldb_search(ldb, NULL, LDB_SCOPE_SUBTREE, + "(objectClass=winsRecord)", attrs, &res); + status = NT_STATUS_INTERNAL_DB_CORRUPTION; + if (ret != LDB_SUCCESS) goto failed; + talloc_steal(tmp_ctx, res); + + for (i=0; i < res->count; i++) { + wins_owner = ldb_msg_find_attr_as_string(res->msgs[i], "winsOwner", NULL); + version = ldb_msg_find_attr_as_uint64(res->msgs[i], "versionID", 0); + + status = wreplsrv_add_table(service, + service, &service->table, + wins_owner, version); + if (!NT_STATUS_IS_OK(status)) goto failed; + talloc_free(res->msgs[i]); + } + + /* + * this makes sure we call wreplsrv_local_max_version() before returning in + * wreplsrv_find_owner() + */ + service->owner = local_owner; + + /* + * this makes sure the maxVersion in the database is updated, + * with the highest version we found, if this is higher than the current stored one + */ + status = wreplsrv_add_table(service, + service, &service->table, + service->wins_db->local_owner, local_owner->owner.max_version); + if (!NT_STATUS_IS_OK(status)) goto failed; + + talloc_free(tmp_ctx); + return NT_STATUS_OK; +failed: + talloc_free(tmp_ctx); + return status; +} + +/* + setup our replication partners +*/ +static NTSTATUS wreplsrv_setup_partners(struct wreplsrv_service *service) +{ + NTSTATUS status; + + status = wreplsrv_load_partners(service); + NT_STATUS_NOT_OK_RETURN(status); + + status = wreplsrv_load_table(service); + NT_STATUS_NOT_OK_RETURN(status); + + return NT_STATUS_OK; +} + +/* + startup the wrepl task +*/ +static void wreplsrv_task_init(struct task_server *task) +{ + NTSTATUS status; + struct wreplsrv_service *service; + + if (!lp_wins_support(task->lp_ctx)) { + return; + } + + task_server_set_title(task, "task[wreplsrv]"); + + service = talloc_zero(task, struct wreplsrv_service); + if (!service) { + task_server_terminate(task, "wreplsrv_task_init: out of memory"); + return; + } + service->task = task; + service->startup_time = timeval_current(); + task->private = service; + + /* + * setup up all partners, and open the winsdb + */ + status = wreplsrv_open_winsdb(service, task->lp_ctx); + if (!NT_STATUS_IS_OK(status)) { + task_server_terminate(task, "wreplsrv_task_init: wreplsrv_open_winsdb() failed"); + return; + } + + /* + * setup timed events for each partner we want to pull from + */ + status = wreplsrv_setup_partners(service); + if (!NT_STATUS_IS_OK(status)) { + task_server_terminate(task, "wreplsrv_task_init: wreplsrv_setup_partners() failed"); + return; + } + + /* + * setup listen sockets, so we can anwser requests from our partners, + * which pull from us + */ + status = wreplsrv_setup_sockets(service, task->lp_ctx); + if (!NT_STATUS_IS_OK(status)) { + task_server_terminate(task, "wreplsrv_task_init: wreplsrv_setup_sockets() failed"); + return; + } + + status = wreplsrv_setup_periodic(service); + if (!NT_STATUS_IS_OK(status)) { + task_server_terminate(task, "wreplsrv_task_init: wreplsrv_setup_periodic() failed"); + return; + } + + irpc_add_name(task->msg_ctx, "wrepl_server"); +} + +/* + register ourselves as a available server +*/ +NTSTATUS server_service_wrepl_init(void) +{ + return register_server_service("wrepl", wreplsrv_task_init); +} diff --git a/source4/wrepl_server/wrepl_server.h b/source4/wrepl_server/wrepl_server.h new file mode 100644 index 0000000000..a001c6b3ae --- /dev/null +++ b/source4/wrepl_server/wrepl_server.h @@ -0,0 +1,313 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +struct wreplsrv_service; +struct wreplsrv_in_connection; +struct wreplsrv_out_connection; +struct wreplsrv_partner; + +#define WREPLSRV_VALID_ASSOC_CTX 0x12345678 +#define WREPLSRV_INVALID_ASSOC_CTX 0x0000000a + +/* + state of an incoming wrepl call +*/ +struct wreplsrv_in_call { + struct wreplsrv_in_connection *wreplconn; + struct wrepl_packet req_packet; + struct wrepl_packet rep_packet; + bool terminate_after_send; +}; + +/* + state of an incoming wrepl connection +*/ +struct wreplsrv_in_connection { + struct wreplsrv_in_connection *prev,*next; + struct stream_connection *conn; + struct packet_context *packet; + + /* our global service context */ + struct wreplsrv_service *service; + + /* + * the partner that connects us, + * can be NULL, when we got a connection + * from an unknown address + */ + struct wreplsrv_partner *partner; + + /* keep track of the assoc_ctx's */ + struct { + bool stopped; + uint32_t our_ctx; + uint32_t peer_ctx; + } assoc_ctx; +}; + +/* + state of an outgoing wrepl connection +*/ +struct wreplsrv_out_connection { + /* our global service context */ + struct wreplsrv_service *service; + + /* + * the partner we connect + */ + struct wreplsrv_partner *partner; + + /* keep track of the assoc_ctx's */ + struct { + uint32_t our_ctx; + uint32_t peer_ctx; + } assoc_ctx; + + /* + * the client socket to the partner, + * NULL if not yet connected + */ + struct wrepl_socket *sock; +}; + +enum winsrepl_partner_type { + WINSREPL_PARTNER_NONE = 0x0, + WINSREPL_PARTNER_PULL = 0x1, + WINSREPL_PARTNER_PUSH = 0x2, + WINSREPL_PARTNER_BOTH = (WINSREPL_PARTNER_PULL | WINSREPL_PARTNER_PUSH) +}; + +#define WINSREPL_DEFAULT_PULL_INTERVAL (30*60) +#define WINSREPL_DEFAULT_PULL_RETRY_INTERVAL (30) + +#define WINSREPL_DEFAULT_PUSH_CHANGE_COUNT (0) + +/* + this represents one of our configured partners +*/ +struct wreplsrv_partner { + struct wreplsrv_partner *prev,*next; + + /* our global service context */ + struct wreplsrv_service *service; + + /* the netbios name of the partner, mostly just for debugging */ + const char *name; + + /* the ip-address of the partner */ + const char *address; + + /* + * as wins partners identified by ip-address, we need to use a specific source-ip + * when we want to connect to the partner + */ + const char *our_address; + + /* the type of the partner, pull, push or both */ + enum winsrepl_partner_type type; + + /* pull specific options */ + struct { + /* the interval between 2 pull replications to the partner */ + uint32_t interval; + + /* the retry_interval if a pull cycle failed to the partner */ + uint32_t retry_interval; + + /* the error count till the last success */ + uint32_t error_count; + + /* the status of the last pull cycle */ + NTSTATUS last_status; + + /* the timestamp of the next pull try */ + struct timeval next_run; + + /* this is a list of each wins_owner the partner knows about */ + struct wreplsrv_owner *table; + + /* the outgoing connection to the partner */ + struct wreplsrv_out_connection *wreplconn; + + /* the current pending pull cycle request */ + struct composite_context *creq; + + /* the pull cycle io params */ + struct wreplsrv_pull_cycle_io *cycle_io; + + /* the current timed_event to the next pull cycle */ + struct timed_event *te; + } pull; + + /* push specific options */ + struct { + /* change count till push notification */ + uint32_t change_count; + + /* the last wins db maxVersion have reported to the partner */ + uint64_t maxVersionID; + + /* we should use WREPL_REPL_INFORM* messages to this partner */ + bool use_inform; + + /* the error count till the last success */ + uint32_t error_count; + + /* the status of the last push cycle */ + NTSTATUS last_status; + + /* the outgoing connection to the partner */ + struct wreplsrv_out_connection *wreplconn; + + /* the current push notification */ + struct composite_context *creq; + + /* the pull cycle io params */ + struct wreplsrv_push_notify_io *notify_io; + } push; +}; + +struct wreplsrv_owner { + struct wreplsrv_owner *prev,*next; + + /* this hold the owner_id (address), min_version, max_version and partner_type */ + struct wrepl_wins_owner owner; + + /* can be NULL if this owner isn't a configure partner */ + struct wreplsrv_partner *partner; +}; + +/* + state of the whole wrepl service +*/ +struct wreplsrv_service { + /* the whole wrepl service is in one task */ + struct task_server *task; + + /* the time the service was started */ + struct timeval startup_time; + + /* the winsdb handle */ + struct winsdb_handle *wins_db; + + /* some configuration */ + struct { + /* the wins config db handle */ + struct ldb_context *ldb; + + /* the last wins config db seqnumber we know about */ + uint64_t seqnumber; + + /* + * the interval (in secs) till an active record will be marked as RELEASED + */ + uint32_t renew_interval; + + /* + * the interval (in secs) a record remains in RELEASED state, + * before it will be marked as TOMBSTONE + * (also known as extinction interval) + */ + uint32_t tombstone_interval; + + /* + * the interval (in secs) a record remains in TOMBSTONE state, + * before it will be removed from the database. + * See also 'tombstone_extra_timeout'. + * (also known as extinction timeout) + */ + uint32_t tombstone_timeout; + + /* + * the interval (in secs) a record remains in TOMBSTONE state, + * even after 'tombstone_timeout' passes the current timestamp. + * this is the minimum uptime of the wrepl service, before + * we start delete tombstones. This is to prevent deletion of + * tombstones, without replacte them. + */ + uint32_t tombstone_extra_timeout; + + /* + * the interval (in secs) till a replica record will be verified + * with the owning wins server + */ + uint32_t verify_interval; + + /* + * the interval (in secs) till a do a database cleanup + */ + uint32_t scavenging_interval; + + /* + * the interval (in secs) to the next periodic processing + * (this is the maximun interval) + */ + uint32_t periodic_interval; + } config; + + /* all incoming connections */ + struct wreplsrv_in_connection *in_connections; + + /* all partners (pull and push) */ + struct wreplsrv_partner *partners; + + /* + * this is our local wins_owner entry, this is also in the table list + * but we need a pointer to it, because we need to update it on each + * query to wreplsrv_find_owner(), as the local records can be added + * to the wins.ldb from external tools and the winsserver + */ + struct wreplsrv_owner *owner; + + /* this is a list of each wins_owner we know about in our database */ + struct wreplsrv_owner *table; + + /* some stuff for periodic processing */ + struct { + /* + * the timestamp for the next event, + * this is the timstamp passed to event_add_timed() + */ + struct timeval next_event; + + /* here we have a reference to the timed event the schedules the periodic stuff */ + struct timed_event *te; + } periodic; + + /* some stuff for scavenging processing */ + struct { + /* + * the timestamp for the next scavenging run, + * this is the timstamp passed to event_add_timed() + */ + struct timeval next_run; + + /* + * are we currently inside a scavenging run + */ + bool processing; + } scavenging; +}; + +struct socket_context; +struct wrepl_name; +#include "wrepl_server/wrepl_out_helpers.h" +#include "wrepl_server/wrepl_server_proto.h" |