From 843432d56f9114a4d0d3021a772953dc5a1193dd Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Wed, 4 Apr 2012 14:51:43 +0200 Subject: s3: New notify implementation From notify_internal.c: /* * The notify database is split up into two databases: One * relatively static index db and the real notify db with the * volatile entries. */ This change is necessary to make notify scale better in a cluster --- source3/smbd/files.c | 4 - source3/smbd/notify.c | 36 +- source3/smbd/notify_internal.c | 1666 +++++++++++++++++++++++----------------- source3/smbd/proto.h | 27 +- source3/smbd/server.c | 100 ++- 5 files changed, 1076 insertions(+), 757 deletions(-) (limited to 'source3/smbd') diff --git a/source3/smbd/files.c b/source3/smbd/files.c index c71e864648..edcd98cd85 100644 --- a/source3/smbd/files.c +++ b/source3/smbd/files.c @@ -449,10 +449,6 @@ void file_free(struct smb_request *req, files_struct *fsp) if (fsp->notify) { struct notify_context *notify_ctx = fsp->conn->sconn->notify_ctx; - if (fsp->is_directory) { - notify_remove_onelevel(notify_ctx, - &fsp->file_id, fsp); - } notify_remove(notify_ctx, fsp); TALLOC_FREE(fsp->notify); } diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c index fd9e5524a7..0401b65a6a 100644 --- a/source3/smbd/notify.c +++ b/source3/smbd/notify.c @@ -188,7 +188,7 @@ NTSTATUS change_notify_create(struct files_struct *fsp, uint32 filter, { char *fullpath; size_t len; - struct notify_entry e; + uint32_t subdir_filter; NTSTATUS status = NT_STATUS_NOT_IMPLEMENTED; if (fsp->notify != NULL) { @@ -220,22 +220,14 @@ NTSTATUS change_notify_create(struct files_struct *fsp, uint32 filter, fullpath[len-2] = '\0'; } - ZERO_STRUCT(e); - e.path = fullpath; - e.dir_fd = fsp->fh->fd; - e.dir_id = fsp->file_id; - e.filter = filter; - e.subdir_filter = 0; - if (recursive) { - e.subdir_filter = filter; - } + subdir_filter = recursive ? filter : 0; if (fsp->conn->sconn->sys_notify_ctx != NULL) { void *sys_notify_handle = NULL; status = SMB_VFS_NOTIFY_WATCH( fsp->conn, fsp->conn->sconn->sys_notify_ctx, - e.path, &e.filter, &e.subdir_filter, + fullpath, &filter, &subdir_filter, sys_notify_callback, fsp, &sys_notify_handle); if (NT_STATUS_IS_OK(status)) { @@ -243,9 +235,10 @@ NTSTATUS change_notify_create(struct files_struct *fsp, uint32 filter, } } - if ((e.filter != 0) || (e.subdir_filter != 0)) { - status = notify_add(fsp->conn->sconn->notify_ctx, fsp->conn, - &e, notify_callback, fsp); + if ((filter != 0) || (subdir_filter != 0)) { + status = notify_add(fsp->conn->sconn->notify_ctx, + fullpath, filter, subdir_filter, + notify_callback, fsp); } TALLOC_FREE(fullpath); return status; @@ -389,25 +382,10 @@ void notify_fname(connection_struct *conn, uint32 action, uint32 filter, { struct notify_context *notify_ctx = conn->sconn->notify_ctx; char *fullpath; - char *parent; - const char *name; if (path[0] == '.' && path[1] == '/') { path += 2; } - if (parent_dirname(talloc_tos(), path, &parent, &name)) { - struct smb_filename smb_fname_parent; - - ZERO_STRUCT(smb_fname_parent); - smb_fname_parent.base_name = parent; - - if (SMB_VFS_STAT(conn, &smb_fname_parent) != -1) { - notify_onelevel(notify_ctx, action, filter, - SMB_VFS_FILE_ID_CREATE(conn, &smb_fname_parent.st), - name); - } - } - fullpath = talloc_asprintf(talloc_tos(), "%s/%s", conn->connectpath, path); if (fullpath == NULL) { diff --git a/source3/smbd/notify_internal.c b/source3/smbd/notify_internal.c index c036e8a000..6e6bdf7b03 100644 --- a/source3/smbd/notify_internal.c +++ b/source3/smbd/notify_internal.c @@ -2,6 +2,7 @@ Unix SMB/CIFS implementation. Copyright (C) Andrew Tridgell 2006 + Copyright (C) Volker Lendecke 2012 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -28,958 +29,1193 @@ #include "librpc/gen_ndr/ndr_notify.h" #include "dbwrap/dbwrap.h" #include "dbwrap/dbwrap_open.h" +#include "dbwrap/dbwrap_tdb.h" #include "smbd/smbd.h" #include "messages.h" #include "lib/tdb_wrap/tdb_wrap.h" #include "util_tdb.h" #include "lib/param/param.h" - -struct notify_context { - struct db_context *db_recursive; - struct db_context *db_onelevel; - struct server_id server; - struct messaging_context *messaging_ctx; - struct notify_list *list; - struct notify_array *array; - int seqnum; - TDB_DATA key; -}; - +#include "lib/dbwrap/dbwrap_cache.h" +#include "ctdb_srvids.h" +#include "ctdbd_conn.h" +#include "ctdb_conn.h" +#include "lib/util/tevent_unix.h" struct notify_list { struct notify_list *next, *prev; - void *private_data; + const char *path; void (*callback)(void *, const struct notify_event *); - int depth; + void *private_data; }; -#define NOTIFY_KEY "notify array" - -#define NOTIFY_ENABLE "notify:enable" -#define NOTIFY_ENABLE_DEFAULT True +struct notify_context { + struct messaging_context *msg; + struct notify_list *list; -static NTSTATUS notify_remove_all(struct notify_context *notify, - const struct server_id *server); -static void notify_handler(struct messaging_context *msg_ctx, void *private_data, - uint32_t msg_type, struct server_id server_id, DATA_BLOB *data); + /* + * The notify database is split up into two databases: One + * relatively static index db and the real notify db with the + * volatile entries. + */ -/* - destroy the notify context -*/ -static int notify_destructor(struct notify_context *notify) -{ - messaging_deregister(notify->messaging_ctx, MSG_PVFS_NOTIFY, notify); + /* + * "db_notify" is indexed by pathname. Per record it stores an + * array of notify_db_entry structs. These represent the + * notify records as requested by the smb client. This + * database is always held locally, it is never clustered. + */ + struct db_context *db_notify; - if (notify->list != NULL) { - notify_remove_all(notify, ¬ify->server); - } + /* + * "db_index" is indexed by pathname. The records are an array + * of VNNs which have any interest in notifies for this path + * name. + * + * In the non-clustered case this database is cached in RAM by + * means of db_cache_open, which maintains a cache per + * process. Cache consistency is maintained by the tdb + * sequence number. + * + * In the clustered case right now we can not use the tdb + * sequence number, but by means of read only records we + * should be able to avoid a lot of full migrations. + * + * In both cases, it is important to keep the update + * operations to db_index to a minimum. This is achieved by + * delayed deletion. When a db_notify is initially created, + * the db_index record is also created. When more notifies are + * add for a path, then only the db_notify record needs to be + * modified, the db_index record is not touched. When the last + * entry from the db_notify record is deleted, the db_index + * record is not immediately deleted. Instead, the db_notify + * record is replaced with a current timestamp. A regular + * cleanup process will delete all db_index records that are + * older than a minute. + */ + struct db_context *db_index; +}; - return 0; -} +static void notify_trigger_local(struct notify_context *notify, + uint32_t action, uint32_t filter, + const char *path, size_t path_len, + bool recursive); +static NTSTATUS notify_send(struct notify_context *notify, + struct server_id *pid, + const char *path, uint32_t action, + void *private_data); +static NTSTATUS notify_add_entry(struct db_record *rec, + const struct notify_db_entry *e, + bool *p_add_idx); +static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn); + +static NTSTATUS notify_del_entry(struct db_record *rec, + const struct server_id *pid, + void *private_data); +static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn); + +static int notify_context_destructor(struct notify_context *notify); + +static void notify_handler(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id server_id, DATA_BLOB *data); -/* - Open up the notify.tdb database. You should close it down using - talloc_free(). We need the messaging_ctx to allow for notifications - via internal messages -*/ struct notify_context *notify_init(TALLOC_CTX *mem_ctx, - struct messaging_context *messaging_ctx, + struct messaging_context *msg, struct event_context *ev) { struct notify_context *notify; notify = talloc(mem_ctx, struct notify_context); if (notify == NULL) { - return NULL; - } - - notify->db_recursive = db_open(notify, lock_path("notify.tdb"), - 0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH, - O_RDWR|O_CREAT, 0644, - DBWRAP_LOCK_ORDER_2); - if (notify->db_recursive == NULL) { - talloc_free(notify); - return NULL; + goto fail; } + notify->msg = msg; + notify->list = NULL; - notify->db_onelevel = db_open(notify, lock_path("notify_onelevel.tdb"), - 0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH, - O_RDWR|O_CREAT, 0644, - DBWRAP_LOCK_ORDER_2); - if (notify->db_onelevel == NULL) { - talloc_free(notify); - return NULL; + notify->db_notify = db_open_tdb( + notify, lock_path("notify.tdb"), + 0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH, + O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_2); + if (notify->db_notify == NULL) { + goto fail; + } + notify->db_index = db_open( + notify, lock_path("notify_index.tdb"), + 0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH, + O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_3); + if (notify->db_index == NULL) { + goto fail; + } + if (!lp_clustering()) { + notify->db_index = db_open_cache(notify, notify->db_index); + if (notify->db_index == NULL) { + goto fail; + } } - notify->server = messaging_server_id(messaging_ctx); - notify->messaging_ctx = messaging_ctx; - notify->list = NULL; - notify->array = NULL; - notify->seqnum = dbwrap_get_seqnum(notify->db_recursive); - notify->key = string_term_tdb_data(NOTIFY_KEY); + if (notify->msg != NULL) { + NTSTATUS status; - talloc_set_destructor(notify, notify_destructor); + status = messaging_register(notify->msg, notify, + MSG_PVFS_NOTIFY, notify_handler); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(1, ("messaging_register returned %s\n", + nt_errstr(status))); + goto fail; + } + } - /* register with the messaging subsystem for the notify - message type */ - messaging_register(notify->messaging_ctx, notify, - MSG_PVFS_NOTIFY, notify_handler); + talloc_set_destructor(notify, notify_context_destructor); return notify; +fail: + TALLOC_FREE(notify); + return NULL; } -bool notify_internal_parent_init(TALLOC_CTX *mem_ctx) +static int notify_context_destructor(struct notify_context *notify) { - struct tdb_wrap *db1, *db2; - struct loadparm_context *lp_ctx; - - if (lp_clustering()) { - return true; - } + DEBUG(10, ("notify_context_destructor called\n")); - lp_ctx = loadparm_init_s3(mem_ctx, loadparm_s3_context()); - if (lp_ctx == NULL) { - DEBUG(0, ("loadparm_init_s3 failed\n")); - return false; - } - /* - * Open the tdbs in the parent process (smbd) so that our - * CLEAR_IF_FIRST optimization in tdb_reopen_all can properly - * work. - */ - - db1 = tdb_wrap_open(mem_ctx, lock_path("notify.tdb"), - 0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH, - O_RDWR|O_CREAT, 0644, lp_ctx); - if (db1 == NULL) { - talloc_unlink(mem_ctx, lp_ctx); - DEBUG(1, ("could not open notify.tdb: %s\n", strerror(errno))); - return false; - } - db2 = tdb_wrap_open(mem_ctx, lock_path("notify_onelevel.tdb"), - 0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH, O_RDWR|O_CREAT, 0644, lp_ctx); - talloc_unlink(mem_ctx, lp_ctx); - if (db2 == NULL) { - DEBUG(1, ("could not open notify_onelevel.tdb: %s\n", - strerror(errno))); - TALLOC_FREE(db1); - return false; + if (notify->msg != NULL) { + messaging_deregister(notify->msg, MSG_PVFS_NOTIFY, notify); } - return true; -} -/* - lock and fetch the record -*/ -static NTSTATUS notify_fetch_locked(struct notify_context *notify, struct db_record **rec) -{ - *rec = dbwrap_fetch_locked(notify->db_recursive, notify, notify->key); - if (*rec == NULL) { - return NT_STATUS_INTERNAL_DB_CORRUPTION; + while (notify->list != NULL) { + DEBUG(10, ("Removing private_data=%p\n", + notify->list->private_data)); + notify_remove(notify, notify->list->private_data); } - return NT_STATUS_OK; + return 0; } -/* - load the notify array -*/ -static NTSTATUS notify_load(struct notify_context *notify, struct db_record *rec) +NTSTATUS notify_add(struct notify_context *notify, + const char *path, uint32_t filter, uint32_t subdir_filter, + void (*callback)(void *, const struct notify_event *), + void *private_data) { - TDB_DATA dbuf; - DATA_BLOB blob; + struct notify_db_entry e; + struct notify_list *listel; + struct db_record *notify_rec, *idx_rec; + bool add_idx; NTSTATUS status; - int seqnum; + TDB_DATA key, notify_copy; + + if (notify == NULL) { + return NT_STATUS_NOT_IMPLEMENTED; + } - seqnum = dbwrap_get_seqnum(notify->db_recursive); + DEBUG(10, ("notify_add: path=[%s], private_data=%p\n", path, + private_data)); - if (seqnum == notify->seqnum && notify->array != NULL) { - return NT_STATUS_OK; + listel = talloc(notify, struct notify_list); + if (listel == NULL) { + return NT_STATUS_NO_MEMORY; + } + listel->callback = callback; + listel->private_data = private_data; + listel->path = talloc_strdup(listel, path); + if (listel->path == NULL) { + TALLOC_FREE(listel); + return NT_STATUS_NO_MEMORY; } + DLIST_ADD(notify->list, listel); - notify->seqnum = seqnum; + ZERO_STRUCT(e); + e.filter = filter; + e.subdir_filter = subdir_filter; + e.server = messaging_server_id(notify->msg); + e.private_data = private_data; - talloc_free(notify->array); - notify->array = talloc_zero(notify, struct notify_array); - NT_STATUS_HAVE_NO_MEMORY(notify->array); + key = string_tdb_data(path); - if (!rec) { - status = dbwrap_fetch(notify->db_recursive, notify, - notify->key, &dbuf); - if (!NT_STATUS_IS_OK(status)) { - return NT_STATUS_INTERNAL_DB_CORRUPTION; - } - } else { - dbuf = dbwrap_record_get_value(rec); - } - - blob.data = (uint8 *)dbuf.dptr; - blob.length = dbuf.dsize; - - status = NT_STATUS_OK; - if (blob.length > 0) { - enum ndr_err_code ndr_err; - ndr_err = ndr_pull_struct_blob(&blob, notify->array, notify->array, - (ndr_pull_flags_fn_t)ndr_pull_notify_array); - if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { - /* 1. log that we got a corrupt notify_array - * 2. clear the variable the garbage was stored into to not trip - * over it next time this method is entered with the same seqnum - * 3. delete it from the database */ - DEBUG(2, ("notify_array is corrupt, discarding it\n")); - - ZERO_STRUCTP(notify->array); - if (rec != NULL) { - dbwrap_record_delete(rec); - } + notify_rec = dbwrap_fetch_locked(notify->db_notify, + talloc_tos(), key); + if (notify_rec == NULL) { + status = NT_STATUS_INTERNAL_DB_CORRUPTION; + goto fail; + } - } else { - if (DEBUGLEVEL >= 10) { - DEBUG(10, ("notify_load:\n")); - NDR_PRINT_DEBUG(notify_array, notify->array); - } + /* + * Make a copy of the notify_rec for easy restore in case + * updating the index_rec fails; + */ + notify_copy = dbwrap_record_get_value(notify_rec); + if (notify_copy.dsize != 0) { + notify_copy.dptr = (uint8_t *)talloc_memdup( + notify_rec, notify_copy.dptr, + notify_copy.dsize); + if (notify_copy.dptr == NULL) { + TALLOC_FREE(notify_rec); + status = NT_STATUS_NO_MEMORY; + goto fail; } } + if (DEBUGLEVEL >= 10) { + NDR_PRINT_DEBUG(notify_db_entry, &e); + } - if (!rec) { - talloc_free(dbuf.dptr); + status = notify_add_entry(notify_rec, &e, &add_idx); + if (!NT_STATUS_IS_OK(status)) { + goto fail; + } + if (!add_idx) { + /* + * Someone else has added the idx entry already + */ + TALLOC_FREE(notify_rec); + return NT_STATUS_OK; } - return status; -} + idx_rec = dbwrap_fetch_locked(notify->db_index, + talloc_tos(), key); + if (idx_rec == NULL) { + status = NT_STATUS_INTERNAL_DB_CORRUPTION; + goto restore_notify; + } + status = notify_add_idx(idx_rec, get_my_vnn()); + if (!NT_STATUS_IS_OK(status)) { + goto restore_notify; + } -/* - compare notify entries for sorting -*/ -static int notify_compare(const struct notify_entry *e1, const struct notify_entry *e2) -{ - return strcmp(e1->path, e2->path); + TALLOC_FREE(idx_rec); + TALLOC_FREE(notify_rec); + return NT_STATUS_OK; + +restore_notify: + if (notify_copy.dsize != 0) { + dbwrap_record_store(notify_rec, notify_copy, 0); + } else { + dbwrap_record_delete(notify_rec); + } + TALLOC_FREE(notify_rec); +fail: + DLIST_REMOVE(notify->list, listel); + TALLOC_FREE(listel); + return status; } -/* - save the notify array -*/ -static NTSTATUS notify_save(struct notify_context *notify, struct db_record *rec) +static NTSTATUS notify_add_entry(struct db_record *rec, + const struct notify_db_entry *e, + bool *p_add_idx) { - TDB_DATA dbuf; - DATA_BLOB blob; + TDB_DATA value = dbwrap_record_get_value(rec); + struct notify_db_entry *entries; + size_t num_entries; + bool add_idx = true; NTSTATUS status; - enum ndr_err_code ndr_err; - TALLOC_CTX *tmp_ctx; - /* if possible, remove some depth arrays */ - while (notify->array->num_depths > 0 && - notify->array->depth[notify->array->num_depths-1].num_entries == 0) { - notify->array->num_depths--; + if (value.dsize == sizeof(time_t)) { + DEBUG(10, ("Re-using deleted entry\n")); + value.dsize = 0; + add_idx = false; } - /* we might just be able to delete the record */ - if (notify->array->num_depths == 0) { - return dbwrap_record_delete(rec); + if ((value.dsize % sizeof(struct notify_db_entry)) != 0) { + DEBUG(1, ("Invalid value.dsize = %u\n", + (unsigned)value.dsize)); + return NT_STATUS_INTERNAL_DB_CORRUPTION; } + num_entries = value.dsize / sizeof(struct notify_db_entry); - tmp_ctx = talloc_new(notify); - NT_STATUS_HAVE_NO_MEMORY(tmp_ctx); + if (num_entries != 0) { + add_idx = false; + } - ndr_err = ndr_push_struct_blob(&blob, tmp_ctx, notify->array, - (ndr_push_flags_fn_t)ndr_push_notify_array); - if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { - talloc_free(tmp_ctx); - return ndr_map_error2ntstatus(ndr_err); + entries = talloc_array(rec, struct notify_db_entry, num_entries + 1); + if (entries == NULL) { + return NT_STATUS_NO_MEMORY; } + memcpy(entries, value.dptr, value.dsize); - if (DEBUGLEVEL >= 10) { - DEBUG(10, ("notify_save:\n")); - NDR_PRINT_DEBUG(notify_array, notify->array); + entries[num_entries] = *e; + value = make_tdb_data((uint8_t *)entries, talloc_get_size(entries)); + status = dbwrap_record_store(rec, value, 0); + TALLOC_FREE(entries); + if (!NT_STATUS_IS_OK(status)) { + return status; } + *p_add_idx = add_idx; + return NT_STATUS_OK; +} - dbuf.dptr = blob.data; - dbuf.dsize = blob.length; +static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn) +{ + TDB_DATA value = dbwrap_record_get_value(rec); + uint32_t *vnns; + size_t i, num_vnns; + NTSTATUS status; - status = dbwrap_record_store(rec, dbuf, TDB_REPLACE); - talloc_free(tmp_ctx); + if ((value.dsize % sizeof(uint32_t)) != 0) { + DEBUG(1, ("Invalid value.dsize = %u\n", + (unsigned)value.dsize)); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + num_vnns = value.dsize / sizeof(uint32_t); + vnns = (uint32_t *)value.dptr; - return status; -} + for (i=0; i vnn) { + break; + } + } + value.dptr = (uint8_t *)talloc_realloc( + rec, value.dptr, uint32_t, num_vnns + 1); + if (value.dptr == NULL) { + return NT_STATUS_NO_MEMORY; + } + value.dsize = talloc_get_size(value.dptr); -/* - handle incoming notify messages -*/ -static void notify_handler(struct messaging_context *msg_ctx, void *private_data, - uint32_t msg_type, struct server_id server_id, DATA_BLOB *data) + vnns = (uint32_t *)value.dptr; + + memmove(&vnns[i+1], &vnns[i], sizeof(uint32_t) * (num_vnns - i)); + vnns[i] = vnn; + + status = dbwrap_record_store(rec, value, 0); + if (!NT_STATUS_IS_OK(status)) { + return status; + } + return NT_STATUS_OK; +} + +NTSTATUS notify_remove(struct notify_context *notify, void *private_data) { - struct notify_context *notify = talloc_get_type(private_data, struct notify_context); - enum ndr_err_code ndr_err; - struct notify_event ev; - TALLOC_CTX *tmp_ctx = talloc_new(notify); + struct server_id pid = messaging_server_id(notify->msg); struct notify_list *listel; + struct db_record *notify_rec; + NTSTATUS status; - if (tmp_ctx == NULL) { - return; + if (notify == NULL) { + return NT_STATUS_NOT_IMPLEMENTED; } - ndr_err = ndr_pull_struct_blob(data, tmp_ctx, &ev, - (ndr_pull_flags_fn_t)ndr_pull_notify_event); - if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { - talloc_free(tmp_ctx); - return; - } + DEBUG(10, ("notify_remove: private_data=%p\n", private_data)); for (listel=notify->list;listel;listel=listel->next) { - if (listel->private_data == ev.private_data) { - listel->callback(listel->private_data, &ev); + if (listel->private_data == private_data) { + DLIST_REMOVE(notify->list, listel); break; } } - - talloc_free(tmp_ctx); + if (listel == NULL) { + DEBUG(10, ("%p not found\n", private_data)); + return NT_STATUS_NOT_FOUND; + } + notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(), + string_tdb_data(listel->path)); + TALLOC_FREE(listel); + if (notify_rec == NULL) { + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + status = notify_del_entry(notify_rec, &pid, private_data); + DEBUG(10, ("del_entry returned %s\n", nt_errstr(status))); + TALLOC_FREE(notify_rec); + return status; } -/* - add an entry to the notify array -*/ -static NTSTATUS notify_add_array(struct notify_context *notify, struct db_record *rec, - struct notify_entry *e, - void *private_data, int depth) +static NTSTATUS notify_del_entry(struct db_record *rec, + const struct server_id *pid, + void *private_data) { - int i; - struct notify_depth *d; - struct notify_entry *ee; - - /* possibly expand the depths array */ - if (depth >= notify->array->num_depths) { - d = talloc_realloc(notify->array, notify->array->depth, - struct notify_depth, depth+1); - NT_STATUS_HAVE_NO_MEMORY(d); - for (i=notify->array->num_depths;i<=depth;i++) { - ZERO_STRUCT(d[i]); - } - notify->array->depth = d; - notify->array->num_depths = depth+1; - } - d = ¬ify->array->depth[depth]; + TDB_DATA value = dbwrap_record_get_value(rec); + struct notify_db_entry *entries; + size_t i, num_entries; + time_t now; - /* expand the entries array */ - ee = talloc_realloc(notify->array->depth, d->entries, struct notify_entry, - d->num_entries+1); - NT_STATUS_HAVE_NO_MEMORY(ee); - d->entries = ee; + DEBUG(10, ("del_entry called for %s %p\n", procid_str_static(pid), + private_data)); - d->entries[d->num_entries] = *e; - d->entries[d->num_entries].private_data = private_data; - d->entries[d->num_entries].server = notify->server; - d->entries[d->num_entries].path_len = strlen(e->path); - d->num_entries++; - - d->max_mask |= e->filter; - d->max_mask_subdir |= e->subdir_filter; + if ((value.dsize % sizeof(struct notify_db_entry)) != 0) { + DEBUG(1, ("Invalid value.dsize = %u\n", + (unsigned)value.dsize)); + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + num_entries = value.dsize / sizeof(struct notify_db_entry); + entries = (struct notify_db_entry *)value.dptr; - TYPESAFE_QSORT(d->entries, d->num_entries, notify_compare); + for (i=0; imax_mask = 0; - d->max_mask_subdir = 0; + if (DEBUGLEVEL >= 10) { + NDR_PRINT_DEBUG(notify_db_entry, e); + } - for (i=0;inum_entries;i++) { - d->max_mask |= d->entries[i].filter; - d->max_mask_subdir |= d->entries[i].subdir_filter; + if (e->private_data != private_data) { + continue; + } + if (procid_equal(&e->server, pid)) { + break; + } + } + if (i == num_entries) { + return NT_STATUS_NOT_FOUND; } + entries[i] = entries[num_entries-1]; + value.dsize -= sizeof(struct notify_db_entry); - return notify_save(notify, rec); + if (value.dsize == 0) { + now = time(NULL); + value.dptr = (uint8_t *)&now; + value.dsize = sizeof(now); + } + return dbwrap_record_store(rec, value, 0); } -/* - Add a non-recursive watch -*/ +struct notify_trigger_index_state { + TALLOC_CTX *mem_ctx; + uint32_t *vnns; + uint32_t my_vnn; + bool found_my_vnn; +}; -static void notify_add_onelevel(struct notify_context *notify, - struct notify_entry *e, void *private_data) +static void notify_trigger_index_parser(TDB_DATA key, TDB_DATA data, + void *private_data) { - struct notify_entry_array *array; - struct db_record *rec; - DATA_BLOB blob; - TDB_DATA dbuf; - TDB_DATA value; - enum ndr_err_code ndr_err; - NTSTATUS status; - - array = talloc_zero(talloc_tos(), struct notify_entry_array); - if (array == NULL) { + struct notify_trigger_index_state *state = + (struct notify_trigger_index_state *)private_data; + uint32_t *new_vnns; + size_t i, num_vnns, num_new_vnns; + + if ((data.dsize % sizeof(uint32_t)) != 0) { + DEBUG(1, ("Invalid record size in notify index db: %u\n", + (unsigned)data.dsize)); return; } + new_vnns = (uint32_t *)data.dptr; + num_new_vnns = data.dsize / sizeof(uint32_t); - rec = dbwrap_fetch_locked(notify->db_onelevel, array, - make_tdb_data((uint8_t *)&e->dir_id, - sizeof(e->dir_id))); - if (rec == NULL) { - DEBUG(10, ("notify_add_onelevel: fetch_locked for %s failed" - "\n", file_id_string_tos(&e->dir_id))); - TALLOC_FREE(array); - return; - } + num_vnns = talloc_array_length(state->vnns); - value = dbwrap_record_get_value(rec); - blob.data = (uint8_t *)value.dptr; - blob.length = value.dsize; - - if (blob.length > 0) { - ndr_err = ndr_pull_struct_blob(&blob, array, array, - (ndr_pull_flags_fn_t)ndr_pull_notify_entry_array); - if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { - DEBUG(10, ("ndr_pull_notify_entry_array failed: %s\n", - ndr_errstr(ndr_err))); - TALLOC_FREE(array); - return; - } - if (DEBUGLEVEL >= 10) { - DEBUG(10, ("notify_add_onelevel:\n")); - NDR_PRINT_DEBUG(notify_entry_array, array); + for (i=0; imy_vnn) { + state->found_my_vnn = true; } } - array->entries = talloc_realloc(array, array->entries, - struct notify_entry, - array->num_entries+1); - if (array->entries == NULL) { - TALLOC_FREE(array); + state->vnns = talloc_realloc(state->mem_ctx, state->vnns, uint32_t, + num_vnns + num_new_vnns); + if ((num_vnns + num_new_vnns != 0) && (state->vnns == NULL)) { + DEBUG(1, ("talloc_realloc failed\n")); return; } - array->entries[array->num_entries] = *e; - array->entries[array->num_entries].private_data = private_data; - array->entries[array->num_entries].server = notify->server; - array->num_entries += 1; + memcpy(&state->vnns[num_vnns], data.dptr, data.dsize); +} - ndr_err = ndr_push_struct_blob(&blob, rec, array, - (ndr_push_flags_fn_t)ndr_push_notify_entry_array); - if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { - DEBUG(10, ("ndr_push_notify_entry_array failed: %s\n", - ndr_errstr(ndr_err))); - TALLOC_FREE(array); - return; +static int vnn_cmp(const void *p1, const void *p2) +{ + uint32_t *vnn1 = (uint32_t *)p1; + uint32_t *vnn2 = (uint32_t *)p2; + + if (*vnn1 < *vnn2) { + return -1; } + if (*vnn1 == *vnn2) { + return 0; + } + return 1; +} + +static bool notify_push_remote_blob(TALLOC_CTX *mem_ctx, uint32_t action, + uint32_t filter, const char *path, + uint8_t **pblob, size_t *pblob_len) +{ + struct notify_remote_event ev; + DATA_BLOB data; + enum ndr_err_code ndr_err; + + ev.action = action; + ev.filter = filter; + ev.path = path; if (DEBUGLEVEL >= 10) { - DEBUG(10, ("notify_add_onelevel:\n")); - NDR_PRINT_DEBUG(notify_entry_array, array); + NDR_PRINT_DEBUG(notify_remote_event, &ev); } - dbuf.dptr = blob.data; - dbuf.dsize = blob.length; - - status = dbwrap_record_store(rec, dbuf, TDB_REPLACE); - TALLOC_FREE(array); - if (!NT_STATUS_IS_OK(status)) { - DEBUG(10, ("notify_add_onelevel: store failed: %s\n", - nt_errstr(status))); - return; + ndr_err = ndr_push_struct_blob( + &data, mem_ctx, &ev, + (ndr_push_flags_fn_t)ndr_push_notify_remote_event); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + return false; } - e->filter = 0; - return; + *pblob = data.data; + *pblob_len = data.length; + return true; } +static bool notify_pull_remote_blob(TALLOC_CTX *mem_ctx, + const uint8_t *blob, size_t blob_len, + uint32_t *paction, uint32_t *pfilter, + char **path) +{ + struct notify_remote_event *ev; + enum ndr_err_code ndr_err; + DATA_BLOB data; -/* - add a notify watch. This is called when a notify is first setup on a open - directory handle. -*/ -NTSTATUS notify_add(struct notify_context *notify, connection_struct *conn, - struct notify_entry *e0, - void (*callback)(void *, const struct notify_event *), - void *private_data) + data.data = discard_const_p(uint8_t, blob); + data.length = blob_len; + + ev = talloc(mem_ctx, struct notify_remote_event); + if (ev == NULL) { + return false; + } + + ndr_err = ndr_pull_struct_blob( + &data, ev, ev, + (ndr_pull_flags_fn_t)ndr_pull_notify_remote_event); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + TALLOC_FREE(ev); + return false; + } + if (DEBUGLEVEL >= 10) { + NDR_PRINT_DEBUG(notify_remote_event, ev); + } + *paction = ev->action; + *pfilter = ev->filter; + *path = talloc_move(mem_ctx, (char **)&ev->path); + + TALLOC_FREE(ev); + return true; +} + +void notify_trigger(struct notify_context *notify, + uint32_t action, uint32_t filter, const char *path) { - struct notify_entry e = *e0; - NTSTATUS status; - struct notify_list *listel; - int depth; + struct ctdbd_connection *ctdbd_conn; + struct notify_trigger_index_state idx_state; + const char *p, *next_p; + size_t i, num_vnns; + uint32_t last_vnn; + uint8_t *remote_blob = NULL; + size_t remote_blob_len = 0; + + DEBUG(10, ("notify_trigger called action=0x%x, filter=0x%x, " + "path=%s\n", (unsigned)action, (unsigned)filter, path)); /* see if change notify is enabled at all */ if (notify == NULL) { - return NT_STATUS_NOT_IMPLEMENTED; + return; } - depth = count_chars(e.path, '/'); + idx_state.mem_ctx = talloc_tos(); + idx_state.vnns = NULL; + idx_state.my_vnn = get_my_vnn(); - listel = talloc_zero(notify, struct notify_list); - if (listel == NULL) { - status = NT_STATUS_NO_MEMORY; + for (p = path; p != NULL; p = next_p) { + ptrdiff_t path_len = p - path; + bool recursive; + + next_p = strchr(p+1, '/'); + recursive = (next_p != NULL); + + idx_state.found_my_vnn = false; + + dbwrap_parse_record( + notify->db_index, + make_tdb_data((uint8_t *)path, path_len), + notify_trigger_index_parser, &idx_state); + + if (!idx_state.found_my_vnn) { + continue; + } + notify_trigger_local(notify, action, filter, + path, path_len, recursive); + } + + ctdbd_conn = messaging_ctdbd_connection(); + if (ctdbd_conn == NULL) { goto done; } - listel->private_data = private_data; - listel->callback = callback; - listel->depth = depth; - DLIST_ADD(notify->list, listel); + num_vnns = talloc_array_length(idx_state.vnns); + qsort(idx_state.vnns, num_vnns, sizeof(uint32_t), vnn_cmp); - if (e.filter != 0) { - notify_add_onelevel(notify, &e, private_data); - status = NT_STATUS_OK; - } + last_vnn = 0xffffffff; + remote_blob = NULL; - /* if the system notify handler couldn't handle some of the - filter bits, or couldn't handle a request for recursion - then we need to install it in the array used for the - intra-samba notify handling */ - if (e.filter != 0 || e.subdir_filter != 0) { - struct db_record *rec; + for (i=0; idb_notify, talloc_tos(), + make_tdb_data((uint8_t *)path, path_len), &data); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("dbwrap_fetch returned %s\n", + nt_errstr(status))); + return; } - - rec = dbwrap_fetch_locked( - notify->db_onelevel, array, - make_tdb_data((const uint8_t *)fid, sizeof(*fid))); - if (rec == NULL) { - DEBUG(10, ("notify_remove_onelevel: fetch_locked for %s failed" - "\n", file_id_string_tos(fid))); - TALLOC_FREE(array); - return NT_STATUS_INTERNAL_DB_CORRUPTION; + if (data.dsize == sizeof(time_t)) { + DEBUG(10, ("Got deleted record\n")); + goto done; + } + if ((data.dsize % sizeof(struct notify_db_entry)) != 0) { + DEBUG(1, ("Invalid data.dsize = %u\n", + (unsigned)data.dsize)); + goto done; } - value = dbwrap_record_get_value(rec); - blob.data = (uint8_t *)value.dptr; - blob.length = value.dsize; - - if (blob.length > 0) { - ndr_err = ndr_pull_struct_blob(&blob, array, array, - (ndr_pull_flags_fn_t)ndr_pull_notify_entry_array); - if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { - DEBUG(10, ("ndr_pull_notify_entry_array failed: %s\n", - ndr_errstr(ndr_err))); - TALLOC_FREE(array); - return ndr_map_error2ntstatus(ndr_err); - } + entries = (struct notify_db_entry *)data.dptr; + num_entries = data.dsize / sizeof(struct notify_db_entry); + + DEBUG(10, ("recursive = %s pathlen=%d (%c)\n", + recursive ? "true" : "false", (int)path_len, + path[path_len])); + + for (i=0; i= 10) { - DEBUG(10, ("notify_remove_onelevel:\n")); - NDR_PRINT_DEBUG(notify_entry_array, array); + NDR_PRINT_DEBUG(notify_db_entry, e); } - } - for (i=0; inum_entries; i++) { - if ((private_data == array->entries[i].private_data) && - procid_equal(¬ify->server, &array->entries[i].server)) { - break; - } - } + e_filter = recursive ? e->subdir_filter : e->filter; - if (i == array->num_entries) { - TALLOC_FREE(array); - return NT_STATUS_OBJECT_NAME_NOT_FOUND; - } + if ((filter & e_filter) == 0) { + continue; + } - array->entries[i] = array->entries[array->num_entries-1]; - array->num_entries -= 1; + if (!procid_is_local(&e->server)) { + DEBUG(1, ("internal error: Non-local pid %s in " + "notify.tdb\n", + procid_str_static(&e->server))); + continue; + } - if (array->num_entries == 0) { - dbwrap_record_delete(rec); - TALLOC_FREE(array); - return NT_STATUS_OK; + status = notify_send(notify, &e->server, path + path_len + 1, + action, e->private_data); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("notify_send returned %s\n", + nt_errstr(status))); + } } - ndr_err = ndr_push_struct_blob(&blob, rec, array, - (ndr_push_flags_fn_t)ndr_push_notify_entry_array); - if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { - DEBUG(10, ("ndr_push_notify_entry_array failed: %s\n", - ndr_errstr(ndr_err))); - TALLOC_FREE(array); - return ndr_map_error2ntstatus(ndr_err); - } +done: + TALLOC_FREE(data.dptr); +} - if (DEBUGLEVEL >= 10) { - DEBUG(10, ("notify_add_onelevel:\n")); - NDR_PRINT_DEBUG(notify_entry_array, array); - } +static NTSTATUS notify_send(struct notify_context *notify, + struct server_id *pid, + const char *path, uint32_t action, + void *private_data) +{ + struct notify_event ev; + DATA_BLOB data; + NTSTATUS status; + enum ndr_err_code ndr_err; - dbuf.dptr = blob.data; - dbuf.dsize = blob.length; + ev.action = action; + ev.path = path; + ev.private_data = private_data; - status = dbwrap_record_store(rec, dbuf, TDB_REPLACE); - TALLOC_FREE(array); - if (!NT_STATUS_IS_OK(status)) { - DEBUG(10, ("notify_add_onelevel: store failed: %s\n", - nt_errstr(status))); - return status; + ndr_err = ndr_push_struct_blob( + &data, talloc_tos(), &ev, + (ndr_push_flags_fn_t)ndr_push_notify_event); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + return ndr_map_error2ntstatus(ndr_err); } - return NT_STATUS_OK; + status = messaging_send(notify->msg, *pid, MSG_PVFS_NOTIFY, + &data); + TALLOC_FREE(data.data); + return status; } -/* - remove a notify watch. Called when the directory handle is closed -*/ -NTSTATUS notify_remove(struct notify_context *notify, void *private_data) +static void notify_handler(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id server_id, DATA_BLOB *data) { - NTSTATUS status; + struct notify_context *notify = talloc_get_type_abort( + private_data, struct notify_context); + enum ndr_err_code ndr_err; + struct notify_event *n; struct notify_list *listel; - int i, depth; - struct notify_depth *d; - struct db_record *rec; - /* see if change notify is enabled at all */ - if (notify == NULL) { - return NT_STATUS_NOT_IMPLEMENTED; + n = talloc(talloc_tos(), struct notify_event); + if (n == NULL) { + DEBUG(1, ("talloc failed\n")); + return; + } + + ndr_err = ndr_pull_struct_blob( + data, n, n, (ndr_pull_flags_fn_t)ndr_pull_notify_event); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + TALLOC_FREE(n); + return; + } + if (DEBUGLEVEL >= 10) { + NDR_PRINT_DEBUG(notify_event, n); } for (listel=notify->list;listel;listel=listel->next) { - if (listel->private_data == private_data) { - DLIST_REMOVE(notify->list, listel); + if (listel->private_data == n->private_data) { + listel->callback(listel->private_data, n); break; } } - if (listel == NULL) { - return NT_STATUS_OBJECT_NAME_NOT_FOUND; - } + TALLOC_FREE(n); +} - depth = listel->depth; +struct notify_walk_idx_state { + void (*fn)(const char *path, + uint32_t *vnns, size_t num_vnns, + void *private_data); + void *private_data; +}; - talloc_free(listel); +static int notify_walk_idx_fn(struct db_record *rec, void *private_data) +{ + struct notify_walk_idx_state *state = + (struct notify_walk_idx_state *)private_data; + TDB_DATA key, value; + char *path; - status = notify_fetch_locked(notify, &rec); - NT_STATUS_NOT_OK_RETURN(status); + key = dbwrap_record_get_key(rec); + value = dbwrap_record_get_value(rec); - status = notify_load(notify, rec); - if (!NT_STATUS_IS_OK(status)) { - talloc_free(rec); - return status; + if ((value.dsize % sizeof(uint32_t)) != 0) { + DEBUG(1, ("invalid value size in notify index db: %u\n", + (unsigned)(value.dsize))); + return 0; } - if (depth >= notify->array->num_depths) { - talloc_free(rec); - return NT_STATUS_OBJECT_NAME_NOT_FOUND; + path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize); + if (path == NULL) { + DEBUG(1, ("talloc_strndup failed\n")); + return 0; } + state->fn(path, (uint32_t *)value.dptr, value.dsize/sizeof(uint32_t), + state->private_data); + TALLOC_FREE(path); + return 0; +} - /* we only have to search at the depth of this element */ - d = ¬ify->array->depth[depth]; +void notify_walk_idx(struct notify_context *notify, + void (*fn)(const char *path, + uint32_t *vnns, size_t num_vnns, + void *private_data), + void *private_data) +{ + struct notify_walk_idx_state state; + state.fn = fn; + state.private_data = private_data; + dbwrap_traverse_read(notify->db_index, notify_walk_idx_fn, &state, + NULL); +} - for (i=0;inum_entries;i++) { - if (private_data == d->entries[i].private_data && - procid_equal(¬ify->server, &d->entries[i].server)) { - break; +struct notify_walk_state { + void (*fn)(const char *path, + struct notify_db_entry *entries, size_t num_entries, + time_t deleted_time, void *private_data); + void *private_data; +}; + +static int notify_walk_fn(struct db_record *rec, void *private_data) +{ + struct notify_walk_state *state = + (struct notify_walk_state *)private_data; + TDB_DATA key, value; + struct notify_db_entry *entries; + size_t num_entries; + time_t deleted_time; + char *path; + + key = dbwrap_record_get_key(rec); + value = dbwrap_record_get_value(rec); + + if (value.dsize == sizeof(deleted_time)) { + memcpy(&deleted_time, value.dptr, sizeof(deleted_time)); + entries = NULL; + num_entries = 0; + } else { + if ((value.dsize % sizeof(struct notify_db_entry)) != 0) { + DEBUG(1, ("invalid value size in notify db: %u\n", + (unsigned)(value.dsize))); + return 0; } - } - if (i == d->num_entries) { - talloc_free(rec); - return NT_STATUS_OBJECT_NAME_NOT_FOUND; + entries = (struct notify_db_entry *)value.dptr; + num_entries = value.dsize / sizeof(struct notify_db_entry); + deleted_time = 0; } - if (i < d->num_entries-1) { - memmove(&d->entries[i], &d->entries[i+1], - sizeof(d->entries[i])*(d->num_entries-(i+1))); + path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize); + if (path == NULL) { + DEBUG(1, ("talloc_strndup failed\n")); + return 0; } - d->num_entries--; - - status = notify_save(notify, rec); - - talloc_free(rec); - - return status; + state->fn(path, entries, num_entries, deleted_time, + state->private_data); + TALLOC_FREE(path); + return 0; } -/* - remove all notify watches for a messaging server -*/ -static NTSTATUS notify_remove_all(struct notify_context *notify, - const struct server_id *server) +void notify_walk(struct notify_context *notify, + void (*fn)(const char *path, + struct notify_db_entry *entries, + size_t num_entries, + time_t deleted_time, void *private_data), + void *private_data) { - NTSTATUS status; - int i, depth, del_count=0; - struct db_record *rec; + struct notify_walk_state state; + state.fn = fn; + state.private_data = private_data; + dbwrap_traverse_read(notify->db_notify, notify_walk_fn, &state, + NULL); +} - status = notify_fetch_locked(notify, &rec); - NT_STATUS_NOT_OK_RETURN(status); +struct notify_cleanup_state { + TALLOC_CTX *mem_ctx; + time_t delete_before; + ssize_t array_size; + uint32_t num_paths; + char **paths; +}; - status = notify_load(notify, rec); - if (!NT_STATUS_IS_OK(status)) { - talloc_free(rec); - return status; - } +static void notify_cleanup_collect( + const char *path, struct notify_db_entry *entries, size_t num_entries, + time_t deleted_time, void *private_data) +{ + struct notify_cleanup_state *state = + (struct notify_cleanup_state *)private_data; + char *p; - /* we have to search for all entries across all depths, looking for matches - for the server id */ - for (depth=0;deptharray->num_depths;depth++) { - struct notify_depth *d = ¬ify->array->depth[depth]; - for (i=0;inum_entries;i++) { - if (procid_equal(server, &d->entries[i].server)) { - if (i < d->num_entries-1) { - memmove(&d->entries[i], &d->entries[i+1], - sizeof(d->entries[i])*(d->num_entries-(i+1))); - } - i--; - d->num_entries--; - del_count++; - } - } + if (num_entries != 0) { + return; } - - if (del_count > 0) { - status = notify_save(notify, rec); + if (deleted_time >= state->delete_before) { + return; } - talloc_free(rec); - - return status; + p = talloc_strdup(state->mem_ctx, path); + if (p == NULL) { + DEBUG(1, ("talloc_strdup failed\n")); + return; + } + add_to_large_array(state->mem_ctx, sizeof(p), (void *)&p, + &state->paths, &state->num_paths, + &state->array_size); + if (state->array_size == -1) { + TALLOC_FREE(p); + } } +static bool notify_cleanup_path(struct notify_context *notify, + const char *path, time_t delete_before); -/* - send a notify message to another messaging server -*/ -static NTSTATUS notify_send(struct notify_context *notify, struct notify_entry *e, - const char *path, uint32_t action) +void notify_cleanup(struct notify_context *notify) { - struct notify_event ev; - DATA_BLOB data; - NTSTATUS status; - enum ndr_err_code ndr_err; + struct notify_cleanup_state state; + uint32_t failure_pool; - ev.action = action; - ev.path = path; - ev.private_data = e->private_data; + ZERO_STRUCT(state); + state.mem_ctx = talloc_stackframe(); - ndr_err = ndr_push_struct_blob(&data, talloc_tos(), &ev, - (ndr_push_flags_fn_t)ndr_push_notify_event); - if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { - return ndr_map_error2ntstatus(ndr_err); - } + state.delete_before = time(NULL) + - lp_parm_int(-1, "smbd", "notify cleanup interval", 60); - status = messaging_send(notify->messaging_ctx, e->server, - MSG_PVFS_NOTIFY, &data); - TALLOC_FREE(data.data); - return status; + notify_walk(notify, notify_cleanup_collect, &state); + + failure_pool = state.num_paths; + + while (state.num_paths != 0) { + size_t idx; + + /* + * This loop is designed to be as kind as possible to + * ctdb. ctdb does not like it if many smbds hammer on a + * single record. If on many nodes the cleanup process starts + * running, it can happen that all of them need to clean up + * records in the same order. This would generate a ctdb + * migrate storm on these records. Randomizing the load across + * multiple records reduces the load on the individual record. + */ + + generate_random_buffer((uint8_t *)&idx, sizeof(idx)); + idx = idx % state.num_paths; + + if (!notify_cleanup_path(notify, state.paths[idx], + state.delete_before)) { + /* + * notify_cleanup_path failed, the most likely reason + * is that dbwrap_try_fetch_locked failed due to + * contention. We allow one failed attempt per deleted + * path on average before we give up. + */ + failure_pool -= 1; + if (failure_pool == 0) { + /* + * Too many failures. We will come back here, + * maybe next time there is less contention. + */ + break; + } + } + + TALLOC_FREE(state.paths[idx]); + state.paths[idx] = state.paths[state.num_paths-1]; + state.num_paths -= 1; + } + TALLOC_FREE(state.mem_ctx); } -void notify_onelevel(struct notify_context *notify, uint32_t action, - uint32_t filter, struct file_id fid, const char *name) +static bool notify_cleanup_path(struct notify_context *notify, + const char *path, time_t delete_before) { - struct notify_entry_array *array; - TDB_DATA dbuf; - DATA_BLOB blob; - bool have_dead_entries = false; - int i; + struct db_record *notify_rec = NULL; + struct db_record *idx_rec = NULL; + TDB_DATA key = string_tdb_data(path); + TDB_DATA value; + time_t deleted; NTSTATUS status; - if (notify == NULL) { - return; + notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(), key); + if (notify_rec == NULL) { + DEBUG(10, ("Could not fetch notify_rec\n")); + return false; } + value = dbwrap_record_get_value(notify_rec); - array = talloc_zero(talloc_tos(), struct notify_entry_array); - if (array == NULL) { - return; + if (value.dsize != sizeof(deleted)) { + DEBUG(10, ("record %s has been re-used\n", path)); + goto done; } + memcpy(&deleted, value.dptr, sizeof(deleted)); - status = dbwrap_fetch(notify->db_onelevel, array, - make_tdb_data((uint8_t *)&fid, sizeof(fid)), - &dbuf); - if (!NT_STATUS_IS_OK(status)) { - TALLOC_FREE(array); - return; + if (deleted >= delete_before) { + DEBUG(10, ("record %s too young\n", path)); + goto done; } - blob.data = (uint8 *)dbuf.dptr; - blob.length = dbuf.dsize; - - if (blob.length > 0) { - enum ndr_err_code ndr_err; - ndr_err = ndr_pull_struct_blob(&blob, array, array, - (ndr_pull_flags_fn_t)ndr_pull_notify_entry_array); - if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { - DEBUG(10, ("ndr_pull_notify_entry_array failed: %s\n", - ndr_errstr(ndr_err))); - TALLOC_FREE(array); - return; - } - if (DEBUGLEVEL >= 10) { - DEBUG(10, ("notify_onelevel:\n")); - NDR_PRINT_DEBUG(notify_entry_array, array); - } + /* + * Be kind to ctdb and only try one dmaster migration at most. + */ + idx_rec = dbwrap_try_fetch_locked(notify->db_index, talloc_tos(), key); + if (idx_rec == NULL) { + DEBUG(10, ("Could not fetch idx_rec\n")); + goto done; } - for (i=0; inum_entries; i++) { - struct notify_entry *e = &array->entries[i]; + status = dbwrap_record_delete(notify_rec); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("Could not delete notify_rec: %s\n", + nt_errstr(status))); + } - if ((e->filter & filter) != 0) { - status = notify_send(notify, e, name, action); - if (NT_STATUS_EQUAL( - status, NT_STATUS_INVALID_HANDLE)) { - /* - * Mark the entry as dead. All entries have a - * path set. The marker used here is setting - * that to NULL. - */ - e->path = NULL; - have_dead_entries = true; - } - } + status = notify_del_idx(idx_rec, get_my_vnn()); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("Could not delete idx_rec: %s\n", + nt_errstr(status))); } - if (!have_dead_entries) { - TALLOC_FREE(array); - return; +done: + TALLOC_FREE(idx_rec); + TALLOC_FREE(notify_rec); + return true; +} + +static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn) +{ + TDB_DATA value = dbwrap_record_get_value(rec); + uint32_t *vnns; + size_t i, num_vnns; + + if ((value.dsize % sizeof(uint32_t)) != 0) { + DEBUG(1, ("Invalid value.dsize = %u\n", + (unsigned)value.dsize)); + return NT_STATUS_INTERNAL_DB_CORRUPTION; } + num_vnns = value.dsize / sizeof(uint32_t); + vnns = (uint32_t *)value.dptr; - for (i=0; inum_entries; i++) { - struct notify_entry *e = &array->entries[i]; - if (e->path != NULL) { - continue; + for (i=0; iserver))); + } + + if (i == num_vnns) { /* - * Potential TODO: This might need optimizing, - * notify_remove_onelevel() does a fetch_locked() operation at - * every call. But this would only matter if a process with - * MANY notifies has died without shutting down properly. + * Not found. Should not happen, but okay... */ - notify_remove_onelevel(notify, &e->dir_id, e->private_data); + return NT_STATUS_OK; } - TALLOC_FREE(array); - return; + memmove(&vnns[i], &vnns[i+1], sizeof(uint32_t) * (num_vnns - i - 1)); + value.dsize -= sizeof(uint32_t); + + if (value.dsize == 0) { + return dbwrap_record_delete(rec); + } + return dbwrap_record_store(rec, value, 0); } -/* - trigger a notify message for anyone waiting on a matching event +struct notify_cluster_proxy_state { + struct tevent_context *ev; + struct notify_context *notify; + struct ctdb_msg_channel *chan; +}; - This function is called a lot, and needs to be very fast. The unusual data structure - and traversal is designed to be fast in the average case, even for large numbers of - notifies -*/ -void notify_trigger(struct notify_context *notify, - uint32_t action, uint32_t filter, const char *path) +static void notify_cluster_proxy_got_chan(struct tevent_req *subreq); +static void notify_cluster_proxy_got_msg(struct tevent_req *subreq); +static void notify_cluster_proxy_trigger(struct notify_context *notify, + uint32_t action, uint32_t filter, + char *path); + +struct tevent_req *notify_cluster_proxy_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct notify_context *notify) { - NTSTATUS status; - int depth; - const char *p, *next_p; + struct tevent_req *req, *subreq; + struct notify_cluster_proxy_state *state; - DEBUG(10, ("notify_trigger called action=0x%x, filter=0x%x, " - "path=%s\n", (unsigned)action, (unsigned)filter, path)); + req = tevent_req_create(mem_ctx, &state, + struct notify_cluster_proxy_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->notify = notify; - /* see if change notify is enabled at all */ - if (notify == NULL) { - return; + subreq = ctdb_msg_channel_init_send( + state, state->ev, lp_ctdbd_socket(), + CTDB_SRVID_SAMBA_NOTIFY_PROXY); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); } + tevent_req_set_callback(subreq, notify_cluster_proxy_got_chan, req); + return req; +} - again: - status = notify_load(notify, NULL); - if (!NT_STATUS_IS_OK(status)) { +static void notify_cluster_proxy_got_chan(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct notify_cluster_proxy_state *state = tevent_req_data( + req, struct notify_cluster_proxy_state); + int ret; + + ret = ctdb_msg_channel_init_recv(subreq, state, &state->chan); + TALLOC_FREE(subreq); + if (ret != 0) { + tevent_req_error(req, ret); return; } + subreq = ctdb_msg_read_send(state, state->ev, state->chan); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req); +} - /* loop along the given path, working with each directory depth separately */ - for (depth=0,p=path; - p && depth < notify->array->num_depths; - p=next_p,depth++) { - int p_len = p - path; - int min_i, max_i, i; - struct notify_depth *d = ¬ify->array->depth[depth]; - uint32_t d_max_mask; - next_p = strchr(p+1, '/'); - - /* see if there are any entries at this depth */ - if (d->num_entries == 0) continue; - - /* try to skip based on the maximum mask. If next_p is - NULL then we know it will be a 'this directory' - match, otherwise it must be a subdir match */ - - d_max_mask = next_p ? d->max_mask_subdir : d->max_mask; - - if ((filter & d_max_mask) == 0) { - continue; - } +static void notify_cluster_proxy_got_msg(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct notify_cluster_proxy_state *state = tevent_req_data( + req, struct notify_cluster_proxy_state); + uint8_t *msg; + size_t msg_len; + uint32_t action, filter; + char *path; + int ret; + bool res; + + ret = ctdb_msg_read_recv(subreq, talloc_tos(), &msg, &msg_len); + TALLOC_FREE(subreq); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } - /* we know there is an entry here worth looking - for. Use a bisection search to find the first entry - with a matching path */ - min_i = 0; - max_i = d->num_entries-1; - - while (min_i < max_i) { - struct notify_entry *e; - int cmp; - i = (min_i+max_i)/2; - e = &d->entries[i]; - cmp = strncmp(path, e->path, p_len); - if (cmp == 0) { - if (p_len == e->path_len) { - max_i = i; - } else { - max_i = i-1; - } - } else if (cmp < 0) { - max_i = i-1; - } else { - min_i = i+1; - } - } + res = notify_pull_remote_blob(talloc_tos(), msg, msg_len, + &action, &filter, &path); + TALLOC_FREE(msg); + if (!res) { + tevent_req_error(req, EIO); + return; + } + notify_cluster_proxy_trigger(state->notify, action, filter, path); + TALLOC_FREE(path); - if (min_i != max_i) { - /* none match */ - continue; - } + subreq = ctdb_msg_read_send(state, state->ev, state->chan); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req); +} - /* we now know that the entries start at min_i */ - for (i=min_i;inum_entries;i++) { - struct notify_entry *e = &d->entries[i]; - uint32_t e_filter; - if (p_len != e->path_len || - strncmp(path, e->path, p_len) != 0) break; +static void notify_cluster_proxy_trigger(struct notify_context *notify, + uint32_t action, uint32_t filter, + char *path) +{ + const char *p, *next_p; - e_filter = next_p ? e->subdir_filter : e->filter; + for (p = path; p != NULL; p = next_p) { + ptrdiff_t path_len = p - path; + bool recursive; - if ((filter & e_filter) == 0) { - continue; - } + next_p = strchr(p+1, '/'); + recursive = (next_p != NULL); - status = notify_send(notify, e, path + e->path_len + 1, - action); + notify_trigger_local(notify, action, filter, + path, path_len, recursive); + } +} - if (NT_STATUS_EQUAL( - status, NT_STATUS_INVALID_HANDLE)) { - struct server_id server = e->server; +int notify_cluster_proxy_recv(struct tevent_req *req) +{ + int err; - DEBUG(10, ("Deleting notify entries for " - "process %s because it's gone\n", - procid_str_static(&e->server))); - notify_remove_all(notify, &server); - goto again; - } - } + if (tevent_req_is_unix_error(req, &err)) { + return err; } + return 0; } diff --git a/source3/smbd/proto.h b/source3/smbd/proto.h index a770c3a2a4..4300ee3bca 100644 --- a/source3/smbd/proto.h +++ b/source3/smbd/proto.h @@ -540,19 +540,30 @@ NTSTATUS inotify_watch(struct sys_notify_context *ctx, struct notify_context *notify_init(TALLOC_CTX *mem_ctx, struct messaging_context *messaging_ctx, struct event_context *ev); -bool notify_internal_parent_init(TALLOC_CTX *mem_ctx); -NTSTATUS notify_add(struct notify_context *notify, connection_struct *conn, - struct notify_entry *e0, +NTSTATUS notify_add(struct notify_context *notify, + const char *path, uint32_t filter, uint32_t subdir_filter, void (*callback)(void *, const struct notify_event *), void *private_data); NTSTATUS notify_remove(struct notify_context *notify, void *private_data); -NTSTATUS notify_remove_onelevel(struct notify_context *notify, - const struct file_id *fid, - void *private_data); -void notify_onelevel(struct notify_context *notify, uint32_t action, - uint32_t filter, struct file_id fid, const char *name); void notify_trigger(struct notify_context *notify, uint32_t action, uint32_t filter, const char *path); +void notify_walk_idx(struct notify_context *notify, + void (*fn)(const char *path, + uint32_t *vnns, size_t num_vnns, + void *private_data), + void *private_data); +void notify_walk(struct notify_context *notify, + void (*fn)(const char *path, + struct notify_db_entry *entries, + size_t num_entries, + time_t deleted_time, void *private_data), + void *private_data); +void notify_cleanup(struct notify_context *notify); + +struct tevent_req *notify_cluster_proxy_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct notify_context *notify); +int notify_cluster_proxy_recv(struct tevent_req *req); /* The following definitions come from smbd/ntquotas.c */ diff --git a/source3/smbd/server.c b/source3/smbd/server.c index 2920bbd7fb..912d7a129b 100644 --- a/source3/smbd/server.c +++ b/source3/smbd/server.c @@ -41,6 +41,7 @@ #include "smbprofile.h" #include "lib/id_cache.h" #include "lib/param/param.h" +#include "lib/background.h" struct smbd_open_socket; struct smbd_child_pid; @@ -269,6 +270,103 @@ static void smbd_parent_id_cache_delete(struct messaging_context *ctx, messaging_send_to_children(ctx, msg_type, msg_data); } +struct smbd_parent_notify_state { + struct tevent_context *ev; + struct messaging_context *msg; + uint32_t msgtype; + struct notify_context *notify; +}; + +static int smbd_parent_notify_cleanup(void *private_data); +static void smbd_parent_notify_cleanup_done(struct tevent_req *req); +static void smbd_parent_notify_proxy_done(struct tevent_req *req); + +static bool smbd_parent_notify_init(TALLOC_CTX *mem_ctx, + struct messaging_context *msg, + struct tevent_context *ev) +{ + struct smbd_parent_notify_state *state; + struct tevent_req *req; + + state = talloc(mem_ctx, struct smbd_parent_notify_state); + if (state == NULL) { + return NULL; + } + state->msg = msg; + state->ev = ev; + state->msgtype = MSG_SMB_NOTIFY_CLEANUP; + + state->notify = notify_init(state, msg, ev); + if (state->notify == NULL) { + goto fail; + } + req = background_job_send( + state, state->ev, state->msg, &state->msgtype, 1, + lp_parm_int(-1, "smbd", "notify cleanup interval", 60), + smbd_parent_notify_cleanup, state->notify); + if (req == NULL) { + goto fail; + } + tevent_req_set_callback(req, smbd_parent_notify_cleanup_done, state); + + if (!lp_clustering()) { + return true; + } + + req = notify_cluster_proxy_send(state, ev, state->notify); + if (req == NULL) { + goto fail; + } + tevent_req_set_callback(req, smbd_parent_notify_proxy_done, state); + + return true; +fail: + TALLOC_FREE(state); + return false; +} + +static int smbd_parent_notify_cleanup(void *private_data) +{ + struct notify_context *notify = talloc_get_type_abort( + private_data, struct notify_context); + notify_cleanup(notify); + return lp_parm_int(-1, "smbd", "notify cleanup interval", 60); +} + +static void smbd_parent_notify_cleanup_done(struct tevent_req *req) +{ + struct smbd_parent_notify_state *state = tevent_req_callback_data( + req, struct smbd_parent_notify_state); + NTSTATUS status; + + status = background_job_recv(req); + TALLOC_FREE(req); + DEBUG(1, ("notify cleanup job ended with %s\n", nt_errstr(status))); + + /* + * Provide self-healing: Whatever the error condition was, it + * will have printed it into log.smbd. Just retrying and + * spamming log.smbd once a minute should be fine. + */ + req = background_job_send( + state, state->ev, state->msg, &state->msgtype, 1, 60, + smbd_parent_notify_cleanup, state->notify); + if (req == NULL) { + DEBUG(1, ("background_job_send failed\n")); + return; + } + tevent_req_set_callback(req, smbd_parent_notify_cleanup_done, state); +} + +static void smbd_parent_notify_proxy_done(struct tevent_req *req) +{ + int ret; + + ret = notify_cluster_proxy_recv(req); + TALLOC_FREE(req); + DEBUG(1, ("notify proxy job ended with %s\n", strerror(ret))); +} + static void smb_parent_force_tdis(struct messaging_context *ctx, void* data, uint32_t msg_type, @@ -1319,7 +1417,7 @@ extern void build_options(bool screen); exit(1); } - if (!notify_internal_parent_init(ev_ctx)) { + if (!smbd_parent_notify_init(NULL, msg_ctx, ev_ctx)) { exit(1); } -- cgit