summaryrefslogtreecommitdiff
path: root/source4/lib
diff options
context:
space:
mode:
authorAndrew Tridgell <tridge@samba.org>2005-09-22 03:56:41 +0000
committerGerald (Jerry) Carter <jerry@samba.org>2007-10-10 13:38:41 -0500
commitede8415d61b6791114c65de1c283a4e8c11f1585 (patch)
treef57fed9af4459d650db40546527ba230382a3248 /source4/lib
parentd6de10b409329084075c59d9371695871c738362 (diff)
downloadsamba-ede8415d61b6791114c65de1c283a4e8c11f1585.tar.gz
samba-ede8415d61b6791114c65de1c283a4e8c11f1585.tar.bz2
samba-ede8415d61b6791114c65de1c283a4e8c11f1585.zip
r10405: added transactions into tdb, and hook them into ldb. See my
samba-technical posting for more details on the transactions design. This also adds a number of command line arguments to tdbtorture, making it more flexible, and fixes some lock deadlock conditions in the tdbtorture code. (This used to be commit 06bd8abba942ec9f1e23f5c5d546cbb71ca3a701)
Diffstat (limited to 'source4/lib')
-rw-r--r--source4/lib/ldb/Makefile.in3
-rw-r--r--source4/lib/ldb/configure.in1
-rw-r--r--source4/lib/ldb/ldb_tdb/ldb_tdb.c24
-rw-r--r--source4/lib/tdb/Makefile.in5
-rw-r--r--source4/lib/tdb/common/dump.c6
-rw-r--r--source4/lib/tdb/common/error.c1
-rw-r--r--source4/lib/tdb/common/freelist.c16
-rw-r--r--source4/lib/tdb/common/io.c60
-rw-r--r--source4/lib/tdb/common/lock.c50
-rw-r--r--source4/lib/tdb/common/open.c42
-rw-r--r--source4/lib/tdb/common/tdb.c4
-rw-r--r--source4/lib/tdb/common/tdb_private.h47
-rw-r--r--source4/lib/tdb/common/transaction.c976
-rw-r--r--source4/lib/tdb/common/traverse.c2
-rw-r--r--source4/lib/tdb/config.mk2
-rw-r--r--source4/lib/tdb/configure.in2
-rw-r--r--source4/lib/tdb/docs/README51
-rw-r--r--source4/lib/tdb/include/tdb.h17
-rw-r--r--source4/lib/tdb/tools/tdbtool.c26
-rw-r--r--source4/lib/tdb/tools/tdbtorture.c129
20 files changed, 1348 insertions, 116 deletions
diff --git a/source4/lib/ldb/Makefile.in b/source4/lib/ldb/Makefile.in
index b40933bc0d..2ffe6d25b8 100644
--- a/source4/lib/ldb/Makefile.in
+++ b/source4/lib/ldb/Makefile.in
@@ -44,7 +44,8 @@ LIB_FLAGS=-Llib -lldb $(LDAP_LIBS) $(SQLITE3_LIBS) $(GCOV_LIBS) @LIBS@
TDB_OBJ=$(TDBDIR)/common/tdb.o $(TDBDIR)/common/dump.o \
$(TDBDIR)/common/io.o $(TDBDIR)/common/lock.o \
$(TDBDIR)/common/open.o $(TDBDIR)/common/traverse.o \
- $(TDBDIR)/common/freelist.o $(TDBDIR)/common/error.o
+ $(TDBDIR)/common/freelist.o $(TDBDIR)/common/error.o \
+ $(TDBDIR)/common/transaction.o
TALLOC_OBJ=$(TALLOCDIR)/talloc.o
LDB_TDB_OBJ=ldb_tdb/ldb_tdb.o \
diff --git a/source4/lib/ldb/configure.in b/source4/lib/ldb/configure.in
index 7ae339c103..8b9e599892 100644
--- a/source4/lib/ldb/configure.in
+++ b/source4/lib/ldb/configure.in
@@ -34,4 +34,5 @@ WITH_SQLITE3=$with_sqlite3_support
AC_SUBST(WITH_SQLITE3)
sinclude(config.m4)
sinclude(../talloc/config.m4)
+sinclude(../tdb/config.m4)
AC_OUTPUT(Makefile ldb.pc)
diff --git a/source4/lib/ldb/ldb_tdb/ldb_tdb.c b/source4/lib/ldb/ldb_tdb/ldb_tdb.c
index 265e04a057..c056bd2e2d 100644
--- a/source4/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/source4/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -833,14 +833,28 @@ failed:
static int ltdb_start_trans(struct ldb_module *module)
{
- /* TODO: implement transactions */
+ struct ltdb_private *ltdb = module->private_data;
+
+ if (tdb_transaction_start(ltdb->tdb) != 0) {
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
return LDB_ERR_SUCCESS;
}
static int ltdb_end_trans(struct ldb_module *module, int status)
{
- /* TODO: implement transactions */
+ struct ltdb_private *ltdb = module->private_data;
+
+ if (status != LDB_ERR_SUCCESS) {
+ if (tdb_transaction_cancel(ltdb->tdb) != 0) {
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+ } else {
+ if (tdb_transaction_commit(ltdb->tdb) != 0) {
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+ }
return status;
}
@@ -881,6 +895,12 @@ int ltdb_connect(struct ldb_context *ldb, const char *url,
tdb_flags = TDB_DEFAULT;
+#if 0
+ /* set this to run tdb without disk sync, but still with
+ transactions */
+ tdb_flags |= TDB_NOSYNC;
+#endif
+
if (flags & LDB_FLG_RDONLY) {
open_flags = O_RDONLY;
} else {
diff --git a/source4/lib/tdb/Makefile.in b/source4/lib/tdb/Makefile.in
index 2fc77242b2..1942d9050a 100644
--- a/source4/lib/tdb/Makefile.in
+++ b/source4/lib/tdb/Makefile.in
@@ -2,7 +2,7 @@
# Makefile for tdb directory
#
-CFLAGS = -DTDB_DEBUG -g -Iinclude
+CFLAGS = -Iinclude @CFLAGS@
CC = @CC@
prefix = @prefix@
exec_prefix = @exec_prefix@
@@ -12,7 +12,8 @@ libdir = @libdir@
PROGS = bin/tdbtool bin/tdbtorture
TDB_OBJ = common/tdb.o common/dump.o common/io.o common/lock.o \
- common/open.o common/traverse.o common/freelist.o common/error.o
+ common/open.o common/traverse.o common/freelist.o common/error.o \
+ common/transaction.o
all: $(PROGS)
diff --git a/source4/lib/tdb/common/dump.c b/source4/lib/tdb/common/dump.c
index 0e203cc0d8..577f23aac6 100644
--- a/source4/lib/tdb/common/dump.c
+++ b/source4/lib/tdb/common/dump.c
@@ -33,7 +33,8 @@ static tdb_off_t tdb_dump_record(struct tdb_context *tdb, tdb_off_t offset)
struct list_struct rec;
tdb_off_t tailer_ofs, tailer;
- if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
+ if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
+ sizeof(rec), DOCONV()) == -1) {
printf("ERROR: failed to read record at %u\n", offset);
return 0;
}
@@ -107,7 +108,8 @@ int tdb_printfreelist(struct tdb_context *tdb)
printf("freelist top=[0x%08x]\n", rec_ptr );
while (rec_ptr) {
- if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
+ if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
+ sizeof(rec), DOCONV()) == -1) {
tdb_unlock(tdb, -1, F_WRLCK);
return -1;
}
diff --git a/source4/lib/tdb/common/error.c b/source4/lib/tdb/common/error.c
index 5b7696877c..270a441463 100644
--- a/source4/lib/tdb/common/error.c
+++ b/source4/lib/tdb/common/error.c
@@ -42,6 +42,7 @@ static struct tdb_errname {
{TDB_ERR_OOM, "Out of memory"},
{TDB_ERR_EXISTS, "Record exists"},
{TDB_ERR_NOLOCK, "Lock exists on other keys"},
+ {TDB_ERR_EINVAL, "Invalid parameter"},
{TDB_ERR_NOEXIST, "Record does not exist"} };
/* Error string for the last tdb error */
diff --git a/source4/lib/tdb/common/freelist.c b/source4/lib/tdb/common/freelist.c
index 018b7bc298..9707295ec3 100644
--- a/source4/lib/tdb/common/freelist.c
+++ b/source4/lib/tdb/common/freelist.c
@@ -31,7 +31,7 @@
/* read a freelist record and check for simple errors */
static int rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
{
- if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
+ if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
return -1;
if (rec->magic == TDB_MAGIC) {
@@ -40,7 +40,7 @@ static int rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_str
TDB_LOG((tdb, 0,"rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
rec->magic, off));
rec->magic = TDB_FREE_MAGIC;
- if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
+ if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
return -1;
}
@@ -51,7 +51,7 @@ static int rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_str
rec->magic, off));
return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
}
- if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
+ if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
return -1;
return 0;
}
@@ -111,7 +111,7 @@ int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
if (right + sizeof(*rec) <= tdb->map_size) {
struct list_struct r;
- if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
+ if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
goto left;
}
@@ -138,10 +138,16 @@ left:
TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
goto update;
}
+
+ /* it could be uninitialised data */
+ if (leftsize == 0 || leftsize == TDB_PAD_U32) {
+ goto update;
+ }
+
left = offset - leftsize;
/* Now read in record */
- if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
+ if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
goto update;
}
diff --git a/source4/lib/tdb/common/io.c b/source4/lib/tdb/common/io.c
index 97da809bc6..f02cd1a0e1 100644
--- a/source4/lib/tdb/common/io.c
+++ b/source4/lib/tdb/common/io.c
@@ -56,7 +56,7 @@
if necessary
note that "len" is the minimum length needed for the db
*/
-int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
+static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
{
struct stat st;
if (len <= tdb->map_size)
@@ -94,9 +94,10 @@ int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
}
/* write a lump of data at a specified offset */
-int tdb_write(struct tdb_context *tdb, tdb_off_t off, void *buf, tdb_len_t len)
+static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
+ const void *buf, tdb_len_t len)
{
- if (tdb_oob(tdb, off + len, 0) != 0)
+ if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
return -1;
if (tdb->map_ptr) {
@@ -122,9 +123,10 @@ void *tdb_convert(void *buf, u32 size)
/* read a lump of data at a specified offset, maybe convert */
-int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf, tdb_len_t len, int cv)
+static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
+ tdb_len_t len, int cv)
{
- if (tdb_oob(tdb, off + len, 0) != 0) {
+ if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
return -1;
}
@@ -140,8 +142,9 @@ int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf, tdb_len_t len, i
return TDB_ERRCODE(TDB_ERR_IO, -1);
}
}
- if (cv)
+ if (cv) {
tdb_convert(buf, len);
+ }
return 0;
}
@@ -151,7 +154,7 @@ int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf, tdb_len_t len, i
do an unlocked scan of the hash table heads to find the next non-zero head. The value
will then be confirmed with the lock held
*/
-void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
+static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
{
u32 h = *chain;
if (tdb->map_ptr) {
@@ -230,9 +233,10 @@ static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t ad
}
}
- /* now fill the file with something. This ensures that the file isn't sparse, which would be
- very bad if we ran out of disk. This must be done with write, not via mmap */
- memset(buf, 0x42, sizeof(buf));
+ /* now fill the file with something. This ensures that the
+ file isn't sparse, which would be very bad if we ran out of
+ disk. This must be done with write, not via mmap */
+ memset(buf, TDB_PAD_BYTE, sizeof(buf));
while (addition) {
int n = addition>sizeof(buf)?sizeof(buf):addition;
int ret = pwrite(tdb->fd, buf, n, size);
@@ -261,11 +265,11 @@ int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
}
/* must know about any previous expansions by another process */
- tdb_oob(tdb, tdb->map_size + 1, 1);
+ tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
/* always make room for at least 10 more records, and round
- the database up to a multiple of TDB_PAGE_SIZE */
- size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
+ the database up to a multiple of the page size */
+ size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
if (!(tdb->flags & TDB_INTERNAL))
tdb_munmap(tdb);
@@ -278,7 +282,7 @@ int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
/* expand the file itself */
if (!(tdb->flags & TDB_INTERNAL)) {
- if (tdb_expand_file(tdb, tdb->map_size, size) != 0)
+ if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
goto fail;
}
@@ -321,13 +325,13 @@ int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
/* read/write a tdb_off_t */
int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
{
- return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
+ return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
}
int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
{
tdb_off_t off = *d;
- return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
+ return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
}
@@ -343,7 +347,7 @@ unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len
len, strerror(errno)));
return TDB_ERRCODE(TDB_ERR_OOM, buf);
}
- if (tdb_read(tdb, offset, buf, len, 0) == -1) {
+ if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
SAFE_FREE(buf);
return NULL;
}
@@ -353,7 +357,7 @@ unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len
/* read/write a record */
int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
{
- if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
+ if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
return -1;
if (TDB_BAD_MAGIC(rec)) {
/* Ensure ecode is set for log fn. */
@@ -361,12 +365,28 @@ int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *
TDB_LOG((tdb, 0,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
}
- return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
+ return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
}
int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
{
struct list_struct r = *rec;
- return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
+ return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
}
+static const struct tdb_methods io_methods = {
+ .tdb_read = tdb_read,
+ .tdb_write = tdb_write,
+ .next_hash_chain = tdb_next_hash_chain,
+ .tdb_oob = tdb_oob,
+ .tdb_expand_file = tdb_expand_file,
+ .tdb_brlock = tdb_brlock
+};
+
+/*
+ initialise the default methods table
+*/
+void tdb_io_init(struct tdb_context *tdb)
+{
+ tdb->methods = &io_methods;
+}
diff --git a/source4/lib/tdb/common/lock.c b/source4/lib/tdb/common/lock.c
index 596f7bfc95..8061bab7b1 100644
--- a/source4/lib/tdb/common/lock.c
+++ b/source4/lib/tdb/common/lock.c
@@ -32,9 +32,12 @@
this functions locks/unlocks 1 byte at the specified offset.
On error, errno is also set so that errors are passed back properly
- through tdb_open(). */
-int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
- int rw_type, int lck_type, int probe)
+ through tdb_open().
+
+ note that a len of zero means lock to end of file
+*/
+int tdb_brlock_len(struct tdb_context *tdb, tdb_off_t offset,
+ int rw_type, int lck_type, int probe, size_t len)
{
struct flock fl;
int ret;
@@ -49,7 +52,7 @@ int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
fl.l_type = rw_type;
fl.l_whence = SEEK_SET;
fl.l_start = offset;
- fl.l_len = 1;
+ fl.l_len = len;
fl.l_pid = 0;
do {
@@ -57,25 +60,36 @@ int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
} while (ret == -1 && errno == EINTR);
if (ret == -1) {
- if (!probe && lck_type != F_SETLK) {
- /* Ensure error code is set for log fun to examine. */
- tdb->ecode = TDB_ERR_LOCK;
- TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n",
- tdb->fd, offset, rw_type, lck_type));
- }
/* Generic lock error. errno set by fcntl.
* EAGAIN is an expected return from non-blocking
* locks. */
if (errno != EAGAIN) {
- TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n",
+ TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n",
tdb->fd, offset, rw_type, lck_type,
strerror(errno)));
+ } else if (!probe && lck_type != F_SETLK) {
+ /* Ensure error code is set for log fun to examine. */
+ tdb->ecode = TDB_ERR_LOCK;
+ TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n",
+ tdb->fd, offset, rw_type, lck_type));
}
return TDB_ERRCODE(TDB_ERR_LOCK, -1);
}
return 0;
}
+
+/* a byte range locking function - return 0 on success
+ this functions locks/unlocks 1 byte at the specified offset.
+
+ On error, errno is also set so that errors are passed back properly
+ through tdb_open(). */
+int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
+ int rw_type, int lck_type, int probe)
+{
+ return tdb_brlock_len(tdb, offset, rw_type, lck_type, probe, 1);
+}
+
/* lock a list in the database. list -1 is the alloc list */
int tdb_lock(struct tdb_context *tdb, int list, int ltype)
{
@@ -90,12 +104,13 @@ int tdb_lock(struct tdb_context *tdb, int list, int ltype)
/* Since fcntl locks don't nest, we do a lock for the first one,
and simply bump the count for future ones */
if (tdb->locked[list+1].count == 0) {
- if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
+ if (tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n",
list, ltype, strerror(errno)));
return -1;
}
tdb->locked[list+1].ltype = ltype;
+ tdb->num_locks++;
}
tdb->locked[list+1].count++;
return 0;
@@ -124,7 +139,8 @@ int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
if (tdb->locked[list+1].count == 1) {
/* Down to last nested lock: unlock underneath */
- ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
+ ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
+ tdb->num_locks--;
} else {
ret = 0;
}
@@ -194,7 +210,7 @@ int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
/* record lock stops delete underneath */
int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
{
- return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
+ return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
}
/*
@@ -208,7 +224,7 @@ int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
for (i = &tdb->travlocks; i; i = i->next)
if (i->off == off)
return -1;
- return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
+ return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
}
/*
@@ -217,7 +233,7 @@ int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
*/
int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
{
- return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
+ return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
}
/* fcntl locks don't stack: avoid unlocking someone else's */
@@ -231,5 +247,5 @@ int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
for (i = &tdb->travlocks; i; i = i->next)
if (i->off == off)
count++;
- return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
+ return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
}
diff --git a/source4/lib/tdb/common/open.c b/source4/lib/tdb/common/open.c
index d86ffff646..117152d4d9 100644
--- a/source4/lib/tdb/common/open.c
+++ b/source4/lib/tdb/common/open.c
@@ -144,6 +144,7 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
errno = ENOMEM;
goto fail;
}
+ tdb_io_init(tdb);
tdb->fd = -1;
tdb->name = NULL;
tdb->map_ptr = NULL;
@@ -152,6 +153,12 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
tdb->log_fn = log_fn?log_fn:null_log_fn;
tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
+ /* cache the page size */
+ tdb->page_size = getpagesize();
+ if (tdb->page_size <= 0) {
+ tdb->page_size = 0x2000;
+ }
+
if ((open_flags & O_ACCMODE) == O_WRONLY) {
TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
name));
@@ -186,7 +193,7 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
}
/* ensure there is only one process initialising at once */
- if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
+ if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
name, strerror(errno)));
goto fail; /* errno set by tdb_brlock */
@@ -194,7 +201,7 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
/* we need to zero database if we are the only one with it open */
if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
- (locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))) {
+ (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))) {
open_flags |= O_CREAT;
if (ftruncate(tdb->fd, 0) == -1) {
TDB_LOG((tdb, 0, "tdb_open_ex: "
@@ -260,7 +267,7 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
}
tdb_mmap(tdb);
if (locked) {
- if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
+ if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
TDB_LOG((tdb, 0, "tdb_open_ex: "
"failed to take ACTIVE_LOCK on %s: %s\n",
name, strerror(errno)));
@@ -275,15 +282,20 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
if (tdb_flags & TDB_CLEAR_IF_FIRST) {
/* leave this lock in place to indicate it's in use */
- if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
+ if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
goto fail;
}
+ /* if needed, run recovery */
+ if (tdb_transaction_recover(tdb) == -1) {
+ goto fail;
+ }
+
internal:
/* Internal (memory-only) databases skip all the code above to
* do with disk files, and resume here by releasing their
* global lock and hooking into the active list. */
- if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
+ if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
goto fail;
tdb->next = tdbs;
tdbs = tdb;
@@ -322,6 +334,10 @@ int tdb_close(struct tdb_context *tdb)
struct tdb_context **i;
int ret = 0;
+ if (tdb->transaction) {
+ tdb_transaction_cancel(tdb);
+ }
+
if (tdb->map_ptr) {
if (tdb->flags & TDB_INTERNAL)
SAFE_FREE(tdb->map_ptr);
@@ -360,8 +376,20 @@ int tdb_reopen(struct tdb_context *tdb)
{
struct stat st;
- if (tdb->flags & TDB_INTERNAL)
+ if (tdb->flags & TDB_INTERNAL) {
return 0; /* Nothing to do. */
+ }
+
+ if (tdb->num_locks != 0) {
+ TDB_LOG((tdb, 0, "tdb_reopen: reopen not allowed with locks held\n"));
+ goto fail;
+ }
+
+ if (tdb->transaction != 0) {
+ TDB_LOG((tdb, 0, "tdb_reopen: reopen not allowed inside a transaction\n"));
+ goto fail;
+ }
+
if (tdb_munmap(tdb) != 0) {
TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
goto fail;
@@ -374,7 +402,7 @@ int tdb_reopen(struct tdb_context *tdb)
goto fail;
}
if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
- (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)) {
+ (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)) {
TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
goto fail;
}
diff --git a/source4/lib/tdb/common/tdb.c b/source4/lib/tdb/common/tdb.c
index f099c2d1aa..c37d37a4f2 100644
--- a/source4/lib/tdb/common/tdb.c
+++ b/source4/lib/tdb/common/tdb.c
@@ -98,7 +98,7 @@ static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_
return -1;
}
- if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
+ if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
dbuf.dptr, dbuf.dsize) == -1)
return -1;
@@ -285,7 +285,7 @@ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
/* write out and point the top of the hash chain at it */
if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
- || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
+ || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
|| tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
/* Need to tdb_unallocate() here */
goto fail;
diff --git a/source4/lib/tdb/common/tdb_private.h b/source4/lib/tdb/common/tdb_private.h
index 8e246e3283..eefcc52557 100644
--- a/source4/lib/tdb/common/tdb_private.h
+++ b/source4/lib/tdb/common/tdb_private.h
@@ -29,6 +29,8 @@
#include <config.h>
#endif
+#define _XOPEN_SOURCE 500
+
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
@@ -39,7 +41,7 @@
#include <errno.h>
#include <sys/mman.h>
#include <sys/stat.h>
-#include "tdbconfig.h"
+#include "config.h"
#include "tdb.h"
#else
#include "includes.h"
@@ -56,23 +58,30 @@
typedef u32 tdb_len_t;
typedef u32 tdb_off_t;
+#ifndef offsetof
+#define offsetof(t,f) ((unsigned int)&((t *)0)->f)
+#endif
+
#define TDB_MAGIC_FOOD "TDB file\n"
#define TDB_VERSION (0x26011967 + 6)
#define TDB_MAGIC (0x26011999U)
#define TDB_FREE_MAGIC (~TDB_MAGIC)
#define TDB_DEAD_MAGIC (0xFEE1DEAD)
+#define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
#define TDB_ALIGNMENT 4
#define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
#define DEFAULT_HASH_SIZE 131
-#define TDB_PAGE_SIZE 0x2000
#define FREELIST_TOP (sizeof(struct tdb_header))
#define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
#define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
#define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
#define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
#define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
+#define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
#define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
-
+#define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
+#define TDB_PAD_BYTE 0x42
+#define TDB_PAD_U32 0x42424242
/* NB assumes there is a local variable called "tdb" that is the
* current context, also takes doubly-parenthesized print-style
@@ -80,8 +89,9 @@ typedef u32 tdb_off_t;
#define TDB_LOG(x) tdb->log_fn x
/* lock offsets */
-#define GLOBAL_LOCK 0
-#define ACTIVE_LOCK 4
+#define GLOBAL_LOCK 0
+#define ACTIVE_LOCK 4
+#define TRANSACTION_LOCK 8
#ifndef MAP_FILE
#define MAP_FILE 0
@@ -138,8 +148,9 @@ struct tdb_header {
char magic_food[32]; /* for /etc/magic */
u32 version; /* version of the code */
u32 hash_size; /* number of hash entries */
- tdb_off_t rwlocks;
- tdb_off_t reserved[31];
+ tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
+ tdb_off_t recovery_start; /* offset of transaction recovery region */
+ tdb_off_t reserved[30];
};
struct tdb_lock_type {
@@ -154,6 +165,15 @@ struct tdb_traverse_lock {
};
+struct tdb_methods {
+ int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
+ int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
+ void (*next_hash_chain)(struct tdb_context *, u32 *);
+ int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
+ int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
+ int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int);
+};
+
struct tdb_context {
char *name; /* the name of the database */
void *map_ptr; /* where it is currently mapped */
@@ -171,7 +191,10 @@ struct tdb_context {
void (*log_fn)(struct tdb_context *tdb, int level, const char *, ...) PRINTF_ATTRIBUTE(3,4); /* logging function */
unsigned int (*hash_fn)(TDB_DATA *key);
int open_flags; /* flags used in the open - needed by reopen */
+ unsigned int num_locks; /* number of chain locks held */
+ const struct tdb_methods *methods;
struct tdb_transaction *transaction;
+ int page_size;
};
@@ -180,13 +203,11 @@ struct tdb_context {
*/
int tdb_munmap(struct tdb_context *tdb);
void tdb_mmap(struct tdb_context *tdb);
-int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf, tdb_len_t len, int cv);
-int tdb_write(struct tdb_context *tdb, tdb_off_t off, void *buf, tdb_len_t len);
-int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe);
int tdb_lock(struct tdb_context *tdb, int list, int ltype);
int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
-int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe);
+int tdb_brlock_len(struct tdb_context *tdb, tdb_off_t offset,
+ int rw_type, int lck_type, int probe, size_t len);
int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
@@ -204,5 +225,7 @@ int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct
unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
struct list_struct *rec);
-void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain);
+void tdb_io_init(struct tdb_context *tdb);
+int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
+
diff --git a/source4/lib/tdb/common/transaction.c b/source4/lib/tdb/common/transaction.c
new file mode 100644
index 0000000000..b9d44a7283
--- /dev/null
+++ b/source4/lib/tdb/common/transaction.c
@@ -0,0 +1,976 @@
+ /*
+ Unix SMB/CIFS implementation.
+
+ trivial database library
+
+ Copyright (C) Andrew Tridgell 2005
+
+ ** NOTE! The following LGPL license applies to the tdb
+ ** library. This does NOT imply that all of Samba is released
+ ** under the LGPL
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "tdb_private.h"
+
+/*
+ transaction design:
+
+ - only allow a single transaction at a time per database. This makes
+ using the transaction API simpler, as otherwise the caller would
+ have to cope with temporary failures in transactions that conflict
+ with other current transactions
+
+ - keep the transaction recovery information in the same file as the
+ database, using a special 'transaction recovery' record pointed at
+ by the header. This removes the need for extra journal files as
+ used by some other databases
+
+ - dymacially allocated the transaction recover record, re-using it
+ for subsequent transactions. If a larger record is needed then
+ tdb_free() the old record to place it on the normal tdb freelist
+ before allocating the new record
+
+ - during transactions, keep a linked list of writes all that have
+ been performed by intercepting all tdb_write() calls. The hooked
+ transaction versions of tdb_read() and tdb_write() check this
+ linked list and try to use the elements of the list in preference
+ to the real database.
+
+ - don't allow any locks to be held when a transaction starts,
+ otherwise we can end up with deadlock (plus lack of lock nesting
+ in posix locks would mean the lock is lost)
+
+ - if the caller gains a lock during the transaction but doesn't
+ release it then fail the commit
+
+ - allow for nested calls to tdb_transaction_start(), re-using the
+ existing transaction record. If the inner transaction is cancelled
+ then a subsequent commit will fail
+
+ - keep a mirrored copy of the tdb hash chain heads to allow for the
+ fast hash heads scan on traverse, updating the mirrored copy in
+ the transaction version of tdb_write
+
+ - allow callers to mix transaction and non-transaction use of tdb,
+ although once a transaction is started then an exclusive lock is
+ gained until the transaction is committed or cancelled
+
+ - the commit stategy involves first saving away all modified data
+ into a linearised buffer in the transaction recovery area, then
+ marking the transaction recovery area with a magic value to
+ indicate a valid recovery record. In total 4 fsync/msync calls are
+ needed per commit to prevent race conditions. It might be possible
+ to reduce this to 3 or even 2 with some more work.
+
+ - check for a valid recovery record on open of the tdb, while the
+ global lock is held. Automatically recover from the transaction
+ recovery area if needed, then continue with the open as
+ usual. This allows for smooth crash recovery with no administrator
+ intervention.
+
+ - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
+ still available, but no transaction recovery area is used and no
+ fsync/msync calls are made.
+
+*/
+
+
+/*
+ hold the context of any current transaction
+*/
+struct tdb_transaction {
+ /* we keep a mirrored copy of the tdb hash heads here so
+ tdb_next_hash_chain() can operate efficiently */
+ u32 *hash_heads;
+
+ /* the original io methods - used to do IOs to the real db */
+ const struct tdb_methods *io_methods;
+
+ /* the list of transaction elements. We use a doubly linked
+ list with a last pointer to allow us to keep the list
+ ordered, with first element at the front of the list. It
+ needs to be doubly linked as the read/write traversals need
+ to be backwards, while the commit needs to be forwards */
+ struct tdb_transaction_el {
+ struct tdb_transaction_el *next, *prev;
+ tdb_off_t offset;
+ tdb_len_t length;
+ unsigned char *data;
+ } *elements, *elements_last;
+
+ /* non-zero when an internal transaction error has
+ occurred. All write operations will then fail until the
+ transaction is ended */
+ int transaction_error;
+
+ /* when inside a transaction we need to keep track of any
+ nested tdb_transaction_start() calls, as these are allowed,
+ but don't create a new transaction */
+ int nesting;
+
+ /* old file size before transaction */
+ tdb_len_t old_map_size;
+};
+
+
+/*
+ read while in a transaction. We need to check first if the data is in our list
+ of transaction elements, then if not do a real read
+*/
+static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
+ tdb_len_t len, int cv)
+{
+ struct tdb_transaction_el *el;
+
+ /* we need to walk the list backwards to get the most recent data */
+ for (el=tdb->transaction->elements_last;el;el=el->prev) {
+ tdb_len_t partial;
+
+ if (off+len <= el->offset) {
+ continue;
+ }
+ if (off >= el->offset + el->length) {
+ continue;
+ }
+
+ /* an overlapping read - needs to be split into up to
+ 2 reads and a memcpy */
+ if (off < el->offset) {
+ partial = el->offset - off;
+ if (transaction_read(tdb, off, buf, partial, cv) != 0) {
+ goto fail;
+ }
+ len -= partial;
+ off += partial;
+ buf = (void *)(partial + (char *)buf);
+ }
+ if (off + len <= el->offset + el->length) {
+ partial = len;
+ } else {
+ partial = el->offset + el->length - off;
+ }
+ memcpy(buf, el->data + (off - el->offset), partial);
+ if (cv) {
+ tdb_convert(buf, len);
+ }
+ len -= partial;
+ off += partial;
+ buf = (void *)(partial + (char *)buf);
+
+ if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
+ goto fail;
+ }
+
+ return 0;
+ }
+
+ /* its not in the transaction elements - do a real read */
+ return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
+
+fail:
+ TDB_LOG((tdb, 0, "transaction_read: failed at off=%d len=%d\n", off, len));
+ tdb->ecode = TDB_ERR_IO;
+ tdb->transaction->transaction_error = 1;
+ return -1;
+}
+
+
+/*
+ write while in a transaction
+*/
+static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
+ const void *buf, tdb_len_t len)
+{
+ struct tdb_transaction_el *el;
+
+ /* if the write is to a hash head, then update the transaction
+ hash heads */
+ if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
+ off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
+ u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
+ memcpy(&tdb->transaction->hash_heads[chain], buf, len);
+ }
+
+ /* first see if we can replace an existing entry */
+ for (el=tdb->transaction->elements_last;el;el=el->prev) {
+ tdb_len_t partial;
+
+ if (off+len <= el->offset) {
+ continue;
+ }
+ if (off >= el->offset + el->length) {
+ continue;
+ }
+
+ /* an overlapping write - needs to be split into up to
+ 2 writes and a memcpy */
+ if (off < el->offset) {
+ partial = el->offset - off;
+ if (transaction_write(tdb, off, buf, partial) != 0) {
+ goto fail;
+ }
+ len -= partial;
+ off += partial;
+ buf = (const void *)(partial + (const char *)buf);
+ }
+ if (off + len <= el->offset + el->length) {
+ partial = len;
+ } else {
+ partial = el->offset + el->length - off;
+ }
+ memcpy(el->data + (off - el->offset), buf, partial);
+ len -= partial;
+ off += partial;
+ buf = (const void *)(partial + (const char *)buf);
+
+ if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
+ goto fail;
+ }
+
+ return 0;
+ }
+
+ /* add a new entry at the end of the list */
+ el = malloc(sizeof(*el));
+ if (el == NULL) {
+ tdb->ecode = TDB_ERR_OOM;
+ tdb->transaction->transaction_error = 1;
+ return -1;
+ }
+ el->next = NULL;
+ el->prev = tdb->transaction->elements_last;
+ el->offset = off;
+ el->length = len;
+ el->data = malloc(len);
+ if (el->data == NULL) {
+ free(el);
+ tdb->ecode = TDB_ERR_OOM;
+ tdb->transaction->transaction_error = 1;
+ return -1;
+ }
+ if (buf) {
+ memcpy(el->data, buf, len);
+ } else {
+ memset(el->data, TDB_PAD_BYTE, len);
+ }
+ if (el->prev) {
+ el->prev->next = el;
+ } else {
+ tdb->transaction->elements = el;
+ }
+ tdb->transaction->elements_last = el;
+ return 0;
+
+fail:
+ TDB_LOG((tdb, 0, "transaction_write: failed at off=%d len=%d\n", off, len));
+ tdb->ecode = TDB_ERR_IO;
+ tdb->transaction->transaction_error = 1;
+ return -1;
+}
+
+/*
+ accelerated hash chain head search, using the cached hash heads
+*/
+static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
+{
+ u32 h = *chain;
+ for (;h < tdb->header.hash_size;h++) {
+ /* the +1 takes account of the freelist */
+ if (0 != tdb->transaction->hash_heads[h+1]) {
+ break;
+ }
+ }
+ (*chain) = h;
+}
+
+/*
+ out of bounds check during a transaction
+*/
+static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
+{
+ if (len <= tdb->map_size) {
+ return 0;
+ }
+ return TDB_ERRCODE(TDB_ERR_IO, -1);
+}
+
+/*
+ transaction version of tdb_expand().
+*/
+static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
+ tdb_off_t addition)
+{
+ /* add a write to the transaction elements, so subsequent
+ reads see the zero data */
+ if (transaction_write(tdb, size, NULL, addition) != 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ brlock during a transaction - ignore them
+*/
+int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
+ int rw_type, int lck_type, int probe)
+{
+ return 0;
+}
+
+static const struct tdb_methods transaction_methods = {
+ .tdb_read = transaction_read,
+ .tdb_write = transaction_write,
+ .next_hash_chain = transaction_next_hash_chain,
+ .tdb_oob = transaction_oob,
+ .tdb_expand_file = transaction_expand_file,
+ .tdb_brlock = transaction_brlock
+};
+
+
+/*
+ start a tdb transaction. No token is returned, as only a single
+ transaction is allowed to be pending per tdb_context
+*/
+int tdb_transaction_start(struct tdb_context *tdb)
+{
+ /* some sanity checks */
+ if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
+ TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
+ tdb->ecode = TDB_ERR_EINVAL;
+ return -1;
+ }
+
+ /* cope with nested tdb_transaction_start() calls */
+ if (tdb->transaction != NULL) {
+ tdb->transaction->nesting++;
+ TDB_LOG((tdb, 0, "tdb_transaction_start: nesting %d\n",
+ tdb->transaction->nesting));
+ return 0;
+ }
+
+ if (tdb->num_locks != 0) {
+ /* the caller must not have any locks when starting a
+ transaction as otherwise we'll be screwed by lack
+ of nested locks in posix */
+ TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction with locks held\n"));
+ tdb->ecode = TDB_ERR_LOCK;
+ return -1;
+ }
+
+ tdb->transaction = calloc(sizeof(struct tdb_transaction), 1);
+ if (tdb->transaction == NULL) {
+ tdb->ecode = TDB_ERR_OOM;
+ return -1;
+ }
+
+ /* get the transaction write lock. This is a blocking lock. As
+ discussed with Volker, there are a number of ways we could
+ make this async, which we will probably do in the future */
+ if (tdb_brlock_len(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_start: failed to get transaction lock\n"));
+ tdb->ecode = TDB_ERR_LOCK;
+ SAFE_FREE(tdb->transaction);
+ return -1;
+ }
+
+ /* get a write lock from the freelist to the end of file. It
+ would be much better to make this a read lock as it would
+ increase parallelism, but it could lead to deadlocks on
+ commit when a write lock needs to be taken.
+
+ TODO: look at alternative locking strategies to allow this
+ to be a read lock
+ */
+ if (tdb_brlock_len(tdb, FREELIST_TOP, F_WRLCK, F_SETLKW, 0, 0) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_start: failed to get hash locks\n"));
+ tdb->ecode = TDB_ERR_LOCK;
+ goto fail;
+ }
+
+ /* setup a copy of the hash table heads so the hash scan in
+ traverse can be fast */
+ tdb->transaction->hash_heads = calloc(tdb->header.hash_size+1, sizeof(tdb_off_t));
+ if (tdb->transaction->hash_heads == NULL) {
+ tdb->ecode = TDB_ERR_OOM;
+ goto fail;
+ }
+ if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
+ TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
+ TDB_LOG((tdb, 0, "tdb_transaction_start: failed to read hash heads\n"));
+ tdb->ecode = TDB_ERR_IO;
+ goto fail;
+ }
+
+ /* make sure we know about any file expansions already done by
+ anyone else */
+ tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+ tdb->transaction->old_map_size = tdb->map_size;
+
+ /* finally hook the io methods, replacing them with
+ transaction specific methods */
+ tdb->transaction->io_methods = tdb->methods;
+ tdb->methods = &transaction_methods;
+
+ return 0;
+
+fail:
+ tdb_brlock_len(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
+ tdb_brlock_len(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+ SAFE_FREE(tdb->transaction->hash_heads);
+ SAFE_FREE(tdb->transaction);
+ return -1;
+}
+
+
+/*
+ cancel the current transaction
+*/
+int tdb_transaction_cancel(struct tdb_context *tdb)
+{
+ if (tdb->transaction == NULL) {
+ TDB_LOG((tdb, 0, "tdb_transaction_cancel: no transaction\n"));
+ return -1;
+ }
+
+ if (tdb->transaction->nesting != 0) {
+ tdb->transaction->transaction_error = 1;
+ tdb->transaction->nesting--;
+ return 0;
+ }
+
+ tdb->map_size = tdb->transaction->old_map_size;
+
+ /* free all the transaction elements */
+ while (tdb->transaction->elements) {
+ struct tdb_transaction_el *el = tdb->transaction->elements;
+ tdb->transaction->elements = el->next;
+ free(el->data);
+ free(el);
+ }
+
+ /* remove any locks created during the transaction */
+ if (tdb->num_locks != 0) {
+ int h;
+ for (h=0;h<tdb->header.hash_size+1;h++) {
+ if (tdb->locked[h].count != 0) {
+ tdb_brlock_len(tdb,FREELIST_TOP+4*h,F_UNLCK,F_SETLKW, 0, 1);
+ tdb->locked[h].count = 0;
+ }
+ }
+ tdb->num_locks = 0;
+ }
+
+ /* restore the normal io methods */
+ tdb->methods = tdb->transaction->io_methods;
+
+ tdb_brlock_len(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
+ tdb_brlock_len(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+ SAFE_FREE(tdb->transaction->hash_heads);
+ SAFE_FREE(tdb->transaction);
+
+ return 0;
+}
+
+/*
+ sync to disk
+*/
+static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
+{
+ if (fsync(tdb->fd) != 0) {
+ tdb->ecode = TDB_ERR_IO;
+ TDB_LOG((tdb, 0, "tdb_transaction: fsync failed\n"));
+ return -1;
+ }
+#ifdef MS_SYNC
+ if (tdb->map_ptr) {
+ tdb_off_t moffset = offset & ~(tdb->page_size-1);
+ if (msync(moffset + (char *)tdb->map_ptr,
+ length + (offset - moffset), MS_SYNC) != 0) {
+ tdb->ecode = TDB_ERR_IO;
+ TDB_LOG((tdb, 0, "tdb_transaction: msync failed\n"));
+ return -1;
+ }
+ }
+#endif
+ return 0;
+}
+
+
+/*
+ work out how much space the linearised recovery data will consume
+*/
+static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
+{
+ struct tdb_transaction_el *el;
+ tdb_len_t recovery_size = 0;
+
+ recovery_size = sizeof(u32);
+ for (el=tdb->transaction->elements;el;el=el->next) {
+ if (el->offset >= tdb->transaction->old_map_size) {
+ continue;
+ }
+ recovery_size += 2*sizeof(tdb_off_t) + el->length;
+ }
+
+ return recovery_size;
+}
+
+/*
+ allocate the recovery area, or use an existing recovery area if it is
+ large enough
+*/
+static int tdb_recovery_allocate(struct tdb_context *tdb,
+ tdb_len_t *recovery_size,
+ tdb_off_t *recovery_offset,
+ tdb_len_t *recovery_max_size)
+{
+ struct list_struct rec;
+ const struct tdb_methods *methods = tdb->transaction->io_methods;
+ tdb_off_t recovery_head;
+
+ if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
+ TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to read recovery head\n"));
+ return -1;
+ }
+
+ rec.rec_len = 0;
+
+ if (recovery_head != 0 &&
+ methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
+ TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to read recovery record\n"));
+ return -1;
+ }
+
+ *recovery_size = tdb_recovery_size(tdb);
+
+ if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
+ /* it fits in the existing area */
+ *recovery_max_size = rec.rec_len;
+ *recovery_offset = recovery_head;
+ return 0;
+ }
+
+ /* we need to free up the old recovery area, then allocate a
+ new one at the end of the file. Note that we cannot use
+ tdb_allocate() to allocate the new one as that might return
+ us an area that is being currently used (as of the start of
+ the transaction) */
+ if (recovery_head != 0) {
+ if (tdb_free(tdb, recovery_head, &rec) == -1) {
+ TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to free previous recovery area\n"));
+ return -1;
+ }
+ }
+
+ /* the tdb_free() call might have increased the recovery size */
+ *recovery_size = tdb_recovery_size(tdb);
+
+ /* round up to a multiple of page size */
+ *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
+ *recovery_offset = tdb->map_size;
+ recovery_head = *recovery_offset;
+
+ if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
+ (tdb->map_size - tdb->transaction->old_map_size) +
+ sizeof(rec) + *recovery_max_size) == -1) {
+ TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to create recovery area\n"));
+ return -1;
+ }
+
+ /* remap the file (if using mmap) */
+ methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+
+ /* we have to reset the old map size so that we don't try to expand the file
+ again in the transaction commit, which would destroy the recovery area */
+ tdb->transaction->old_map_size = tdb->map_size;
+
+ /* write the recovery header offset and sync - we can sync without a race here
+ as the magic ptr in the recovery record has not been set */
+ CONVERT(recovery_head);
+ if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
+ &recovery_head, sizeof(tdb_off_t)) == -1) {
+ TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to write recovery head\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/*
+ setup the recovery data that will be used on a crash during commit
+*/
+static int transaction_setup_recovery(struct tdb_context *tdb,
+ tdb_off_t *magic_offset)
+{
+ struct tdb_transaction_el *el;
+ tdb_len_t recovery_size;
+ unsigned char *data, *p;
+ const struct tdb_methods *methods = tdb->transaction->io_methods;
+ struct list_struct *rec;
+ tdb_off_t recovery_offset, recovery_max_size;
+ tdb_off_t old_map_size = tdb->transaction->old_map_size;
+ u32 magic;
+
+ /*
+ check that the recovery area has enough space
+ */
+ if (tdb_recovery_allocate(tdb, &recovery_size,
+ &recovery_offset, &recovery_max_size) == -1) {
+ return -1;
+ }
+
+ data = malloc(recovery_size + sizeof(*rec));
+ if (data == NULL) {
+ tdb->ecode = TDB_ERR_OOM;
+ return -1;
+ }
+
+ rec = (struct list_struct *)data;
+ memset(rec, 0, sizeof(*rec));
+
+ rec->magic = 0;
+ rec->data_len = recovery_size;
+ rec->rec_len = recovery_max_size;
+ rec->key_len = old_map_size;
+ CONVERT(rec);
+
+ /* build the recovery data into a single blob to allow us to do a single
+ large write, which should be more efficient */
+ p = data + sizeof(*rec);
+ for (el=tdb->transaction->elements;el;el=el->next) {
+ if (el->offset >= old_map_size) {
+ continue;
+ }
+ if (el->offset + el->length > tdb->transaction->old_map_size) {
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: transaction data over new region boundary\n"));
+ free(data);
+ tdb->ecode = TDB_ERR_CORRUPT;
+ return -1;
+ }
+ ((u32 *)p)[0] = el->offset;
+ ((u32 *)p)[1] = el->length;
+ if (DOCONV()) {
+ tdb_convert(p, 8);
+ }
+ /* the recovery area contains the old data, not the
+ new data, so we have to call the original tdb_read
+ method to get it */
+ if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
+ free(data);
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+ p += 8 + el->length;
+ }
+
+ /* and the tailer */
+ *(u32 *)p = sizeof(*rec) + recovery_max_size;
+ CONVERT(p);
+
+ /* write the recovery data to the recovery area */
+ if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to write recovery data\n"));
+ free(data);
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+
+ /* as we don't have ordered writes, we have to sync the recovery
+ data before we update the magic to indicate that the recovery
+ data is present */
+ if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
+ free(data);
+ return -1;
+ }
+
+ free(data);
+
+ magic = TDB_RECOVERY_MAGIC;
+ CONVERT(magic);
+
+ *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
+
+ if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to write recovery magic\n"));
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+
+ /* ensure the recovery magic marker is on disk */
+ if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ commit the current transaction
+*/
+int tdb_transaction_commit(struct tdb_context *tdb)
+{
+ const struct tdb_methods *methods;
+ tdb_off_t magic_offset;
+ u32 zero = 0;
+
+ if (tdb->transaction == NULL) {
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: no transaction\n"));
+ return -1;
+ }
+
+ if (tdb->transaction->transaction_error) {
+ tdb->ecode = TDB_ERR_IO;
+ tdb_transaction_cancel(tdb);
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: transaction error pending\n"));
+ return -1;
+ }
+
+ if (tdb->transaction->nesting != 0) {
+ tdb->transaction->nesting--;
+ return 0;
+ }
+
+ /* check for a null transaction */
+ if (tdb->transaction->elements == NULL) {
+ tdb_transaction_cancel(tdb);
+ return 0;
+ }
+
+ methods = tdb->transaction->io_methods;
+
+ /* if there are any locks pending then the caller has not
+ nested their locks properly, so fail the transaction */
+ if (tdb->num_locks) {
+ tdb->ecode = TDB_ERR_LOCK;
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: locks pending on commit\n"));
+ tdb_transaction_cancel(tdb);
+ return -1;
+ }
+
+ /* get the global lock - this prevents new users attaching to the database
+ during the commit */
+ if (tdb_brlock_len(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to get global lock\n"));
+ tdb->ecode = TDB_ERR_LOCK;
+ tdb_transaction_cancel(tdb);
+ return -1;
+ }
+
+ if (!(tdb->flags & TDB_NOSYNC)) {
+ /* write the recovery data to the end of the file */
+ if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to setup recovery data\n"));
+ tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+ tdb_transaction_cancel(tdb);
+ return -1;
+ }
+ }
+
+ /* expand the file to the new size if needed */
+ if (tdb->map_size != tdb->transaction->old_map_size) {
+ if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
+ tdb->map_size -
+ tdb->transaction->old_map_size) == -1) {
+ tdb->ecode = TDB_ERR_IO;
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: expansion failed\n"));
+ tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+ tdb_transaction_cancel(tdb);
+ return -1;
+ }
+ tdb->map_size = tdb->transaction->old_map_size;
+ methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+ }
+
+ /* perform all the writes */
+ while (tdb->transaction->elements) {
+ struct tdb_transaction_el *el = tdb->transaction->elements;
+
+ if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: write failed during commit\n"));
+
+ /* we've overwritten part of the data and
+ possibly expanded the file, so we need to
+ run the crash recovery code */
+ tdb->methods = methods;
+ tdb_transaction_recover(tdb);
+
+ tdb_transaction_cancel(tdb);
+ tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: write failed\n"));
+ return -1;
+ }
+ tdb->transaction->elements = el->next;
+ free(el->data);
+ free(el);
+ }
+
+ if (!(tdb->flags & TDB_NOSYNC)) {
+ /* ensure the new data is on disk */
+ if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
+ return -1;
+ }
+
+ /* remove the recovery marker */
+ if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to remove recovery magic\n"));
+ return -1;
+ }
+
+ /* ensure the recovery marker has been removed on disk */
+ if (transaction_sync(tdb, magic_offset, 4) == -1) {
+ return -1;
+ }
+ }
+
+ tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+
+ /* use a transaction cancel to free memory and remove the
+ transaction locks */
+ tdb_transaction_cancel(tdb);
+ return 0;
+}
+
+
+/*
+ recover from an aborted transaction. Must be called with exclusive
+ database write access already established (including the global
+ lock to prevent new processes attaching)
+*/
+int tdb_transaction_recover(struct tdb_context *tdb)
+{
+ tdb_off_t recovery_head, recovery_eof;
+ unsigned char *data, *p;
+ u32 zero = 0;
+ struct list_struct rec;
+
+ /* find the recovery area */
+ if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery head\n"));
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+
+ if (recovery_head == 0) {
+ /* we have never allocated a recovery record */
+ return 0;
+ }
+
+ /* read the recovery record */
+ if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
+ sizeof(rec), DOCONV()) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery record\n"));
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+
+ if (rec.magic != TDB_RECOVERY_MAGIC) {
+ /* there is no valid recovery data */
+ return 0;
+ }
+
+ if (tdb->read_only) {
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: attempt to recover read only database\n"));
+ tdb->ecode = TDB_ERR_CORRUPT;
+ return -1;
+ }
+
+ recovery_eof = rec.key_len;
+
+ data = malloc(rec.data_len);
+ if (data == NULL) {
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to allocate recovery data\n"));
+ tdb->ecode = TDB_ERR_OOM;
+ return -1;
+ }
+
+ /* read the full recovery data */
+ if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
+ rec.data_len, 0) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery data\n"));
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+
+ /* recover the file data */
+ p = data;
+ while (p+8 < data + rec.data_len) {
+ u32 ofs, len;
+ if (DOCONV()) {
+ tdb_convert(p, 8);
+ }
+ ofs = ((u32 *)p)[0];
+ len = ((u32 *)p)[1];
+
+ if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
+ free(data);
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+ p += 8 + len;
+ }
+
+ free(data);
+
+ if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to sync recovery\n"));
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+
+ /* if the recovery area is after the recovered eof then remove it */
+ if (recovery_eof <= recovery_head) {
+ if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to remove recovery head\n"));
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+ }
+
+ /* remove the recovery magic */
+ if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
+ &zero) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to remove recovery magic\n"));
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+
+ /* reduce the file size to the old size */
+ tdb_munmap(tdb);
+ if (ftruncate(tdb->fd, recovery_eof) != 0) {
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to reduce to recovery size\n"));
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+ tdb->map_size = recovery_eof;
+ tdb_mmap(tdb);
+
+ if (transaction_sync(tdb, 0, recovery_eof) == -1) {
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to sync2 recovery\n"));
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
+ }
+
+ TDB_LOG((tdb, 0, "tdb_transaction_recover: recovered %d byte database\n",
+ recovery_eof));
+
+ /* all done */
+ return 0;
+}
diff --git a/source4/lib/tdb/common/traverse.c b/source4/lib/tdb/common/traverse.c
index d14be355a2..7d1e99cbe8 100644
--- a/source4/lib/tdb/common/traverse.c
+++ b/source4/lib/tdb/common/traverse.c
@@ -65,7 +65,7 @@ static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tloc
factor of around 80 in speed on a linux 2.6.x
system (testing using ldbtest).
*/
- tdb_next_hash_chain(tdb, &tlock->hash);
+ tdb->methods->next_hash_chain(tdb, &tlock->hash);
if (tlock->hash == tdb->header.hash_size) {
continue;
}
diff --git a/source4/lib/tdb/config.mk b/source4/lib/tdb/config.mk
index ddb57acf0d..bfc1900299 100644
--- a/source4/lib/tdb/config.mk
+++ b/source4/lib/tdb/config.mk
@@ -5,7 +5,7 @@ INIT_OBJ_FILES = \
lib/tdb/common/tdb.o
ADD_OBJ_FILES = lib/tdb/common/dump.o lib/tdb/common/io.o lib/tdb/common/lock.o \
lib/tdb/common/open.o lib/tdb/common/traverse.o lib/tdb/common/freelist.o \
- lib/tdb/common/error.o lib/tdb/common/tdbutil.o
+ lib/tdb/common/error.o lib/tdb/common/transaction.o lib/tdb/common/tdbutil.o
NOPROTO=YES
REQUIRED_SUBSYSTEMS = \
LIBREPLACE
diff --git a/source4/lib/tdb/configure.in b/source4/lib/tdb/configure.in
index 68ea39c140..4e5c42c351 100644
--- a/source4/lib/tdb/configure.in
+++ b/source4/lib/tdb/configure.in
@@ -1,7 +1,7 @@
AC_DEFUN([SMB_MODULE_DEFAULT], [echo -n ""])
AC_DEFUN([SMB_LIBRARY_ENABLE], [echo -n ""])
AC_INIT(include/tdb.h)
-AC_CONFIG_HEADER(include/tdbconfig.h)
+AC_CONFIG_HEADER(include/config.h)
AC_PROG_CC
AC_FUNC_MMAP
AC_DEFINE([_GNU_SOURCE],[],[Pull in GNU extensions])
diff --git a/source4/lib/tdb/docs/README b/source4/lib/tdb/docs/README
index 664d65ffd9..18b32de37f 100644
--- a/source4/lib/tdb/docs/README
+++ b/source4/lib/tdb/docs/README
@@ -40,6 +40,7 @@ The interface is very similar to gdbm except for the following:
- no tdbm_reorganise() function
- no tdbm_sync() function. No operations are cached in the library anyway
- added a tdb_traverse() function for traversing the whole database
+- added transactions support
A general rule for using tdb is that the caller frees any returned
TDB_DATA structures. Just call free(p.dptr) to free a TDB_DATA
@@ -67,6 +68,19 @@ TDB_CONTEXT *tdb_open(char *name, int hash_size, int tdb_flags,
memory. The filename is ignored in this case.
TDB_NOLOCK - don't do any locking
TDB_NOMMAP - don't use mmap
+ TDB_NOSYNC - don't synchronise transactions to disk
+
+----------------------------------------------------------------------
+TDB_CONTEXT *tdb_open_ex(char *name, int hash_size, int tdb_flags,
+ int open_flags, mode_t mode,
+ tdb_log_func log_fn,
+ tdb_hash_func hash_fn)
+
+This is like tdb_open(), but allows you to pass an initial logging and
+hash function. Be careful when passing a hash function - all users of
+the database must use the same hash function or you will get data
+corruption.
+
----------------------------------------------------------------------
char *tdb_error(TDB_CONTEXT *tdb);
@@ -164,3 +178,40 @@ int tdb_lockchain(TDB_CONTEXT *tdb, TDB_DATA key);
int tdb_unlockchain(TDB_CONTEXT *tdb, TDB_DATA key);
unlock one hash chain
+
+----------------------------------------------------------------------
+int tdb_transaction_start(TDB_CONTEXT *tdb)
+
+ start a transaction. All operations after the transaction start can
+ either be committed with tdb_transaction_commit() or cancelled with
+ tdb_transaction_cancel().
+
+ If you call tdb_transaction_start() again on the same tdb context
+ while a transaction is in progress, then the same transaction
+ buffer is re-used. The number of tdb_transaction_{commit,cancel}
+ operations must match the number of successful
+ tdb_transaction_start() calls.
+
+ Note that transactions are by default disk synchronous, and use a
+ recover area in the database to automatically recover the database
+ on the next open if the system crashes during a transaction. You
+ can disable the synchronous transaction recovery setup using the
+ TDB_NOSYNC flag, which will greatly speed up operations at the risk
+ of corrupting your database if the system crashes.
+
+ Operations made within a transaction are not visible to other users
+ of the database until a successful commit.
+
+----------------------------------------------------------------------
+int tdb_transaction_cancel(TDB_CONTEXT *tdb)
+
+ cancel a current transaction, discarding all write and lock
+ operations that have been made since the transaction started.
+
+
+----------------------------------------------------------------------
+int tdb_transaction_commit(TDB_CONTEXT *tdb)
+
+ commit a current transaction, updating the database and releasing
+ the transaction locks.
+
diff --git a/source4/lib/tdb/include/tdb.h b/source4/lib/tdb/include/tdb.h
index cc3f3c18d8..3f123d814c 100644
--- a/source4/lib/tdb/include/tdb.h
+++ b/source4/lib/tdb/include/tdb.h
@@ -45,13 +45,14 @@ extern "C" {
#define TDB_NOMMAP 8 /* don't use mmap */
#define TDB_CONVERT 16 /* convert endian (internal use) */
#define TDB_BIGENDIAN 32 /* header is big-endian (internal use) */
+#define TDB_NOSYNC 64 /* don't use synchronous transactions */
#define TDB_ERRCODE(code, ret) ((tdb->ecode = (code)), ret)
/* error codes */
enum TDB_ERROR {TDB_SUCCESS=0, TDB_ERR_CORRUPT, TDB_ERR_IO, TDB_ERR_LOCK,
TDB_ERR_OOM, TDB_ERR_EXISTS, TDB_ERR_NOLOCK, TDB_ERR_LOCK_TIMEOUT,
- TDB_ERR_NOEXIST};
+ TDB_ERR_NOEXIST, TDB_ERR_EINVAL};
typedef struct TDB_DATA {
unsigned char *dptr;
@@ -59,7 +60,15 @@ typedef struct TDB_DATA {
} TDB_DATA;
#ifndef PRINTF_ATTRIBUTE
-#define PRINTF_ATTRIBUTE(a,b)
+#if (__GNUC__ >= 3)
+/** Use gcc attribute to check printf fns. a1 is the 1-based index of
+ * the parameter containing the format, and a2 the index of the first
+ * argument. Note that some gcc 2.x versions don't handle this
+ * properly **/
+#define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
+#else
+#define PRINTF_ATTRIBUTE(a1, a2)
+#endif
#endif
/* this is the context structure that is returned from a db open */
@@ -95,6 +104,10 @@ void tdb_unlockall(struct tdb_context *tdb);
const char *tdb_name(struct tdb_context *tdb);
int tdb_fd(struct tdb_context *tdb);
tdb_log_func tdb_log_fn(struct tdb_context *tdb);
+int tdb_transaction_start(struct tdb_context *tdb);
+int tdb_transaction_commit(struct tdb_context *tdb);
+int tdb_transaction_cancel(struct tdb_context *tdb);
+int tdb_transaction_recover(struct tdb_context *tdb);
/* Low level locking functions: use with care */
int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key);
diff --git a/source4/lib/tdb/tools/tdbtool.c b/source4/lib/tdb/tools/tdbtool.c
index 5a8c871699..0941a73118 100644
--- a/source4/lib/tdb/tools/tdbtool.c
+++ b/source4/lib/tdb/tools/tdbtool.c
@@ -34,6 +34,7 @@
#include <sys/time.h>
#include <ctype.h>
#include <signal.h>
+#include <stdarg.h>
#include "tdb.h"
/* a tdb tool for manipulating a tdb database */
@@ -77,6 +78,19 @@ static void print_asc(unsigned char *buf,int len)
printf("%c",isprint(buf[i])?buf[i]:'.');
}
+#ifdef PRINTF_ATTRIBUTE
+static void tdb_log(struct tdb_context *t, int level, const char *format, ...) PRINTF_ATTRIBUTE(3,4);
+#endif
+static void tdb_log(struct tdb_context *t, int level, const char *format, ...)
+{
+ va_list ap;
+
+ va_start(ap, format);
+ vfprintf(stdout, format, ap);
+ va_end(ap);
+ fflush(stdout);
+}
+
static void print_data(unsigned char *buf,int len)
{
int i=0;
@@ -131,7 +145,7 @@ static void help(void)
"\n");
}
-static void terror(char *why)
+static void terror(const char *why)
{
printf("%s\n", why);
}
@@ -175,8 +189,8 @@ static void create_tdb(void)
return;
}
if (tdb) tdb_close(tdb);
- tdb = tdb_open(tok, 0, TDB_CLEAR_IF_FIRST,
- O_RDWR | O_CREAT | O_TRUNC, 0600);
+ tdb = tdb_open_ex(tok, 0, TDB_CLEAR_IF_FIRST,
+ O_RDWR | O_CREAT | O_TRUNC, 0600, tdb_log, NULL);
if (!tdb) {
printf("Could not create %s: %s\n", tok, strerror(errno));
}
@@ -190,7 +204,7 @@ static void open_tdb(void)
return;
}
if (tdb) tdb_close(tdb);
- tdb = tdb_open(tok, 0, 0, O_RDWR, 0600);
+ tdb = tdb_open_ex(tok, 0, 0, O_RDWR, 0600, tdb_log, NULL);
if (!tdb) {
printf("Could not open %s: %s\n", tok, strerror(errno));
}
@@ -326,7 +340,7 @@ static void move_rec(void)
print_rec(tdb, key, dbuf, NULL);
- dst_tdb = tdb_open(file, 0, 0, O_RDWR, 0600);
+ dst_tdb = tdb_open_ex(file, 0, 0, O_RDWR, 0600, tdb_log, NULL);
if ( !dst_tdb ) {
terror("unable to open destination tdb");
return;
@@ -377,7 +391,7 @@ static void info_tdb(void)
printf("%d records totalling %d bytes\n", count, total_bytes);
}
-static char *tdb_getline(char *prompt)
+static char *tdb_getline(const char *prompt)
{
static char line[1024];
char *p;
diff --git a/source4/lib/tdb/tools/tdbtorture.c b/source4/lib/tdb/tools/tdbtorture.c
index 0cf82fa5a5..b0a2e7484f 100644
--- a/source4/lib/tdb/tools/tdbtorture.c
+++ b/source4/lib/tdb/tools/tdbtorture.c
@@ -1,6 +1,8 @@
/* this tests tdb by doing lots of ops from several simultaneous
- writers - that stresses the locking code. Build with TDB_DEBUG=1
- for best effect */
+ writers - that stresses the locking code.
+*/
+
+#define _GNU_SOURCE
#ifndef _SAMBA_BUILD_
#include <stdlib.h>
@@ -28,11 +30,14 @@
#endif
+#include <getopt.h>
+
#define REOPEN_PROB 30
#define DELETE_PROB 8
#define STORE_PROB 4
#define APPEND_PROB 6
-#define LOCKSTORE_PROB 0
+#define TRANSACTION_PROB 10
+#define LOCKSTORE_PROB 5
#define TRAVERSE_PROB 20
#define CULL_PROB 100
#define KEYLEN 3
@@ -40,6 +45,7 @@
#define LOCKLEN 20
static struct tdb_context *db;
+static int in_transaction;
#ifdef PRINTF_ATTRIBUTE
static void tdb_log(struct tdb_context *tdb, int level, const char *format, ...) PRINTF_ATTRIBUTE(3,4);
@@ -84,25 +90,25 @@ static char *randbuf(int len)
static int cull_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf,
void *state)
{
+#if CULL_PROB
if (random() % CULL_PROB == 0) {
tdb_delete(tdb, key);
}
+#endif
return 0;
}
static void addrec_db(void)
{
- int klen, dlen, slen;
- char *k, *d, *s;
- TDB_DATA key, data, lockkey;
+ int klen, dlen;
+ char *k, *d;
+ TDB_DATA key, data;
klen = 1 + (rand() % KEYLEN);
dlen = 1 + (rand() % DATALEN);
- slen = 1 + (rand() % LOCKLEN);
k = randbuf(klen);
d = randbuf(dlen);
- s = randbuf(slen);
key.dptr = (unsigned char *)k;
key.dsize = klen+1;
@@ -110,11 +116,32 @@ static void addrec_db(void)
data.dptr = (unsigned char *)d;
data.dsize = dlen+1;
- lockkey.dptr = (unsigned char *)s;
- lockkey.dsize = slen+1;
+#if TRANSACTION_PROB
+ if (in_transaction == 0 && random() % TRANSACTION_PROB == 0) {
+ if (tdb_transaction_start(db) != 0) {
+ fatal("tdb_transaction_start failed");
+ }
+ in_transaction++;
+ goto next;
+ }
+ if (in_transaction && random() % TRANSACTION_PROB == 0) {
+ if (tdb_transaction_commit(db) != 0) {
+ fatal("tdb_transaction_commit failed");
+ }
+ in_transaction--;
+ goto next;
+ }
+ if (in_transaction && random() % TRANSACTION_PROB == 0) {
+ if (tdb_transaction_cancel(db) != 0) {
+ fatal("tdb_transaction_cancel failed");
+ }
+ in_transaction--;
+ goto next;
+ }
+#endif
#if REOPEN_PROB
- if (random() % REOPEN_PROB == 0) {
+ if (in_transaction == 0 && random() % REOPEN_PROB == 0) {
tdb_reopen_all();
goto next;
}
@@ -147,13 +174,13 @@ static void addrec_db(void)
#if LOCKSTORE_PROB
if (random() % LOCKSTORE_PROB == 0) {
- tdb_chainlock(db, lockkey);
+ tdb_chainlock(db, key);
data = tdb_fetch(db, key);
if (tdb_store(db, key, data, TDB_REPLACE) != 0) {
fatal("tdb_store failed");
}
if (data.dptr) free(data.dptr);
- tdb_chainunlock(db, lockkey);
+ tdb_chainunlock(db, key);
goto next;
}
#endif
@@ -171,7 +198,6 @@ static void addrec_db(void)
next:
free(k);
free(d);
- free(s);
}
static int traverse_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf,
@@ -181,38 +207,71 @@ static int traverse_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf,
return 0;
}
-#ifndef NPROC
-#define NPROC 2
-#endif
-
-#ifndef NLOOPS
-#define NLOOPS 5000
-#endif
-
- int main(int argc, const char *argv[])
+static void usage(void)
{
- int i, seed=0;
- int loops = NLOOPS;
- pid_t pids[NPROC];
+ printf("Usage: tdbtorture [-n NUM_PROCS] [-l NUM_LOOPS] [-s SEED] [-H HASH_SIZE]\n");
+ exit(0);
+}
- pids[0] = getpid();
+ int main(int argc, char * const *argv)
+{
+ int i, seed = -1;
+ int num_procs = 2;
+ int num_loops = 5000;
+ int hash_size = 2;
+ int c;
+ extern char *optarg;
+ pid_t *pids;
+
+ while ((c = getopt(argc, argv, "n:l:s:H:h")) != -1) {
+ switch (c) {
+ case 'n':
+ num_procs = strtol(optarg, NULL, 0);
+ break;
+ case 'l':
+ num_loops = strtol(optarg, NULL, 0);
+ break;
+ case 'H':
+ hash_size = strtol(optarg, NULL, 0);
+ break;
+ case 's':
+ seed = strtol(optarg, NULL, 0);
+ break;
+ default:
+ usage();
+ }
+ }
unlink("torture.tdb");
- for (i=0;i<NPROC-1;i++) {
+ pids = calloc(sizeof(pid_t), num_procs);
+ pids[0] = getpid();
+
+ for (i=0;i<num_procs-1;i++) {
if ((pids[i+1]=fork()) == 0) break;
}
- db = tdb_open("torture.tdb", 2, TDB_CLEAR_IF_FIRST,
- O_RDWR | O_CREAT, 0600);
+ db = tdb_open_ex("torture.tdb", hash_size, TDB_CLEAR_IF_FIRST,
+ O_RDWR | O_CREAT, 0600, tdb_log, NULL);
if (!db) {
fatal("db open failed");
}
- tdb_logging_function(db, tdb_log);
- srand(seed + getpid());
- srandom(seed + getpid() + time(NULL));
- for (i=0;i<loops;i++) addrec_db();
+ if (seed == -1) {
+ seed = (getpid() + time(NULL)) & 0x7FFFFFFF;
+ }
+
+ if (i == 0) {
+ printf("testing with %d processes, %d loops, %d hash_size, seed=%d\n",
+ num_procs, num_loops, hash_size, seed);
+ }
+
+ srand(seed + i);
+ srandom(seed + i);
+
+ for (i=0;i<num_loops;i++) {
+ addrec_db();
+ }
tdb_traverse(db, NULL, NULL);
tdb_traverse(db, traverse_fn, NULL);
@@ -221,7 +280,7 @@ static int traverse_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf,
tdb_close(db);
if (getpid() == pids[0]) {
- for (i=0;i<NPROC-1;i++) {
+ for (i=0;i<num_procs-1;i++) {
int status;
if (waitpid(pids[i+1], &status, 0) != pids[i+1]) {
printf("failed to wait for %d\n",