diff options
-rw-r--r-- | source4/cluster/ctdb/brlock_ctdb.c | 116 |
1 files changed, 80 insertions, 36 deletions
diff --git a/source4/cluster/ctdb/brlock_ctdb.c b/source4/cluster/ctdb/brlock_ctdb.c index e847a6f50c..ffc5facbb4 100644 --- a/source4/cluster/ctdb/brlock_ctdb.c +++ b/source4/cluster/ctdb/brlock_ctdb.c @@ -31,8 +31,6 @@ #include "ntvfs/common/brlock.h" #include "include/ctdb.h" -#define ENABLE_NOTIFIES 0 - enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2, FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4, FUNC_BRL_CLOSE=5}; @@ -296,26 +294,19 @@ static int brl_ctdb_lock_func(struct ctdb_call_info *call) struct lock_struct lock, *locks=NULL; NTSTATUS status = NT_STATUS_OK; -#if ENABLE_NOTIFIES /* if this is a pending lock, then with the chainlock held we try to get the real lock. If we succeed then we don't need to make it pending. This prevents a possible race condition where the pending lock gets created after the lock that is preventing the real lock gets removed */ - if (lock_type >= PENDING_READ_LOCK) { - enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK); - - /* here we need to force that the last_lock isn't overwritten */ - lock = brlh->last_lock; - status = brl_ctdb_lock(brl, brlh, smbpid, start, size, rw, NULL); - brlh->last_lock = lock; - - if (NT_STATUS_IS_OK(status)) { - tdb_chainunlock(brl->w->tdb, kbuf); - return NT_STATUS_OK; + if (req->lock_type >= PENDING_READ_LOCK) { + enum brl_type lock_type = req->lock_type; + req->lock_type = (req->lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK); + if (brl_ctdb_lock_func(call) == 0 && call->status == NT_STATUS_V(NT_STATUS_OK)) { + return 0; } + req->lock_type = lock_type; } -#endif dbuf = call->record_data; @@ -383,6 +374,7 @@ static NTSTATUS brl_ctdb_lock(struct brl_context *brl, struct ctdb_lock_req req; struct ctdb_call call; int ret; + NTSTATUS status; call.call_id = FUNC_BRL_LOCK; call.key.dptr = brlh->key.data; @@ -405,18 +397,35 @@ static NTSTATUS brl_ctdb_lock(struct brl_context *brl, return NT_STATUS_INTERNAL_DB_CORRUPTION; } - return NT_STATUS(call.status); + status = NT_STATUS(call.status); + + if (NT_STATUS_EQUAL(status, NT_STATUS_LOCK_NOT_GRANTED)) { + struct lock_struct lock; + lock.context.smbpid = smbpid; + lock.context.server = brl->server; + lock.context.ctx = brl; + lock.ntvfs = brlh->ntvfs; + lock.start = start; + lock.size = size; + lock.lock_type = lock_type; + lock.notify_ptr = notify_ptr; + status = brl_ctdb_lock_failed(brlh, &lock); + } + + return status; } -#if ENABLE_NOTIFIES /* - we are removing a lock that might be holding up a pending lock. Scan for pending - locks that cover this range and if we find any then notify the server that it should - retry the lock + we are removing a lock that might be holding up a pending lock. Scan + for pending locks that cover this range and if we find any then + notify the server that it should retry the lock. In this backend, we + notify by sending the list of locks that need to be notified on back + in the reply_data of the ctdb call. The caller then does the + messaging for us. */ -static void brl_ctdb_notify_unlock(struct brl_context *brl, - struct lock_struct *locks, int count, - struct lock_struct *removed_lock) +static int brl_ctdb_notify_unlock(struct ctdb_call_info *call, + struct lock_struct *locks, int count, + struct lock_struct *removed_lock) { int i, last_notice; @@ -429,36 +438,68 @@ static void brl_ctdb_notify_unlock(struct brl_context *brl, for (i=0;i<count;i++) { if (locks[i].lock_type >= PENDING_READ_LOCK && brl_ctdb_overlap(&locks[i], removed_lock)) { + struct lock_struct *nlocks; + int ncount; + if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) { continue; } if (locks[i].lock_type == PENDING_WRITE_LOCK) { last_notice = i; } - messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, - MSG_BRL_RETRY, locks[i].notify_ptr); + if (call->reply_data == NULL) { + call->reply_data = talloc_zero(call, TDB_DATA); + if (call->reply_data == NULL) { + return CTDB_ERR_NOMEM; + } + } + /* add to the list of pending locks to notify caller of */ + ncount = call->reply_data->dsize / sizeof(struct lock_struct); + nlocks = talloc_realloc(call->reply_data, call->reply_data->dptr, + struct lock_struct, ncount + 1); + if (nlocks == NULL) { + return CTDB_ERR_NOMEM; + } + call->reply_data->dptr = (uint8_t *)nlocks; + nlocks[ncount] = locks[i]; + call->reply_data->dsize += sizeof(struct lock_struct); } } + + return 0; } -#endif /* send notifications for all pending locks - the file is being closed by this user */ -static void brl_ctdb_notify_all(struct brl_context *brl, - struct lock_struct *locks, int count) +static int brl_ctdb_notify_all(struct ctdb_call_info *call, + struct lock_struct *locks, int count) { int i; for (i=0;i<count;i++) { if (locks->lock_type >= PENDING_READ_LOCK) { -#if ENABLE_NOTIFIES - brl_ctdb_notify_unlock(brl, locks, count, &locks[i]); -#endif + int ret = brl_ctdb_notify_unlock(call, locks, count, &locks[i]); + if (ret != 0) return ret; } } + return 0; } +/* + send off any messages needed to notify of pending locks that should now retry +*/ +static void brl_ctdb_notify_send(struct brl_context *brl, TDB_DATA *reply_data) +{ + struct lock_struct *locks = (struct lock_struct *)reply_data->dptr; + int i, count = reply_data->dsize / sizeof(struct lock_struct); + for (i=0;i<count;i++) { + messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, + MSG_BRL_RETRY, locks[i].notify_ptr); + } +} + + struct ctdb_unlock_req { uint16_t smbpid; uint64_t start; @@ -515,9 +556,7 @@ static int brl_ctdb_unlock_func(struct ctdb_call_info *call) found: if (i < count) { -#if ENABLE_NOTIFIES struct lock_struct removed_lock = *lock; -#endif call->new_data = talloc(call, TDB_DATA); if (call->new_data == NULL) { @@ -535,9 +574,8 @@ found: (count-(i+1))*sizeof(*lock)); if (count > 1) { -#if ENABLE_NOTIFIES - brl_ctdb_notify_unlock(req->brl, locks, count, &removed_lock); -#endif + int ret = brl_ctdb_notify_unlock(call, locks, count, &removed_lock); + if (ret != 0) return ret; } } @@ -584,6 +622,8 @@ static NTSTATUS brl_ctdb_unlock(struct brl_context *brl, return NT_STATUS_INTERNAL_DB_CORRUPTION; } + brl_ctdb_notify_send(brl, &call.reply_data); + return NT_STATUS(call.status); } @@ -808,6 +848,8 @@ static int brl_ctdb_close_func(struct ctdb_call_info *call) if (call->new_data == NULL) { return CTDB_ERR_NOMEM; } + + brl_ctdb_notify_all(call, locks, count); call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct)); if (call->new_data->dptr == NULL) { @@ -850,6 +892,8 @@ static NTSTATUS brl_ctdb_close(struct brl_context *brl, return NT_STATUS_INTERNAL_DB_CORRUPTION; } + brl_ctdb_notify_send(brl, &call.reply_data); + return NT_STATUS(call.status); } |