Add txgs-<pool> kstat file
[zfs.git] / module / zfs / txg.c
index 9b308ca..17494bc 100644 (file)
@@ -29,6 +29,7 @@
 #include <sys/dsl_pool.h>
 #include <sys/dsl_scan.h>
 #include <sys/callb.h>
+#include <sys/spa_impl.h>
 
 /*
  * Pool-wide transaction groups.
@@ -49,7 +50,7 @@ txg_init(dsl_pool_t *dp, uint64_t txg)
        int c;
        bzero(tx, sizeof (tx_state_t));
 
-       tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
+       tx->tx_cpu = vmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
 
        for (c = 0; c < max_ncpus; c++) {
                int i;
@@ -107,7 +108,7 @@ txg_fini(dsl_pool_t *dp)
        if (tx->tx_commit_cb_taskq != NULL)
                taskq_destroy(tx->tx_commit_cb_taskq);
 
-       kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
+       vmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
 
        bzero(tx, sizeof (tx_state_t));
 }
@@ -166,10 +167,10 @@ txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
        CALLB_CPR_SAFE_BEGIN(cpr);
 
        if (time)
-               (void) cv_timedwait(cv, &tx->tx_sync_lock,
+               (void) cv_timedwait_interruptible(cv, &tx->tx_sync_lock,
                    ddi_get_lbolt() + time);
        else
-               cv_wait(cv, &tx->tx_sync_lock);
+               cv_wait_interruptible(cv, &tx->tx_sync_lock);
 
        CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
 }
@@ -218,9 +219,19 @@ uint64_t
 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
 {
        tx_state_t *tx = &dp->dp_tx;
-       tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
+       tx_cpu_t *tc;
        uint64_t txg;
 
+       /*
+        * It appears the processor id is simply used as a "random"
+        * number to index into the array, and there isn't any other
+        * significance to the chosen tx_cpu. Because.. Why not use
+        * the current cpu to index into the array?
+        */
+       kpreempt_disable();
+       tc = &tx->tx_cpu[CPU_SEQID];
+       kpreempt_enable();
+
        mutex_enter(&tc->tc_lock);
 
        txg = tx->tx_open_txg;
@@ -269,6 +280,8 @@ txg_rele_to_sync(txg_handle_t *th)
 static void
 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
 {
+       hrtime_t start;
+       txg_history_t *th;
        tx_state_t *tx = &dp->dp_tx;
        int g = txg & TXG_MASK;
        int c;
@@ -283,6 +296,15 @@ txg_quiesce(dsl_pool_t *dp, uint64_t txg)
        tx->tx_open_txg++;
 
        /*
+        * Measure how long the txg was open and replace the kstat.
+        */
+       th = dsl_pool_txg_history_get(dp, txg);
+       th->th_kstat.open_time = gethrtime() - th->th_kstat.birth;
+       th->th_kstat.state = TXG_STATE_QUIESCING;
+       dsl_pool_txg_history_put(th);
+       dsl_pool_txg_history_add(dp, tx->tx_open_txg);
+
+       /*
         * Now that we've incremented tx_open_txg, we can let threads
         * enter the next transaction group.
         */
@@ -292,6 +314,8 @@ txg_quiesce(dsl_pool_t *dp, uint64_t txg)
        /*
         * Quiesce the transaction group by waiting for everyone to txg_exit().
         */
+       start = gethrtime();
+
        for (c = 0; c < max_ncpus; c++) {
                tx_cpu_t *tc = &tx->tx_cpu[c];
                mutex_enter(&tc->tc_lock);
@@ -299,6 +323,13 @@ txg_quiesce(dsl_pool_t *dp, uint64_t txg)
                        cv_wait(&tc->tc_cv[g], &tc->tc_lock);
                mutex_exit(&tc->tc_lock);
        }
+
+       /*
+        * Measure how long the txg took to quiesce.
+        */
+       th = dsl_pool_txg_history_get(dp, txg);
+       th->th_kstat.quiesce_time = gethrtime() - start;
+       dsl_pool_txg_history_put(th);
 }
 
 static void
@@ -335,21 +366,35 @@ txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
                         * Commit callback taskq hasn't been created yet.
                         */
                        tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
-                           max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
-                           TASKQ_PREPOPULATE);
+                           100, minclsyspri, max_ncpus, INT_MAX,
+                           TASKQ_THREADS_CPU_PCT | TASKQ_PREPOPULATE);
                }
 
-               cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
+               cb_list = kmem_alloc(sizeof (list_t), KM_PUSHPAGE);
                list_create(cb_list, sizeof (dmu_tx_callback_t),
                    offsetof(dmu_tx_callback_t, dcb_node));
 
-               list_move_tail(&tc->tc_callbacks[g], cb_list);
+               list_move_tail(cb_list, &tc->tc_callbacks[g]);
 
                (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
                    txg_do_callbacks, cb_list, TQ_SLEEP);
        }
 }
 
+/*
+ * Wait for pending commit callbacks of already-synced transactions to finish
+ * processing.
+ * Calling this function from within a commit callback will deadlock.
+ */
+void
+txg_wait_callbacks(dsl_pool_t *dp)
+{
+       tx_state_t *tx = &dp->dp_tx;
+
+       if (tx->tx_commit_cb_taskq != NULL)
+               taskq_wait(tx->tx_commit_cb_taskq);
+}
+
 static void
 txg_sync_thread(dsl_pool_t *dp)
 {
@@ -358,13 +403,26 @@ txg_sync_thread(dsl_pool_t *dp)
        callb_cpr_t cpr;
        uint64_t start, delta;
 
+#ifdef _KERNEL
+       /*
+        * Annotate this process with a flag that indicates that it is
+        * unsafe to use KM_SLEEP during memory allocations due to the
+        * potential for a deadlock.  KM_PUSHPAGE should be used instead.
+        */
+       current->flags |= PF_NOFS;
+#endif /* _KERNEL */
+
        txg_thread_enter(tx, &cpr);
 
        start = delta = 0;
        for (;;) {
-               uint64_t timer, timeout = zfs_txg_timeout * hz;
+               hrtime_t hrstart;
+               txg_history_t *th;
+               uint64_t timer, timeout;
                uint64_t txg;
 
+               timeout = zfs_txg_timeout * hz;
+
                /*
                 * We sync when we're scanning, there's someone waiting
                 * on us, or the quiesce thread has handed off a txg to
@@ -406,11 +464,17 @@ txg_sync_thread(dsl_pool_t *dp)
                tx->tx_syncing_txg = txg;
                cv_broadcast(&tx->tx_quiesce_more_cv);
 
+               th = dsl_pool_txg_history_get(dp, txg);
+               th->th_kstat.state = TXG_STATE_SYNCING;
+               vdev_get_stats(spa->spa_root_vdev, &th->th_vs1);
+               dsl_pool_txg_history_put(th);
+
                dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
                    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
                mutex_exit(&tx->tx_sync_lock);
 
                start = ddi_get_lbolt();
+               hrstart = gethrtime();
                spa_sync(spa, txg);
                delta = ddi_get_lbolt() - start;
 
@@ -423,6 +487,23 @@ txg_sync_thread(dsl_pool_t *dp)
                 * Dispatch commit callbacks to worker threads.
                 */
                txg_dispatch_callbacks(dp, txg);
+
+               /*
+                * Measure the txg sync time determine the amount of I/O done.
+                */
+               th = dsl_pool_txg_history_get(dp, txg);
+               vdev_get_stats(spa->spa_root_vdev, &th->th_vs2);
+               th->th_kstat.sync_time = gethrtime() - hrstart;
+               th->th_kstat.nread = th->th_vs2.vs_bytes[ZIO_TYPE_READ] -
+                   th->th_vs1.vs_bytes[ZIO_TYPE_READ];
+               th->th_kstat.nwritten = th->th_vs2.vs_bytes[ZIO_TYPE_WRITE] -
+                   th->th_vs1.vs_bytes[ZIO_TYPE_WRITE];
+               th->th_kstat.reads = th->th_vs2.vs_ops[ZIO_TYPE_READ] -
+                   th->th_vs1.vs_ops[ZIO_TYPE_READ];
+               th->th_kstat.writes = th->th_vs2.vs_ops[ZIO_TYPE_WRITE] -
+                   th->th_vs1.vs_ops[ZIO_TYPE_WRITE];
+               th->th_kstat.state = TXG_STATE_COMMITTED;
+               dsl_pool_txg_history_put(th);
        }
 }
 
@@ -479,7 +560,7 @@ void
 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
 {
        tx_state_t *tx = &dp->dp_tx;
-       int timeout = ddi_get_lbolt() + ticks;
+       clock_t timeout = ddi_get_lbolt() + ticks;
 
        /* don't delay if this txg could transition to quiesing immediately */
        if (tx->tx_open_txg > txg ||
@@ -497,6 +578,8 @@ txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
                (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
                    timeout);
 
+       DMU_TX_STAT_BUMP(dmu_tx_delay);
+
        mutex_exit(&tx->tx_sync_lock);
 }
 
@@ -722,3 +805,23 @@ txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
 
        return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 }
+
+#if defined(_KERNEL) && defined(HAVE_SPL)
+EXPORT_SYMBOL(txg_init);
+EXPORT_SYMBOL(txg_fini);
+EXPORT_SYMBOL(txg_sync_start);
+EXPORT_SYMBOL(txg_sync_stop);
+EXPORT_SYMBOL(txg_hold_open);
+EXPORT_SYMBOL(txg_rele_to_quiesce);
+EXPORT_SYMBOL(txg_rele_to_sync);
+EXPORT_SYMBOL(txg_register_callbacks);
+EXPORT_SYMBOL(txg_delay);
+EXPORT_SYMBOL(txg_wait_synced);
+EXPORT_SYMBOL(txg_wait_open);
+EXPORT_SYMBOL(txg_wait_callbacks);
+EXPORT_SYMBOL(txg_stalled);
+EXPORT_SYMBOL(txg_sync_waiting);
+
+module_param(zfs_txg_timeout, int, 0644);
+MODULE_PARM_DESC(zfs_txg_timeout, "Max seconds worth of delta per txg");
+#endif