3246 ZFS I/O deadman thread
[zfs.git] / lib / libzpool / taskq.c
index 93acdcf..64e2142 100644 (file)
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
+/*
+ * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
+ * Copyright 2012 Garrett D'Amore <garrett@damore.org>.  All rights reserved.
+ */
 
 #include <sys/zfs_context.h>
 
 int taskq_now;
 taskq_t *system_taskq;
 
-typedef struct task {
-       struct task     *task_next;
-       struct task     *task_prev;
-       task_func_t     *task_func;
-       void            *task_arg;
-} task_t;
-
 #define        TASKQ_ACTIVE    0x00010000
 
 struct taskq {
@@ -42,66 +39,85 @@ struct taskq {
        krwlock_t       tq_threadlock;
        kcondvar_t      tq_dispatch_cv;
        kcondvar_t      tq_wait_cv;
-       thread_t        *tq_threadlist;
+       kthread_t       **tq_threadlist;
        int             tq_flags;
        int             tq_active;
        int             tq_nthreads;
        int             tq_nalloc;
        int             tq_minalloc;
        int             tq_maxalloc;
-       task_t          *tq_freelist;
-       task_t          tq_task;
+       kcondvar_t      tq_maxalloc_cv;
+       int             tq_maxalloc_wait;
+       taskq_ent_t     *tq_freelist;
+       taskq_ent_t     tq_task;
 };
 
-static task_t *
+static taskq_ent_t *
 task_alloc(taskq_t *tq, int tqflags)
 {
-       task_t *t;
+       taskq_ent_t *t;
+       int rv;
 
-       if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
-               tq->tq_freelist = t->task_next;
+again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
+               ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
+               tq->tq_freelist = t->tqent_next;
        } else {
-               mutex_exit(&tq->tq_lock);
                if (tq->tq_nalloc >= tq->tq_maxalloc) {
-                       if (!(tqflags & KM_SLEEP)) {
-                               mutex_enter(&tq->tq_lock);
+                       if (!(tqflags & KM_SLEEP))
                                return (NULL);
-                       }
+
                        /*
                         * We don't want to exceed tq_maxalloc, but we can't
                         * wait for other tasks to complete (and thus free up
                         * task structures) without risking deadlock with
                         * the caller.  So, we just delay for one second
-                        * to throttle the allocation rate.
+                        * to throttle the allocation rate. If we have tasks
+                        * complete before one second timeout expires then
+                        * taskq_ent_free will signal us and we will
+                        * immediately retry the allocation.
                         */
-                       delay(hz);
+                       tq->tq_maxalloc_wait++;
+                       rv = cv_timedwait(&tq->tq_maxalloc_cv,
+                           &tq->tq_lock, ddi_get_lbolt() + hz);
+                       tq->tq_maxalloc_wait--;
+                       if (rv > 0)
+                               goto again;             /* signaled */
                }
-               t = kmem_alloc(sizeof (task_t), tqflags);
+               mutex_exit(&tq->tq_lock);
+
+               t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
+
                mutex_enter(&tq->tq_lock);
-               if (t != NULL)
+               if (t != NULL) {
+                       /* Make sure we start without any flags */
+                       t->tqent_flags = 0;
                        tq->tq_nalloc++;
+               }
        }
        return (t);
 }
 
 static void
-task_free(taskq_t *tq, task_t *t)
+task_free(taskq_t *tq, taskq_ent_t *t)
 {
        if (tq->tq_nalloc <= tq->tq_minalloc) {
-               t->task_next = tq->tq_freelist;
+               t->tqent_next = tq->tq_freelist;
                tq->tq_freelist = t;
        } else {
                tq->tq_nalloc--;
                mutex_exit(&tq->tq_lock);
-               kmem_free(t, sizeof (task_t));
+               kmem_free(t, sizeof (taskq_ent_t));
                mutex_enter(&tq->tq_lock);
        }
+
+       if (tq->tq_maxalloc_wait)
+               cv_signal(&tq->tq_maxalloc_cv);
 }
 
 taskqid_t
 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
 {
-       task_t *t;
+       taskq_ent_t *t;
 
        if (taskq_now) {
                func(arg);
@@ -114,56 +130,122 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
                mutex_exit(&tq->tq_lock);
                return (0);
        }
-       t->task_next = &tq->tq_task;
-       t->task_prev = tq->tq_task.task_prev;
-       t->task_next->task_prev = t;
-       t->task_prev->task_next = t;
-       t->task_func = func;
-       t->task_arg = arg;
+       if (tqflags & TQ_FRONT) {
+               t->tqent_next = tq->tq_task.tqent_next;
+               t->tqent_prev = &tq->tq_task;
+       } else {
+               t->tqent_next = &tq->tq_task;
+               t->tqent_prev = tq->tq_task.tqent_prev;
+       }
+       t->tqent_next->tqent_prev = t;
+       t->tqent_prev->tqent_next = t;
+       t->tqent_func = func;
+       t->tqent_arg = arg;
+       t->tqent_flags = 0;
        cv_signal(&tq->tq_dispatch_cv);
        mutex_exit(&tq->tq_lock);
        return (1);
 }
 
+taskqid_t
+taskq_dispatch_delay(taskq_t *tq,  task_func_t func, void *arg, uint_t tqflags,
+    clock_t expire_time)
+{
+       return (0);
+}
+
+int
+taskq_empty_ent(taskq_ent_t *t)
+{
+       return t->tqent_next == NULL;
+}
+
+void
+taskq_init_ent(taskq_ent_t *t)
+{
+       t->tqent_next = NULL;
+       t->tqent_prev = NULL;
+       t->tqent_func = NULL;
+       t->tqent_arg = NULL;
+       t->tqent_flags = 0;
+}
+
+void
+taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
+    taskq_ent_t *t)
+{
+       ASSERT(func != NULL);
+       ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
+
+       /*
+        * Mark it as a prealloc'd task.  This is important
+        * to ensure that we don't free it later.
+        */
+       t->tqent_flags |= TQENT_FLAG_PREALLOC;
+       /*
+        * Enqueue the task to the underlying queue.
+        */
+       mutex_enter(&tq->tq_lock);
+
+       if (flags & TQ_FRONT) {
+               t->tqent_next = tq->tq_task.tqent_next;
+               t->tqent_prev = &tq->tq_task;
+       } else {
+               t->tqent_next = &tq->tq_task;
+               t->tqent_prev = tq->tq_task.tqent_prev;
+       }
+       t->tqent_next->tqent_prev = t;
+       t->tqent_prev->tqent_next = t;
+       t->tqent_func = func;
+       t->tqent_arg = arg;
+       cv_signal(&tq->tq_dispatch_cv);
+       mutex_exit(&tq->tq_lock);
+}
+
 void
 taskq_wait(taskq_t *tq)
 {
        mutex_enter(&tq->tq_lock);
-       while (tq->tq_task.task_next != &tq->tq_task || tq->tq_active != 0)
+       while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
                cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
        mutex_exit(&tq->tq_lock);
 }
 
-static void *
+static void
 taskq_thread(void *arg)
 {
        taskq_t *tq = arg;
-       task_t *t;
+       taskq_ent_t *t;
+       boolean_t prealloc;
 
        mutex_enter(&tq->tq_lock);
        while (tq->tq_flags & TASKQ_ACTIVE) {
-               if ((t = tq->tq_task.task_next) == &tq->tq_task) {
+               if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
                        if (--tq->tq_active == 0)
                                cv_broadcast(&tq->tq_wait_cv);
                        cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
                        tq->tq_active++;
                        continue;
                }
-               t->task_prev->task_next = t->task_next;
-               t->task_next->task_prev = t->task_prev;
+               t->tqent_prev->tqent_next = t->tqent_next;
+               t->tqent_next->tqent_prev = t->tqent_prev;
+               t->tqent_next = NULL;
+               t->tqent_prev = NULL;
+               prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
                mutex_exit(&tq->tq_lock);
 
                rw_enter(&tq->tq_threadlock, RW_READER);
-               t->task_func(t->task_arg);
+               t->tqent_func(t->tqent_arg);
                rw_exit(&tq->tq_threadlock);
 
                mutex_enter(&tq->tq_lock);
-               task_free(tq, t);
+               if (!prealloc)
+                       task_free(tq, t);
        }
        tq->tq_nthreads--;
        cv_broadcast(&tq->tq_wait_cv);
        mutex_exit(&tq->tq_lock);
-       return (NULL);
+       thread_exit();
 }
 
 /*ARGSUSED*/
@@ -174,18 +256,32 @@ taskq_create(const char *name, int nthreads, pri_t pri,
        taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
        int t;
 
+       if (flags & TASKQ_THREADS_CPU_PCT) {
+               int pct;
+               ASSERT3S(nthreads, >=, 0);
+               ASSERT3S(nthreads, <=, 100);
+               pct = MIN(nthreads, 100);
+               pct = MAX(pct, 0);
+
+               nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
+               nthreads = MAX(nthreads, 1);    /* need at least 1 thread */
+       } else {
+               ASSERT3S(nthreads, >=, 1);
+       }
+
        rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
        mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
        cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
        cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
+       cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
        tq->tq_flags = flags | TASKQ_ACTIVE;
        tq->tq_active = nthreads;
        tq->tq_nthreads = nthreads;
        tq->tq_minalloc = minalloc;
        tq->tq_maxalloc = maxalloc;
-       tq->tq_task.task_next = &tq->tq_task;
-       tq->tq_task.task_prev = &tq->tq_task;
-       tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);
+       tq->tq_task.tqent_next = &tq->tq_task;
+       tq->tq_task.tqent_prev = &tq->tq_task;
+       tq->tq_threadlist = kmem_alloc(nthreads*sizeof(kthread_t *), KM_SLEEP);
 
        if (flags & TASKQ_PREPOPULATE) {
                mutex_enter(&tq->tq_lock);
@@ -195,8 +291,8 @@ taskq_create(const char *name, int nthreads, pri_t pri,
        }
 
        for (t = 0; t < nthreads; t++)
-               (void) thr_create(0, 0, taskq_thread,
-                   tq, THR_BOUND, &tq->tq_threadlist[t]);
+               VERIFY((tq->tq_threadlist[t] = thread_create(NULL, 0,
+                   taskq_thread, tq, TS_RUN, NULL, 0, 0)) != NULL);
 
        return (tq);
 }
@@ -204,7 +300,6 @@ taskq_create(const char *name, int nthreads, pri_t pri,
 void
 taskq_destroy(taskq_t *tq)
 {
-       int t;
        int nthreads = tq->tq_nthreads;
 
        taskq_wait(tq);
@@ -225,21 +320,19 @@ taskq_destroy(taskq_t *tq)
 
        mutex_exit(&tq->tq_lock);
 
-       for (t = 0; t < nthreads; t++)
-               (void) thr_join(tq->tq_threadlist[t], NULL, NULL);
-
-       kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t));
+       kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *));
 
        rw_destroy(&tq->tq_threadlock);
        mutex_destroy(&tq->tq_lock);
        cv_destroy(&tq->tq_dispatch_cv);
        cv_destroy(&tq->tq_wait_cv);
+       cv_destroy(&tq->tq_maxalloc_cv);
 
        kmem_free(tq, sizeof (taskq_t));
 }
 
 int
-taskq_member(taskq_t *tq, void *t)
+taskq_member(taskq_t *tq, kthread_t *t)
 {
        int i;
 
@@ -247,15 +340,28 @@ taskq_member(taskq_t *tq, void *t)
                return (1);
 
        for (i = 0; i < tq->tq_nthreads; i++)
-               if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t)
+               if (tq->tq_threadlist[i] == t)
                        return (1);
 
        return (0);
 }
 
+int
+taskq_cancel_id(taskq_t *tq, taskqid_t id)
+{
+       return (ENOENT);
+}
+
 void
 system_taskq_init(void)
 {
        system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512,
            TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
 }
+
+void
+system_taskq_fini(void)
+{
+       taskq_destroy(system_taskq);
+       system_taskq = NULL; /* defensive */
+}