view taskq.c @ 490:a4c25e17b135

synch: make the synch API macro-based Instead of considering the macros as compatibility wrappers, make them *the* API. This way, we can trivially pass additional debugging information into the actual functions. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Sun, 06 May 2018 18:37:16 -0400
parents 7d21606b771c
children ddeb5fa3ea47
line wrap: on
line source

/*
 * Copyright (c) 2017-2018 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <unistd.h>

#include <jeffpc/error.h>
#include <jeffpc/taskq.h>
#include <jeffpc/cstr.h>

static LOCK_CLASS(taskq_lc);

static void enqueue(struct taskq *tq, struct taskq_item *item)
{
	list_insert_tail(&tq->queue, item);
	tq->queue_len++;
}

static struct taskq_item *dequeue(struct taskq *tq)
{
	struct taskq_item *item;

	item = list_remove_head(&tq->queue);
	if (!item)
		return NULL;

	tq->queue_len--;

	return item;
}

static void *taskq_worker(void *arg)
{
	struct taskq *tq = arg;

	MXLOCK(&tq->lock);

	/* process */
	while (!tq->shutdown) {
		struct taskq_item *item;

		item = dequeue(tq);
		if (!item) {
			CONDWAIT(&tq->cond_parent2worker, &tq->lock);
			continue;
		}

		CONDBCAST(&tq->cond_worker2parent);
		MXUNLOCK(&tq->lock);

		item->fxn(item->arg);

		free(item);

		MXLOCK(&tq->lock);

		tq->processed++;
	}

	MXUNLOCK(&tq->lock);

	return NULL;
}

static int start_threads(struct taskq *tq)
{
	int ret = 0;
	long i;

	VERIFY(tq->nthreads);

	MXLOCK(&tq->lock);

	for (i = 0; i < tq->nthreads; i++) {
		ret = xthr_create(&tq->threads[i], taskq_worker, tq);
		if (ret)
			break;

		tq->nstarted_threads++;
	}

	MXUNLOCK(&tq->lock);

	return ret;
}

static long get_nthreads(long nthreads)
{
	if (nthreads == -1) {
		nthreads = sysconf(_SC_NPROCESSORS_ONLN);
		if (nthreads == -1)
			return -errno;
	}

	if (nthreads <= 0)
		return -EINVAL;

	return nthreads;
}

/*
 * Create a taskq with a given number of threads.  If the number of threads
 * is -1, the current number of online processors is used instead.  All
 * threads are created immediately.
 */
struct taskq *taskq_create_fixed(const char *name, long nthreads)
{
	struct taskq *tq;
	int ret;

	nthreads = get_nthreads(nthreads);
	if (nthreads < 0)
		return ERR_PTR(nthreads);

	VERIFY(nthreads);

	tq = malloc(sizeof(struct taskq));
	if (!tq)
		goto err;

	tq->threads = calloc(nthreads, sizeof(pthread_t));
	if (!tq->threads)
		goto err_free;

	strcpy_safe(tq->name, name, sizeof(tq->name));
	tq->nthreads = nthreads;
	tq->nstarted_threads = 0;
	tq->queue_len = 0;
	tq->shutdown = false;
	tq->processed = 0;

	list_create(&tq->queue, sizeof(struct taskq_item),
		    offsetof(struct taskq_item, node));
	MXINIT(&tq->lock, &taskq_lc);
	CONDINIT(&tq->cond_worker2parent);
	CONDINIT(&tq->cond_parent2worker);

	ret = start_threads(tq);
	if (ret) {
		taskq_destroy(tq);
		return ERR_PTR(ret);
	}

	return tq;

err_free:
	free(tq);
err:
	return ERR_PTR(-ENOMEM);
}

/*
 * Allocate and enqueue a taskq item.  Once a worker thread is available, it
 * will call fxn(arg) and free the item.
 */
int taskq_dispatch(struct taskq *tq, void (*fxn)(void *), void *arg)
{
	struct taskq_item *item;

	item = malloc(sizeof(struct taskq_item));
	if (!item)
		return -ENOMEM;

	item->fxn = fxn;
	item->arg = arg;

	MXLOCK(&tq->lock);
	enqueue(tq, item);
	CONDSIG(&tq->cond_parent2worker);
	MXUNLOCK(&tq->lock);

	return 0;
}

/*
 * Wait for the queue of work items to drain.
 */
void taskq_wait(struct taskq *tq)
{
	MXLOCK(&tq->lock);

	while (!list_is_empty(&tq->queue))
		CONDWAIT(&tq->cond_worker2parent, &tq->lock);

	VERIFY(list_is_empty(&tq->queue));

	MXUNLOCK(&tq->lock);
}

/*
 * Destroy the taskq.  The queue must be empty at this point.
 */
void taskq_destroy(struct taskq *tq)
{
	long i;

	MXLOCK(&tq->lock);

	/* the queue should be empty */
	VERIFY(list_is_empty(&tq->queue));

	/* make everyone aware that we are shutting down */
	tq->shutdown = true;
	CONDBCAST(&tq->cond_parent2worker);

	MXUNLOCK(&tq->lock);

	/* wait for all the threads to terminate */
	for (i = 0; i < tq->nstarted_threads; i++)
		xthr_join(tq->threads[i], NULL);

	/* free */
	CONDDESTROY(&tq->cond_parent2worker);
	CONDDESTROY(&tq->cond_worker2parent);
	MXDESTROY(&tq->lock);
	list_destroy(&tq->queue);
	free(tq->threads);
	free(tq);
}