* CDDL HEADER END
*/
/*
- * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
+ * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
krwlock_t tq_threadlock;
kcondvar_t tq_dispatch_cv;
kcondvar_t tq_wait_cv;
- thread_t *tq_threadlist;
+ kthread_t **tq_threadlist;
int tq_flags;
int tq_active;
int tq_nthreads;
int tq_nalloc;
int tq_minalloc;
int tq_maxalloc;
+ kcondvar_t tq_maxalloc_cv;
+ int tq_maxalloc_wait;
task_t *tq_freelist;
task_t tq_task;
};
task_alloc(taskq_t *tq, int tqflags)
{
task_t *t;
+ int rv;
- if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
+again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
tq->tq_freelist = t->task_next;
} else {
- mutex_exit(&tq->tq_lock);
if (tq->tq_nalloc >= tq->tq_maxalloc) {
- if (!(tqflags & KM_SLEEP)) {
- mutex_enter(&tq->tq_lock);
+ if (!(tqflags & KM_SLEEP))
return (NULL);
- }
+
/*
* We don't want to exceed tq_maxalloc, but we can't
* wait for other tasks to complete (and thus free up
* task structures) without risking deadlock with
* the caller. So, we just delay for one second
- * to throttle the allocation rate.
+ * to throttle the allocation rate. If we have tasks
+ * complete before one second timeout expires then
+ * taskq_ent_free will signal us and we will
+ * immediately retry the allocation.
*/
- delay(hz);
+ tq->tq_maxalloc_wait++;
+ rv = cv_timedwait(&tq->tq_maxalloc_cv,
+ &tq->tq_lock, ddi_get_lbolt() + hz);
+ tq->tq_maxalloc_wait--;
+ if (rv > 0)
+ goto again; /* signaled */
}
+ mutex_exit(&tq->tq_lock);
+
t = kmem_alloc(sizeof (task_t), tqflags);
+
mutex_enter(&tq->tq_lock);
if (t != NULL)
tq->tq_nalloc++;
kmem_free(t, sizeof (task_t));
mutex_enter(&tq->tq_lock);
}
+
+ if (tq->tq_maxalloc_wait)
+ cv_signal(&tq->tq_maxalloc_cv);
}
taskqid_t
mutex_exit(&tq->tq_lock);
return (0);
}
- t->task_next = &tq->tq_task;
- t->task_prev = tq->tq_task.task_prev;
+ if (tqflags & TQ_FRONT) {
+ t->task_next = tq->tq_task.task_next;
+ t->task_prev = &tq->tq_task;
+ } else {
+ t->task_next = &tq->tq_task;
+ t->task_prev = tq->tq_task.task_prev;
+ }
t->task_next->task_prev = t;
t->task_prev->task_next = t;
t->task_func = func;
mutex_exit(&tq->tq_lock);
}
-static void *
+static void
taskq_thread(void *arg)
{
taskq_t *tq = arg;
tq->tq_nthreads--;
cv_broadcast(&tq->tq_wait_cv);
mutex_exit(&tq->tq_lock);
- return (NULL);
+ thread_exit();
}
/*ARGSUSED*/
mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
+ cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
tq->tq_flags = flags | TASKQ_ACTIVE;
tq->tq_active = nthreads;
tq->tq_nthreads = nthreads;
tq->tq_maxalloc = maxalloc;
tq->tq_task.task_next = &tq->tq_task;
tq->tq_task.task_prev = &tq->tq_task;
- tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);
+ tq->tq_threadlist = kmem_alloc(nthreads*sizeof(kthread_t *), KM_SLEEP);
if (flags & TASKQ_PREPOPULATE) {
mutex_enter(&tq->tq_lock);
}
for (t = 0; t < nthreads; t++)
- (void) thr_create(0, 0, taskq_thread,
- tq, THR_BOUND, &tq->tq_threadlist[t]);
+ VERIFY((tq->tq_threadlist[t] = thread_create(NULL, 0,
+ taskq_thread, tq, TS_RUN, NULL, 0, 0)) != NULL);
return (tq);
}
void
taskq_destroy(taskq_t *tq)
{
- int t;
int nthreads = tq->tq_nthreads;
taskq_wait(tq);
mutex_exit(&tq->tq_lock);
- for (t = 0; t < nthreads; t++)
- (void) thr_join(tq->tq_threadlist[t], NULL, NULL);
-
- kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t));
+ kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *));
rw_destroy(&tq->tq_threadlock);
mutex_destroy(&tq->tq_lock);
cv_destroy(&tq->tq_dispatch_cv);
cv_destroy(&tq->tq_wait_cv);
+ cv_destroy(&tq->tq_maxalloc_cv);
kmem_free(tq, sizeof (taskq_t));
}
int
-taskq_member(taskq_t *tq, void *t)
+taskq_member(taskq_t *tq, kthread_t *t)
{
int i;
return (1);
for (i = 0; i < tq->tq_nthreads; i++)
- if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t)
+ if (tq->tq_threadlist[i] == t)
return (1);
return (0);
system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512,
TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
}
+
+void
+system_taskq_fini(void)
+{
+ taskq_destroy(system_taskq);
+ system_taskq = NULL; /* defensive */
+}