diff options
author | Andrew Tridgell <tridge@samba.org> | 2005-09-22 13:12:46 +0000 |
---|---|---|
committer | Gerald (Jerry) Carter <jerry@samba.org> | 2007-10-10 13:38:44 -0500 |
commit | bd310b792509f7305d7dc029eb4bec109322a4bf (patch) | |
tree | 7621c4474ccae12b9126e09c25bd6ff70fbc998e /source4/lib/tdb/common/traverse.c | |
parent | 05bd880626255c6547922204d7ba012aa9bc6d50 (diff) | |
download | samba-bd310b792509f7305d7dc029eb4bec109322a4bf.tar.gz samba-bd310b792509f7305d7dc029eb4bec109322a4bf.tar.bz2 samba-bd310b792509f7305d7dc029eb4bec109322a4bf.zip |
r10421: following on discussions with simo, I have worked out a way of
allowing searches to proceed while another process is in a
transaction, then only upgrading the transaction lock to a write lock
on commit.
The solution is:
- split tdb_traverse() into two calls, called tdb_traverse() and
tdb_traverse_read(). The _read() version only gets read locks, and
will fail any write operations made in the callback from the
traverse.
- the normal tdb_traverse() call allows for read or write operations
in the callback, but gets the transaction lock, preventing
transastions from starting inside the traverse
In addition we enforce the following rule that you may not start a
transaction within a traverse callback, although you can start a
traverse within a transaction
With these rules in place I believe all the deadlock possibilities are
removed, and we can now allow for searches to happen in parallel with
transactions
(This used to be commit 7dd31288a701d772e45b1960ac4ce4cc1be782ed)
Diffstat (limited to 'source4/lib/tdb/common/traverse.c')
-rw-r--r-- | source4/lib/tdb/common/traverse.c | 70 |
1 files changed, 55 insertions, 15 deletions
diff --git a/source4/lib/tdb/common/traverse.c b/source4/lib/tdb/common/traverse.c index 7d1e99cbe8..335dce4152 100644 --- a/source4/lib/tdb/common/traverse.c +++ b/source4/lib/tdb/common/traverse.c @@ -71,7 +71,7 @@ static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tloc } } - if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1) + if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1) return -1; /* No previous record? Start at top of chain. */ @@ -118,7 +118,7 @@ static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tloc tdb_do_delete(tdb, current, rec) != 0) goto fail; } - tdb_unlock(tdb, tlock->hash, F_WRLCK); + tdb_unlock(tdb, tlock->hash, tlock->lock_rw); want_next = 0; } /* We finished iteration without finding anything */ @@ -126,7 +126,7 @@ static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tloc fail: tlock->off = 0; - if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0) + if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0) TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n")); return -1; } @@ -136,32 +136,33 @@ static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tloc if fn is NULL then it is not called a non-zero return value from fn() indicates that the traversal should stop */ -int tdb_traverse(struct tdb_context *tdb, tdb_traverse_func fn, void *private) +static int tdb_traverse_internal(struct tdb_context *tdb, + tdb_traverse_func fn, void *private, + struct tdb_traverse_lock *tl) { TDB_DATA key, dbuf; struct list_struct rec; - struct tdb_traverse_lock tl = { NULL, 0, 0 }; int ret, count = 0; /* This was in the initializaton, above, but the IRIX compiler * did not like it. crh */ - tl.next = tdb->travlocks.next; + tl->next = tdb->travlocks.next; /* fcntl locks don't stack: beware traverse inside traverse */ - tdb->travlocks.next = &tl; + tdb->travlocks.next = tl; /* tdb_next_lock places locks on the record returned, and its chain */ - while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) { + while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) { count++; /* now read the full record */ - key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), + key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec), rec.key_len + rec.data_len); if (!key.dptr) { ret = -1; - if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) + if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) goto out; - if (tdb_unlock_record(tdb, tl.off) != 0) + if (tdb_unlock_record(tdb, tl->off) != 0) TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n")); goto out; } @@ -170,31 +171,70 @@ int tdb_traverse(struct tdb_context *tdb, tdb_traverse_func fn, void *private) dbuf.dsize = rec.data_len; /* Drop chain lock, call out */ - if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) { + if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) { ret = -1; goto out; } if (fn && fn(tdb, key, dbuf, private)) { /* They want us to terminate traversal */ ret = count; - if (tdb_unlock_record(tdb, tl.off) != 0) { + if (tdb_unlock_record(tdb, tl->off) != 0) { TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));; ret = -1; } - tdb->travlocks.next = tl.next; + tdb->travlocks.next = tl->next; SAFE_FREE(key.dptr); return count; } SAFE_FREE(key.dptr); } out: - tdb->travlocks.next = tl.next; + tdb->travlocks.next = tl->next; if (ret < 0) return -1; else return count; } + +/* + a write style traverse - temporarily marks the db read only +*/ +int tdb_traverse_read(struct tdb_context *tdb, + tdb_traverse_func fn, void *private) +{ + struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK }; + int ret, read_only = tdb->read_only; + tdb->read_only = 1; + ret = tdb_traverse_internal(tdb, fn, private, &tl); + tdb->read_only = read_only; + return ret; +} + +/* + a write style traverse - needs to get the transaction lock to + prevent deadlocks +*/ +int tdb_traverse(struct tdb_context *tdb, + tdb_traverse_func fn, void *private) +{ + struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK }; + int ret; + + if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0) == -1) { + TDB_LOG((tdb, 0, "tdb_traverse: failed to get transaction lock\n")); + tdb->ecode = TDB_ERR_LOCK; + return -1; + } + + ret = tdb_traverse_internal(tdb, fn, private, &tl); + + tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0); + + return ret; +} + + /* find the first entry in the database and return its key */ TDB_DATA tdb_firstkey(struct tdb_context *tdb) { |