c7c3df3f8f909d83b264026759d59cc30dd4c053
[zfs.git] / module / zfs / txg.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Portions Copyright 2011 Martin Matuska
24  * Copyright (c) 2013 by Delphix. All rights reserved.
25  */
26
27 #include <sys/zfs_context.h>
28 #include <sys/txg_impl.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dsl_pool.h>
32 #include <sys/dsl_scan.h>
33 #include <sys/callb.h>
34 #include <sys/spa_impl.h>
35
36 /*
37  * ZFS Transaction Groups
38  * ----------------------
39  *
40  * ZFS transaction groups are, as the name implies, groups of transactions
41  * that act on persistent state. ZFS asserts consistency at the granularity of
42  * these transaction groups. Each successive transaction group (txg) is
43  * assigned a 64-bit consecutive identifier. There are three active
44  * transaction group states: open, quiescing, or syncing. At any given time,
45  * there may be an active txg associated with each state; each active txg may
46  * either be processing, or blocked waiting to enter the next state. There may
47  * be up to three active txgs, and there is always a txg in the open state
48  * (though it may be blocked waiting to enter the quiescing state). In broad
49  * strokes, transactions — operations that change in-memory structures — are
50  * accepted into the txg in the open state, and are completed while the txg is
51  * in the open or quiescing states. The accumulated changes are written to
52  * disk in the syncing state.
53  *
54  * Open
55  *
56  * When a new txg becomes active, it first enters the open state. New
57  * transactions — updates to in-memory structures — are assigned to the
58  * currently open txg. There is always a txg in the open state so that ZFS can
59  * accept new changes (though the txg may refuse new changes if it has hit
60  * some limit). ZFS advances the open txg to the next state for a variety of
61  * reasons such as it hitting a time or size threshold, or the execution of an
62  * administrative action that must be completed in the syncing state.
63  *
64  * Quiescing
65  *
66  * After a txg exits the open state, it enters the quiescing state. The
67  * quiescing state is intended to provide a buffer between accepting new
68  * transactions in the open state and writing them out to stable storage in
69  * the syncing state. While quiescing, transactions can continue their
70  * operation without delaying either of the other states. Typically, a txg is
71  * in the quiescing state very briefly since the operations are bounded by
72  * software latencies rather than, say, slower I/O latencies. After all
73  * transactions complete, the txg is ready to enter the next state.
74  *
75  * Syncing
76  *
77  * In the syncing state, the in-memory state built up during the open and (to
78  * a lesser degree) the quiescing states is written to stable storage. The
79  * process of writing out modified data can, in turn modify more data. For
80  * example when we write new blocks, we need to allocate space for them; those
81  * allocations modify metadata (space maps)... which themselves must be
82  * written to stable storage. During the sync state, ZFS iterates, writing out
83  * data until it converges and all in-memory changes have been written out.
84  * The first such pass is the largest as it encompasses all the modified user
85  * data (as opposed to filesystem metadata). Subsequent passes typically have
86  * far less data to write as they consist exclusively of filesystem metadata.
87  *
88  * To ensure convergence, after a certain number of passes ZFS begins
89  * overwriting locations on stable storage that had been allocated earlier in
90  * the syncing state (and subsequently freed). ZFS usually allocates new
91  * blocks to optimize for large, continuous, writes. For the syncing state to
92  * converge however it must complete a pass where no new blocks are allocated
93  * since each allocation requires a modification of persistent metadata.
94  * Further, to hasten convergence, after a prescribed number of passes, ZFS
95  * also defers frees, and stops compressing.
96  *
97  * In addition to writing out user data, we must also execute synctasks during
98  * the syncing context. A synctask is the mechanism by which some
99  * administrative activities work such as creating and destroying snapshots or
100  * datasets. Note that when a synctask is initiated it enters the open txg,
101  * and ZFS then pushes that txg as quickly as possible to completion of the
102  * syncing state in order to reduce the latency of the administrative
103  * activity. To complete the syncing state, ZFS writes out a new uberblock,
104  * the root of the tree of blocks that comprise all state stored on the ZFS
105  * pool. Finally, if there is a quiesced txg waiting, we signal that it can
106  * now transition to the syncing state.
107  */
108
109 static void txg_sync_thread(dsl_pool_t *dp);
110 static void txg_quiesce_thread(dsl_pool_t *dp);
111
112 int zfs_txg_timeout = 5;        /* max seconds worth of delta per txg */
113
114 /*
115  * Prepare the txg subsystem.
116  */
117 void
118 txg_init(dsl_pool_t *dp, uint64_t txg)
119 {
120         tx_state_t *tx = &dp->dp_tx;
121         int c;
122         bzero(tx, sizeof (tx_state_t));
123
124         tx->tx_cpu = vmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
125
126         for (c = 0; c < max_ncpus; c++) {
127                 int i;
128
129                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
130                 for (i = 0; i < TXG_SIZE; i++) {
131                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
132                             NULL);
133                         list_create(&tx->tx_cpu[c].tc_callbacks[i],
134                             sizeof (dmu_tx_callback_t),
135                             offsetof(dmu_tx_callback_t, dcb_node));
136                 }
137         }
138
139         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
140
141         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
142         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
143         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
144         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
145         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
146
147         tx->tx_open_txg = txg;
148 }
149
150 /*
151  * Close down the txg subsystem.
152  */
153 void
154 txg_fini(dsl_pool_t *dp)
155 {
156         tx_state_t *tx = &dp->dp_tx;
157         int c;
158
159         ASSERT(tx->tx_threads == 0);
160
161         mutex_destroy(&tx->tx_sync_lock);
162
163         cv_destroy(&tx->tx_sync_more_cv);
164         cv_destroy(&tx->tx_sync_done_cv);
165         cv_destroy(&tx->tx_quiesce_more_cv);
166         cv_destroy(&tx->tx_quiesce_done_cv);
167         cv_destroy(&tx->tx_exit_cv);
168
169         for (c = 0; c < max_ncpus; c++) {
170                 int i;
171
172                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
173                 for (i = 0; i < TXG_SIZE; i++) {
174                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
175                         list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
176                 }
177         }
178
179         if (tx->tx_commit_cb_taskq != NULL)
180                 taskq_destroy(tx->tx_commit_cb_taskq);
181
182         vmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
183
184         bzero(tx, sizeof (tx_state_t));
185 }
186
187 /*
188  * Start syncing transaction groups.
189  */
190 void
191 txg_sync_start(dsl_pool_t *dp)
192 {
193         tx_state_t *tx = &dp->dp_tx;
194
195         mutex_enter(&tx->tx_sync_lock);
196
197         dprintf("pool %p\n", dp);
198
199         ASSERT(tx->tx_threads == 0);
200
201         tx->tx_threads = 2;
202
203         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
204             dp, 0, &p0, TS_RUN, minclsyspri);
205
206         /*
207          * The sync thread can need a larger-than-default stack size on
208          * 32-bit x86.  This is due in part to nested pools and
209          * scrub_visitbp() recursion.
210          */
211         tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
212             dp, 0, &p0, TS_RUN, minclsyspri);
213
214         mutex_exit(&tx->tx_sync_lock);
215 }
216
217 static void
218 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
219 {
220         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
221         mutex_enter(&tx->tx_sync_lock);
222 }
223
224 static void
225 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
226 {
227         ASSERT(*tpp != NULL);
228         *tpp = NULL;
229         tx->tx_threads--;
230         cv_broadcast(&tx->tx_exit_cv);
231         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
232         thread_exit();
233 }
234
235 static void
236 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
237 {
238         CALLB_CPR_SAFE_BEGIN(cpr);
239
240         if (time)
241                 (void) cv_timedwait_interruptible(cv, &tx->tx_sync_lock,
242                     ddi_get_lbolt() + time);
243         else
244                 cv_wait_interruptible(cv, &tx->tx_sync_lock);
245
246         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
247 }
248
249 /*
250  * Stop syncing transaction groups.
251  */
252 void
253 txg_sync_stop(dsl_pool_t *dp)
254 {
255         tx_state_t *tx = &dp->dp_tx;
256
257         dprintf("pool %p\n", dp);
258         /*
259          * Finish off any work in progress.
260          */
261         ASSERT(tx->tx_threads == 2);
262
263         /*
264          * We need to ensure that we've vacated the deferred space_maps.
265          */
266         txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
267
268         /*
269          * Wake all sync threads and wait for them to die.
270          */
271         mutex_enter(&tx->tx_sync_lock);
272
273         ASSERT(tx->tx_threads == 2);
274
275         tx->tx_exiting = 1;
276
277         cv_broadcast(&tx->tx_quiesce_more_cv);
278         cv_broadcast(&tx->tx_quiesce_done_cv);
279         cv_broadcast(&tx->tx_sync_more_cv);
280
281         while (tx->tx_threads != 0)
282                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
283
284         tx->tx_exiting = 0;
285
286         mutex_exit(&tx->tx_sync_lock);
287 }
288
289 uint64_t
290 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
291 {
292         tx_state_t *tx = &dp->dp_tx;
293         tx_cpu_t *tc;
294         uint64_t txg;
295
296         /*
297          * It appears the processor id is simply used as a "random"
298          * number to index into the array, and there isn't any other
299          * significance to the chosen tx_cpu. Because.. Why not use
300          * the current cpu to index into the array?
301          */
302         kpreempt_disable();
303         tc = &tx->tx_cpu[CPU_SEQID];
304         kpreempt_enable();
305
306         mutex_enter(&tc->tc_lock);
307
308         txg = tx->tx_open_txg;
309         tc->tc_count[txg & TXG_MASK]++;
310
311         th->th_cpu = tc;
312         th->th_txg = txg;
313
314         return (txg);
315 }
316
317 void
318 txg_rele_to_quiesce(txg_handle_t *th)
319 {
320         tx_cpu_t *tc = th->th_cpu;
321
322         mutex_exit(&tc->tc_lock);
323 }
324
325 void
326 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
327 {
328         tx_cpu_t *tc = th->th_cpu;
329         int g = th->th_txg & TXG_MASK;
330
331         mutex_enter(&tc->tc_lock);
332         list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
333         mutex_exit(&tc->tc_lock);
334 }
335
336 void
337 txg_rele_to_sync(txg_handle_t *th)
338 {
339         tx_cpu_t *tc = th->th_cpu;
340         int g = th->th_txg & TXG_MASK;
341
342         mutex_enter(&tc->tc_lock);
343         ASSERT(tc->tc_count[g] != 0);
344         if (--tc->tc_count[g] == 0)
345                 cv_broadcast(&tc->tc_cv[g]);
346         mutex_exit(&tc->tc_lock);
347
348         th->th_cpu = NULL;      /* defensive */
349 }
350
351 static void
352 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
353 {
354         hrtime_t start;
355         txg_history_t *th;
356         tx_state_t *tx = &dp->dp_tx;
357         int g = txg & TXG_MASK;
358         int c;
359
360         /*
361          * Grab all tx_cpu locks so nobody else can get into this txg.
362          */
363         for (c = 0; c < max_ncpus; c++)
364                 mutex_enter(&tx->tx_cpu[c].tc_lock);
365
366         ASSERT(txg == tx->tx_open_txg);
367         tx->tx_open_txg++;
368
369         /*
370          * Measure how long the txg was open and replace the kstat.
371          */
372         th = dsl_pool_txg_history_get(dp, txg);
373         th->th_kstat.open_time = gethrtime() - th->th_kstat.birth;
374         th->th_kstat.state = TXG_STATE_QUIESCING;
375         dsl_pool_txg_history_put(th);
376         dsl_pool_txg_history_add(dp, tx->tx_open_txg);
377
378         /*
379          * Now that we've incremented tx_open_txg, we can let threads
380          * enter the next transaction group.
381          */
382         for (c = 0; c < max_ncpus; c++)
383                 mutex_exit(&tx->tx_cpu[c].tc_lock);
384
385         /*
386          * Quiesce the transaction group by waiting for everyone to txg_exit().
387          */
388         start = gethrtime();
389
390         for (c = 0; c < max_ncpus; c++) {
391                 tx_cpu_t *tc = &tx->tx_cpu[c];
392                 mutex_enter(&tc->tc_lock);
393                 while (tc->tc_count[g] != 0)
394                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
395                 mutex_exit(&tc->tc_lock);
396         }
397
398         /*
399          * Measure how long the txg took to quiesce.
400          */
401         th = dsl_pool_txg_history_get(dp, txg);
402         th->th_kstat.quiesce_time = gethrtime() - start;
403         dsl_pool_txg_history_put(th);
404 }
405
406 static void
407 txg_do_callbacks(list_t *cb_list)
408 {
409         dmu_tx_do_callbacks(cb_list, 0);
410
411         list_destroy(cb_list);
412
413         kmem_free(cb_list, sizeof (list_t));
414 }
415
416 /*
417  * Dispatch the commit callbacks registered on this txg to worker threads.
418  */
419 static void
420 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
421 {
422         int c;
423         tx_state_t *tx = &dp->dp_tx;
424         list_t *cb_list;
425
426         for (c = 0; c < max_ncpus; c++) {
427                 tx_cpu_t *tc = &tx->tx_cpu[c];
428                 /* No need to lock tx_cpu_t at this point */
429
430                 int g = txg & TXG_MASK;
431
432                 if (list_is_empty(&tc->tc_callbacks[g]))
433                         continue;
434
435                 if (tx->tx_commit_cb_taskq == NULL) {
436                         /*
437                          * Commit callback taskq hasn't been created yet.
438                          */
439                         tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
440                             100, minclsyspri, max_ncpus, INT_MAX,
441                             TASKQ_THREADS_CPU_PCT | TASKQ_PREPOPULATE);
442                 }
443
444                 cb_list = kmem_alloc(sizeof (list_t), KM_PUSHPAGE);
445                 list_create(cb_list, sizeof (dmu_tx_callback_t),
446                     offsetof(dmu_tx_callback_t, dcb_node));
447
448                 list_move_tail(cb_list, &tc->tc_callbacks[g]);
449
450                 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
451                     txg_do_callbacks, cb_list, TQ_SLEEP);
452         }
453 }
454
455 /*
456  * Wait for pending commit callbacks of already-synced transactions to finish
457  * processing.
458  * Calling this function from within a commit callback will deadlock.
459  */
460 void
461 txg_wait_callbacks(dsl_pool_t *dp)
462 {
463         tx_state_t *tx = &dp->dp_tx;
464
465         if (tx->tx_commit_cb_taskq != NULL)
466                 taskq_wait(tx->tx_commit_cb_taskq);
467 }
468
469 static void
470 txg_sync_thread(dsl_pool_t *dp)
471 {
472         spa_t *spa = dp->dp_spa;
473         tx_state_t *tx = &dp->dp_tx;
474         callb_cpr_t cpr;
475         uint64_t start, delta;
476
477 #ifdef _KERNEL
478         /*
479          * Annotate this process with a flag that indicates that it is
480          * unsafe to use KM_SLEEP during memory allocations due to the
481          * potential for a deadlock.  KM_PUSHPAGE should be used instead.
482          */
483         current->flags |= PF_NOFS;
484 #endif /* _KERNEL */
485
486         txg_thread_enter(tx, &cpr);
487
488         start = delta = 0;
489         for (;;) {
490                 hrtime_t hrstart;
491                 txg_history_t *th;
492                 uint64_t timer, timeout;
493                 uint64_t txg;
494
495                 timeout = zfs_txg_timeout * hz;
496
497                 /*
498                  * We sync when we're scanning, there's someone waiting
499                  * on us, or the quiesce thread has handed off a txg to
500                  * us, or we have reached our timeout.
501                  */
502                 timer = (delta >= timeout ? 0 : timeout - delta);
503                 while (!dsl_scan_active(dp->dp_scan) &&
504                     !tx->tx_exiting && timer > 0 &&
505                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
506                     tx->tx_quiesced_txg == 0) {
507                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
508                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
509                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
510                         delta = ddi_get_lbolt() - start;
511                         timer = (delta > timeout ? 0 : timeout - delta);
512                 }
513
514                 /*
515                  * Wait until the quiesce thread hands off a txg to us,
516                  * prompting it to do so if necessary.
517                  */
518                 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
519                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
520                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
521                         cv_broadcast(&tx->tx_quiesce_more_cv);
522                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
523                 }
524
525                 if (tx->tx_exiting)
526                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
527
528                 /*
529                  * Consume the quiesced txg which has been handed off to
530                  * us.  This may cause the quiescing thread to now be
531                  * able to quiesce another txg, so we must signal it.
532                  */
533                 txg = tx->tx_quiesced_txg;
534                 tx->tx_quiesced_txg = 0;
535                 tx->tx_syncing_txg = txg;
536                 cv_broadcast(&tx->tx_quiesce_more_cv);
537
538                 th = dsl_pool_txg_history_get(dp, txg);
539                 th->th_kstat.state = TXG_STATE_SYNCING;
540                 vdev_get_stats(spa->spa_root_vdev, &th->th_vs1);
541                 dsl_pool_txg_history_put(th);
542
543                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
544                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
545                 mutex_exit(&tx->tx_sync_lock);
546
547                 start = ddi_get_lbolt();
548                 hrstart = gethrtime();
549                 spa_sync(spa, txg);
550                 delta = ddi_get_lbolt() - start;
551
552                 mutex_enter(&tx->tx_sync_lock);
553                 tx->tx_synced_txg = txg;
554                 tx->tx_syncing_txg = 0;
555                 cv_broadcast(&tx->tx_sync_done_cv);
556
557                 /*
558                  * Dispatch commit callbacks to worker threads.
559                  */
560                 txg_dispatch_callbacks(dp, txg);
561
562                 /*
563                  * Measure the txg sync time determine the amount of I/O done.
564                  */
565                 th = dsl_pool_txg_history_get(dp, txg);
566                 vdev_get_stats(spa->spa_root_vdev, &th->th_vs2);
567                 th->th_kstat.sync_time = gethrtime() - hrstart;
568                 th->th_kstat.nread = th->th_vs2.vs_bytes[ZIO_TYPE_READ] -
569                     th->th_vs1.vs_bytes[ZIO_TYPE_READ];
570                 th->th_kstat.nwritten = th->th_vs2.vs_bytes[ZIO_TYPE_WRITE] -
571                     th->th_vs1.vs_bytes[ZIO_TYPE_WRITE];
572                 th->th_kstat.reads = th->th_vs2.vs_ops[ZIO_TYPE_READ] -
573                     th->th_vs1.vs_ops[ZIO_TYPE_READ];
574                 th->th_kstat.writes = th->th_vs2.vs_ops[ZIO_TYPE_WRITE] -
575                     th->th_vs1.vs_ops[ZIO_TYPE_WRITE];
576                 th->th_kstat.state = TXG_STATE_COMMITTED;
577                 dsl_pool_txg_history_put(th);
578         }
579 }
580
581 static void
582 txg_quiesce_thread(dsl_pool_t *dp)
583 {
584         tx_state_t *tx = &dp->dp_tx;
585         callb_cpr_t cpr;
586
587         txg_thread_enter(tx, &cpr);
588
589         for (;;) {
590                 uint64_t txg;
591
592                 /*
593                  * We quiesce when there's someone waiting on us.
594                  * However, we can only have one txg in "quiescing" or
595                  * "quiesced, waiting to sync" state.  So we wait until
596                  * the "quiesced, waiting to sync" txg has been consumed
597                  * by the sync thread.
598                  */
599                 while (!tx->tx_exiting &&
600                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
601                     tx->tx_quiesced_txg != 0))
602                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
603
604                 if (tx->tx_exiting)
605                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
606
607                 txg = tx->tx_open_txg;
608                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
609                     txg, tx->tx_quiesce_txg_waiting,
610                     tx->tx_sync_txg_waiting);
611                 mutex_exit(&tx->tx_sync_lock);
612                 txg_quiesce(dp, txg);
613                 mutex_enter(&tx->tx_sync_lock);
614
615                 /*
616                  * Hand this txg off to the sync thread.
617                  */
618                 dprintf("quiesce done, handing off txg %llu\n", txg);
619                 tx->tx_quiesced_txg = txg;
620                 cv_broadcast(&tx->tx_sync_more_cv);
621                 cv_broadcast(&tx->tx_quiesce_done_cv);
622         }
623 }
624
625 /*
626  * Delay this thread by 'ticks' if we are still in the open transaction
627  * group and there is already a waiting txg quiesing or quiesced.  Abort
628  * the delay if this txg stalls or enters the quiesing state.
629  */
630 void
631 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
632 {
633         tx_state_t *tx = &dp->dp_tx;
634         clock_t timeout = ddi_get_lbolt() + ticks;
635
636         /* don't delay if this txg could transition to quiesing immediately */
637         if (tx->tx_open_txg > txg ||
638             tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
639                 return;
640
641         mutex_enter(&tx->tx_sync_lock);
642         if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
643                 mutex_exit(&tx->tx_sync_lock);
644                 return;
645         }
646
647         while (ddi_get_lbolt() < timeout &&
648             tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
649                 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
650                     timeout);
651
652         DMU_TX_STAT_BUMP(dmu_tx_delay);
653
654         mutex_exit(&tx->tx_sync_lock);
655 }
656
657 void
658 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
659 {
660         tx_state_t *tx = &dp->dp_tx;
661
662         mutex_enter(&tx->tx_sync_lock);
663         ASSERT(tx->tx_threads == 2);
664         if (txg == 0)
665                 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
666         if (tx->tx_sync_txg_waiting < txg)
667                 tx->tx_sync_txg_waiting = txg;
668         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
669             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
670         while (tx->tx_synced_txg < txg) {
671                 dprintf("broadcasting sync more "
672                     "tx_synced=%llu waiting=%llu dp=%p\n",
673                     tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
674                 cv_broadcast(&tx->tx_sync_more_cv);
675                 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
676         }
677         mutex_exit(&tx->tx_sync_lock);
678 }
679
680 void
681 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
682 {
683         tx_state_t *tx = &dp->dp_tx;
684
685         mutex_enter(&tx->tx_sync_lock);
686         ASSERT(tx->tx_threads == 2);
687         if (txg == 0)
688                 txg = tx->tx_open_txg + 1;
689         if (tx->tx_quiesce_txg_waiting < txg)
690                 tx->tx_quiesce_txg_waiting = txg;
691         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
692             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
693         while (tx->tx_open_txg < txg) {
694                 cv_broadcast(&tx->tx_quiesce_more_cv);
695                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
696         }
697         mutex_exit(&tx->tx_sync_lock);
698 }
699
700 boolean_t
701 txg_stalled(dsl_pool_t *dp)
702 {
703         tx_state_t *tx = &dp->dp_tx;
704         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
705 }
706
707 boolean_t
708 txg_sync_waiting(dsl_pool_t *dp)
709 {
710         tx_state_t *tx = &dp->dp_tx;
711
712         return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
713             tx->tx_quiesced_txg != 0);
714 }
715
716 /*
717  * Per-txg object lists.
718  */
719 void
720 txg_list_create(txg_list_t *tl, size_t offset)
721 {
722         int t;
723
724         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
725
726         tl->tl_offset = offset;
727
728         for (t = 0; t < TXG_SIZE; t++)
729                 tl->tl_head[t] = NULL;
730 }
731
732 void
733 txg_list_destroy(txg_list_t *tl)
734 {
735         int t;
736
737         for (t = 0; t < TXG_SIZE; t++)
738                 ASSERT(txg_list_empty(tl, t));
739
740         mutex_destroy(&tl->tl_lock);
741 }
742
743 boolean_t
744 txg_list_empty(txg_list_t *tl, uint64_t txg)
745 {
746         return (tl->tl_head[txg & TXG_MASK] == NULL);
747 }
748
749 /*
750  * Add an entry to the list.
751  * Returns 0 if it's a new entry, 1 if it's already there.
752  */
753 int
754 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
755 {
756         int t = txg & TXG_MASK;
757         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
758         int already_on_list;
759
760         mutex_enter(&tl->tl_lock);
761         already_on_list = tn->tn_member[t];
762         if (!already_on_list) {
763                 tn->tn_member[t] = 1;
764                 tn->tn_next[t] = tl->tl_head[t];
765                 tl->tl_head[t] = tn;
766         }
767         mutex_exit(&tl->tl_lock);
768
769         return (already_on_list);
770 }
771
772 /*
773  * Add an entry to the end of the list (walks list to find end).
774  * Returns 0 if it's a new entry, 1 if it's already there.
775  */
776 int
777 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
778 {
779         int t = txg & TXG_MASK;
780         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
781         int already_on_list;
782
783         mutex_enter(&tl->tl_lock);
784         already_on_list = tn->tn_member[t];
785         if (!already_on_list) {
786                 txg_node_t **tp;
787
788                 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
789                         continue;
790
791                 tn->tn_member[t] = 1;
792                 tn->tn_next[t] = NULL;
793                 *tp = tn;
794         }
795         mutex_exit(&tl->tl_lock);
796
797         return (already_on_list);
798 }
799
800 /*
801  * Remove the head of the list and return it.
802  */
803 void *
804 txg_list_remove(txg_list_t *tl, uint64_t txg)
805 {
806         int t = txg & TXG_MASK;
807         txg_node_t *tn;
808         void *p = NULL;
809
810         mutex_enter(&tl->tl_lock);
811         if ((tn = tl->tl_head[t]) != NULL) {
812                 p = (char *)tn - tl->tl_offset;
813                 tl->tl_head[t] = tn->tn_next[t];
814                 tn->tn_next[t] = NULL;
815                 tn->tn_member[t] = 0;
816         }
817         mutex_exit(&tl->tl_lock);
818
819         return (p);
820 }
821
822 /*
823  * Remove a specific item from the list and return it.
824  */
825 void *
826 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
827 {
828         int t = txg & TXG_MASK;
829         txg_node_t *tn, **tp;
830
831         mutex_enter(&tl->tl_lock);
832
833         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
834                 if ((char *)tn - tl->tl_offset == p) {
835                         *tp = tn->tn_next[t];
836                         tn->tn_next[t] = NULL;
837                         tn->tn_member[t] = 0;
838                         mutex_exit(&tl->tl_lock);
839                         return (p);
840                 }
841         }
842
843         mutex_exit(&tl->tl_lock);
844
845         return (NULL);
846 }
847
848 int
849 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
850 {
851         int t = txg & TXG_MASK;
852         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
853
854         return (tn->tn_member[t]);
855 }
856
857 /*
858  * Walk a txg list -- only safe if you know it's not changing.
859  */
860 void *
861 txg_list_head(txg_list_t *tl, uint64_t txg)
862 {
863         int t = txg & TXG_MASK;
864         txg_node_t *tn = tl->tl_head[t];
865
866         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
867 }
868
869 void *
870 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
871 {
872         int t = txg & TXG_MASK;
873         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
874
875         tn = tn->tn_next[t];
876
877         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
878 }
879
880 #if defined(_KERNEL) && defined(HAVE_SPL)
881 EXPORT_SYMBOL(txg_init);
882 EXPORT_SYMBOL(txg_fini);
883 EXPORT_SYMBOL(txg_sync_start);
884 EXPORT_SYMBOL(txg_sync_stop);
885 EXPORT_SYMBOL(txg_hold_open);
886 EXPORT_SYMBOL(txg_rele_to_quiesce);
887 EXPORT_SYMBOL(txg_rele_to_sync);
888 EXPORT_SYMBOL(txg_register_callbacks);
889 EXPORT_SYMBOL(txg_delay);
890 EXPORT_SYMBOL(txg_wait_synced);
891 EXPORT_SYMBOL(txg_wait_open);
892 EXPORT_SYMBOL(txg_wait_callbacks);
893 EXPORT_SYMBOL(txg_stalled);
894 EXPORT_SYMBOL(txg_sync_waiting);
895
896 module_param(zfs_txg_timeout, int, 0644);
897 MODULE_PARM_DESC(zfs_txg_timeout, "Max seconds worth of delta per txg");
898 #endif