Mercurial > nomad > old-fuse
changeset 67:903ff5d135a8
suntaskq: clean up to use POSIX/common lib interfaces
It still relies on some libumem interfaces so it still won't work on Linux.
Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author | Josef 'Jeff' Sipek <jeffpc@josefsipek.net> |
---|---|
date | Tue, 07 Jul 2015 21:43:35 -0400 |
parents | 568b4fd9ef61 |
children | a25c31ea6b52 |
files | src/suntaskq/include/sys/taskq.h src/suntaskq/taskq.c |
diffstat | 2 files changed, 65 insertions(+), 87 deletions(-) [+] |
line wrap: on
line diff
--- a/src/suntaskq/include/sys/taskq.h Tue Jul 07 21:42:02 2015 -0400 +++ b/src/suntaskq/include/sys/taskq.h Tue Jul 07 21:43:35 2015 -0400 @@ -62,15 +62,13 @@ extern taskq_t *system_taskq; -extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t); +extern taskq_t *taskq_create(const char *, int, int, int, uint_t); extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t); extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, taskq_ent_t *); extern void taskq_destroy(taskq_t *); extern void taskq_wait(taskq_t *); extern int taskq_member(taskq_t *, void *); -extern void system_taskq_init(void); -extern void system_taskq_fini(void); #ifdef __cplusplus }
--- a/src/suntaskq/taskq.c Tue Jul 07 21:42:02 2015 -0400 +++ b/src/suntaskq/taskq.c Tue Jul 07 21:43:35 2015 -0400 @@ -26,41 +26,38 @@ * Copyright 2011 Nexenta Systems, Inc. All rights reserved. * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. * Copyright (c) 2014 by Delphix. All rights reserved. + * Copyright (c) 2015 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> */ -#include <thread.h> -#include <synch.h> #include <unistd.h> -#include <string.h> -#include <errno.h> -#include <sys/debug.h> -#include <sys/sysmacros.h> +#include <nomad/types.h> +#include <nomad/error.h> +#include <nomad/mutex.h> #include <sys/taskq.h> /* Maximum percentage allowed for TASKQ_THREADS_CPU_PCT */ static int taskq_cpupct_max_percent = 1000; int taskq_now; -taskq_t *system_taskq; #define TASKQ_ACTIVE 0x00010000 #define TASKQ_NAMELEN 31 struct taskq { char tq_name[TASKQ_NAMELEN + 1]; - mutex_t tq_lock; - rwlock_t tq_threadlock; - cond_t tq_dispatch_cv; - cond_t tq_wait_cv; - thread_t *tq_threadlist; + pthread_mutex_t tq_lock; + pthread_rwlock_t tq_threadlock; + pthread_cond_t tq_dispatch_cv; + pthread_cond_t tq_wait_cv; + pthread_t *tq_threadlist; int tq_flags; int tq_active; int tq_nthreads; int tq_nalloc; int tq_minalloc; int tq_maxalloc; - cond_t tq_maxalloc_cv; + pthread_cond_t tq_maxalloc_cv; int tq_maxalloc_wait; taskq_ent_t *tq_freelist; taskq_ent_t tq_task; @@ -94,18 +91,18 @@ ts.tv_sec = 1; ts.tv_nsec = 0; - err = cond_reltimedwait(&tq->tq_maxalloc_cv, + err = condreltimedwait(&tq->tq_maxalloc_cv, &tq->tq_lock, &ts); tq->tq_maxalloc_wait--; if (err == 0) goto again; /* signaled */ } - VERIFY0(mutex_unlock(&tq->tq_lock)); + mxunlock(&tq->tq_lock); t = umem_alloc(sizeof (taskq_ent_t), tqflags); - VERIFY0(mutex_lock(&tq->tq_lock)); + mxlock(&tq->tq_lock); if (t != NULL) tq->tq_nalloc++; } @@ -120,13 +117,13 @@ tq->tq_freelist = t; } else { tq->tq_nalloc--; - VERIFY0(mutex_unlock(&tq->tq_lock)); + mxunlock(&tq->tq_lock); umem_free(t, sizeof (taskq_ent_t)); - VERIFY0(mutex_lock(&tq->tq_lock)); + mxlock(&tq->tq_lock); } if (tq->tq_maxalloc_wait) - VERIFY0(cond_signal(&tq->tq_maxalloc_cv)); + condsig(&tq->tq_maxalloc_cv); } taskqid_t @@ -139,10 +136,10 @@ return (1); } - VERIFY0(mutex_lock(&tq->tq_lock)); + mxlock(&tq->tq_lock); ASSERT(tq->tq_flags & TASKQ_ACTIVE); if ((t = task_alloc(tq, tqflags)) == NULL) { - VERIFY0(mutex_unlock(&tq->tq_lock)); + mxunlock(&tq->tq_lock); return (0); } if (tqflags & TQ_FRONT) { @@ -157,8 +154,8 @@ t->tqent_func = func; t->tqent_arg = arg; t->tqent_flags = 0; - VERIFY0(cond_signal(&tq->tq_dispatch_cv)); - VERIFY0(mutex_unlock(&tq->tq_lock)); + condsig(&tq->tq_dispatch_cv); + mxunlock(&tq->tq_lock); return (1); } @@ -177,7 +174,7 @@ /* * Enqueue the task to the underlying queue. */ - VERIFY0(mutex_lock(&tq->tq_lock)); + mxlock(&tq->tq_lock); if (flags & TQ_FRONT) { t->tqent_next = tq->tq_task.tqent_next; @@ -190,19 +187,17 @@ t->tqent_prev->tqent_next = t; t->tqent_func = func; t->tqent_arg = arg; - VERIFY0(cond_signal(&tq->tq_dispatch_cv)); - VERIFY0(mutex_unlock(&tq->tq_lock)); + condsig(&tq->tq_dispatch_cv); + mxunlock(&tq->tq_lock); } void taskq_wait(taskq_t *tq) { - VERIFY0(mutex_lock(&tq->tq_lock)); - while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) { - int ret = cond_wait(&tq->tq_wait_cv, &tq->tq_lock); - VERIFY(ret == 0 || ret == EINTR); - } - VERIFY0(mutex_unlock(&tq->tq_lock)); + mxlock(&tq->tq_lock); + while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) + condwait(&tq->tq_wait_cv, &tq->tq_lock); + mxunlock(&tq->tq_lock); } static void * @@ -212,14 +207,12 @@ taskq_ent_t *t; boolean_t prealloc; - VERIFY0(mutex_lock(&tq->tq_lock)); + mxlock(&tq->tq_lock); while (tq->tq_flags & TASKQ_ACTIVE) { if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { - int ret; if (--tq->tq_active == 0) - VERIFY0(cond_broadcast(&tq->tq_wait_cv)); - ret = cond_wait(&tq->tq_dispatch_cv, &tq->tq_lock); - VERIFY(ret == 0 || ret == EINTR); + condbcast(&tq->tq_wait_cv); + condwait(&tq->tq_dispatch_cv, &tq->tq_lock); tq->tq_active++; continue; } @@ -228,30 +221,34 @@ t->tqent_next = NULL; t->tqent_prev = NULL; prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; - VERIFY0(mutex_unlock(&tq->tq_lock)); + mxunlock(&tq->tq_lock); - VERIFY0(rw_rdlock(&tq->tq_threadlock)); + rwlock(&tq->tq_threadlock, false); t->tqent_func(t->tqent_arg); - VERIFY0(rw_unlock(&tq->tq_threadlock)); + rwunlock(&tq->tq_threadlock); - VERIFY0(mutex_lock(&tq->tq_lock)); + mxlock(&tq->tq_lock); if (!prealloc) task_free(tq, t); } tq->tq_nthreads--; - VERIFY0(cond_broadcast(&tq->tq_wait_cv)); - VERIFY0(mutex_unlock(&tq->tq_lock)); + condbcast(&tq->tq_wait_cv); + mxunlock(&tq->tq_lock); return (NULL); } /*ARGSUSED*/ taskq_t * -taskq_create(const char *name, int nthreads, pri_t pri, +taskq_create(const char *name, int nthreads, int minalloc, int maxalloc, uint_t flags) { - taskq_t *tq = umem_zalloc(sizeof (taskq_t), UMEM_NOFAIL); + taskq_t *tq; int t; + tq = zalloc(sizeof(taskq_t)); + if (!tq) + return NULL; + if (flags & TASKQ_THREADS_CPU_PCT) { int pct; ASSERT3S(nthreads, >=, 0); @@ -265,11 +262,11 @@ ASSERT3S(nthreads, >=, 1); } - VERIFY0(rwlock_init(&tq->tq_threadlock, USYNC_THREAD, NULL)); - VERIFY0(mutex_init(&tq->tq_lock, USYNC_THREAD, NULL)); - VERIFY0(cond_init(&tq->tq_dispatch_cv, USYNC_THREAD, NULL)); - VERIFY0(cond_init(&tq->tq_wait_cv, USYNC_THREAD, NULL)); - VERIFY0(cond_init(&tq->tq_maxalloc_cv, USYNC_THREAD, NULL)); + rwinit(&tq->tq_threadlock); + mxinit(&tq->tq_lock); + condinit(&tq->tq_dispatch_cv); + condinit(&tq->tq_wait_cv); + condinit(&tq->tq_maxalloc_cv); (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1); tq->tq_flags = flags | TASKQ_ACTIVE; tq->tq_active = nthreads; @@ -279,18 +276,17 @@ tq->tq_task.tqent_next = &tq->tq_task; tq->tq_task.tqent_prev = &tq->tq_task; tq->tq_threadlist = - umem_alloc(nthreads * sizeof (thread_t), UMEM_NOFAIL); + umem_alloc(nthreads * sizeof (pthread_t), UMEM_NOFAIL); if (flags & TASKQ_PREPOPULATE) { - VERIFY0(mutex_lock(&tq->tq_lock)); + mxlock(&tq->tq_lock); while (minalloc-- > 0) task_free(tq, task_alloc(tq, UMEM_NOFAIL)); - VERIFY0(mutex_unlock(&tq->tq_lock)); + mxunlock(&tq->tq_lock); } for (t = 0; t < nthreads; t++) - (void) thr_create(0, 0, taskq_thread, - tq, THR_BOUND, &tq->tq_threadlist[t]); + pthread_create(&tq->tq_threadlist[t], NULL, taskq_thread, tq); return (tq); } @@ -303,15 +299,13 @@ taskq_wait(tq); - VERIFY0(mutex_lock(&tq->tq_lock)); + mxlock(&tq->tq_lock); tq->tq_flags &= ~TASKQ_ACTIVE; - VERIFY0(cond_broadcast(&tq->tq_dispatch_cv)); + condbcast(&tq->tq_dispatch_cv); - while (tq->tq_nthreads != 0) { - int ret = cond_wait(&tq->tq_wait_cv, &tq->tq_lock); - VERIFY(ret == 0 || ret == EINTR); - } + while (tq->tq_nthreads != 0) + condwait(&tq->tq_wait_cv, &tq->tq_lock); tq->tq_minalloc = 0; while (tq->tq_nalloc != 0) { @@ -319,18 +313,18 @@ task_free(tq, task_alloc(tq, UMEM_NOFAIL)); } - VERIFY0(mutex_unlock(&tq->tq_lock)); + mxunlock(&tq->tq_lock); for (t = 0; t < nthreads; t++) - (void) thr_join(tq->tq_threadlist[t], NULL, NULL); + pthread_join(tq->tq_threadlist[t], NULL); - umem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); + umem_free(tq->tq_threadlist, nthreads * sizeof (pthread_t)); - VERIFY0(rwlock_destroy(&tq->tq_threadlock)); - VERIFY0(mutex_destroy(&tq->tq_lock)); - VERIFY0(cond_destroy(&tq->tq_dispatch_cv)); - VERIFY0(cond_destroy(&tq->tq_wait_cv)); - VERIFY0(cond_destroy(&tq->tq_maxalloc_cv)); + rwdestroy(&tq->tq_threadlock); + mxdestroy(&tq->tq_lock); + conddestroy(&tq->tq_dispatch_cv); + conddestroy(&tq->tq_wait_cv); + conddestroy(&tq->tq_maxalloc_cv); umem_free(tq, sizeof (taskq_t)); } @@ -344,22 +338,8 @@ return (1); for (i = 0; i < tq->tq_nthreads; i++) - if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) + if (tq->tq_threadlist[i] == (pthread_t)(uintptr_t)t) return (1); return (0); } - -void -system_taskq_init(void) -{ - system_taskq = taskq_create("system_taskq", 64, 60, 4, 512, - TASKQ_DYNAMIC | TASKQ_PREPOPULATE); -} - -void -system_taskq_fini(void) -{ - taskq_destroy(system_taskq); - system_taskq = NULL; /* defensive */ -}