changeset 67:903ff5d135a8

suntaskq: clean up to use POSIX/common lib interfaces It still relies on some libumem interfaces so it still won't work on Linux. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Tue, 07 Jul 2015 21:43:35 -0400
parents 568b4fd9ef61
children a25c31ea6b52
files src/suntaskq/include/sys/taskq.h src/suntaskq/taskq.c
diffstat 2 files changed, 65 insertions(+), 87 deletions(-) [+]
line wrap: on
line diff
--- a/src/suntaskq/include/sys/taskq.h	Tue Jul 07 21:42:02 2015 -0400
+++ b/src/suntaskq/include/sys/taskq.h	Tue Jul 07 21:43:35 2015 -0400
@@ -62,15 +62,13 @@
 
 extern taskq_t *system_taskq;
 
-extern taskq_t	*taskq_create(const char *, int, pri_t, int, int, uint_t);
+extern taskq_t	*taskq_create(const char *, int, int, int, uint_t);
 extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t);
 extern void	taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t,
     taskq_ent_t *);
 extern void	taskq_destroy(taskq_t *);
 extern void	taskq_wait(taskq_t *);
 extern int	taskq_member(taskq_t *, void *);
-extern void	system_taskq_init(void);
-extern void	system_taskq_fini(void);
 
 #ifdef	__cplusplus
 }
--- a/src/suntaskq/taskq.c	Tue Jul 07 21:42:02 2015 -0400
+++ b/src/suntaskq/taskq.c	Tue Jul 07 21:43:35 2015 -0400
@@ -26,41 +26,38 @@
  * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
  * Copyright 2012 Garrett D'Amore <garrett@damore.org>.  All rights reserved.
  * Copyright (c) 2014 by Delphix. All rights reserved.
+ * Copyright (c) 2015 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
  */
 
-#include <thread.h>
-#include <synch.h>
 #include <unistd.h>
-#include <string.h>
-#include <errno.h>
-#include <sys/debug.h>
-#include <sys/sysmacros.h>
 
+#include <nomad/types.h>
+#include <nomad/error.h>
+#include <nomad/mutex.h>
 #include <sys/taskq.h>
 
 /* Maximum percentage allowed for TASKQ_THREADS_CPU_PCT */
 static int taskq_cpupct_max_percent = 1000;
 
 int taskq_now;
-taskq_t *system_taskq;
 
 #define	TASKQ_ACTIVE	0x00010000
 #define	TASKQ_NAMELEN	31
 
 struct taskq {
 	char		tq_name[TASKQ_NAMELEN + 1];
-	mutex_t		tq_lock;
-	rwlock_t	tq_threadlock;
-	cond_t		tq_dispatch_cv;
-	cond_t		tq_wait_cv;
-	thread_t	*tq_threadlist;
+	pthread_mutex_t	tq_lock;
+	pthread_rwlock_t tq_threadlock;
+	pthread_cond_t	tq_dispatch_cv;
+	pthread_cond_t	tq_wait_cv;
+	pthread_t	*tq_threadlist;
 	int		tq_flags;
 	int		tq_active;
 	int		tq_nthreads;
 	int		tq_nalloc;
 	int		tq_minalloc;
 	int		tq_maxalloc;
-	cond_t		tq_maxalloc_cv;
+	pthread_cond_t	tq_maxalloc_cv;
 	int		tq_maxalloc_wait;
 	taskq_ent_t	*tq_freelist;
 	taskq_ent_t	tq_task;
@@ -94,18 +91,18 @@
 
 			ts.tv_sec = 1;
 			ts.tv_nsec = 0;
-			err = cond_reltimedwait(&tq->tq_maxalloc_cv,
+			err = condreltimedwait(&tq->tq_maxalloc_cv,
 			    &tq->tq_lock, &ts);
 
 			tq->tq_maxalloc_wait--;
 			if (err == 0)
 				goto again;		/* signaled */
 		}
-		VERIFY0(mutex_unlock(&tq->tq_lock));
+		mxunlock(&tq->tq_lock);
 
 		t = umem_alloc(sizeof (taskq_ent_t), tqflags);
 
-		VERIFY0(mutex_lock(&tq->tq_lock));
+		mxlock(&tq->tq_lock);
 		if (t != NULL)
 			tq->tq_nalloc++;
 	}
@@ -120,13 +117,13 @@
 		tq->tq_freelist = t;
 	} else {
 		tq->tq_nalloc--;
-		VERIFY0(mutex_unlock(&tq->tq_lock));
+		mxunlock(&tq->tq_lock);
 		umem_free(t, sizeof (taskq_ent_t));
-		VERIFY0(mutex_lock(&tq->tq_lock));
+		mxlock(&tq->tq_lock);
 	}
 
 	if (tq->tq_maxalloc_wait)
-		VERIFY0(cond_signal(&tq->tq_maxalloc_cv));
+		condsig(&tq->tq_maxalloc_cv);
 }
 
 taskqid_t
@@ -139,10 +136,10 @@
 		return (1);
 	}
 
-	VERIFY0(mutex_lock(&tq->tq_lock));
+	mxlock(&tq->tq_lock);
 	ASSERT(tq->tq_flags & TASKQ_ACTIVE);
 	if ((t = task_alloc(tq, tqflags)) == NULL) {
-		VERIFY0(mutex_unlock(&tq->tq_lock));
+		mxunlock(&tq->tq_lock);
 		return (0);
 	}
 	if (tqflags & TQ_FRONT) {
@@ -157,8 +154,8 @@
 	t->tqent_func = func;
 	t->tqent_arg = arg;
 	t->tqent_flags = 0;
-	VERIFY0(cond_signal(&tq->tq_dispatch_cv));
-	VERIFY0(mutex_unlock(&tq->tq_lock));
+	condsig(&tq->tq_dispatch_cv);
+	mxunlock(&tq->tq_lock);
 	return (1);
 }
 
@@ -177,7 +174,7 @@
 	/*
 	 * Enqueue the task to the underlying queue.
 	 */
-	VERIFY0(mutex_lock(&tq->tq_lock));
+	mxlock(&tq->tq_lock);
 
 	if (flags & TQ_FRONT) {
 		t->tqent_next = tq->tq_task.tqent_next;
@@ -190,19 +187,17 @@
 	t->tqent_prev->tqent_next = t;
 	t->tqent_func = func;
 	t->tqent_arg = arg;
-	VERIFY0(cond_signal(&tq->tq_dispatch_cv));
-	VERIFY0(mutex_unlock(&tq->tq_lock));
+	condsig(&tq->tq_dispatch_cv);
+	mxunlock(&tq->tq_lock);
 }
 
 void
 taskq_wait(taskq_t *tq)
 {
-	VERIFY0(mutex_lock(&tq->tq_lock));
-	while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) {
-		int ret = cond_wait(&tq->tq_wait_cv, &tq->tq_lock);
-		VERIFY(ret == 0 || ret == EINTR);
-	}
-	VERIFY0(mutex_unlock(&tq->tq_lock));
+	mxlock(&tq->tq_lock);
+	while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
+		condwait(&tq->tq_wait_cv, &tq->tq_lock);
+	mxunlock(&tq->tq_lock);
 }
 
 static void *
@@ -212,14 +207,12 @@
 	taskq_ent_t *t;
 	boolean_t prealloc;
 
-	VERIFY0(mutex_lock(&tq->tq_lock));
+	mxlock(&tq->tq_lock);
 	while (tq->tq_flags & TASKQ_ACTIVE) {
 		if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
-			int ret;
 			if (--tq->tq_active == 0)
-				VERIFY0(cond_broadcast(&tq->tq_wait_cv));
-			ret = cond_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
-			VERIFY(ret == 0 || ret == EINTR);
+				condbcast(&tq->tq_wait_cv);
+			condwait(&tq->tq_dispatch_cv, &tq->tq_lock);
 			tq->tq_active++;
 			continue;
 		}
@@ -228,30 +221,34 @@
 		t->tqent_next = NULL;
 		t->tqent_prev = NULL;
 		prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
-		VERIFY0(mutex_unlock(&tq->tq_lock));
+		mxunlock(&tq->tq_lock);
 
-		VERIFY0(rw_rdlock(&tq->tq_threadlock));
+		rwlock(&tq->tq_threadlock, false);
 		t->tqent_func(t->tqent_arg);
-		VERIFY0(rw_unlock(&tq->tq_threadlock));
+		rwunlock(&tq->tq_threadlock);
 
-		VERIFY0(mutex_lock(&tq->tq_lock));
+		mxlock(&tq->tq_lock);
 		if (!prealloc)
 			task_free(tq, t);
 	}
 	tq->tq_nthreads--;
-	VERIFY0(cond_broadcast(&tq->tq_wait_cv));
-	VERIFY0(mutex_unlock(&tq->tq_lock));
+	condbcast(&tq->tq_wait_cv);
+	mxunlock(&tq->tq_lock);
 	return (NULL);
 }
 
 /*ARGSUSED*/
 taskq_t *
-taskq_create(const char *name, int nthreads, pri_t pri,
+taskq_create(const char *name, int nthreads,
 	int minalloc, int maxalloc, uint_t flags)
 {
-	taskq_t *tq = umem_zalloc(sizeof (taskq_t), UMEM_NOFAIL);
+	taskq_t *tq;
 	int t;
 
+	tq = zalloc(sizeof(taskq_t));
+	if (!tq)
+		return NULL;
+
 	if (flags & TASKQ_THREADS_CPU_PCT) {
 		int pct;
 		ASSERT3S(nthreads, >=, 0);
@@ -265,11 +262,11 @@
 		ASSERT3S(nthreads, >=, 1);
 	}
 
-	VERIFY0(rwlock_init(&tq->tq_threadlock, USYNC_THREAD, NULL));
-	VERIFY0(mutex_init(&tq->tq_lock, USYNC_THREAD, NULL));
-	VERIFY0(cond_init(&tq->tq_dispatch_cv, USYNC_THREAD, NULL));
-	VERIFY0(cond_init(&tq->tq_wait_cv, USYNC_THREAD, NULL));
-	VERIFY0(cond_init(&tq->tq_maxalloc_cv, USYNC_THREAD, NULL));
+	rwinit(&tq->tq_threadlock);
+	mxinit(&tq->tq_lock);
+	condinit(&tq->tq_dispatch_cv);
+	condinit(&tq->tq_wait_cv);
+	condinit(&tq->tq_maxalloc_cv);
 	(void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1);
 	tq->tq_flags = flags | TASKQ_ACTIVE;
 	tq->tq_active = nthreads;
@@ -279,18 +276,17 @@
 	tq->tq_task.tqent_next = &tq->tq_task;
 	tq->tq_task.tqent_prev = &tq->tq_task;
 	tq->tq_threadlist =
-	    umem_alloc(nthreads * sizeof (thread_t), UMEM_NOFAIL);
+	    umem_alloc(nthreads * sizeof (pthread_t), UMEM_NOFAIL);
 
 	if (flags & TASKQ_PREPOPULATE) {
-		VERIFY0(mutex_lock(&tq->tq_lock));
+		mxlock(&tq->tq_lock);
 		while (minalloc-- > 0)
 			task_free(tq, task_alloc(tq, UMEM_NOFAIL));
-		VERIFY0(mutex_unlock(&tq->tq_lock));
+		mxunlock(&tq->tq_lock);
 	}
 
 	for (t = 0; t < nthreads; t++)
-		(void) thr_create(0, 0, taskq_thread,
-		    tq, THR_BOUND, &tq->tq_threadlist[t]);
+		pthread_create(&tq->tq_threadlist[t], NULL, taskq_thread, tq);
 
 	return (tq);
 }
@@ -303,15 +299,13 @@
 
 	taskq_wait(tq);
 
-	VERIFY0(mutex_lock(&tq->tq_lock));
+	mxlock(&tq->tq_lock);
 
 	tq->tq_flags &= ~TASKQ_ACTIVE;
-	VERIFY0(cond_broadcast(&tq->tq_dispatch_cv));
+	condbcast(&tq->tq_dispatch_cv);
 
-	while (tq->tq_nthreads != 0) {
-		int ret = cond_wait(&tq->tq_wait_cv, &tq->tq_lock);
-		VERIFY(ret == 0 || ret == EINTR);
-	}
+	while (tq->tq_nthreads != 0)
+		condwait(&tq->tq_wait_cv, &tq->tq_lock);
 
 	tq->tq_minalloc = 0;
 	while (tq->tq_nalloc != 0) {
@@ -319,18 +313,18 @@
 		task_free(tq, task_alloc(tq, UMEM_NOFAIL));
 	}
 
-	VERIFY0(mutex_unlock(&tq->tq_lock));
+	mxunlock(&tq->tq_lock);
 
 	for (t = 0; t < nthreads; t++)
-		(void) thr_join(tq->tq_threadlist[t], NULL, NULL);
+		pthread_join(tq->tq_threadlist[t], NULL);
 
-	umem_free(tq->tq_threadlist, nthreads * sizeof (thread_t));
+	umem_free(tq->tq_threadlist, nthreads * sizeof (pthread_t));
 
-	VERIFY0(rwlock_destroy(&tq->tq_threadlock));
-	VERIFY0(mutex_destroy(&tq->tq_lock));
-	VERIFY0(cond_destroy(&tq->tq_dispatch_cv));
-	VERIFY0(cond_destroy(&tq->tq_wait_cv));
-	VERIFY0(cond_destroy(&tq->tq_maxalloc_cv));
+	rwdestroy(&tq->tq_threadlock);
+	mxdestroy(&tq->tq_lock);
+	conddestroy(&tq->tq_dispatch_cv);
+	conddestroy(&tq->tq_wait_cv);
+	conddestroy(&tq->tq_maxalloc_cv);
 
 	umem_free(tq, sizeof (taskq_t));
 }
@@ -344,22 +338,8 @@
 		return (1);
 
 	for (i = 0; i < tq->tq_nthreads; i++)
-		if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t)
+		if (tq->tq_threadlist[i] == (pthread_t)(uintptr_t)t)
 			return (1);
 
 	return (0);
 }
-
-void
-system_taskq_init(void)
-{
-	system_taskq = taskq_create("system_taskq", 64, 60, 4, 512,
-	    TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
-}
-
-void
-system_taskq_fini(void)
-{
-	taskq_destroy(system_taskq);
-	system_taskq = NULL; /* defensive */
-}