Mercurial > nomad > old-fuse
changeset 60:858975077f7c
suntaskq: import taskq code from Delphix's illumos tree
A taskq is a thread pool used to execute functions asynchronously.
The code is not going to compile on Linux, but it shouldn't be too difficult
to fix it up to be Linux friendly.
Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author | Josef 'Jeff' Sipek <jeffpc@josefsipek.net> |
---|---|
date | Tue, 07 Jul 2015 20:52:33 -0400 |
parents | af965108a9b3 |
children | 3f383dbb1c15 |
files | README src/CMakeLists.txt src/suntaskq/.gitignore src/suntaskq/CMakeLists.txt src/suntaskq/include/sys/taskq.h src/suntaskq/taskq.c |
diffstat | 6 files changed, 492 insertions(+), 11 deletions(-) [+] |
line wrap: on
line diff
--- a/README Tue Jul 07 20:40:26 2015 -0400 +++ b/README Tue Jul 07 20:52:33 2015 -0400 @@ -26,17 +26,18 @@ Internal Dependencies ===================== - | sunavl | sunlist | common | fakeumem | objstore --------------+--------+---------+--------+----------+---------- -sunavl | - | n | n | n | n -sunlist | n | - | n | n | n -common | n | n | - | y | n -fakeumem | n | n | n | - | n -objstore | n | n | n | y | - -objs. module | ? | ? | y | y | n -client | n | n | n | y | n -server | n | n | n | y | n -tool | n | n | n | y | n + | sunavl | sunlist | common | fakeumem | objstore | suntaskq +-------------+--------+---------+--------+----------+----------+---------- +sunavl | - | n | n | n | n | n +sunlist | n | - | n | n | n | n +common | n | n | - | y | n | n +fakeumem | n | n | n | - | n | n +objstore | n | n | n | y | - | n +suntaskq | n | n | y | y | n | - +objs. module | ? | ? | y | y | n | n +client | n | n | n | y | n | n +server | n | n | n | y | n | n +tool | n | n | n | y | n | n y = yes, linked against n = no, not linked against
--- a/src/CMakeLists.txt Tue Jul 07 20:40:26 2015 -0400 +++ b/src/CMakeLists.txt Tue Jul 07 20:52:33 2015 -0400 @@ -40,11 +40,14 @@ set(LIST_LIBRARY ${CMDUTILS_LIBRARY}) endif() +set(TASKQ_LIBRARY suntaskq) + # # From this point on, use these variables instead of hardcoding lib names: # # AVL_LIBRARY # LIST_LIBRARY +# TASKQ_LIBRARY # UMEM_LIBRARY # @@ -58,9 +61,11 @@ include_directories( common/include objstore/include + suntaskq/include ) add_subdirectory(common) +add_subdirectory(suntaskq) add_subdirectory(objstore) add_subdirectory(server) add_subdirectory(client)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/suntaskq/.gitignore Tue Jul 07 20:52:33 2015 -0400 @@ -0,0 +1,1 @@ +libsuntaskq.so
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/suntaskq/CMakeLists.txt Tue Jul 07 20:52:33 2015 -0400 @@ -0,0 +1,30 @@ +# +# Copyright (c) 2015 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +add_library(suntaskq SHARED + taskq.c +) + +target_link_libraries(suntaskq + ${BASE_LIBS} + common +)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/suntaskq/include/sys/taskq.h Tue Jul 07 20:52:33 2015 -0400 @@ -0,0 +1,79 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ +/* + * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. + * Copyright 2011 Nexenta Systems, Inc. All rights reserved. + * Copyright (c) 2012, 2014 by Delphix. All rights reserved. + * Copyright (c) 2012, Joyent, Inc. All rights reserved. + */ + +#ifndef _TASKQ_H +#define _TASKQ_H + +#include <stdint.h> +#include <umem.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct taskq taskq_t; +typedef uintptr_t taskqid_t; +typedef void (task_func_t)(void *); + +typedef struct taskq_ent { + struct taskq_ent *tqent_next; + struct taskq_ent *tqent_prev; + task_func_t *tqent_func; + void *tqent_arg; + uintptr_t tqent_flags; +} taskq_ent_t; + +#define TQENT_FLAG_PREALLOC 0x1 /* taskq_dispatch_ent used */ + +#define TASKQ_PREPOPULATE 0x0001 +#define TASKQ_CPR_SAFE 0x0002 /* Use CPR safe protocol */ +#define TASKQ_DYNAMIC 0x0004 /* Use dynamic thread scheduling */ +#define TASKQ_THREADS_CPU_PCT 0x0008 /* Scale # threads by # cpus */ +#define TASKQ_DC_BATCH 0x0010 /* Mark threads as batch */ + +#define TQ_SLEEP UMEM_NOFAIL /* Can block for memory */ +#define TQ_NOSLEEP UMEM_DEFAULT /* cannot block for memory; may fail */ +#define TQ_NOQUEUE 0x02 /* Do not enqueue if can't dispatch */ +#define TQ_FRONT 0x08 /* Queue in front */ + +extern taskq_t *system_taskq; + +extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t); +extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t); +extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, + taskq_ent_t *); +extern void taskq_destroy(taskq_t *); +extern void taskq_wait(taskq_t *); +extern int taskq_member(taskq_t *, void *); +extern void system_taskq_init(void); +extern void system_taskq_fini(void); + +#ifdef __cplusplus +} +#endif + +#endif /* _TASKQ_H */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/suntaskq/taskq.c Tue Jul 07 20:52:33 2015 -0400 @@ -0,0 +1,365 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ +/* + * Copyright 2010 Sun Microsystems, Inc. All rights reserved. + * Use is subject to license terms. + */ +/* + * Copyright 2011 Nexenta Systems, Inc. All rights reserved. + * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. + * Copyright (c) 2014 by Delphix. All rights reserved. + */ + +#include <thread.h> +#include <synch.h> +#include <unistd.h> +#include <string.h> +#include <errno.h> +#include <sys/debug.h> +#include <sys/sysmacros.h> + +#include <sys/taskq.h> + +/* Maximum percentage allowed for TASKQ_THREADS_CPU_PCT */ +static int taskq_cpupct_max_percent = 1000; + +int taskq_now; +taskq_t *system_taskq; + +#define TASKQ_ACTIVE 0x00010000 +#define TASKQ_NAMELEN 31 + +struct taskq { + char tq_name[TASKQ_NAMELEN + 1]; + mutex_t tq_lock; + rwlock_t tq_threadlock; + cond_t tq_dispatch_cv; + cond_t tq_wait_cv; + thread_t *tq_threadlist; + int tq_flags; + int tq_active; + int tq_nthreads; + int tq_nalloc; + int tq_minalloc; + int tq_maxalloc; + cond_t tq_maxalloc_cv; + int tq_maxalloc_wait; + taskq_ent_t *tq_freelist; + taskq_ent_t tq_task; +}; + +static taskq_ent_t * +task_alloc(taskq_t *tq, int tqflags) +{ + taskq_ent_t *t; + timestruc_t ts; + int err; + +again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { + tq->tq_freelist = t->tqent_next; + } else { + if (tq->tq_nalloc >= tq->tq_maxalloc) { + if (!(tqflags & UMEM_NOFAIL)) + return (NULL); + + /* + * We don't want to exceed tq_maxalloc, but we can't + * wait for other tasks to complete (and thus free up + * task structures) without risking deadlock with + * the caller. So, we just delay for one second + * to throttle the allocation rate. If we have tasks + * complete before one second timeout expires then + * taskq_ent_free will signal us and we will + * immediately retry the allocation. + */ + tq->tq_maxalloc_wait++; + + ts.tv_sec = 1; + ts.tv_nsec = 0; + err = cond_reltimedwait(&tq->tq_maxalloc_cv, + &tq->tq_lock, &ts); + + tq->tq_maxalloc_wait--; + if (err == 0) + goto again; /* signaled */ + } + VERIFY0(mutex_unlock(&tq->tq_lock)); + + t = umem_alloc(sizeof (taskq_ent_t), tqflags); + + VERIFY0(mutex_lock(&tq->tq_lock)); + if (t != NULL) + tq->tq_nalloc++; + } + return (t); +} + +static void +task_free(taskq_t *tq, taskq_ent_t *t) +{ + if (tq->tq_nalloc <= tq->tq_minalloc) { + t->tqent_next = tq->tq_freelist; + tq->tq_freelist = t; + } else { + tq->tq_nalloc--; + VERIFY0(mutex_unlock(&tq->tq_lock)); + umem_free(t, sizeof (taskq_ent_t)); + VERIFY0(mutex_lock(&tq->tq_lock)); + } + + if (tq->tq_maxalloc_wait) + VERIFY0(cond_signal(&tq->tq_maxalloc_cv)); +} + +taskqid_t +taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) +{ + taskq_ent_t *t; + + if (taskq_now) { + func(arg); + return (1); + } + + VERIFY0(mutex_lock(&tq->tq_lock)); + ASSERT(tq->tq_flags & TASKQ_ACTIVE); + if ((t = task_alloc(tq, tqflags)) == NULL) { + VERIFY0(mutex_unlock(&tq->tq_lock)); + return (0); + } + if (tqflags & TQ_FRONT) { + t->tqent_next = tq->tq_task.tqent_next; + t->tqent_prev = &tq->tq_task; + } else { + t->tqent_next = &tq->tq_task; + t->tqent_prev = tq->tq_task.tqent_prev; + } + t->tqent_next->tqent_prev = t; + t->tqent_prev->tqent_next = t; + t->tqent_func = func; + t->tqent_arg = arg; + t->tqent_flags = 0; + VERIFY0(cond_signal(&tq->tq_dispatch_cv)); + VERIFY0(mutex_unlock(&tq->tq_lock)); + return (1); +} + +void +taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, + taskq_ent_t *t) +{ + ASSERT(func != NULL); + ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); + + /* + * Mark it as a prealloc'd task. This is important + * to ensure that we don't free it later. + */ + t->tqent_flags |= TQENT_FLAG_PREALLOC; + /* + * Enqueue the task to the underlying queue. + */ + VERIFY0(mutex_lock(&tq->tq_lock)); + + if (flags & TQ_FRONT) { + t->tqent_next = tq->tq_task.tqent_next; + t->tqent_prev = &tq->tq_task; + } else { + t->tqent_next = &tq->tq_task; + t->tqent_prev = tq->tq_task.tqent_prev; + } + t->tqent_next->tqent_prev = t; + t->tqent_prev->tqent_next = t; + t->tqent_func = func; + t->tqent_arg = arg; + VERIFY0(cond_signal(&tq->tq_dispatch_cv)); + VERIFY0(mutex_unlock(&tq->tq_lock)); +} + +void +taskq_wait(taskq_t *tq) +{ + VERIFY0(mutex_lock(&tq->tq_lock)); + while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) { + int ret = cond_wait(&tq->tq_wait_cv, &tq->tq_lock); + VERIFY(ret == 0 || ret == EINTR); + } + VERIFY0(mutex_unlock(&tq->tq_lock)); +} + +static void * +taskq_thread(void *arg) +{ + taskq_t *tq = arg; + taskq_ent_t *t; + boolean_t prealloc; + + VERIFY0(mutex_lock(&tq->tq_lock)); + while (tq->tq_flags & TASKQ_ACTIVE) { + if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { + int ret; + if (--tq->tq_active == 0) + VERIFY0(cond_broadcast(&tq->tq_wait_cv)); + ret = cond_wait(&tq->tq_dispatch_cv, &tq->tq_lock); + VERIFY(ret == 0 || ret == EINTR); + tq->tq_active++; + continue; + } + t->tqent_prev->tqent_next = t->tqent_next; + t->tqent_next->tqent_prev = t->tqent_prev; + t->tqent_next = NULL; + t->tqent_prev = NULL; + prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; + VERIFY0(mutex_unlock(&tq->tq_lock)); + + VERIFY0(rw_rdlock(&tq->tq_threadlock)); + t->tqent_func(t->tqent_arg); + VERIFY0(rw_unlock(&tq->tq_threadlock)); + + VERIFY0(mutex_lock(&tq->tq_lock)); + if (!prealloc) + task_free(tq, t); + } + tq->tq_nthreads--; + VERIFY0(cond_broadcast(&tq->tq_wait_cv)); + VERIFY0(mutex_unlock(&tq->tq_lock)); + return (NULL); +} + +/*ARGSUSED*/ +taskq_t * +taskq_create(const char *name, int nthreads, pri_t pri, + int minalloc, int maxalloc, uint_t flags) +{ + taskq_t *tq = umem_zalloc(sizeof (taskq_t), UMEM_NOFAIL); + int t; + + if (flags & TASKQ_THREADS_CPU_PCT) { + int pct; + ASSERT3S(nthreads, >=, 0); + ASSERT3S(nthreads, <=, taskq_cpupct_max_percent); + pct = MIN(nthreads, taskq_cpupct_max_percent); + pct = MAX(pct, 0); + + nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; + nthreads = MAX(nthreads, 1); /* need at least 1 thread */ + } else { + ASSERT3S(nthreads, >=, 1); + } + + VERIFY0(rwlock_init(&tq->tq_threadlock, USYNC_THREAD, NULL)); + VERIFY0(mutex_init(&tq->tq_lock, USYNC_THREAD, NULL)); + VERIFY0(cond_init(&tq->tq_dispatch_cv, USYNC_THREAD, NULL)); + VERIFY0(cond_init(&tq->tq_wait_cv, USYNC_THREAD, NULL)); + VERIFY0(cond_init(&tq->tq_maxalloc_cv, USYNC_THREAD, NULL)); + (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1); + tq->tq_flags = flags | TASKQ_ACTIVE; + tq->tq_active = nthreads; + tq->tq_nthreads = nthreads; + tq->tq_minalloc = minalloc; + tq->tq_maxalloc = maxalloc; + tq->tq_task.tqent_next = &tq->tq_task; + tq->tq_task.tqent_prev = &tq->tq_task; + tq->tq_threadlist = + umem_alloc(nthreads * sizeof (thread_t), UMEM_NOFAIL); + + if (flags & TASKQ_PREPOPULATE) { + VERIFY0(mutex_lock(&tq->tq_lock)); + while (minalloc-- > 0) + task_free(tq, task_alloc(tq, UMEM_NOFAIL)); + VERIFY0(mutex_unlock(&tq->tq_lock)); + } + + for (t = 0; t < nthreads; t++) + (void) thr_create(0, 0, taskq_thread, + tq, THR_BOUND, &tq->tq_threadlist[t]); + + return (tq); +} + +void +taskq_destroy(taskq_t *tq) +{ + int t; + int nthreads = tq->tq_nthreads; + + taskq_wait(tq); + + VERIFY0(mutex_lock(&tq->tq_lock)); + + tq->tq_flags &= ~TASKQ_ACTIVE; + VERIFY0(cond_broadcast(&tq->tq_dispatch_cv)); + + while (tq->tq_nthreads != 0) { + int ret = cond_wait(&tq->tq_wait_cv, &tq->tq_lock); + VERIFY(ret == 0 || ret == EINTR); + } + + tq->tq_minalloc = 0; + while (tq->tq_nalloc != 0) { + ASSERT(tq->tq_freelist != NULL); + task_free(tq, task_alloc(tq, UMEM_NOFAIL)); + } + + VERIFY0(mutex_unlock(&tq->tq_lock)); + + for (t = 0; t < nthreads; t++) + (void) thr_join(tq->tq_threadlist[t], NULL, NULL); + + umem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); + + VERIFY0(rwlock_destroy(&tq->tq_threadlock)); + VERIFY0(mutex_destroy(&tq->tq_lock)); + VERIFY0(cond_destroy(&tq->tq_dispatch_cv)); + VERIFY0(cond_destroy(&tq->tq_wait_cv)); + VERIFY0(cond_destroy(&tq->tq_maxalloc_cv)); + + umem_free(tq, sizeof (taskq_t)); +} + +int +taskq_member(taskq_t *tq, void *t) +{ + int i; + + if (taskq_now) + return (1); + + for (i = 0; i < tq->tq_nthreads; i++) + if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) + return (1); + + return (0); +} + +void +system_taskq_init(void) +{ + system_taskq = taskq_create("system_taskq", 64, 60, 4, 512, + TASKQ_DYNAMIC | TASKQ_PREPOPULATE); +} + +void +system_taskq_fini(void) +{ + taskq_destroy(system_taskq); + system_taskq = NULL; /* defensive */ +}