summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--source4/cluster/ctdb/brlock_ctdb.c116
1 files changed, 80 insertions, 36 deletions
diff --git a/source4/cluster/ctdb/brlock_ctdb.c b/source4/cluster/ctdb/brlock_ctdb.c
index e847a6f50c..ffc5facbb4 100644
--- a/source4/cluster/ctdb/brlock_ctdb.c
+++ b/source4/cluster/ctdb/brlock_ctdb.c
@@ -31,8 +31,6 @@
#include "ntvfs/common/brlock.h"
#include "include/ctdb.h"
-#define ENABLE_NOTIFIES 0
-
enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2,
FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4,
FUNC_BRL_CLOSE=5};
@@ -296,26 +294,19 @@ static int brl_ctdb_lock_func(struct ctdb_call_info *call)
struct lock_struct lock, *locks=NULL;
NTSTATUS status = NT_STATUS_OK;
-#if ENABLE_NOTIFIES
/* if this is a pending lock, then with the chainlock held we
try to get the real lock. If we succeed then we don't need
to make it pending. This prevents a possible race condition
where the pending lock gets created after the lock that is
preventing the real lock gets removed */
- if (lock_type >= PENDING_READ_LOCK) {
- enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
-
- /* here we need to force that the last_lock isn't overwritten */
- lock = brlh->last_lock;
- status = brl_ctdb_lock(brl, brlh, smbpid, start, size, rw, NULL);
- brlh->last_lock = lock;
-
- if (NT_STATUS_IS_OK(status)) {
- tdb_chainunlock(brl->w->tdb, kbuf);
- return NT_STATUS_OK;
+ if (req->lock_type >= PENDING_READ_LOCK) {
+ enum brl_type lock_type = req->lock_type;
+ req->lock_type = (req->lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
+ if (brl_ctdb_lock_func(call) == 0 && call->status == NT_STATUS_V(NT_STATUS_OK)) {
+ return 0;
}
+ req->lock_type = lock_type;
}
-#endif
dbuf = call->record_data;
@@ -383,6 +374,7 @@ static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
struct ctdb_lock_req req;
struct ctdb_call call;
int ret;
+ NTSTATUS status;
call.call_id = FUNC_BRL_LOCK;
call.key.dptr = brlh->key.data;
@@ -405,18 +397,35 @@ static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
return NT_STATUS_INTERNAL_DB_CORRUPTION;
}
- return NT_STATUS(call.status);
+ status = NT_STATUS(call.status);
+
+ if (NT_STATUS_EQUAL(status, NT_STATUS_LOCK_NOT_GRANTED)) {
+ struct lock_struct lock;
+ lock.context.smbpid = smbpid;
+ lock.context.server = brl->server;
+ lock.context.ctx = brl;
+ lock.ntvfs = brlh->ntvfs;
+ lock.start = start;
+ lock.size = size;
+ lock.lock_type = lock_type;
+ lock.notify_ptr = notify_ptr;
+ status = brl_ctdb_lock_failed(brlh, &lock);
+ }
+
+ return status;
}
-#if ENABLE_NOTIFIES
/*
- we are removing a lock that might be holding up a pending lock. Scan for pending
- locks that cover this range and if we find any then notify the server that it should
- retry the lock
+ we are removing a lock that might be holding up a pending lock. Scan
+ for pending locks that cover this range and if we find any then
+ notify the server that it should retry the lock. In this backend, we
+ notify by sending the list of locks that need to be notified on back
+ in the reply_data of the ctdb call. The caller then does the
+ messaging for us.
*/
-static void brl_ctdb_notify_unlock(struct brl_context *brl,
- struct lock_struct *locks, int count,
- struct lock_struct *removed_lock)
+static int brl_ctdb_notify_unlock(struct ctdb_call_info *call,
+ struct lock_struct *locks, int count,
+ struct lock_struct *removed_lock)
{
int i, last_notice;
@@ -429,36 +438,68 @@ static void brl_ctdb_notify_unlock(struct brl_context *brl,
for (i=0;i<count;i++) {
if (locks[i].lock_type >= PENDING_READ_LOCK &&
brl_ctdb_overlap(&locks[i], removed_lock)) {
+ struct lock_struct *nlocks;
+ int ncount;
+
if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) {
continue;
}
if (locks[i].lock_type == PENDING_WRITE_LOCK) {
last_notice = i;
}
- messaging_send_ptr(brl->messaging_ctx, locks[i].context.server,
- MSG_BRL_RETRY, locks[i].notify_ptr);
+ if (call->reply_data == NULL) {
+ call->reply_data = talloc_zero(call, TDB_DATA);
+ if (call->reply_data == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ }
+ /* add to the list of pending locks to notify caller of */
+ ncount = call->reply_data->dsize / sizeof(struct lock_struct);
+ nlocks = talloc_realloc(call->reply_data, call->reply_data->dptr,
+ struct lock_struct, ncount + 1);
+ if (nlocks == NULL) {
+ return CTDB_ERR_NOMEM;
+ }
+ call->reply_data->dptr = (uint8_t *)nlocks;
+ nlocks[ncount] = locks[i];
+ call->reply_data->dsize += sizeof(struct lock_struct);
}
}
+
+ return 0;
}
-#endif
/*
send notifications for all pending locks - the file is being closed by this
user
*/
-static void brl_ctdb_notify_all(struct brl_context *brl,
- struct lock_struct *locks, int count)
+static int brl_ctdb_notify_all(struct ctdb_call_info *call,
+ struct lock_struct *locks, int count)
{
int i;
for (i=0;i<count;i++) {
if (locks->lock_type >= PENDING_READ_LOCK) {
-#if ENABLE_NOTIFIES
- brl_ctdb_notify_unlock(brl, locks, count, &locks[i]);
-#endif
+ int ret = brl_ctdb_notify_unlock(call, locks, count, &locks[i]);
+ if (ret != 0) return ret;
}
}
+ return 0;
}
+/*
+ send off any messages needed to notify of pending locks that should now retry
+*/
+static void brl_ctdb_notify_send(struct brl_context *brl, TDB_DATA *reply_data)
+{
+ struct lock_struct *locks = (struct lock_struct *)reply_data->dptr;
+ int i, count = reply_data->dsize / sizeof(struct lock_struct);
+ for (i=0;i<count;i++) {
+ messaging_send_ptr(brl->messaging_ctx, locks[i].context.server,
+ MSG_BRL_RETRY, locks[i].notify_ptr);
+ }
+}
+
+
struct ctdb_unlock_req {
uint16_t smbpid;
uint64_t start;
@@ -515,9 +556,7 @@ static int brl_ctdb_unlock_func(struct ctdb_call_info *call)
found:
if (i < count) {
-#if ENABLE_NOTIFIES
struct lock_struct removed_lock = *lock;
-#endif
call->new_data = talloc(call, TDB_DATA);
if (call->new_data == NULL) {
@@ -535,9 +574,8 @@ found:
(count-(i+1))*sizeof(*lock));
if (count > 1) {
-#if ENABLE_NOTIFIES
- brl_ctdb_notify_unlock(req->brl, locks, count, &removed_lock);
-#endif
+ int ret = brl_ctdb_notify_unlock(call, locks, count, &removed_lock);
+ if (ret != 0) return ret;
}
}
@@ -584,6 +622,8 @@ static NTSTATUS brl_ctdb_unlock(struct brl_context *brl,
return NT_STATUS_INTERNAL_DB_CORRUPTION;
}
+ brl_ctdb_notify_send(brl, &call.reply_data);
+
return NT_STATUS(call.status);
}
@@ -808,6 +848,8 @@ static int brl_ctdb_close_func(struct ctdb_call_info *call)
if (call->new_data == NULL) {
return CTDB_ERR_NOMEM;
}
+
+ brl_ctdb_notify_all(call, locks, count);
call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct));
if (call->new_data->dptr == NULL) {
@@ -850,6 +892,8 @@ static NTSTATUS brl_ctdb_close(struct brl_context *brl,
return NT_STATUS_INTERNAL_DB_CORRUPTION;
}
+ brl_ctdb_notify_send(brl, &call.reply_data);
+
return NT_STATUS(call.status);
}