Rebase master to b105
[zfs.git] / module / zfs / txg.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25
26 #include <sys/zfs_context.h>
27 #include <sys/txg_impl.h>
28 #include <sys/dmu_impl.h>
29 #include <sys/dsl_pool.h>
30 #include <sys/callb.h>
31
32 /*
33  * Pool-wide transaction groups.
34  */
35
36 static void txg_sync_thread(dsl_pool_t *dp);
37 static void txg_quiesce_thread(dsl_pool_t *dp);
38
39 int zfs_txg_timeout = 30;       /* max seconds worth of delta per txg */
40
41 /*
42  * Prepare the txg subsystem.
43  */
44 void
45 txg_init(dsl_pool_t *dp, uint64_t txg)
46 {
47         tx_state_t *tx = &dp->dp_tx;
48         int c;
49         bzero(tx, sizeof (tx_state_t));
50
51         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
52
53         for (c = 0; c < max_ncpus; c++) {
54                 int i;
55
56                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
57                 for (i = 0; i < TXG_SIZE; i++) {
58                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
59                             NULL);
60                 }
61         }
62
63         rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
64         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
65
66         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
67         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
68         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
69         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
70         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
71
72         tx->tx_open_txg = txg;
73 }
74
75 /*
76  * Close down the txg subsystem.
77  */
78 void
79 txg_fini(dsl_pool_t *dp)
80 {
81         tx_state_t *tx = &dp->dp_tx;
82         int c;
83
84         ASSERT(tx->tx_threads == 0);
85
86         rw_destroy(&tx->tx_suspend);
87         mutex_destroy(&tx->tx_sync_lock);
88
89         cv_destroy(&tx->tx_sync_more_cv);
90         cv_destroy(&tx->tx_sync_done_cv);
91         cv_destroy(&tx->tx_quiesce_more_cv);
92         cv_destroy(&tx->tx_quiesce_done_cv);
93         cv_destroy(&tx->tx_exit_cv);
94
95         for (c = 0; c < max_ncpus; c++) {
96                 int i;
97
98                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
99                 for (i = 0; i < TXG_SIZE; i++)
100                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
101         }
102
103         kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
104
105         bzero(tx, sizeof (tx_state_t));
106 }
107
108 /*
109  * Start syncing transaction groups.
110  */
111 void
112 txg_sync_start(dsl_pool_t *dp)
113 {
114         tx_state_t *tx = &dp->dp_tx;
115
116         mutex_enter(&tx->tx_sync_lock);
117
118         dprintf("pool %p\n", dp);
119
120         ASSERT(tx->tx_threads == 0);
121
122         tx->tx_threads = 2;
123
124         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
125             dp, 0, &p0, TS_RUN, minclsyspri);
126
127         /*
128          * The sync thread can need a larger-than-default stack size on
129          * 32-bit x86.  This is due in part to nested pools and
130          * scrub_visitbp() recursion.
131          */
132         tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
133             dp, 0, &p0, TS_RUN, minclsyspri);
134
135         mutex_exit(&tx->tx_sync_lock);
136 }
137
138 static void
139 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
140 {
141         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
142         mutex_enter(&tx->tx_sync_lock);
143 }
144
145 static void
146 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
147 {
148         ASSERT(*tpp != NULL);
149         *tpp = NULL;
150         tx->tx_threads--;
151         cv_broadcast(&tx->tx_exit_cv);
152         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
153         thread_exit();
154 }
155
156 static void
157 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
158 {
159         CALLB_CPR_SAFE_BEGIN(cpr);
160
161         if (time)
162                 (void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time);
163         else
164                 cv_wait(cv, &tx->tx_sync_lock);
165
166         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
167 }
168
169 /*
170  * Stop syncing transaction groups.
171  */
172 void
173 txg_sync_stop(dsl_pool_t *dp)
174 {
175         tx_state_t *tx = &dp->dp_tx;
176
177         dprintf("pool %p\n", dp);
178         /*
179          * Finish off any work in progress.
180          */
181         ASSERT(tx->tx_threads == 2);
182         txg_wait_synced(dp, 0);
183
184         /*
185          * Wake all sync threads and wait for them to die.
186          */
187         mutex_enter(&tx->tx_sync_lock);
188
189         ASSERT(tx->tx_threads == 2);
190
191         tx->tx_exiting = 1;
192
193         cv_broadcast(&tx->tx_quiesce_more_cv);
194         cv_broadcast(&tx->tx_quiesce_done_cv);
195         cv_broadcast(&tx->tx_sync_more_cv);
196
197         while (tx->tx_threads != 0)
198                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
199
200         tx->tx_exiting = 0;
201
202         mutex_exit(&tx->tx_sync_lock);
203 }
204
205 uint64_t
206 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
207 {
208         tx_state_t *tx = &dp->dp_tx;
209         tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
210         uint64_t txg;
211
212         mutex_enter(&tc->tc_lock);
213
214         txg = tx->tx_open_txg;
215         tc->tc_count[txg & TXG_MASK]++;
216
217         th->th_cpu = tc;
218         th->th_txg = txg;
219
220         return (txg);
221 }
222
223 void
224 txg_rele_to_quiesce(txg_handle_t *th)
225 {
226         tx_cpu_t *tc = th->th_cpu;
227
228         mutex_exit(&tc->tc_lock);
229 }
230
231 void
232 txg_rele_to_sync(txg_handle_t *th)
233 {
234         tx_cpu_t *tc = th->th_cpu;
235         int g = th->th_txg & TXG_MASK;
236
237         mutex_enter(&tc->tc_lock);
238         ASSERT(tc->tc_count[g] != 0);
239         if (--tc->tc_count[g] == 0)
240                 cv_broadcast(&tc->tc_cv[g]);
241         mutex_exit(&tc->tc_lock);
242
243         th->th_cpu = NULL;      /* defensive */
244 }
245
246 static void
247 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
248 {
249         tx_state_t *tx = &dp->dp_tx;
250         int g = txg & TXG_MASK;
251         int c;
252
253         /*
254          * Grab all tx_cpu locks so nobody else can get into this txg.
255          */
256         for (c = 0; c < max_ncpus; c++)
257                 mutex_enter(&tx->tx_cpu[c].tc_lock);
258
259         ASSERT(txg == tx->tx_open_txg);
260         tx->tx_open_txg++;
261
262         /*
263          * Now that we've incremented tx_open_txg, we can let threads
264          * enter the next transaction group.
265          */
266         for (c = 0; c < max_ncpus; c++)
267                 mutex_exit(&tx->tx_cpu[c].tc_lock);
268
269         /*
270          * Quiesce the transaction group by waiting for everyone to txg_exit().
271          */
272         for (c = 0; c < max_ncpus; c++) {
273                 tx_cpu_t *tc = &tx->tx_cpu[c];
274                 mutex_enter(&tc->tc_lock);
275                 while (tc->tc_count[g] != 0)
276                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
277                 mutex_exit(&tc->tc_lock);
278         }
279 }
280
281 static void
282 txg_sync_thread(dsl_pool_t *dp)
283 {
284         tx_state_t *tx = &dp->dp_tx;
285         callb_cpr_t cpr;
286         uint64_t start, delta;
287
288         txg_thread_enter(tx, &cpr);
289
290         start = delta = 0;
291         for (;;) {
292                 uint64_t timer, timeout = zfs_txg_timeout * hz;
293                 uint64_t txg;
294
295                 /*
296                  * We sync when we're scrubbing, there's someone waiting
297                  * on us, or the quiesce thread has handed off a txg to
298                  * us, or we have reached our timeout.
299                  */
300                 timer = (delta >= timeout ? 0 : timeout - delta);
301                 while ((dp->dp_scrub_func == SCRUB_FUNC_NONE ||
302                     spa_shutting_down(dp->dp_spa)) &&
303                     !tx->tx_exiting && timer > 0 &&
304                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
305                     tx->tx_quiesced_txg == 0) {
306                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
307                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
308                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
309                         delta = lbolt - start;
310                         timer = (delta > timeout ? 0 : timeout - delta);
311                 }
312
313                 /*
314                  * Wait until the quiesce thread hands off a txg to us,
315                  * prompting it to do so if necessary.
316                  */
317                 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
318                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
319                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
320                         cv_broadcast(&tx->tx_quiesce_more_cv);
321                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
322                 }
323
324                 if (tx->tx_exiting)
325                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
326
327                 rw_enter(&tx->tx_suspend, RW_WRITER);
328
329                 /*
330                  * Consume the quiesced txg which has been handed off to
331                  * us.  This may cause the quiescing thread to now be
332                  * able to quiesce another txg, so we must signal it.
333                  */
334                 txg = tx->tx_quiesced_txg;
335                 tx->tx_quiesced_txg = 0;
336                 tx->tx_syncing_txg = txg;
337                 cv_broadcast(&tx->tx_quiesce_more_cv);
338                 rw_exit(&tx->tx_suspend);
339
340                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
341                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
342                 mutex_exit(&tx->tx_sync_lock);
343
344                 start = lbolt;
345                 spa_sync(dp->dp_spa, txg);
346                 delta = lbolt - start;
347
348                 mutex_enter(&tx->tx_sync_lock);
349                 rw_enter(&tx->tx_suspend, RW_WRITER);
350                 tx->tx_synced_txg = txg;
351                 tx->tx_syncing_txg = 0;
352                 rw_exit(&tx->tx_suspend);
353                 cv_broadcast(&tx->tx_sync_done_cv);
354         }
355 }
356
357 static void
358 txg_quiesce_thread(dsl_pool_t *dp)
359 {
360         tx_state_t *tx = &dp->dp_tx;
361         callb_cpr_t cpr;
362
363         txg_thread_enter(tx, &cpr);
364
365         for (;;) {
366                 uint64_t txg;
367
368                 /*
369                  * We quiesce when there's someone waiting on us.
370                  * However, we can only have one txg in "quiescing" or
371                  * "quiesced, waiting to sync" state.  So we wait until
372                  * the "quiesced, waiting to sync" txg has been consumed
373                  * by the sync thread.
374                  */
375                 while (!tx->tx_exiting &&
376                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
377                     tx->tx_quiesced_txg != 0))
378                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
379
380                 if (tx->tx_exiting)
381                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
382
383                 txg = tx->tx_open_txg;
384                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
385                     txg, tx->tx_quiesce_txg_waiting,
386                     tx->tx_sync_txg_waiting);
387                 mutex_exit(&tx->tx_sync_lock);
388                 txg_quiesce(dp, txg);
389                 mutex_enter(&tx->tx_sync_lock);
390
391                 /*
392                  * Hand this txg off to the sync thread.
393                  */
394                 dprintf("quiesce done, handing off txg %llu\n", txg);
395                 tx->tx_quiesced_txg = txg;
396                 cv_broadcast(&tx->tx_sync_more_cv);
397                 cv_broadcast(&tx->tx_quiesce_done_cv);
398         }
399 }
400
401 /*
402  * Delay this thread by 'ticks' if we are still in the open transaction
403  * group and there is already a waiting txg quiesing or quiesced.  Abort
404  * the delay if this txg stalls or enters the quiesing state.
405  */
406 void
407 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
408 {
409         tx_state_t *tx = &dp->dp_tx;
410         int timeout = lbolt + ticks;
411
412         /* don't delay if this txg could transition to quiesing immediately */
413         if (tx->tx_open_txg > txg ||
414             tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
415                 return;
416
417         mutex_enter(&tx->tx_sync_lock);
418         if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
419                 mutex_exit(&tx->tx_sync_lock);
420                 return;
421         }
422
423         while (lbolt < timeout &&
424             tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
425                 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
426                     timeout);
427
428         mutex_exit(&tx->tx_sync_lock);
429 }
430
431 void
432 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
433 {
434         tx_state_t *tx = &dp->dp_tx;
435
436         mutex_enter(&tx->tx_sync_lock);
437         ASSERT(tx->tx_threads == 2);
438         if (txg == 0)
439                 txg = tx->tx_open_txg;
440         if (tx->tx_sync_txg_waiting < txg)
441                 tx->tx_sync_txg_waiting = txg;
442         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
443             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
444         while (tx->tx_synced_txg < txg) {
445                 dprintf("broadcasting sync more "
446                     "tx_synced=%llu waiting=%llu dp=%p\n",
447                     tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
448                 cv_broadcast(&tx->tx_sync_more_cv);
449                 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
450         }
451         mutex_exit(&tx->tx_sync_lock);
452 }
453
454 void
455 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
456 {
457         tx_state_t *tx = &dp->dp_tx;
458
459         mutex_enter(&tx->tx_sync_lock);
460         ASSERT(tx->tx_threads == 2);
461         if (txg == 0)
462                 txg = tx->tx_open_txg + 1;
463         if (tx->tx_quiesce_txg_waiting < txg)
464                 tx->tx_quiesce_txg_waiting = txg;
465         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
466             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
467         while (tx->tx_open_txg < txg) {
468                 cv_broadcast(&tx->tx_quiesce_more_cv);
469                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
470         }
471         mutex_exit(&tx->tx_sync_lock);
472 }
473
474 boolean_t
475 txg_stalled(dsl_pool_t *dp)
476 {
477         tx_state_t *tx = &dp->dp_tx;
478         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
479 }
480
481 boolean_t
482 txg_sync_waiting(dsl_pool_t *dp)
483 {
484         tx_state_t *tx = &dp->dp_tx;
485
486         return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
487             tx->tx_quiesced_txg != 0);
488 }
489
490 void
491 txg_suspend(dsl_pool_t *dp)
492 {
493         tx_state_t *tx = &dp->dp_tx;
494         /* XXX some code paths suspend when they are already suspended! */
495         rw_enter(&tx->tx_suspend, RW_READER);
496 }
497
498 void
499 txg_resume(dsl_pool_t *dp)
500 {
501         tx_state_t *tx = &dp->dp_tx;
502         rw_exit(&tx->tx_suspend);
503 }
504
505 /*
506  * Per-txg object lists.
507  */
508 void
509 txg_list_create(txg_list_t *tl, size_t offset)
510 {
511         int t;
512
513         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
514
515         tl->tl_offset = offset;
516
517         for (t = 0; t < TXG_SIZE; t++)
518                 tl->tl_head[t] = NULL;
519 }
520
521 void
522 txg_list_destroy(txg_list_t *tl)
523 {
524         int t;
525
526         for (t = 0; t < TXG_SIZE; t++)
527                 ASSERT(txg_list_empty(tl, t));
528
529         mutex_destroy(&tl->tl_lock);
530 }
531
532 int
533 txg_list_empty(txg_list_t *tl, uint64_t txg)
534 {
535         return (tl->tl_head[txg & TXG_MASK] == NULL);
536 }
537
538 /*
539  * Add an entry to the list.
540  * Returns 0 if it's a new entry, 1 if it's already there.
541  */
542 int
543 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
544 {
545         int t = txg & TXG_MASK;
546         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
547         int already_on_list;
548
549         mutex_enter(&tl->tl_lock);
550         already_on_list = tn->tn_member[t];
551         if (!already_on_list) {
552                 tn->tn_member[t] = 1;
553                 tn->tn_next[t] = tl->tl_head[t];
554                 tl->tl_head[t] = tn;
555         }
556         mutex_exit(&tl->tl_lock);
557
558         return (already_on_list);
559 }
560
561 /*
562  * Remove the head of the list and return it.
563  */
564 void *
565 txg_list_remove(txg_list_t *tl, uint64_t txg)
566 {
567         int t = txg & TXG_MASK;
568         txg_node_t *tn;
569         void *p = NULL;
570
571         mutex_enter(&tl->tl_lock);
572         if ((tn = tl->tl_head[t]) != NULL) {
573                 p = (char *)tn - tl->tl_offset;
574                 tl->tl_head[t] = tn->tn_next[t];
575                 tn->tn_next[t] = NULL;
576                 tn->tn_member[t] = 0;
577         }
578         mutex_exit(&tl->tl_lock);
579
580         return (p);
581 }
582
583 /*
584  * Remove a specific item from the list and return it.
585  */
586 void *
587 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
588 {
589         int t = txg & TXG_MASK;
590         txg_node_t *tn, **tp;
591
592         mutex_enter(&tl->tl_lock);
593
594         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
595                 if ((char *)tn - tl->tl_offset == p) {
596                         *tp = tn->tn_next[t];
597                         tn->tn_next[t] = NULL;
598                         tn->tn_member[t] = 0;
599                         mutex_exit(&tl->tl_lock);
600                         return (p);
601                 }
602         }
603
604         mutex_exit(&tl->tl_lock);
605
606         return (NULL);
607 }
608
609 int
610 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
611 {
612         int t = txg & TXG_MASK;
613         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
614
615         return (tn->tn_member[t]);
616 }
617
618 /*
619  * Walk a txg list -- only safe if you know it's not changing.
620  */
621 void *
622 txg_list_head(txg_list_t *tl, uint64_t txg)
623 {
624         int t = txg & TXG_MASK;
625         txg_node_t *tn = tl->tl_head[t];
626
627         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
628 }
629
630 void *
631 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
632 {
633         int t = txg & TXG_MASK;
634         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
635
636         tn = tn->tn_next[t];
637
638         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
639 }