changeset 10612:89423355fa6f

6650218 Commit callbacks API for the DMU 6747932 Add a ZAP API to move a ZAP cursor to a given key. 6856020 ztest keeps creating and doesn't destroy threads
author Ricardo M. Correia <Ricardo.M.Correia@Sun.COM>
date Tue, 22 Sep 2009 15:59:55 -0600
parents 5facf6222c62
children 97ab7c18ce44
files usr/src/cmd/ztest/ztest.c usr/src/lib/libzpool/common/kernel.c usr/src/lib/libzpool/common/sys/zfs_context.h usr/src/lib/libzpool/common/taskq.c usr/src/uts/common/fs/zfs/dmu_tx.c usr/src/uts/common/fs/zfs/sys/dmu.h usr/src/uts/common/fs/zfs/sys/dmu_tx.h usr/src/uts/common/fs/zfs/sys/txg.h usr/src/uts/common/fs/zfs/sys/txg_impl.h usr/src/uts/common/fs/zfs/sys/zap.h usr/src/uts/common/fs/zfs/sys/zap_impl.h usr/src/uts/common/fs/zfs/txg.c usr/src/uts/common/fs/zfs/zap.c usr/src/uts/common/fs/zfs/zap_micro.c
diffstat 14 files changed, 465 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/usr/src/cmd/ztest/ztest.c	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/cmd/ztest/ztest.c	Tue Sep 22 15:59:55 2009 -0600
@@ -168,6 +168,7 @@
 ztest_func_t ztest_dmu_read_write_zcopy;
 ztest_func_t ztest_dmu_write_parallel;
 ztest_func_t ztest_dmu_object_alloc_free;
+ztest_func_t ztest_dmu_commit_callbacks;
 ztest_func_t ztest_zap;
 ztest_func_t ztest_fzap;
 ztest_func_t ztest_zap_parallel;
@@ -205,6 +206,7 @@
 	{ ztest_dmu_read_write_zcopy,		1,	&zopt_always	},
 	{ ztest_dmu_write_parallel,		30,	&zopt_always	},
 	{ ztest_dmu_object_alloc_free,		1,	&zopt_always	},
+	{ ztest_dmu_commit_callbacks,		10,	&zopt_always	},
 	{ ztest_zap,				30,	&zopt_always	},
 	{ ztest_fzap,				30,	&zopt_always	},
 	{ ztest_zap_parallel,			100,	&zopt_always	},
@@ -227,6 +229,16 @@
 #define	ZTEST_SYNC_LOCKS	16
 
 /*
+ * The following struct is used to hold a list of uncalled commit callbacks.
+ *
+ * The callbacks are ordered by txg number.
+ */
+typedef struct ztest_cb_list {
+	mutex_t	zcl_callbacks_lock;
+	list_t	zcl_callbacks;
+} ztest_cb_list_t;
+
+/*
  * Stuff we need to share writably between parent and child.
  */
 typedef struct ztest_shared {
@@ -254,6 +266,9 @@
 static uint64_t metaslab_sz;
 static boolean_t ztest_exiting;
 
+/* Global commit callback list */
+static ztest_cb_list_t zcl;
+
 extern uint64_t metaslab_gang_bang;
 extern uint64_t metaslab_df_alloc_threshold;
 
@@ -3198,6 +3213,206 @@
 		dmu_tx_commit(tx);
 }
 
+/*
+ * Commit callback data.
+ */
+typedef struct ztest_cb_data {
+	list_node_t		zcd_node;
+	uint64_t		zcd_txg;
+	int			zcd_expected_err;
+	boolean_t		zcd_added;
+	boolean_t		zcd_called;
+	spa_t			*zcd_spa;
+} ztest_cb_data_t;
+
+/* This is the actual commit callback function */
+static void
+ztest_commit_callback(void *arg, int error)
+{
+	ztest_cb_data_t *data = arg;
+	uint64_t synced_txg;
+
+	VERIFY(data != NULL);
+	VERIFY3S(data->zcd_expected_err, ==, error);
+	VERIFY(!data->zcd_called);
+
+	synced_txg = spa_last_synced_txg(data->zcd_spa);
+	if (data->zcd_txg > synced_txg)
+		fatal(0, "commit callback of txg %" PRIu64 " called prematurely"
+		    ", last synced txg = %" PRIu64 "\n", data->zcd_txg,
+		    synced_txg);
+
+	data->zcd_called = B_TRUE;
+
+	if (error == ECANCELED) {
+		ASSERT3U(data->zcd_txg, ==, 0);
+		ASSERT(!data->zcd_added);
+
+		/*
+		 * The private callback data should be destroyed here, but
+		 * since we are going to check the zcd_called field after
+		 * dmu_tx_abort(), we will destroy it there.
+		 */
+		return;
+	}
+
+	/* Was this callback added to the global callback list? */
+	if (!data->zcd_added)
+		goto out;
+
+	ASSERT3U(data->zcd_txg, !=, 0);
+
+	/* Remove our callback from the list */
+	(void) mutex_lock(&zcl.zcl_callbacks_lock);
+	list_remove(&zcl.zcl_callbacks, data);
+	(void) mutex_unlock(&zcl.zcl_callbacks_lock);
+
+out:
+	umem_free(data, sizeof (ztest_cb_data_t));
+}
+
+/* Allocate and initialize callback data structure */
+static ztest_cb_data_t *
+ztest_create_cb_data(objset_t *os, uint64_t txg)
+{
+	ztest_cb_data_t *cb_data;
+
+	cb_data = umem_zalloc(sizeof (ztest_cb_data_t), UMEM_NOFAIL);
+
+	cb_data->zcd_txg = txg;
+	cb_data->zcd_spa = dmu_objset_spa(os);
+
+	return (cb_data);
+}
+
+/*
+ * If a number of txgs equal to this threshold have been created after a commit
+ * callback has been registered but not called, then we assume there is an
+ * implementation bug.
+ */
+#define	ZTEST_COMMIT_CALLBACK_THRESH	(TXG_CONCURRENT_STATES + 2)
+
+/*
+ * Commit callback test.
+ */
+void
+ztest_dmu_commit_callbacks(ztest_args_t *za)
+{
+	objset_t *os = za->za_os;
+	dmu_tx_t *tx;
+	ztest_cb_data_t *cb_data[3], *tmp_cb;
+	uint64_t old_txg, txg;
+	int i, error;
+
+	tx = dmu_tx_create(os);
+
+	cb_data[0] = ztest_create_cb_data(os, 0);
+	dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[0]);
+
+	dmu_tx_hold_write(tx, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t));
+
+	/* Every once in a while, abort the transaction on purpose */
+	if (ztest_random(100) == 0)
+		error = -1;
+
+	if (!error)
+		error = dmu_tx_assign(tx, TXG_NOWAIT);
+
+	txg = error ? 0 : dmu_tx_get_txg(tx);
+
+	cb_data[0]->zcd_txg = txg;
+	cb_data[1] = ztest_create_cb_data(os, txg);
+	dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[1]);
+
+	if (error) {
+		/*
+		 * It's not a strict requirement to call the registered
+		 * callbacks from inside dmu_tx_abort(), but that's what
+		 * it's supposed to happen in the current implementation
+		 * so we will check for that.
+		 */
+		for (i = 0; i < 2; i++) {
+			cb_data[i]->zcd_expected_err = ECANCELED;
+			VERIFY(!cb_data[i]->zcd_called);
+		}
+
+		dmu_tx_abort(tx);
+
+		for (i = 0; i < 2; i++) {
+			VERIFY(cb_data[i]->zcd_called);
+			umem_free(cb_data[i], sizeof (ztest_cb_data_t));
+		}
+
+		return;
+	}
+
+	cb_data[2] = ztest_create_cb_data(os, txg);
+	dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[2]);
+
+	/*
+	 * Read existing data to make sure there isn't a future leak.
+	 */
+	VERIFY(0 == dmu_read(os, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t),
+	    &old_txg, DMU_READ_PREFETCH));
+
+	if (old_txg > txg)
+		fatal(0, "future leak: got %" PRIu64 ", open txg is %" PRIu64,
+		    old_txg, txg);
+
+	dmu_write(os, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t), &txg, tx);
+
+	(void) mutex_lock(&zcl.zcl_callbacks_lock);
+
+	/*
+	 * Since commit callbacks don't have any ordering requirement and since
+	 * it is theoretically possible for a commit callback to be called
+	 * after an arbitrary amount of time has elapsed since its txg has been
+	 * synced, it is difficult to reliably determine whether a commit
+	 * callback hasn't been called due to high load or due to a flawed
+	 * implementation.
+	 *
+	 * In practice, we will assume that if after a certain number of txgs a
+	 * commit callback hasn't been called, then most likely there's an
+	 * implementation bug..
+	 */
+	tmp_cb = list_head(&zcl.zcl_callbacks);
+	if (tmp_cb != NULL &&
+	    tmp_cb->zcd_txg > txg - ZTEST_COMMIT_CALLBACK_THRESH) {
+		fatal(0, "Commit callback threshold exceeded, oldest txg: %"
+		    PRIu64 ", open txg: %" PRIu64 "\n", tmp_cb->zcd_txg, txg);
+	}
+
+	/*
+	 * Let's find the place to insert our callbacks.
+	 *
+	 * Even though the list is ordered by txg, it is possible for the
+	 * insertion point to not be the end because our txg may already be
+	 * quiescing at this point and other callbacks in the open txg
+	 * (from other objsets) may have sneaked in.
+	 */
+	tmp_cb = list_tail(&zcl.zcl_callbacks);
+	while (tmp_cb != NULL && tmp_cb->zcd_txg > txg)
+		tmp_cb = list_prev(&zcl.zcl_callbacks, tmp_cb);
+
+	/* Add the 3 callbacks to the list */
+	for (i = 0; i < 3; i++) {
+		if (tmp_cb == NULL)
+			list_insert_head(&zcl.zcl_callbacks, cb_data[i]);
+		else
+			list_insert_after(&zcl.zcl_callbacks, tmp_cb,
+			    cb_data[i]);
+
+		cb_data[i]->zcd_added = B_TRUE;
+		VERIFY(!cb_data[i]->zcd_called);
+
+		tmp_cb = cb_data[i];
+	}
+
+	(void) mutex_unlock(&zcl.zcl_callbacks_lock);
+
+	dmu_tx_commit(tx);
+}
+
 void
 ztest_dsl_prop_get_set(ztest_args_t *za)
 {
@@ -3807,6 +4022,12 @@
 	(void) _mutex_init(&zs->zs_vdev_lock, USYNC_THREAD, NULL);
 	(void) rwlock_init(&zs->zs_name_lock, USYNC_THREAD, NULL);
 
+	(void) _mutex_init(&zcl.zcl_callbacks_lock, USYNC_THREAD,
+	    NULL);
+
+	list_create(&zcl.zcl_callbacks, sizeof (ztest_cb_data_t),
+	    offsetof(ztest_cb_data_t, zcd_node));
+
 	for (t = 0; t < ZTEST_SYNC_LOCKS; t++)
 		(void) _mutex_init(&zs->zs_sync_lock[t], USYNC_THREAD, NULL);
 
@@ -4008,6 +4229,13 @@
 	spa_close(spa, FTAG);
 
 	kernel_fini();
+
+	list_destroy(&zcl.zcl_callbacks);
+
+	(void) _mutex_destroy(&zcl.zcl_callbacks_lock);
+
+	(void) rwlock_destroy(&zs->zs_name_lock);
+	(void) _mutex_destroy(&zs->zs_vdev_lock);
 }
 
 void
--- a/usr/src/lib/libzpool/common/kernel.c	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/lib/libzpool/common/kernel.c	Tue Sep 22 15:59:55 2009 -0600
@@ -794,6 +794,8 @@
 {
 	spa_fini();
 
+	system_taskq_fini();
+
 	close(random_fd);
 	close(urandom_fd);
 
--- a/usr/src/lib/libzpool/common/sys/zfs_context.h	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/lib/libzpool/common/sys/zfs_context.h	Tue Sep 22 15:59:55 2009 -0600
@@ -332,6 +332,7 @@
 extern void	taskq_wait(taskq_t *);
 extern int	taskq_member(taskq_t *, void *);
 extern void	system_taskq_init(void);
+extern void	system_taskq_fini(void);
 
 #define	XVA_MAPSIZE	3
 #define	XVA_MAGIC	0x78766174
--- a/usr/src/lib/libzpool/common/taskq.c	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/lib/libzpool/common/taskq.c	Tue Sep 22 15:59:55 2009 -0600
@@ -272,3 +272,10 @@
 	system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512,
 	    TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
 }
+
+void
+system_taskq_fini(void)
+{
+	taskq_destroy(system_taskq);
+	system_taskq = NULL; /* defensive */
+}
--- a/usr/src/uts/common/fs/zfs/dmu_tx.c	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/dmu_tx.c	Tue Sep 22 15:59:55 2009 -0600
@@ -48,6 +48,8 @@
 		tx->tx_pool = dd->dd_pool;
 	list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
 	    offsetof(dmu_tx_hold_t, txh_node));
+	list_create(&tx->tx_callbacks, sizeof (dmu_tx_callback_t),
+	    offsetof(dmu_tx_callback_t, dcb_node));
 #ifdef ZFS_DEBUG
 	refcount_create(&tx->tx_space_written);
 	refcount_create(&tx->tx_space_freed);
@@ -1112,8 +1114,13 @@
 	if (tx->tx_tempreserve_cookie)
 		dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
 
+	if (!list_is_empty(&tx->tx_callbacks))
+		txg_register_callbacks(&tx->tx_txgh, &tx->tx_callbacks);
+
 	if (tx->tx_anyobj == FALSE)
 		txg_rele_to_sync(&tx->tx_txgh);
+
+	list_destroy(&tx->tx_callbacks);
 	list_destroy(&tx->tx_holds);
 #ifdef ZFS_DEBUG
 	dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n",
@@ -1142,6 +1149,14 @@
 		if (dn != NULL)
 			dnode_rele(dn, tx);
 	}
+
+	/*
+	 * Call any registered callbacks with an error code.
+	 */
+	if (!list_is_empty(&tx->tx_callbacks))
+		dmu_tx_do_callbacks(&tx->tx_callbacks, ECANCELED);
+
+	list_destroy(&tx->tx_callbacks);
 	list_destroy(&tx->tx_holds);
 #ifdef ZFS_DEBUG
 	refcount_destroy_many(&tx->tx_space_written,
@@ -1158,3 +1173,31 @@
 	ASSERT(tx->tx_txg != 0);
 	return (tx->tx_txg);
 }
+
+void
+dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *func, void *data)
+{
+	dmu_tx_callback_t *dcb;
+
+	dcb = kmem_alloc(sizeof (dmu_tx_callback_t), KM_SLEEP);
+
+	dcb->dcb_func = func;
+	dcb->dcb_data = data;
+
+	list_insert_tail(&tx->tx_callbacks, dcb);
+}
+
+/*
+ * Call all the commit callbacks on a list, with a given error code.
+ */
+void
+dmu_tx_do_callbacks(list_t *cb_list, int error)
+{
+	dmu_tx_callback_t *dcb;
+
+	while (dcb = list_head(cb_list)) {
+		list_remove(cb_list, dcb);
+		dcb->dcb_func(dcb->dcb_data, error);
+		kmem_free(dcb, sizeof (dmu_tx_callback_t));
+	}
+}
--- a/usr/src/uts/common/fs/zfs/sys/dmu.h	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/sys/dmu.h	Tue Sep 22 15:59:55 2009 -0600
@@ -436,6 +436,26 @@
 void dmu_tx_commit(dmu_tx_t *tx);
 
 /*
+ * To register a commit callback, dmu_tx_callback_register() must be called.
+ *
+ * dcb_data is a pointer to caller private data that is passed on as a
+ * callback parameter. The caller is responsible for properly allocating and
+ * freeing it.
+ *
+ * When registering a callback, the transaction must be already created, but
+ * it cannot be committed or aborted. It can be assigned to a txg or not.
+ *
+ * The callback will be called after the transaction has been safely written
+ * to stable storage and will also be called if the dmu_tx is aborted.
+ * If there is any error which prevents the transaction from being committed to
+ * disk, the callback will be called with a value of error != 0.
+ */
+typedef void dmu_tx_callback_func_t(void *dcb_data, int error);
+
+void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func,
+    void *dcb_data);
+
+/*
  * Free up the data blocks for a defined range of a file.  If size is
  * zero, the range from offset to end-of-file is freed.
  */
--- a/usr/src/uts/common/fs/zfs/sys/dmu_tx.h	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/sys/dmu_tx.h	Tue Sep 22 15:59:55 2009 -0600
@@ -19,15 +19,13 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
 #ifndef	_SYS_DMU_TX_H
 #define	_SYS_DMU_TX_H
 
-#pragma ident	"%Z%%M%	%I%	%E% SMI"
-
 #include <sys/inttypes.h>
 #include <sys/dmu.h>
 #include <sys/txg.h>
@@ -59,6 +57,7 @@
 	txg_handle_t tx_txgh;
 	void *tx_tempreserve_cookie;
 	struct dmu_tx_hold *tx_needassign_txh;
+	list_t tx_callbacks; /* list of dmu_tx_callback_t on this dmu_tx */
 	uint8_t tx_anyobj;
 	int tx_err;
 #ifdef ZFS_DEBUG
@@ -98,6 +97,11 @@
 #endif
 } dmu_tx_hold_t;
 
+typedef struct dmu_tx_callback {
+	list_node_t		dcb_node;    /* linked to tx_callbacks list */
+	dmu_tx_callback_func_t	*dcb_func;   /* caller function pointer */
+	void			*dcb_data;   /* caller private data */
+} dmu_tx_callback_t;
 
 /*
  * These routines are defined in dmu.h, and are called by the user.
@@ -109,6 +113,10 @@
 uint64_t dmu_tx_get_txg(dmu_tx_t *tx);
 void dmu_tx_wait(dmu_tx_t *tx);
 
+void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func,
+    void *dcb_data);
+void dmu_tx_do_callbacks(list_t *cb_list, int error);
+
 /*
  * These routines are defined in dmu_spa.h, and are called by the SPA.
  */
--- a/usr/src/uts/common/fs/zfs/sys/txg.h	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/sys/txg.h	Tue Sep 22 15:59:55 2009 -0600
@@ -19,15 +19,13 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
 #ifndef _SYS_TXG_H
 #define	_SYS_TXG_H
 
-#pragma ident	"%Z%%M%	%I%	%E% SMI"
-
 #include <sys/spa.h>
 #include <sys/zfs_context.h>
 
@@ -71,6 +69,7 @@
 extern uint64_t txg_hold_open(struct dsl_pool *dp, txg_handle_t *txghp);
 extern void txg_rele_to_quiesce(txg_handle_t *txghp);
 extern void txg_rele_to_sync(txg_handle_t *txghp);
+extern void txg_register_callbacks(txg_handle_t *txghp, list_t *tx_callbacks);
 extern void txg_suspend(struct dsl_pool *dp);
 extern void txg_resume(struct dsl_pool *dp);
 
--- a/usr/src/uts/common/fs/zfs/sys/txg_impl.h	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/sys/txg_impl.h	Tue Sep 22 15:59:55 2009 -0600
@@ -19,7 +19,7 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
@@ -37,6 +37,7 @@
 	kmutex_t	tc_lock;
 	kcondvar_t	tc_cv[TXG_SIZE];
 	uint64_t	tc_count[TXG_SIZE];
+	list_t		tc_callbacks[TXG_SIZE]; /* commit cb list */
 	char		tc_pad[16];
 };
 
@@ -64,6 +65,8 @@
 
 	kthread_t	*tx_sync_thread;
 	kthread_t	*tx_quiesce_thread;
+
+	taskq_t		*tx_commit_cb_taskq; /* commit callback taskq */
 } tx_state_t;
 
 #ifdef	__cplusplus
--- a/usr/src/uts/common/fs/zfs/sys/zap.h	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/sys/zap.h	Tue Sep 22 15:59:55 2009 -0600
@@ -317,6 +317,11 @@
 uint64_t zap_cursor_serialize(zap_cursor_t *zc);
 
 /*
+ * Advance the cursor to the attribute having the given key.
+ */
+int zap_cursor_move_to_key(zap_cursor_t *zc, const char *name, matchtype_t mt);
+
+/*
  * Initialize a zap cursor pointing to the position recorded by
  * zap_cursor_serialize (in the "serialized" argument).  You can also
  * use a "serialized" argument of 0 to start at the beginning of the
--- a/usr/src/uts/common/fs/zfs/sys/zap_impl.h	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/sys/zap_impl.h	Tue Sep 22 15:59:55 2009 -0600
@@ -210,6 +210,7 @@
     uint64_t integer_size, uint64_t num_integers,
     const void *val, uint32_t cd, dmu_tx_t *tx);
 void fzap_upgrade(zap_t *zap, dmu_tx_t *tx);
+int fzap_cursor_move_to_key(zap_cursor_t *zc, zap_name_t *zn);
 
 #ifdef	__cplusplus
 }
--- a/usr/src/uts/common/fs/zfs/txg.c	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/txg.c	Tue Sep 22 15:59:55 2009 -0600
@@ -19,13 +19,14 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
 #include <sys/zfs_context.h>
 #include <sys/txg_impl.h>
 #include <sys/dmu_impl.h>
+#include <sys/dmu_tx.h>
 #include <sys/dsl_pool.h>
 #include <sys/callb.h>
 
@@ -57,6 +58,9 @@
 		for (i = 0; i < TXG_SIZE; i++) {
 			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
 			    NULL);
+			list_create(&tx->tx_cpu[c].tc_callbacks[i],
+			    sizeof (dmu_tx_callback_t),
+			    offsetof(dmu_tx_callback_t, dcb_node));
 		}
 	}
 
@@ -96,10 +100,15 @@
 		int i;
 
 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
-		for (i = 0; i < TXG_SIZE; i++)
+		for (i = 0; i < TXG_SIZE; i++) {
 			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
+			list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
+		}
 	}
 
+	if (tx->tx_commit_cb_taskq != NULL)
+		taskq_destroy(tx->tx_commit_cb_taskq);
+
 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
 
 	bzero(tx, sizeof (tx_state_t));
@@ -229,6 +238,17 @@
 }
 
 void
+txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
+{
+	tx_cpu_t *tc = th->th_cpu;
+	int g = th->th_txg & TXG_MASK;
+
+	mutex_enter(&tc->tc_lock);
+	list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
+	mutex_exit(&tc->tc_lock);
+}
+
+void
 txg_rele_to_sync(txg_handle_t *th)
 {
 	tx_cpu_t *tc = th->th_cpu;
@@ -279,6 +299,55 @@
 }
 
 static void
+txg_do_callbacks(list_t *cb_list)
+{
+	dmu_tx_do_callbacks(cb_list, 0);
+
+	list_destroy(cb_list);
+
+	kmem_free(cb_list, sizeof (list_t));
+}
+
+/*
+ * Dispatch the commit callbacks registered on this txg to worker threads.
+ */
+static void
+txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
+{
+	int c;
+	tx_state_t *tx = &dp->dp_tx;
+	list_t *cb_list;
+
+	for (c = 0; c < max_ncpus; c++) {
+		tx_cpu_t *tc = &tx->tx_cpu[c];
+		/* No need to lock tx_cpu_t at this point */
+
+		int g = txg & TXG_MASK;
+
+		if (list_is_empty(&tc->tc_callbacks[g]))
+			continue;
+
+		if (tx->tx_commit_cb_taskq == NULL) {
+			/*
+			 * Commit callback taskq hasn't been created yet.
+			 */
+			tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
+			    max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
+			    TASKQ_PREPOPULATE);
+		}
+
+		cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
+		list_create(cb_list, sizeof (dmu_tx_callback_t),
+		    offsetof(dmu_tx_callback_t, dcb_node));
+
+		list_move_tail(&tc->tc_callbacks[g], cb_list);
+
+		(void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
+		    txg_do_callbacks, cb_list, TQ_SLEEP);
+	}
+}
+
+static void
 txg_sync_thread(dsl_pool_t *dp)
 {
 	tx_state_t *tx = &dp->dp_tx;
@@ -351,6 +420,11 @@
 		tx->tx_syncing_txg = 0;
 		rw_exit(&tx->tx_suspend);
 		cv_broadcast(&tx->tx_sync_done_cv);
+
+		/*
+		 * Dispatch commit callbacks to worker threads.
+		 */
+		txg_dispatch_callbacks(dp, txg);
 	}
 }
 
--- a/usr/src/uts/common/fs/zfs/zap.c	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/zap.c	Tue Sep 22 15:59:55 2009 -0600
@@ -1102,6 +1102,31 @@
 	}
 }
 
+int
+fzap_cursor_move_to_key(zap_cursor_t *zc, zap_name_t *zn)
+{
+	int err;
+	zap_leaf_t *l;
+	zap_entry_handle_t zeh;
+
+	if (zn->zn_name_orij && strlen(zn->zn_name_orij) > ZAP_MAXNAMELEN)
+		return (E2BIG);
+
+	err = zap_deref_leaf(zc->zc_zap, zn->zn_hash, NULL, RW_READER, &l);
+	if (err != 0)
+		return (err);
+
+	err = zap_leaf_lookup(l, zn, &zeh);
+	if (err != 0)
+		return (err);
+
+	zc->zc_leaf = l;
+	zc->zc_hash = zeh.zeh_hash;
+	zc->zc_cd = zeh.zeh_cd;
+
+	return (err);
+}
+
 void
 fzap_get_stats(zap_t *zap, zap_stats_t *zs)
 {
--- a/usr/src/uts/common/fs/zfs/zap_micro.c	Tue Sep 22 14:23:05 2009 -0700
+++ b/usr/src/uts/common/fs/zfs/zap_micro.c	Tue Sep 22 15:59:55 2009 -0600
@@ -1044,6 +1044,46 @@
 }
 
 int
+zap_cursor_move_to_key(zap_cursor_t *zc, const char *name, matchtype_t mt)
+{
+	int err = 0;
+	mzap_ent_t *mze;
+	zap_name_t *zn;
+
+	if (zc->zc_zap == NULL) {
+		err = zap_lockdir(zc->zc_objset, zc->zc_zapobj, NULL,
+		    RW_READER, TRUE, FALSE, &zc->zc_zap);
+		if (err)
+			return (err);
+	} else {
+		rw_enter(&zc->zc_zap->zap_rwlock, RW_READER);
+	}
+
+	zn = zap_name_alloc(zc->zc_zap, name, mt);
+	if (zn == NULL) {
+		rw_exit(&zc->zc_zap->zap_rwlock);
+		return (ENOTSUP);
+	}
+
+	if (!zc->zc_zap->zap_ismicro) {
+		err = fzap_cursor_move_to_key(zc, zn);
+	} else {
+		mze = mze_find(zn);
+		if (mze == NULL) {
+			err = ENOENT;
+			goto out;
+		}
+		zc->zc_hash = mze->mze_hash;
+		zc->zc_cd = mze->mze_phys.mze_cd;
+	}
+
+out:
+	zap_name_free(zn);
+	rw_exit(&zc->zc_zap->zap_rwlock);
+	return (err);
+}
+
+int
 zap_get_stats(objset_t *os, uint64_t zapobj, zap_stats_t *zs)
 {
 	int err;