changeset 60:858975077f7c

suntaskq: import taskq code from Delphix's illumos tree A taskq is a thread pool used to execute functions asynchronously. The code is not going to compile on Linux, but it shouldn't be too difficult to fix it up to be Linux friendly. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Tue, 07 Jul 2015 20:52:33 -0400
parents af965108a9b3
children 3f383dbb1c15
files README src/CMakeLists.txt src/suntaskq/.gitignore src/suntaskq/CMakeLists.txt src/suntaskq/include/sys/taskq.h src/suntaskq/taskq.c
diffstat 6 files changed, 492 insertions(+), 11 deletions(-) [+]
line wrap: on
line diff
--- a/README	Tue Jul 07 20:40:26 2015 -0400
+++ b/README	Tue Jul 07 20:52:33 2015 -0400
@@ -26,17 +26,18 @@
 Internal Dependencies
 =====================
 
-             | sunavl | sunlist | common | fakeumem | objstore
--------------+--------+---------+--------+----------+----------
-sunavl       |   -    |    n    |   n    |    n     |    n
-sunlist      |   n    |    -    |   n    |    n     |    n
-common       |   n    |    n    |   -    |    y     |    n
-fakeumem     |   n    |    n    |   n    |    -     |    n
-objstore     |   n    |    n    |   n    |    y     |    -
-objs. module |   ?    |    ?    |   y    |    y     |    n
-client       |   n    |    n    |   n    |    y     |    n
-server       |   n    |    n    |   n    |    y     |    n
-tool         |   n    |    n    |   n    |    y     |    n
+             | sunavl | sunlist | common | fakeumem | objstore | suntaskq
+-------------+--------+---------+--------+----------+----------+----------
+sunavl       |   -    |    n    |   n    |    n     |    n     |     n
+sunlist      |   n    |    -    |   n    |    n     |    n     |     n
+common       |   n    |    n    |   -    |    y     |    n     |     n 
+fakeumem     |   n    |    n    |   n    |    -     |    n     |     n
+objstore     |   n    |    n    |   n    |    y     |    -     |     n
+suntaskq     |   n    |    n    |   y    |    y     |    n     |     -
+objs. module |   ?    |    ?    |   y    |    y     |    n     |     n
+client       |   n    |    n    |   n    |    y     |    n     |     n
+server       |   n    |    n    |   n    |    y     |    n     |     n
+tool         |   n    |    n    |   n    |    y     |    n     |     n
 
   y = yes, linked against
   n = no, not linked against
--- a/src/CMakeLists.txt	Tue Jul 07 20:40:26 2015 -0400
+++ b/src/CMakeLists.txt	Tue Jul 07 20:52:33 2015 -0400
@@ -40,11 +40,14 @@
 	set(LIST_LIBRARY ${CMDUTILS_LIBRARY})
 endif()
 
+set(TASKQ_LIBRARY suntaskq)
+
 #
 # From this point on, use these variables instead of hardcoding lib names:
 #
 #  AVL_LIBRARY
 #  LIST_LIBRARY
+#  TASKQ_LIBRARY
 #  UMEM_LIBRARY
 #
 
@@ -58,9 +61,11 @@
 include_directories(
 	common/include
 	objstore/include
+	suntaskq/include
 )
 
 add_subdirectory(common)
+add_subdirectory(suntaskq)
 add_subdirectory(objstore)
 add_subdirectory(server)
 add_subdirectory(client)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/suntaskq/.gitignore	Tue Jul 07 20:52:33 2015 -0400
@@ -0,0 +1,1 @@
+libsuntaskq.so
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/suntaskq/CMakeLists.txt	Tue Jul 07 20:52:33 2015 -0400
@@ -0,0 +1,30 @@
+#
+# Copyright (c) 2015 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+
+add_library(suntaskq SHARED
+	taskq.c
+)
+
+target_link_libraries(suntaskq
+	${BASE_LIBS}
+	common
+)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/suntaskq/include/sys/taskq.h	Tue Jul 07 20:52:33 2015 -0400
@@ -0,0 +1,79 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License (the "License").
+ * You may not use this file except in compliance with the License.
+ *
+ * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
+ * or http://www.opensolaris.org/os/licensing.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information: Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ */
+/*
+ * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
+ * Copyright (c) 2012, 2014 by Delphix. All rights reserved.
+ * Copyright (c) 2012, Joyent, Inc. All rights reserved.
+ */
+
+#ifndef	_TASKQ_H
+#define	_TASKQ_H
+
+#include <stdint.h>
+#include <umem.h>
+
+#ifdef	__cplusplus
+extern "C" {
+#endif
+
+typedef struct taskq taskq_t;
+typedef uintptr_t taskqid_t;
+typedef void (task_func_t)(void *);
+
+typedef struct taskq_ent {
+	struct taskq_ent	*tqent_next;
+	struct taskq_ent	*tqent_prev;
+	task_func_t		*tqent_func;
+	void			*tqent_arg;
+	uintptr_t		tqent_flags;
+} taskq_ent_t;
+
+#define	TQENT_FLAG_PREALLOC	0x1	/* taskq_dispatch_ent used */
+
+#define	TASKQ_PREPOPULATE	0x0001
+#define	TASKQ_CPR_SAFE		0x0002	/* Use CPR safe protocol */
+#define	TASKQ_DYNAMIC		0x0004	/* Use dynamic thread scheduling */
+#define	TASKQ_THREADS_CPU_PCT	0x0008	/* Scale # threads by # cpus */
+#define	TASKQ_DC_BATCH		0x0010	/* Mark threads as batch */
+
+#define	TQ_SLEEP	UMEM_NOFAIL	/* Can block for memory */
+#define	TQ_NOSLEEP	UMEM_DEFAULT	/* cannot block for memory; may fail */
+#define	TQ_NOQUEUE	0x02		/* Do not enqueue if can't dispatch */
+#define	TQ_FRONT	0x08		/* Queue in front */
+
+extern taskq_t *system_taskq;
+
+extern taskq_t	*taskq_create(const char *, int, pri_t, int, int, uint_t);
+extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t);
+extern void	taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t,
+    taskq_ent_t *);
+extern void	taskq_destroy(taskq_t *);
+extern void	taskq_wait(taskq_t *);
+extern int	taskq_member(taskq_t *, void *);
+extern void	system_taskq_init(void);
+extern void	system_taskq_fini(void);
+
+#ifdef	__cplusplus
+}
+#endif
+
+#endif	/* _TASKQ_H */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/suntaskq/taskq.c	Tue Jul 07 20:52:33 2015 -0400
@@ -0,0 +1,365 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License (the "License").
+ * You may not use this file except in compliance with the License.
+ *
+ * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
+ * or http://www.opensolaris.org/os/licensing.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information: Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ */
+/*
+ * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
+ * Use is subject to license terms.
+ */
+/*
+ * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
+ * Copyright 2012 Garrett D'Amore <garrett@damore.org>.  All rights reserved.
+ * Copyright (c) 2014 by Delphix. All rights reserved.
+ */
+
+#include <thread.h>
+#include <synch.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/debug.h>
+#include <sys/sysmacros.h>
+
+#include <sys/taskq.h>
+
+/* Maximum percentage allowed for TASKQ_THREADS_CPU_PCT */
+static int taskq_cpupct_max_percent = 1000;
+
+int taskq_now;
+taskq_t *system_taskq;
+
+#define	TASKQ_ACTIVE	0x00010000
+#define	TASKQ_NAMELEN	31
+
+struct taskq {
+	char		tq_name[TASKQ_NAMELEN + 1];
+	mutex_t		tq_lock;
+	rwlock_t	tq_threadlock;
+	cond_t		tq_dispatch_cv;
+	cond_t		tq_wait_cv;
+	thread_t	*tq_threadlist;
+	int		tq_flags;
+	int		tq_active;
+	int		tq_nthreads;
+	int		tq_nalloc;
+	int		tq_minalloc;
+	int		tq_maxalloc;
+	cond_t		tq_maxalloc_cv;
+	int		tq_maxalloc_wait;
+	taskq_ent_t	*tq_freelist;
+	taskq_ent_t	tq_task;
+};
+
+static taskq_ent_t *
+task_alloc(taskq_t *tq, int tqflags)
+{
+	taskq_ent_t *t;
+	timestruc_t ts;
+	int err;
+
+again:	if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
+		tq->tq_freelist = t->tqent_next;
+	} else {
+		if (tq->tq_nalloc >= tq->tq_maxalloc) {
+			if (!(tqflags & UMEM_NOFAIL))
+				return (NULL);
+
+			/*
+			 * We don't want to exceed tq_maxalloc, but we can't
+			 * wait for other tasks to complete (and thus free up
+			 * task structures) without risking deadlock with
+			 * the caller.  So, we just delay for one second
+			 * to throttle the allocation rate. If we have tasks
+			 * complete before one second timeout expires then
+			 * taskq_ent_free will signal us and we will
+			 * immediately retry the allocation.
+			 */
+			tq->tq_maxalloc_wait++;
+
+			ts.tv_sec = 1;
+			ts.tv_nsec = 0;
+			err = cond_reltimedwait(&tq->tq_maxalloc_cv,
+			    &tq->tq_lock, &ts);
+
+			tq->tq_maxalloc_wait--;
+			if (err == 0)
+				goto again;		/* signaled */
+		}
+		VERIFY0(mutex_unlock(&tq->tq_lock));
+
+		t = umem_alloc(sizeof (taskq_ent_t), tqflags);
+
+		VERIFY0(mutex_lock(&tq->tq_lock));
+		if (t != NULL)
+			tq->tq_nalloc++;
+	}
+	return (t);
+}
+
+static void
+task_free(taskq_t *tq, taskq_ent_t *t)
+{
+	if (tq->tq_nalloc <= tq->tq_minalloc) {
+		t->tqent_next = tq->tq_freelist;
+		tq->tq_freelist = t;
+	} else {
+		tq->tq_nalloc--;
+		VERIFY0(mutex_unlock(&tq->tq_lock));
+		umem_free(t, sizeof (taskq_ent_t));
+		VERIFY0(mutex_lock(&tq->tq_lock));
+	}
+
+	if (tq->tq_maxalloc_wait)
+		VERIFY0(cond_signal(&tq->tq_maxalloc_cv));
+}
+
+taskqid_t
+taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
+{
+	taskq_ent_t *t;
+
+	if (taskq_now) {
+		func(arg);
+		return (1);
+	}
+
+	VERIFY0(mutex_lock(&tq->tq_lock));
+	ASSERT(tq->tq_flags & TASKQ_ACTIVE);
+	if ((t = task_alloc(tq, tqflags)) == NULL) {
+		VERIFY0(mutex_unlock(&tq->tq_lock));
+		return (0);
+	}
+	if (tqflags & TQ_FRONT) {
+		t->tqent_next = tq->tq_task.tqent_next;
+		t->tqent_prev = &tq->tq_task;
+	} else {
+		t->tqent_next = &tq->tq_task;
+		t->tqent_prev = tq->tq_task.tqent_prev;
+	}
+	t->tqent_next->tqent_prev = t;
+	t->tqent_prev->tqent_next = t;
+	t->tqent_func = func;
+	t->tqent_arg = arg;
+	t->tqent_flags = 0;
+	VERIFY0(cond_signal(&tq->tq_dispatch_cv));
+	VERIFY0(mutex_unlock(&tq->tq_lock));
+	return (1);
+}
+
+void
+taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
+    taskq_ent_t *t)
+{
+	ASSERT(func != NULL);
+	ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
+
+	/*
+	 * Mark it as a prealloc'd task.  This is important
+	 * to ensure that we don't free it later.
+	 */
+	t->tqent_flags |= TQENT_FLAG_PREALLOC;
+	/*
+	 * Enqueue the task to the underlying queue.
+	 */
+	VERIFY0(mutex_lock(&tq->tq_lock));
+
+	if (flags & TQ_FRONT) {
+		t->tqent_next = tq->tq_task.tqent_next;
+		t->tqent_prev = &tq->tq_task;
+	} else {
+		t->tqent_next = &tq->tq_task;
+		t->tqent_prev = tq->tq_task.tqent_prev;
+	}
+	t->tqent_next->tqent_prev = t;
+	t->tqent_prev->tqent_next = t;
+	t->tqent_func = func;
+	t->tqent_arg = arg;
+	VERIFY0(cond_signal(&tq->tq_dispatch_cv));
+	VERIFY0(mutex_unlock(&tq->tq_lock));
+}
+
+void
+taskq_wait(taskq_t *tq)
+{
+	VERIFY0(mutex_lock(&tq->tq_lock));
+	while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) {
+		int ret = cond_wait(&tq->tq_wait_cv, &tq->tq_lock);
+		VERIFY(ret == 0 || ret == EINTR);
+	}
+	VERIFY0(mutex_unlock(&tq->tq_lock));
+}
+
+static void *
+taskq_thread(void *arg)
+{
+	taskq_t *tq = arg;
+	taskq_ent_t *t;
+	boolean_t prealloc;
+
+	VERIFY0(mutex_lock(&tq->tq_lock));
+	while (tq->tq_flags & TASKQ_ACTIVE) {
+		if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
+			int ret;
+			if (--tq->tq_active == 0)
+				VERIFY0(cond_broadcast(&tq->tq_wait_cv));
+			ret = cond_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
+			VERIFY(ret == 0 || ret == EINTR);
+			tq->tq_active++;
+			continue;
+		}
+		t->tqent_prev->tqent_next = t->tqent_next;
+		t->tqent_next->tqent_prev = t->tqent_prev;
+		t->tqent_next = NULL;
+		t->tqent_prev = NULL;
+		prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
+		VERIFY0(mutex_unlock(&tq->tq_lock));
+
+		VERIFY0(rw_rdlock(&tq->tq_threadlock));
+		t->tqent_func(t->tqent_arg);
+		VERIFY0(rw_unlock(&tq->tq_threadlock));
+
+		VERIFY0(mutex_lock(&tq->tq_lock));
+		if (!prealloc)
+			task_free(tq, t);
+	}
+	tq->tq_nthreads--;
+	VERIFY0(cond_broadcast(&tq->tq_wait_cv));
+	VERIFY0(mutex_unlock(&tq->tq_lock));
+	return (NULL);
+}
+
+/*ARGSUSED*/
+taskq_t *
+taskq_create(const char *name, int nthreads, pri_t pri,
+	int minalloc, int maxalloc, uint_t flags)
+{
+	taskq_t *tq = umem_zalloc(sizeof (taskq_t), UMEM_NOFAIL);
+	int t;
+
+	if (flags & TASKQ_THREADS_CPU_PCT) {
+		int pct;
+		ASSERT3S(nthreads, >=, 0);
+		ASSERT3S(nthreads, <=, taskq_cpupct_max_percent);
+		pct = MIN(nthreads, taskq_cpupct_max_percent);
+		pct = MAX(pct, 0);
+
+		nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
+		nthreads = MAX(nthreads, 1);	/* need at least 1 thread */
+	} else {
+		ASSERT3S(nthreads, >=, 1);
+	}
+
+	VERIFY0(rwlock_init(&tq->tq_threadlock, USYNC_THREAD, NULL));
+	VERIFY0(mutex_init(&tq->tq_lock, USYNC_THREAD, NULL));
+	VERIFY0(cond_init(&tq->tq_dispatch_cv, USYNC_THREAD, NULL));
+	VERIFY0(cond_init(&tq->tq_wait_cv, USYNC_THREAD, NULL));
+	VERIFY0(cond_init(&tq->tq_maxalloc_cv, USYNC_THREAD, NULL));
+	(void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1);
+	tq->tq_flags = flags | TASKQ_ACTIVE;
+	tq->tq_active = nthreads;
+	tq->tq_nthreads = nthreads;
+	tq->tq_minalloc = minalloc;
+	tq->tq_maxalloc = maxalloc;
+	tq->tq_task.tqent_next = &tq->tq_task;
+	tq->tq_task.tqent_prev = &tq->tq_task;
+	tq->tq_threadlist =
+	    umem_alloc(nthreads * sizeof (thread_t), UMEM_NOFAIL);
+
+	if (flags & TASKQ_PREPOPULATE) {
+		VERIFY0(mutex_lock(&tq->tq_lock));
+		while (minalloc-- > 0)
+			task_free(tq, task_alloc(tq, UMEM_NOFAIL));
+		VERIFY0(mutex_unlock(&tq->tq_lock));
+	}
+
+	for (t = 0; t < nthreads; t++)
+		(void) thr_create(0, 0, taskq_thread,
+		    tq, THR_BOUND, &tq->tq_threadlist[t]);
+
+	return (tq);
+}
+
+void
+taskq_destroy(taskq_t *tq)
+{
+	int t;
+	int nthreads = tq->tq_nthreads;
+
+	taskq_wait(tq);
+
+	VERIFY0(mutex_lock(&tq->tq_lock));
+
+	tq->tq_flags &= ~TASKQ_ACTIVE;
+	VERIFY0(cond_broadcast(&tq->tq_dispatch_cv));
+
+	while (tq->tq_nthreads != 0) {
+		int ret = cond_wait(&tq->tq_wait_cv, &tq->tq_lock);
+		VERIFY(ret == 0 || ret == EINTR);
+	}
+
+	tq->tq_minalloc = 0;
+	while (tq->tq_nalloc != 0) {
+		ASSERT(tq->tq_freelist != NULL);
+		task_free(tq, task_alloc(tq, UMEM_NOFAIL));
+	}
+
+	VERIFY0(mutex_unlock(&tq->tq_lock));
+
+	for (t = 0; t < nthreads; t++)
+		(void) thr_join(tq->tq_threadlist[t], NULL, NULL);
+
+	umem_free(tq->tq_threadlist, nthreads * sizeof (thread_t));
+
+	VERIFY0(rwlock_destroy(&tq->tq_threadlock));
+	VERIFY0(mutex_destroy(&tq->tq_lock));
+	VERIFY0(cond_destroy(&tq->tq_dispatch_cv));
+	VERIFY0(cond_destroy(&tq->tq_wait_cv));
+	VERIFY0(cond_destroy(&tq->tq_maxalloc_cv));
+
+	umem_free(tq, sizeof (taskq_t));
+}
+
+int
+taskq_member(taskq_t *tq, void *t)
+{
+	int i;
+
+	if (taskq_now)
+		return (1);
+
+	for (i = 0; i < tq->tq_nthreads; i++)
+		if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t)
+			return (1);
+
+	return (0);
+}
+
+void
+system_taskq_init(void)
+{
+	system_taskq = taskq_create("system_taskq", 64, 60, 4, 512,
+	    TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
+}
+
+void
+system_taskq_fini(void)
+{
+	taskq_destroy(system_taskq);
+	system_taskq = NULL; /* defensive */
+}