X-Git-Url: https://git.camperquake.de/gitweb.cgi?a=blobdiff_plain;f=module%2Fzfs%2Ftxg.c;h=6e64adf9376e5c97a8cfb823caf796c0c9cb6b8f;hb=570827e129ed81e066e894530bbe24642f473154;hp=e3c0e2a134239eb05cd9f4c98c0574f49dfcc670;hpb=fb5f0bc83330c8a0236c4d34a23723ac1974971a;p=zfs.git diff --git a/module/zfs/txg.c b/module/zfs/txg.c index e3c0e2a..6e64adf 100644 --- a/module/zfs/txg.c +++ b/module/zfs/txg.c @@ -19,14 +19,15 @@ * CDDL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved. - * Use is subject to license terms. + * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. */ #include #include #include +#include #include +#include #include /* @@ -36,7 +37,7 @@ static void txg_sync_thread(dsl_pool_t *dp); static void txg_quiesce_thread(dsl_pool_t *dp); -int zfs_txg_timeout = 30; /* max seconds worth of delta per txg */ +int zfs_txg_timeout = 5; /* max seconds worth of delta per txg */ /* * Prepare the txg subsystem. @@ -48,7 +49,7 @@ txg_init(dsl_pool_t *dp, uint64_t txg) int c; bzero(tx, sizeof (tx_state_t)); - tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); + tx->tx_cpu = vmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); for (c = 0; c < max_ncpus; c++) { int i; @@ -57,10 +58,12 @@ txg_init(dsl_pool_t *dp, uint64_t txg) for (i = 0; i < TXG_SIZE; i++) { cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, NULL); + list_create(&tx->tx_cpu[c].tc_callbacks[i], + sizeof (dmu_tx_callback_t), + offsetof(dmu_tx_callback_t, dcb_node)); } } - rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); @@ -83,7 +86,6 @@ txg_fini(dsl_pool_t *dp) ASSERT(tx->tx_threads == 0); - rw_destroy(&tx->tx_suspend); mutex_destroy(&tx->tx_sync_lock); cv_destroy(&tx->tx_sync_more_cv); @@ -96,11 +98,16 @@ txg_fini(dsl_pool_t *dp) int i; mutex_destroy(&tx->tx_cpu[c].tc_lock); - for (i = 0; i < TXG_SIZE; i++) + for (i = 0; i < TXG_SIZE; i++) { cv_destroy(&tx->tx_cpu[c].tc_cv[i]); + list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); + } } - kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); + if (tx->tx_commit_cb_taskq != NULL) + taskq_destroy(tx->tx_commit_cb_taskq); + + vmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); bzero(tx, sizeof (tx_state_t)); } @@ -129,7 +136,7 @@ txg_sync_start(dsl_pool_t *dp) * 32-bit x86. This is due in part to nested pools and * scrub_visitbp() recursion. */ - tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread, + tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread, dp, 0, &p0, TS_RUN, minclsyspri); mutex_exit(&tx->tx_sync_lock); @@ -159,9 +166,10 @@ txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) CALLB_CPR_SAFE_BEGIN(cpr); if (time) - (void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time); + (void) cv_timedwait_interruptible(cv, &tx->tx_sync_lock, + ddi_get_lbolt() + time); else - cv_wait(cv, &tx->tx_sync_lock); + cv_wait_interruptible(cv, &tx->tx_sync_lock); CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); } @@ -179,7 +187,11 @@ txg_sync_stop(dsl_pool_t *dp) * Finish off any work in progress. */ ASSERT(tx->tx_threads == 2); - txg_wait_synced(dp, 0); + + /* + * We need to ensure that we've vacated the deferred space_maps. + */ + txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); /* * Wake all sync threads and wait for them to die. @@ -229,6 +241,17 @@ txg_rele_to_quiesce(txg_handle_t *th) } void +txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) +{ + tx_cpu_t *tc = th->th_cpu; + int g = th->th_txg & TXG_MASK; + + mutex_enter(&tc->tc_lock); + list_move_tail(&tc->tc_callbacks[g], tx_callbacks); + mutex_exit(&tc->tc_lock); +} + +void txg_rele_to_sync(txg_handle_t *th) { tx_cpu_t *tc = th->th_cpu; @@ -279,12 +302,89 @@ txg_quiesce(dsl_pool_t *dp, uint64_t txg) } static void +txg_do_callbacks(list_t *cb_list) +{ + dmu_tx_do_callbacks(cb_list, 0); + + list_destroy(cb_list); + + kmem_free(cb_list, sizeof (list_t)); +} + +/* + * Dispatch the commit callbacks registered on this txg to worker threads. + */ +static void +txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) +{ + int c; + tx_state_t *tx = &dp->dp_tx; + list_t *cb_list; + + for (c = 0; c < max_ncpus; c++) { + tx_cpu_t *tc = &tx->tx_cpu[c]; + /* No need to lock tx_cpu_t at this point */ + + int g = txg & TXG_MASK; + + if (list_is_empty(&tc->tc_callbacks[g])) + continue; + + if (tx->tx_commit_cb_taskq == NULL) { + /* + * Commit callback taskq hasn't been created yet. + */ + tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", + 100, minclsyspri, max_ncpus, INT_MAX, + TASKQ_THREADS_CPU_PCT | TASKQ_PREPOPULATE); + } + + cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); + list_create(cb_list, sizeof (dmu_tx_callback_t), + offsetof(dmu_tx_callback_t, dcb_node)); + + list_move_tail(cb_list, &tc->tc_callbacks[g]); + + (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) + txg_do_callbacks, cb_list, TQ_SLEEP); + } +} + +/* + * Wait for pending commit callbacks of already-synced transactions to finish + * processing. + * Calling this function from within a commit callback will deadlock. + */ +void +txg_wait_callbacks(dsl_pool_t *dp) +{ + tx_state_t *tx = &dp->dp_tx; + + if (tx->tx_commit_cb_taskq != NULL) + taskq_wait(tx->tx_commit_cb_taskq); +} + +static void txg_sync_thread(dsl_pool_t *dp) { + spa_t *spa = dp->dp_spa; tx_state_t *tx = &dp->dp_tx; callb_cpr_t cpr; uint64_t start, delta; +#ifdef _KERNEL + /* + * Disable the normal reclaim path for the txg_sync thread. This + * ensures the thread will never enter dmu_tx_assign() which can + * otherwise occur due to direct reclaim. If this is allowed to + * happen the system can deadlock. Direct reclaim call path: + * + * ->shrink_icache_memory->prune_icache->dispose_list-> + * clear_inode->zpl_clear_inode->zfs_inactive->dmu_tx_assign + */ + current->flags |= PF_MEMALLOC; +#endif /* _KERNEL */ + txg_thread_enter(tx, &cpr); start = delta = 0; @@ -293,20 +393,19 @@ txg_sync_thread(dsl_pool_t *dp) uint64_t txg; /* - * We sync when we're scrubbing, there's someone waiting + * We sync when we're scanning, there's someone waiting * on us, or the quiesce thread has handed off a txg to * us, or we have reached our timeout. */ timer = (delta >= timeout ? 0 : timeout - delta); - while ((dp->dp_scrub_func == SCRUB_FUNC_NONE || - spa_shutting_down(dp->dp_spa)) && + while (!dsl_scan_active(dp->dp_scan) && !tx->tx_exiting && timer > 0 && tx->tx_synced_txg >= tx->tx_sync_txg_waiting && tx->tx_quiesced_txg == 0) { dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); - delta = lbolt - start; + delta = ddi_get_lbolt() - start; timer = (delta > timeout ? 0 : timeout - delta); } @@ -324,8 +423,6 @@ txg_sync_thread(dsl_pool_t *dp) if (tx->tx_exiting) txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); - rw_enter(&tx->tx_suspend, RW_WRITER); - /* * Consume the quiesced txg which has been handed off to * us. This may cause the quiescing thread to now be @@ -335,22 +432,24 @@ txg_sync_thread(dsl_pool_t *dp) tx->tx_quiesced_txg = 0; tx->tx_syncing_txg = txg; cv_broadcast(&tx->tx_quiesce_more_cv); - rw_exit(&tx->tx_suspend); dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); mutex_exit(&tx->tx_sync_lock); - start = lbolt; - spa_sync(dp->dp_spa, txg); - delta = lbolt - start; + start = ddi_get_lbolt(); + spa_sync(spa, txg); + delta = ddi_get_lbolt() - start; mutex_enter(&tx->tx_sync_lock); - rw_enter(&tx->tx_suspend, RW_WRITER); tx->tx_synced_txg = txg; tx->tx_syncing_txg = 0; - rw_exit(&tx->tx_suspend); cv_broadcast(&tx->tx_sync_done_cv); + + /* + * Dispatch commit callbacks to worker threads. + */ + txg_dispatch_callbacks(dp, txg); } } @@ -407,7 +506,7 @@ void txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) { tx_state_t *tx = &dp->dp_tx; - int timeout = lbolt + ticks; + clock_t timeout = ddi_get_lbolt() + ticks; /* don't delay if this txg could transition to quiesing immediately */ if (tx->tx_open_txg > txg || @@ -420,11 +519,13 @@ txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) return; } - while (lbolt < timeout && + while (ddi_get_lbolt() < timeout && tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, timeout); + DMU_TX_STAT_BUMP(dmu_tx_delay); + mutex_exit(&tx->tx_sync_lock); } @@ -436,7 +537,7 @@ txg_wait_synced(dsl_pool_t *dp, uint64_t txg) mutex_enter(&tx->tx_sync_lock); ASSERT(tx->tx_threads == 2); if (txg == 0) - txg = tx->tx_open_txg; + txg = tx->tx_open_txg + TXG_DEFER_SIZE; if (tx->tx_sync_txg_waiting < txg) tx->tx_sync_txg_waiting = txg; dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", @@ -487,21 +588,6 @@ txg_sync_waiting(dsl_pool_t *dp) tx->tx_quiesced_txg != 0); } -void -txg_suspend(dsl_pool_t *dp) -{ - tx_state_t *tx = &dp->dp_tx; - /* XXX some code paths suspend when they are already suspended! */ - rw_enter(&tx->tx_suspend, RW_READER); -} - -void -txg_resume(dsl_pool_t *dp) -{ - tx_state_t *tx = &dp->dp_tx; - rw_exit(&tx->tx_suspend); -} - /* * Per-txg object lists. */ @@ -559,6 +645,34 @@ txg_list_add(txg_list_t *tl, void *p, uint64_t txg) } /* + * Add an entry to the end of the list (walks list to find end). + * Returns 0 if it's a new entry, 1 if it's already there. + */ +int +txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg) +{ + int t = txg & TXG_MASK; + txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); + int already_on_list; + + mutex_enter(&tl->tl_lock); + already_on_list = tn->tn_member[t]; + if (!already_on_list) { + txg_node_t **tp; + + for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t]) + continue; + + tn->tn_member[t] = 1; + tn->tn_next[t] = NULL; + *tp = tn; + } + mutex_exit(&tl->tl_lock); + + return (already_on_list); +} + +/* * Remove the head of the list and return it. */ void * @@ -637,3 +751,20 @@ txg_list_next(txg_list_t *tl, void *p, uint64_t txg) return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); } + +#if defined(_KERNEL) && defined(HAVE_SPL) +EXPORT_SYMBOL(txg_init); +EXPORT_SYMBOL(txg_fini); +EXPORT_SYMBOL(txg_sync_start); +EXPORT_SYMBOL(txg_sync_stop); +EXPORT_SYMBOL(txg_hold_open); +EXPORT_SYMBOL(txg_rele_to_quiesce); +EXPORT_SYMBOL(txg_rele_to_sync); +EXPORT_SYMBOL(txg_register_callbacks); +EXPORT_SYMBOL(txg_delay); +EXPORT_SYMBOL(txg_wait_synced); +EXPORT_SYMBOL(txg_wait_open); +EXPORT_SYMBOL(txg_wait_callbacks); +EXPORT_SYMBOL(txg_stalled); +EXPORT_SYMBOL(txg_sync_waiting); +#endif