83fef0d87435ea02d40be7a1030f74aa5d9e1b33
[zfs.git] / module / zfs / zil.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25
26 #include <sys/zfs_context.h>
27 #include <sys/spa.h>
28 #include <sys/dmu.h>
29 #include <sys/zap.h>
30 #include <sys/arc.h>
31 #include <sys/stat.h>
32 #include <sys/resource.h>
33 #include <sys/zil.h>
34 #include <sys/zil_impl.h>
35 #include <sys/dsl_dataset.h>
36 #include <sys/vdev.h>
37 #include <sys/dmu_tx.h>
38
39 /*
40  * The zfs intent log (ZIL) saves transaction records of system calls
41  * that change the file system in memory with enough information
42  * to be able to replay them. These are stored in memory until
43  * either the DMU transaction group (txg) commits them to the stable pool
44  * and they can be discarded, or they are flushed to the stable log
45  * (also in the pool) due to a fsync, O_DSYNC or other synchronous
46  * requirement. In the event of a panic or power fail then those log
47  * records (transactions) are replayed.
48  *
49  * There is one ZIL per file system. Its on-disk (pool) format consists
50  * of 3 parts:
51  *
52  *      - ZIL header
53  *      - ZIL blocks
54  *      - ZIL records
55  *
56  * A log record holds a system call transaction. Log blocks can
57  * hold many log records and the blocks are chained together.
58  * Each ZIL block contains a block pointer (blkptr_t) to the next
59  * ZIL block in the chain. The ZIL header points to the first
60  * block in the chain. Note there is not a fixed place in the pool
61  * to hold blocks. They are dynamically allocated and freed as
62  * needed from the blocks available. Figure X shows the ZIL structure:
63  */
64
65 /*
66  * This global ZIL switch affects all pools
67  */
68 int zil_disable = 0;    /* disable intent logging */
69
70 /*
71  * Tunable parameter for debugging or performance analysis.  Setting
72  * zfs_nocacheflush will cause corruption on power loss if a volatile
73  * out-of-order write cache is enabled.
74  */
75 boolean_t zfs_nocacheflush = B_FALSE;
76
77 static kmem_cache_t *zil_lwb_cache;
78
79 static int
80 zil_dva_compare(const void *x1, const void *x2)
81 {
82         const dva_t *dva1 = x1;
83         const dva_t *dva2 = x2;
84
85         if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
86                 return (-1);
87         if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
88                 return (1);
89
90         if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
91                 return (-1);
92         if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
93                 return (1);
94
95         return (0);
96 }
97
98 static void
99 zil_dva_tree_init(avl_tree_t *t)
100 {
101         avl_create(t, zil_dva_compare, sizeof (zil_dva_node_t),
102             offsetof(zil_dva_node_t, zn_node));
103 }
104
105 static void
106 zil_dva_tree_fini(avl_tree_t *t)
107 {
108         zil_dva_node_t *zn;
109         void *cookie = NULL;
110
111         while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
112                 kmem_free(zn, sizeof (zil_dva_node_t));
113
114         avl_destroy(t);
115 }
116
117 static int
118 zil_dva_tree_add(avl_tree_t *t, dva_t *dva)
119 {
120         zil_dva_node_t *zn;
121         avl_index_t where;
122
123         if (avl_find(t, dva, &where) != NULL)
124                 return (EEXIST);
125
126         zn = kmem_alloc(sizeof (zil_dva_node_t), KM_SLEEP);
127         zn->zn_dva = *dva;
128         avl_insert(t, zn, where);
129
130         return (0);
131 }
132
133 static zil_header_t *
134 zil_header_in_syncing_context(zilog_t *zilog)
135 {
136         return ((zil_header_t *)zilog->zl_header);
137 }
138
139 static void
140 zil_init_log_chain(zilog_t *zilog, blkptr_t *bp)
141 {
142         zio_cksum_t *zc = &bp->blk_cksum;
143
144         zc->zc_word[ZIL_ZC_GUID_0] = spa_get_random(-1ULL);
145         zc->zc_word[ZIL_ZC_GUID_1] = spa_get_random(-1ULL);
146         zc->zc_word[ZIL_ZC_OBJSET] = dmu_objset_id(zilog->zl_os);
147         zc->zc_word[ZIL_ZC_SEQ] = 1ULL;
148 }
149
150 /*
151  * Read a log block, make sure it's valid, and byteswap it if necessary.
152  */
153 static int
154 zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, arc_buf_t **abufpp)
155 {
156         blkptr_t blk = *bp;
157         zbookmark_t zb;
158         uint32_t aflags = ARC_WAIT;
159         int error;
160
161         zb.zb_objset = bp->blk_cksum.zc_word[ZIL_ZC_OBJSET];
162         zb.zb_object = 0;
163         zb.zb_level = -1;
164         zb.zb_blkid = bp->blk_cksum.zc_word[ZIL_ZC_SEQ];
165
166         *abufpp = NULL;
167
168         /*
169          * We shouldn't be doing any scrubbing while we're doing log
170          * replay, it's OK to not lock.
171          */
172         error = arc_read_nolock(NULL, zilog->zl_spa, &blk,
173             arc_getbuf_func, abufpp, ZIO_PRIORITY_SYNC_READ, ZIO_FLAG_CANFAIL |
174             ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB, &aflags, &zb);
175
176         if (error == 0) {
177                 char *data = (*abufpp)->b_data;
178                 uint64_t blksz = BP_GET_LSIZE(bp);
179                 zil_trailer_t *ztp = (zil_trailer_t *)(data + blksz) - 1;
180                 zio_cksum_t cksum = bp->blk_cksum;
181
182                 /*
183                  * Validate the checksummed log block.
184                  *
185                  * Sequence numbers should be... sequential.  The checksum
186                  * verifier for the next block should be bp's checksum plus 1.
187                  *
188                  * Also check the log chain linkage and size used.
189                  */
190                 cksum.zc_word[ZIL_ZC_SEQ]++;
191
192                 if (bcmp(&cksum, &ztp->zit_next_blk.blk_cksum,
193                     sizeof (cksum)) || BP_IS_HOLE(&ztp->zit_next_blk) ||
194                     (ztp->zit_nused > (blksz - sizeof (zil_trailer_t)))) {
195                         error = ECKSUM;
196                 }
197
198                 if (error) {
199                         VERIFY(arc_buf_remove_ref(*abufpp, abufpp) == 1);
200                         *abufpp = NULL;
201                 }
202         }
203
204         dprintf("error %d on %llu:%llu\n", error, zb.zb_objset, zb.zb_blkid);
205
206         return (error);
207 }
208
209 /*
210  * Parse the intent log, and call parse_func for each valid record within.
211  * Return the highest sequence number.
212  */
213 uint64_t
214 zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
215     zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
216 {
217         const zil_header_t *zh = zilog->zl_header;
218         uint64_t claim_seq = zh->zh_claim_seq;
219         uint64_t seq = 0;
220         uint64_t max_seq = 0;
221         blkptr_t blk = zh->zh_log;
222         arc_buf_t *abuf;
223         char *lrbuf, *lrp;
224         zil_trailer_t *ztp;
225         int reclen, error;
226
227         if (BP_IS_HOLE(&blk))
228                 return (max_seq);
229
230         /*
231          * Starting at the block pointed to by zh_log we read the log chain.
232          * For each block in the chain we strongly check that block to
233          * ensure its validity.  We stop when an invalid block is found.
234          * For each block pointer in the chain we call parse_blk_func().
235          * For each record in each valid block we call parse_lr_func().
236          * If the log has been claimed, stop if we encounter a sequence
237          * number greater than the highest claimed sequence number.
238          */
239         zil_dva_tree_init(&zilog->zl_dva_tree);
240         for (;;) {
241                 seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
242
243                 if (claim_seq != 0 && seq > claim_seq)
244                         break;
245
246                 ASSERT(max_seq < seq);
247                 max_seq = seq;
248
249                 error = zil_read_log_block(zilog, &blk, &abuf);
250
251                 if (parse_blk_func != NULL)
252                         parse_blk_func(zilog, &blk, arg, txg);
253
254                 if (error)
255                         break;
256
257                 lrbuf = abuf->b_data;
258                 ztp = (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
259                 blk = ztp->zit_next_blk;
260
261                 if (parse_lr_func == NULL) {
262                         VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
263                         continue;
264                 }
265
266                 for (lrp = lrbuf; lrp < lrbuf + ztp->zit_nused; lrp += reclen) {
267                         lr_t *lr = (lr_t *)lrp;
268                         reclen = lr->lrc_reclen;
269                         ASSERT3U(reclen, >=, sizeof (lr_t));
270                         parse_lr_func(zilog, lr, arg, txg);
271                 }
272                 VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
273         }
274         zil_dva_tree_fini(&zilog->zl_dva_tree);
275
276         return (max_seq);
277 }
278
279 /* ARGSUSED */
280 static void
281 zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
282 {
283         spa_t *spa = zilog->zl_spa;
284         int err;
285
286         /*
287          * Claim log block if not already committed and not already claimed.
288          */
289         if (bp->blk_birth >= first_txg &&
290             zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp)) == 0) {
291                 err = zio_wait(zio_claim(NULL, spa, first_txg, bp, NULL, NULL,
292                     ZIO_FLAG_MUSTSUCCEED));
293                 ASSERT(err == 0);
294         }
295 }
296
297 static void
298 zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
299 {
300         if (lrc->lrc_txtype == TX_WRITE) {
301                 lr_write_t *lr = (lr_write_t *)lrc;
302                 zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg);
303         }
304 }
305
306 /* ARGSUSED */
307 static void
308 zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
309 {
310         zio_free_blk(zilog->zl_spa, bp, dmu_tx_get_txg(tx));
311 }
312
313 static void
314 zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
315 {
316         /*
317          * If we previously claimed it, we need to free it.
318          */
319         if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE) {
320                 lr_write_t *lr = (lr_write_t *)lrc;
321                 blkptr_t *bp = &lr->lr_blkptr;
322                 if (bp->blk_birth >= claim_txg &&
323                     !zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp))) {
324                         (void) arc_free(NULL, zilog->zl_spa,
325                             dmu_tx_get_txg(tx), bp, NULL, NULL, ARC_WAIT);
326                 }
327         }
328 }
329
330 /*
331  * Create an on-disk intent log.
332  */
333 static void
334 zil_create(zilog_t *zilog)
335 {
336         const zil_header_t *zh = zilog->zl_header;
337         lwb_t *lwb;
338         uint64_t txg = 0;
339         dmu_tx_t *tx = NULL;
340         blkptr_t blk;
341         int error = 0;
342
343         /*
344          * Wait for any previous destroy to complete.
345          */
346         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
347
348         ASSERT(zh->zh_claim_txg == 0);
349         ASSERT(zh->zh_replay_seq == 0);
350
351         blk = zh->zh_log;
352
353         /*
354          * If we don't already have an initial log block or we have one
355          * but it's the wrong endianness then allocate one.
356          */
357         if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
358                 tx = dmu_tx_create(zilog->zl_os);
359                 (void) dmu_tx_assign(tx, TXG_WAIT);
360                 dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
361                 txg = dmu_tx_get_txg(tx);
362
363                 if (!BP_IS_HOLE(&blk)) {
364                         zio_free_blk(zilog->zl_spa, &blk, txg);
365                         BP_ZERO(&blk);
366                 }
367
368                 error = zio_alloc_blk(zilog->zl_spa, ZIL_MIN_BLKSZ, &blk,
369                     NULL, txg);
370
371                 if (error == 0)
372                         zil_init_log_chain(zilog, &blk);
373         }
374
375         /*
376          * Allocate a log write buffer (lwb) for the first log block.
377          */
378         if (error == 0) {
379                 lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
380                 lwb->lwb_zilog = zilog;
381                 lwb->lwb_blk = blk;
382                 lwb->lwb_nused = 0;
383                 lwb->lwb_sz = BP_GET_LSIZE(&lwb->lwb_blk);
384                 lwb->lwb_buf = zio_buf_alloc(lwb->lwb_sz);
385                 lwb->lwb_max_txg = txg;
386                 lwb->lwb_zio = NULL;
387
388                 mutex_enter(&zilog->zl_lock);
389                 list_insert_tail(&zilog->zl_lwb_list, lwb);
390                 mutex_exit(&zilog->zl_lock);
391         }
392
393         /*
394          * If we just allocated the first log block, commit our transaction
395          * and wait for zil_sync() to stuff the block poiner into zh_log.
396          * (zh is part of the MOS, so we cannot modify it in open context.)
397          */
398         if (tx != NULL) {
399                 dmu_tx_commit(tx);
400                 txg_wait_synced(zilog->zl_dmu_pool, txg);
401         }
402
403         ASSERT(bcmp(&blk, &zh->zh_log, sizeof (blk)) == 0);
404 }
405
406 /*
407  * In one tx, free all log blocks and clear the log header.
408  * If keep_first is set, then we're replaying a log with no content.
409  * We want to keep the first block, however, so that the first
410  * synchronous transaction doesn't require a txg_wait_synced()
411  * in zil_create().  We don't need to txg_wait_synced() here either
412  * when keep_first is set, because both zil_create() and zil_destroy()
413  * will wait for any in-progress destroys to complete.
414  */
415 void
416 zil_destroy(zilog_t *zilog, boolean_t keep_first)
417 {
418         const zil_header_t *zh = zilog->zl_header;
419         lwb_t *lwb;
420         dmu_tx_t *tx;
421         uint64_t txg;
422
423         /*
424          * Wait for any previous destroy to complete.
425          */
426         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
427
428         if (BP_IS_HOLE(&zh->zh_log))
429                 return;
430
431         tx = dmu_tx_create(zilog->zl_os);
432         (void) dmu_tx_assign(tx, TXG_WAIT);
433         dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
434         txg = dmu_tx_get_txg(tx);
435
436         mutex_enter(&zilog->zl_lock);
437
438         /*
439          * It is possible for the ZIL to get the previously mounted zilog
440          * structure of the same dataset if quickly remounted and the dbuf
441          * eviction has not completed. In this case we can see a non
442          * empty lwb list and keep_first will be set. We fix this by
443          * clearing the keep_first. This will be slower but it's very rare.
444          */
445         if (!list_is_empty(&zilog->zl_lwb_list) && keep_first)
446                 keep_first = B_FALSE;
447
448         ASSERT3U(zilog->zl_destroy_txg, <, txg);
449         zilog->zl_destroy_txg = txg;
450         zilog->zl_keep_first = keep_first;
451
452         if (!list_is_empty(&zilog->zl_lwb_list)) {
453                 ASSERT(zh->zh_claim_txg == 0);
454                 ASSERT(!keep_first);
455                 while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
456                         list_remove(&zilog->zl_lwb_list, lwb);
457                         if (lwb->lwb_buf != NULL)
458                                 zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
459                         zio_free_blk(zilog->zl_spa, &lwb->lwb_blk, txg);
460                         kmem_cache_free(zil_lwb_cache, lwb);
461                 }
462         } else {
463                 if (!keep_first) {
464                         (void) zil_parse(zilog, zil_free_log_block,
465                             zil_free_log_record, tx, zh->zh_claim_txg);
466                 }
467         }
468         mutex_exit(&zilog->zl_lock);
469
470         dmu_tx_commit(tx);
471 }
472
473 /*
474  * zil_rollback_destroy() is only called by the rollback code.
475  * We already have a syncing tx. Rollback has exclusive access to the
476  * dataset, so we don't have to worry about concurrent zil access.
477  * The actual freeing of any log blocks occurs in zil_sync() later in
478  * this txg syncing phase.
479  */
480 void
481 zil_rollback_destroy(zilog_t *zilog, dmu_tx_t *tx)
482 {
483         const zil_header_t *zh = zilog->zl_header;
484         uint64_t txg;
485
486         if (BP_IS_HOLE(&zh->zh_log))
487                 return;
488
489         txg = dmu_tx_get_txg(tx);
490         ASSERT3U(zilog->zl_destroy_txg, <, txg);
491         zilog->zl_destroy_txg = txg;
492         zilog->zl_keep_first = B_FALSE;
493
494         /*
495          * Ensure there's no outstanding ZIL IO.  No lwbs or just the
496          * unused one that allocated in advance is ok.
497          */
498         ASSERT(zilog->zl_lwb_list.list_head.list_next ==
499             zilog->zl_lwb_list.list_head.list_prev);
500         (void) zil_parse(zilog, zil_free_log_block, zil_free_log_record,
501             tx, zh->zh_claim_txg);
502 }
503
504 int
505 zil_claim(char *osname, void *txarg)
506 {
507         dmu_tx_t *tx = txarg;
508         uint64_t first_txg = dmu_tx_get_txg(tx);
509         zilog_t *zilog;
510         zil_header_t *zh;
511         objset_t *os;
512         int error;
513
514         error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_USER, &os);
515         if (error) {
516                 cmn_err(CE_WARN, "can't open objset for %s", osname);
517                 return (0);
518         }
519
520         zilog = dmu_objset_zil(os);
521         zh = zil_header_in_syncing_context(zilog);
522
523         /*
524          * Claim all log blocks if we haven't already done so, and remember
525          * the highest claimed sequence number.  This ensures that if we can
526          * read only part of the log now (e.g. due to a missing device),
527          * but we can read the entire log later, we will not try to replay
528          * or destroy beyond the last block we successfully claimed.
529          */
530         ASSERT3U(zh->zh_claim_txg, <=, first_txg);
531         if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
532                 zh->zh_claim_txg = first_txg;
533                 zh->zh_claim_seq = zil_parse(zilog, zil_claim_log_block,
534                     zil_claim_log_record, tx, first_txg);
535                 dsl_dataset_dirty(dmu_objset_ds(os), tx);
536         }
537
538         ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
539         dmu_objset_close(os);
540         return (0);
541 }
542
543 /*
544  * Check the log by walking the log chain.
545  * Checksum errors are ok as they indicate the end of the chain.
546  * Any other error (no device or read failure) returns an error.
547  */
548 /* ARGSUSED */
549 int
550 zil_check_log_chain(char *osname, void *txarg)
551 {
552         zilog_t *zilog;
553         zil_header_t *zh;
554         blkptr_t blk;
555         arc_buf_t *abuf;
556         objset_t *os;
557         char *lrbuf;
558         zil_trailer_t *ztp;
559         int error;
560
561         error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_USER, &os);
562         if (error) {
563                 cmn_err(CE_WARN, "can't open objset for %s", osname);
564                 return (0);
565         }
566
567         zilog = dmu_objset_zil(os);
568         zh = zil_header_in_syncing_context(zilog);
569         blk = zh->zh_log;
570         if (BP_IS_HOLE(&blk)) {
571                 dmu_objset_close(os);
572                 return (0); /* no chain */
573         }
574
575         for (;;) {
576                 error = zil_read_log_block(zilog, &blk, &abuf);
577                 if (error)
578                         break;
579                 lrbuf = abuf->b_data;
580                 ztp = (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
581                 blk = ztp->zit_next_blk;
582                 VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
583         }
584         dmu_objset_close(os);
585         if (error == ECKSUM)
586                 return (0); /* normal end of chain */
587         return (error);
588 }
589
590 /*
591  * Clear a log chain
592  */
593 /* ARGSUSED */
594 int
595 zil_clear_log_chain(char *osname, void *txarg)
596 {
597         zilog_t *zilog;
598         zil_header_t *zh;
599         objset_t *os;
600         dmu_tx_t *tx;
601         int error;
602
603         error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_USER, &os);
604         if (error) {
605                 cmn_err(CE_WARN, "can't open objset for %s", osname);
606                 return (0);
607         }
608
609         zilog = dmu_objset_zil(os);
610         tx = dmu_tx_create(zilog->zl_os);
611         (void) dmu_tx_assign(tx, TXG_WAIT);
612         zh = zil_header_in_syncing_context(zilog);
613         BP_ZERO(&zh->zh_log);
614         dsl_dataset_dirty(dmu_objset_ds(os), tx);
615         dmu_tx_commit(tx);
616         dmu_objset_close(os);
617         return (0);
618 }
619
620 static int
621 zil_vdev_compare(const void *x1, const void *x2)
622 {
623         uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
624         uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
625
626         if (v1 < v2)
627                 return (-1);
628         if (v1 > v2)
629                 return (1);
630
631         return (0);
632 }
633
634 void
635 zil_add_block(zilog_t *zilog, blkptr_t *bp)
636 {
637         avl_tree_t *t = &zilog->zl_vdev_tree;
638         avl_index_t where;
639         zil_vdev_node_t *zv, zvsearch;
640         int ndvas = BP_GET_NDVAS(bp);
641         int i;
642
643         if (zfs_nocacheflush)
644                 return;
645
646         ASSERT(zilog->zl_writer);
647
648         /*
649          * Even though we're zl_writer, we still need a lock because the
650          * zl_get_data() callbacks may have dmu_sync() done callbacks
651          * that will run concurrently.
652          */
653         mutex_enter(&zilog->zl_vdev_lock);
654         for (i = 0; i < ndvas; i++) {
655                 zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
656                 if (avl_find(t, &zvsearch, &where) == NULL) {
657                         zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
658                         zv->zv_vdev = zvsearch.zv_vdev;
659                         avl_insert(t, zv, where);
660                 }
661         }
662         mutex_exit(&zilog->zl_vdev_lock);
663 }
664
665 void
666 zil_flush_vdevs(zilog_t *zilog)
667 {
668         spa_t *spa = zilog->zl_spa;
669         avl_tree_t *t = &zilog->zl_vdev_tree;
670         void *cookie = NULL;
671         zil_vdev_node_t *zv;
672         zio_t *zio;
673
674         ASSERT(zilog->zl_writer);
675
676         /*
677          * We don't need zl_vdev_lock here because we're the zl_writer,
678          * and all zl_get_data() callbacks are done.
679          */
680         if (avl_numnodes(t) == 0)
681                 return;
682
683         spa_config_enter(spa, SCL_STATE, FTAG, RW_READER);
684
685         zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
686
687         while ((zv = avl_destroy_nodes(t, &cookie)) != NULL) {
688                 vdev_t *vd = vdev_lookup_top(spa, zv->zv_vdev);
689                 if (vd != NULL)
690                         zio_flush(zio, vd);
691                 kmem_free(zv, sizeof (*zv));
692         }
693
694         /*
695          * Wait for all the flushes to complete.  Not all devices actually
696          * support the DKIOCFLUSHWRITECACHE ioctl, so it's OK if it fails.
697          */
698         (void) zio_wait(zio);
699
700         spa_config_exit(spa, SCL_STATE, FTAG);
701 }
702
703 /*
704  * Function called when a log block write completes
705  */
706 static void
707 zil_lwb_write_done(zio_t *zio)
708 {
709         lwb_t *lwb = zio->io_private;
710         zilog_t *zilog = lwb->lwb_zilog;
711
712         ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
713         ASSERT(BP_GET_CHECKSUM(zio->io_bp) == ZIO_CHECKSUM_ZILOG);
714         ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
715         ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
716         ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
717         ASSERT(!BP_IS_GANG(zio->io_bp));
718         ASSERT(!BP_IS_HOLE(zio->io_bp));
719         ASSERT(zio->io_bp->blk_fill == 0);
720
721         /*
722          * Now that we've written this log block, we have a stable pointer
723          * to the next block in the chain, so it's OK to let the txg in
724          * which we allocated the next block sync.
725          */
726         txg_rele_to_sync(&lwb->lwb_txgh);
727
728         zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
729         mutex_enter(&zilog->zl_lock);
730         lwb->lwb_buf = NULL;
731         if (zio->io_error)
732                 zilog->zl_log_error = B_TRUE;
733         mutex_exit(&zilog->zl_lock);
734 }
735
736 /*
737  * Initialize the io for a log block.
738  */
739 static void
740 zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
741 {
742         zbookmark_t zb;
743
744         zb.zb_objset = lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET];
745         zb.zb_object = 0;
746         zb.zb_level = -1;
747         zb.zb_blkid = lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
748
749         if (zilog->zl_root_zio == NULL) {
750                 zilog->zl_root_zio = zio_root(zilog->zl_spa, NULL, NULL,
751                     ZIO_FLAG_CANFAIL);
752         }
753         if (lwb->lwb_zio == NULL) {
754                 lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
755                     0, &lwb->lwb_blk, lwb->lwb_buf,
756                     lwb->lwb_sz, zil_lwb_write_done, lwb,
757                     ZIO_PRIORITY_LOG_WRITE, ZIO_FLAG_CANFAIL, &zb);
758         }
759 }
760
761 /*
762  * Start a log block write and advance to the next log block.
763  * Calls are serialized.
764  */
765 static lwb_t *
766 zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
767 {
768         lwb_t *nlwb;
769         zil_trailer_t *ztp = (zil_trailer_t *)(lwb->lwb_buf + lwb->lwb_sz) - 1;
770         spa_t *spa = zilog->zl_spa;
771         blkptr_t *bp = &ztp->zit_next_blk;
772         uint64_t txg;
773         uint64_t zil_blksz;
774         int error;
775
776         ASSERT(lwb->lwb_nused <= ZIL_BLK_DATA_SZ(lwb));
777
778         /*
779          * Allocate the next block and save its address in this block
780          * before writing it in order to establish the log chain.
781          * Note that if the allocation of nlwb synced before we wrote
782          * the block that points at it (lwb), we'd leak it if we crashed.
783          * Therefore, we don't do txg_rele_to_sync() until zil_lwb_write_done().
784          */
785         txg = txg_hold_open(zilog->zl_dmu_pool, &lwb->lwb_txgh);
786         txg_rele_to_quiesce(&lwb->lwb_txgh);
787
788         /*
789          * Pick a ZIL blocksize. We request a size that is the
790          * maximum of the previous used size, the current used size and
791          * the amount waiting in the queue.
792          */
793         zil_blksz = MAX(zilog->zl_prev_used,
794             zilog->zl_cur_used + sizeof (*ztp));
795         zil_blksz = MAX(zil_blksz, zilog->zl_itx_list_sz + sizeof (*ztp));
796         zil_blksz = P2ROUNDUP_TYPED(zil_blksz, ZIL_MIN_BLKSZ, uint64_t);
797         if (zil_blksz > ZIL_MAX_BLKSZ)
798                 zil_blksz = ZIL_MAX_BLKSZ;
799
800         BP_ZERO(bp);
801         /* pass the old blkptr in order to spread log blocks across devs */
802         error = zio_alloc_blk(spa, zil_blksz, bp, &lwb->lwb_blk, txg);
803         if (error) {
804                 dmu_tx_t *tx = dmu_tx_create_assigned(zilog->zl_dmu_pool, txg);
805
806                 /*
807                  * We dirty the dataset to ensure that zil_sync() will
808                  * be called to remove this lwb from our zl_lwb_list.
809                  * Failing to do so, may leave an lwb with a NULL lwb_buf
810                  * hanging around on the zl_lwb_list.
811                  */
812                 dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
813                 dmu_tx_commit(tx);
814
815                 /*
816                  * Since we've just experienced an allocation failure so we
817                  * terminate the current lwb and send it on its way.
818                  */
819                 ztp->zit_pad = 0;
820                 ztp->zit_nused = lwb->lwb_nused;
821                 ztp->zit_bt.zbt_cksum = lwb->lwb_blk.blk_cksum;
822                 zio_nowait(lwb->lwb_zio);
823
824                 /*
825                  * By returning NULL the caller will call tx_wait_synced()
826                  */
827                 return (NULL);
828         }
829
830         ASSERT3U(bp->blk_birth, ==, txg);
831         ztp->zit_pad = 0;
832         ztp->zit_nused = lwb->lwb_nused;
833         ztp->zit_bt.zbt_cksum = lwb->lwb_blk.blk_cksum;
834         bp->blk_cksum = lwb->lwb_blk.blk_cksum;
835         bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
836
837         /*
838          * Allocate a new log write buffer (lwb).
839          */
840         nlwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
841
842         nlwb->lwb_zilog = zilog;
843         nlwb->lwb_blk = *bp;
844         nlwb->lwb_nused = 0;
845         nlwb->lwb_sz = BP_GET_LSIZE(&nlwb->lwb_blk);
846         nlwb->lwb_buf = zio_buf_alloc(nlwb->lwb_sz);
847         nlwb->lwb_max_txg = txg;
848         nlwb->lwb_zio = NULL;
849
850         /*
851          * Put new lwb at the end of the log chain
852          */
853         mutex_enter(&zilog->zl_lock);
854         list_insert_tail(&zilog->zl_lwb_list, nlwb);
855         mutex_exit(&zilog->zl_lock);
856
857         /* Record the block for later vdev flushing */
858         zil_add_block(zilog, &lwb->lwb_blk);
859
860         /*
861          * kick off the write for the old log block
862          */
863         dprintf_bp(&lwb->lwb_blk, "lwb %p txg %llu: ", lwb, txg);
864         ASSERT(lwb->lwb_zio);
865         zio_nowait(lwb->lwb_zio);
866
867         return (nlwb);
868 }
869
870 static lwb_t *
871 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
872 {
873         lr_t *lrc = &itx->itx_lr; /* common log record */
874         lr_write_t *lr = (lr_write_t *)lrc;
875         uint64_t txg = lrc->lrc_txg;
876         uint64_t reclen = lrc->lrc_reclen;
877         uint64_t dlen;
878
879         if (lwb == NULL)
880                 return (NULL);
881         ASSERT(lwb->lwb_buf != NULL);
882
883         if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
884                 dlen = P2ROUNDUP_TYPED(
885                     lr->lr_length, sizeof (uint64_t), uint64_t);
886         else
887                 dlen = 0;
888
889         zilog->zl_cur_used += (reclen + dlen);
890
891         zil_lwb_write_init(zilog, lwb);
892
893         /*
894          * If this record won't fit in the current log block, start a new one.
895          */
896         if (lwb->lwb_nused + reclen + dlen > ZIL_BLK_DATA_SZ(lwb)) {
897                 lwb = zil_lwb_write_start(zilog, lwb);
898                 if (lwb == NULL)
899                         return (NULL);
900                 zil_lwb_write_init(zilog, lwb);
901                 ASSERT(lwb->lwb_nused == 0);
902                 if (reclen + dlen > ZIL_BLK_DATA_SZ(lwb)) {
903                         txg_wait_synced(zilog->zl_dmu_pool, txg);
904                         return (lwb);
905                 }
906         }
907
908         /*
909          * Update the lrc_seq, to be log record sequence number. See zil.h
910          * Then copy the record to the log buffer.
911          */
912         lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
913         bcopy(lrc, lwb->lwb_buf + lwb->lwb_nused, reclen);
914
915         /*
916          * If it's a write, fetch the data or get its blkptr as appropriate.
917          */
918         if (lrc->lrc_txtype == TX_WRITE) {
919                 if (txg > spa_freeze_txg(zilog->zl_spa))
920                         txg_wait_synced(zilog->zl_dmu_pool, txg);
921                 if (itx->itx_wr_state != WR_COPIED) {
922                         char *dbuf;
923                         int error;
924
925                         /* alignment is guaranteed */
926                         lr = (lr_write_t *)(lwb->lwb_buf + lwb->lwb_nused);
927                         if (dlen) {
928                                 ASSERT(itx->itx_wr_state == WR_NEED_COPY);
929                                 dbuf = lwb->lwb_buf + lwb->lwb_nused + reclen;
930                                 lr->lr_common.lrc_reclen += dlen;
931                         } else {
932                                 ASSERT(itx->itx_wr_state == WR_INDIRECT);
933                                 dbuf = NULL;
934                         }
935                         error = zilog->zl_get_data(
936                             itx->itx_private, lr, dbuf, lwb->lwb_zio);
937                         if (error) {
938                                 ASSERT(error == ENOENT || error == EEXIST ||
939                                     error == EALREADY);
940                                 return (lwb);
941                         }
942                 }
943         }
944
945         lwb->lwb_nused += reclen + dlen;
946         lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
947         ASSERT3U(lwb->lwb_nused, <=, ZIL_BLK_DATA_SZ(lwb));
948         ASSERT3U(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)), ==, 0);
949
950         return (lwb);
951 }
952
953 itx_t *
954 zil_itx_create(uint64_t txtype, size_t lrsize)
955 {
956         itx_t *itx;
957
958         lrsize = P2ROUNDUP_TYPED(lrsize, sizeof (uint64_t), size_t);
959
960         itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize, KM_SLEEP);
961         itx->itx_lr.lrc_txtype = txtype;
962         itx->itx_lr.lrc_reclen = lrsize;
963         itx->itx_sod = lrsize; /* if write & WR_NEED_COPY will be increased */
964         itx->itx_lr.lrc_seq = 0;        /* defensive */
965
966         return (itx);
967 }
968
969 uint64_t
970 zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
971 {
972         uint64_t seq;
973
974         ASSERT(itx->itx_lr.lrc_seq == 0);
975
976         mutex_enter(&zilog->zl_lock);
977         list_insert_tail(&zilog->zl_itx_list, itx);
978         zilog->zl_itx_list_sz += itx->itx_sod;
979         itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
980         itx->itx_lr.lrc_seq = seq = ++zilog->zl_itx_seq;
981         mutex_exit(&zilog->zl_lock);
982
983         return (seq);
984 }
985
986 /*
987  * Free up all in-memory intent log transactions that have now been synced.
988  */
989 static void
990 zil_itx_clean(zilog_t *zilog)
991 {
992         uint64_t synced_txg = spa_last_synced_txg(zilog->zl_spa);
993         uint64_t freeze_txg = spa_freeze_txg(zilog->zl_spa);
994         list_t clean_list;
995         itx_t *itx;
996
997         list_create(&clean_list, sizeof (itx_t), offsetof(itx_t, itx_node));
998
999         mutex_enter(&zilog->zl_lock);
1000         /* wait for a log writer to finish walking list */
1001         while (zilog->zl_writer) {
1002                 cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
1003         }
1004
1005         /*
1006          * Move the sync'd log transactions to a separate list so we can call
1007          * kmem_free without holding the zl_lock.
1008          *
1009          * There is no need to set zl_writer as we don't drop zl_lock here
1010          */
1011         while ((itx = list_head(&zilog->zl_itx_list)) != NULL &&
1012             itx->itx_lr.lrc_txg <= MIN(synced_txg, freeze_txg)) {
1013                 list_remove(&zilog->zl_itx_list, itx);
1014                 zilog->zl_itx_list_sz -= itx->itx_sod;
1015                 list_insert_tail(&clean_list, itx);
1016         }
1017         cv_broadcast(&zilog->zl_cv_writer);
1018         mutex_exit(&zilog->zl_lock);
1019
1020         /* destroy sync'd log transactions */
1021         while ((itx = list_head(&clean_list)) != NULL) {
1022                 list_remove(&clean_list, itx);
1023                 kmem_free(itx, offsetof(itx_t, itx_lr)
1024                     + itx->itx_lr.lrc_reclen);
1025         }
1026         list_destroy(&clean_list);
1027 }
1028
1029 /*
1030  * If there are any in-memory intent log transactions which have now been
1031  * synced then start up a taskq to free them.
1032  */
1033 void
1034 zil_clean(zilog_t *zilog)
1035 {
1036         itx_t *itx;
1037
1038         mutex_enter(&zilog->zl_lock);
1039         itx = list_head(&zilog->zl_itx_list);
1040         if ((itx != NULL) &&
1041             (itx->itx_lr.lrc_txg <= spa_last_synced_txg(zilog->zl_spa))) {
1042                 (void) taskq_dispatch(zilog->zl_clean_taskq,
1043                     (void (*)(void *))zil_itx_clean, zilog, TQ_NOSLEEP);
1044         }
1045         mutex_exit(&zilog->zl_lock);
1046 }
1047
1048 static void
1049 zil_commit_writer(zilog_t *zilog, uint64_t seq, uint64_t foid)
1050 {
1051         uint64_t txg;
1052         uint64_t commit_seq = 0;
1053         itx_t *itx, *itx_next = (itx_t *)-1;
1054         lwb_t *lwb;
1055         spa_t *spa;
1056
1057         zilog->zl_writer = B_TRUE;
1058         ASSERT(zilog->zl_root_zio == NULL);
1059         spa = zilog->zl_spa;
1060
1061         if (zilog->zl_suspend) {
1062                 lwb = NULL;
1063         } else {
1064                 lwb = list_tail(&zilog->zl_lwb_list);
1065                 if (lwb == NULL) {
1066                         /*
1067                          * Return if there's nothing to flush before we
1068                          * dirty the fs by calling zil_create()
1069                          */
1070                         if (list_is_empty(&zilog->zl_itx_list)) {
1071                                 zilog->zl_writer = B_FALSE;
1072                                 return;
1073                         }
1074                         mutex_exit(&zilog->zl_lock);
1075                         zil_create(zilog);
1076                         mutex_enter(&zilog->zl_lock);
1077                         lwb = list_tail(&zilog->zl_lwb_list);
1078                 }
1079         }
1080
1081         /* Loop through in-memory log transactions filling log blocks. */
1082         DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
1083         for (;;) {
1084                 /*
1085                  * Find the next itx to push:
1086                  * Push all transactions related to specified foid and all
1087                  * other transactions except TX_WRITE, TX_TRUNCATE,
1088                  * TX_SETATTR and TX_ACL for all other files.
1089                  */
1090                 if (itx_next != (itx_t *)-1)
1091                         itx = itx_next;
1092                 else
1093                         itx = list_head(&zilog->zl_itx_list);
1094                 for (; itx != NULL; itx = list_next(&zilog->zl_itx_list, itx)) {
1095                         if (foid == 0) /* push all foids? */
1096                                 break;
1097                         if (itx->itx_sync) /* push all O_[D]SYNC */
1098                                 break;
1099                         switch (itx->itx_lr.lrc_txtype) {
1100                         case TX_SETATTR:
1101                         case TX_WRITE:
1102                         case TX_TRUNCATE:
1103                         case TX_ACL:
1104                                 /* lr_foid is same offset for these records */
1105                                 if (((lr_write_t *)&itx->itx_lr)->lr_foid
1106                                     != foid) {
1107                                         continue; /* skip this record */
1108                                 }
1109                         }
1110                         break;
1111                 }
1112                 if (itx == NULL)
1113                         break;
1114
1115                 if ((itx->itx_lr.lrc_seq > seq) &&
1116                     ((lwb == NULL) || (lwb->lwb_nused == 0) ||
1117                     (lwb->lwb_nused + itx->itx_sod > ZIL_BLK_DATA_SZ(lwb)))) {
1118                         break;
1119                 }
1120
1121                 /*
1122                  * Save the next pointer.  Even though we soon drop
1123                  * zl_lock all threads that may change the list
1124                  * (another writer or zil_itx_clean) can't do so until
1125                  * they have zl_writer.
1126                  */
1127                 itx_next = list_next(&zilog->zl_itx_list, itx);
1128                 list_remove(&zilog->zl_itx_list, itx);
1129                 zilog->zl_itx_list_sz -= itx->itx_sod;
1130                 mutex_exit(&zilog->zl_lock);
1131                 txg = itx->itx_lr.lrc_txg;
1132                 ASSERT(txg);
1133
1134                 if (txg > spa_last_synced_txg(spa) ||
1135                     txg > spa_freeze_txg(spa))
1136                         lwb = zil_lwb_commit(zilog, itx, lwb);
1137                 kmem_free(itx, offsetof(itx_t, itx_lr)
1138                     + itx->itx_lr.lrc_reclen);
1139                 mutex_enter(&zilog->zl_lock);
1140         }
1141         DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
1142         /* determine commit sequence number */
1143         itx = list_head(&zilog->zl_itx_list);
1144         if (itx)
1145                 commit_seq = itx->itx_lr.lrc_seq;
1146         else
1147                 commit_seq = zilog->zl_itx_seq;
1148         mutex_exit(&zilog->zl_lock);
1149
1150         /* write the last block out */
1151         if (lwb != NULL && lwb->lwb_zio != NULL)
1152                 lwb = zil_lwb_write_start(zilog, lwb);
1153
1154         zilog->zl_prev_used = zilog->zl_cur_used;
1155         zilog->zl_cur_used = 0;
1156
1157         /*
1158          * Wait if necessary for the log blocks to be on stable storage.
1159          */
1160         if (zilog->zl_root_zio) {
1161                 DTRACE_PROBE1(zil__cw3, zilog_t *, zilog);
1162                 (void) zio_wait(zilog->zl_root_zio);
1163                 zilog->zl_root_zio = NULL;
1164                 DTRACE_PROBE1(zil__cw4, zilog_t *, zilog);
1165                 zil_flush_vdevs(zilog);
1166         }
1167
1168         if (zilog->zl_log_error || lwb == NULL) {
1169                 zilog->zl_log_error = 0;
1170                 txg_wait_synced(zilog->zl_dmu_pool, 0);
1171         }
1172
1173         mutex_enter(&zilog->zl_lock);
1174         zilog->zl_writer = B_FALSE;
1175
1176         ASSERT3U(commit_seq, >=, zilog->zl_commit_seq);
1177         zilog->zl_commit_seq = commit_seq;
1178 }
1179
1180 /*
1181  * Push zfs transactions to stable storage up to the supplied sequence number.
1182  * If foid is 0 push out all transactions, otherwise push only those
1183  * for that file or might have been used to create that file.
1184  */
1185 void
1186 zil_commit(zilog_t *zilog, uint64_t seq, uint64_t foid)
1187 {
1188         if (zilog == NULL || seq == 0)
1189                 return;
1190
1191         mutex_enter(&zilog->zl_lock);
1192
1193         seq = MIN(seq, zilog->zl_itx_seq);      /* cap seq at largest itx seq */
1194
1195         while (zilog->zl_writer) {
1196                 cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
1197                 if (seq < zilog->zl_commit_seq) {
1198                         mutex_exit(&zilog->zl_lock);
1199                         return;
1200                 }
1201         }
1202         zil_commit_writer(zilog, seq, foid); /* drops zl_lock */
1203         /* wake up others waiting on the commit */
1204         cv_broadcast(&zilog->zl_cv_writer);
1205         mutex_exit(&zilog->zl_lock);
1206 }
1207
1208 /*
1209  * Called in syncing context to free committed log blocks and update log header.
1210  */
1211 void
1212 zil_sync(zilog_t *zilog, dmu_tx_t *tx)
1213 {
1214         zil_header_t *zh = zil_header_in_syncing_context(zilog);
1215         uint64_t txg = dmu_tx_get_txg(tx);
1216         spa_t *spa = zilog->zl_spa;
1217         lwb_t *lwb;
1218
1219         mutex_enter(&zilog->zl_lock);
1220
1221         ASSERT(zilog->zl_stop_sync == 0);
1222
1223         zh->zh_replay_seq = zilog->zl_replayed_seq[txg & TXG_MASK];
1224
1225         if (zilog->zl_destroy_txg == txg) {
1226                 blkptr_t blk = zh->zh_log;
1227
1228                 ASSERT(list_head(&zilog->zl_lwb_list) == NULL);
1229                 ASSERT(spa_sync_pass(spa) == 1);
1230
1231                 bzero(zh, sizeof (zil_header_t));
1232                 bzero(zilog->zl_replayed_seq, sizeof (zilog->zl_replayed_seq));
1233
1234                 if (zilog->zl_keep_first) {
1235                         /*
1236                          * If this block was part of log chain that couldn't
1237                          * be claimed because a device was missing during
1238                          * zil_claim(), but that device later returns,
1239                          * then this block could erroneously appear valid.
1240                          * To guard against this, assign a new GUID to the new
1241                          * log chain so it doesn't matter what blk points to.
1242                          */
1243                         zil_init_log_chain(zilog, &blk);
1244                         zh->zh_log = blk;
1245                 }
1246         }
1247
1248         for (;;) {
1249                 lwb = list_head(&zilog->zl_lwb_list);
1250                 if (lwb == NULL) {
1251                         mutex_exit(&zilog->zl_lock);
1252                         return;
1253                 }
1254                 zh->zh_log = lwb->lwb_blk;
1255                 if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
1256                         break;
1257                 list_remove(&zilog->zl_lwb_list, lwb);
1258                 zio_free_blk(spa, &lwb->lwb_blk, txg);
1259                 kmem_cache_free(zil_lwb_cache, lwb);
1260
1261                 /*
1262                  * If we don't have anything left in the lwb list then
1263                  * we've had an allocation failure and we need to zero
1264                  * out the zil_header blkptr so that we don't end
1265                  * up freeing the same block twice.
1266                  */
1267                 if (list_head(&zilog->zl_lwb_list) == NULL)
1268                         BP_ZERO(&zh->zh_log);
1269         }
1270         mutex_exit(&zilog->zl_lock);
1271 }
1272
1273 void
1274 zil_init(void)
1275 {
1276         zil_lwb_cache = kmem_cache_create("zil_lwb_cache",
1277             sizeof (struct lwb), 0, NULL, NULL, NULL, NULL, NULL, 0);
1278 }
1279
1280 void
1281 zil_fini(void)
1282 {
1283         kmem_cache_destroy(zil_lwb_cache);
1284 }
1285
1286 zilog_t *
1287 zil_alloc(objset_t *os, zil_header_t *zh_phys)
1288 {
1289         zilog_t *zilog;
1290
1291         zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
1292
1293         zilog->zl_header = zh_phys;
1294         zilog->zl_os = os;
1295         zilog->zl_spa = dmu_objset_spa(os);
1296         zilog->zl_dmu_pool = dmu_objset_pool(os);
1297         zilog->zl_destroy_txg = TXG_INITIAL - 1;
1298
1299         mutex_init(&zilog->zl_lock, NULL, MUTEX_DEFAULT, NULL);
1300
1301         list_create(&zilog->zl_itx_list, sizeof (itx_t),
1302             offsetof(itx_t, itx_node));
1303
1304         list_create(&zilog->zl_lwb_list, sizeof (lwb_t),
1305             offsetof(lwb_t, lwb_node));
1306
1307         mutex_init(&zilog->zl_vdev_lock, NULL, MUTEX_DEFAULT, NULL);
1308
1309         avl_create(&zilog->zl_vdev_tree, zil_vdev_compare,
1310             sizeof (zil_vdev_node_t), offsetof(zil_vdev_node_t, zv_node));
1311
1312         cv_init(&zilog->zl_cv_writer, NULL, CV_DEFAULT, NULL);
1313         cv_init(&zilog->zl_cv_suspend, NULL, CV_DEFAULT, NULL);
1314
1315         return (zilog);
1316 }
1317
1318 void
1319 zil_free(zilog_t *zilog)
1320 {
1321         lwb_t *lwb;
1322
1323         zilog->zl_stop_sync = 1;
1324
1325         while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
1326                 list_remove(&zilog->zl_lwb_list, lwb);
1327                 if (lwb->lwb_buf != NULL)
1328                         zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
1329                 kmem_cache_free(zil_lwb_cache, lwb);
1330         }
1331         list_destroy(&zilog->zl_lwb_list);
1332
1333         avl_destroy(&zilog->zl_vdev_tree);
1334         mutex_destroy(&zilog->zl_vdev_lock);
1335
1336         ASSERT(list_head(&zilog->zl_itx_list) == NULL);
1337         list_destroy(&zilog->zl_itx_list);
1338         mutex_destroy(&zilog->zl_lock);
1339
1340         cv_destroy(&zilog->zl_cv_writer);
1341         cv_destroy(&zilog->zl_cv_suspend);
1342
1343         kmem_free(zilog, sizeof (zilog_t));
1344 }
1345
1346 /*
1347  * return true if the initial log block is not valid
1348  */
1349 static boolean_t
1350 zil_empty(zilog_t *zilog)
1351 {
1352         const zil_header_t *zh = zilog->zl_header;
1353         arc_buf_t *abuf = NULL;
1354
1355         if (BP_IS_HOLE(&zh->zh_log))
1356                 return (B_TRUE);
1357
1358         if (zil_read_log_block(zilog, &zh->zh_log, &abuf) != 0)
1359                 return (B_TRUE);
1360
1361         VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
1362         return (B_FALSE);
1363 }
1364
1365 /*
1366  * Open an intent log.
1367  */
1368 zilog_t *
1369 zil_open(objset_t *os, zil_get_data_t *get_data)
1370 {
1371         zilog_t *zilog = dmu_objset_zil(os);
1372
1373         zilog->zl_get_data = get_data;
1374         zilog->zl_clean_taskq = taskq_create("zil_clean", 1, minclsyspri,
1375             2, 2, TASKQ_PREPOPULATE);
1376
1377         return (zilog);
1378 }
1379
1380 /*
1381  * Close an intent log.
1382  */
1383 void
1384 zil_close(zilog_t *zilog)
1385 {
1386         /*
1387          * If the log isn't already committed, mark the objset dirty
1388          * (so zil_sync() will be called) and wait for that txg to sync.
1389          */
1390         if (!zil_is_committed(zilog)) {
1391                 uint64_t txg;
1392                 dmu_tx_t *tx = dmu_tx_create(zilog->zl_os);
1393                 (void) dmu_tx_assign(tx, TXG_WAIT);
1394                 dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
1395                 txg = dmu_tx_get_txg(tx);
1396                 dmu_tx_commit(tx);
1397                 txg_wait_synced(zilog->zl_dmu_pool, txg);
1398         }
1399
1400         taskq_destroy(zilog->zl_clean_taskq);
1401         zilog->zl_clean_taskq = NULL;
1402         zilog->zl_get_data = NULL;
1403
1404         zil_itx_clean(zilog);
1405         ASSERT(list_head(&zilog->zl_itx_list) == NULL);
1406 }
1407
1408 /*
1409  * Suspend an intent log.  While in suspended mode, we still honor
1410  * synchronous semantics, but we rely on txg_wait_synced() to do it.
1411  * We suspend the log briefly when taking a snapshot so that the snapshot
1412  * contains all the data it's supposed to, and has an empty intent log.
1413  */
1414 int
1415 zil_suspend(zilog_t *zilog)
1416 {
1417         const zil_header_t *zh = zilog->zl_header;
1418
1419         mutex_enter(&zilog->zl_lock);
1420         if (zh->zh_claim_txg != 0) {            /* unplayed log */
1421                 mutex_exit(&zilog->zl_lock);
1422                 return (EBUSY);
1423         }
1424         if (zilog->zl_suspend++ != 0) {
1425                 /*
1426                  * Someone else already began a suspend.
1427                  * Just wait for them to finish.
1428                  */
1429                 while (zilog->zl_suspending)
1430                         cv_wait(&zilog->zl_cv_suspend, &zilog->zl_lock);
1431                 mutex_exit(&zilog->zl_lock);
1432                 return (0);
1433         }
1434         zilog->zl_suspending = B_TRUE;
1435         mutex_exit(&zilog->zl_lock);
1436
1437         zil_commit(zilog, UINT64_MAX, 0);
1438
1439         /*
1440          * Wait for any in-flight log writes to complete.
1441          */
1442         mutex_enter(&zilog->zl_lock);
1443         while (zilog->zl_writer)
1444                 cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
1445         mutex_exit(&zilog->zl_lock);
1446
1447         zil_destroy(zilog, B_FALSE);
1448
1449         mutex_enter(&zilog->zl_lock);
1450         zilog->zl_suspending = B_FALSE;
1451         cv_broadcast(&zilog->zl_cv_suspend);
1452         mutex_exit(&zilog->zl_lock);
1453
1454         return (0);
1455 }
1456
1457 void
1458 zil_resume(zilog_t *zilog)
1459 {
1460         mutex_enter(&zilog->zl_lock);
1461         ASSERT(zilog->zl_suspend != 0);
1462         zilog->zl_suspend--;
1463         mutex_exit(&zilog->zl_lock);
1464 }
1465
1466 typedef struct zil_replay_arg {
1467         objset_t        *zr_os;
1468         zil_replay_func_t **zr_replay;
1469         void            *zr_arg;
1470         boolean_t       zr_byteswap;
1471         char            *zr_lrbuf;
1472 } zil_replay_arg_t;
1473
1474 static void
1475 zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
1476 {
1477         zil_replay_arg_t *zr = zra;
1478         const zil_header_t *zh = zilog->zl_header;
1479         uint64_t reclen = lr->lrc_reclen;
1480         uint64_t txtype = lr->lrc_txtype;
1481         char *name;
1482         int pass, error;
1483
1484         if (!zilog->zl_replay)                  /* giving up */
1485                 return;
1486
1487         if (lr->lrc_txg < claim_txg)            /* already committed */
1488                 return;
1489
1490         if (lr->lrc_seq <= zh->zh_replay_seq)   /* already replayed */
1491                 return;
1492
1493         /* Strip case-insensitive bit, still present in log record */
1494         txtype &= ~TX_CI;
1495
1496         if (txtype == 0 || txtype >= TX_MAX_TYPE) {
1497                 error = EINVAL;
1498                 goto bad;
1499         }
1500
1501         /*
1502          * Make a copy of the data so we can revise and extend it.
1503          */
1504         bcopy(lr, zr->zr_lrbuf, reclen);
1505
1506         /*
1507          * The log block containing this lr may have been byteswapped
1508          * so that we can easily examine common fields like lrc_txtype.
1509          * However, the log is a mix of different data types, and only the
1510          * replay vectors know how to byteswap their records.  Therefore, if
1511          * the lr was byteswapped, undo it before invoking the replay vector.
1512          */
1513         if (zr->zr_byteswap)
1514                 byteswap_uint64_array(zr->zr_lrbuf, reclen);
1515
1516         /*
1517          * If this is a TX_WRITE with a blkptr, suck in the data.
1518          */
1519         if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
1520                 lr_write_t *lrw = (lr_write_t *)lr;
1521                 blkptr_t *wbp = &lrw->lr_blkptr;
1522                 uint64_t wlen = lrw->lr_length;
1523                 char *wbuf = zr->zr_lrbuf + reclen;
1524
1525                 if (BP_IS_HOLE(wbp)) {  /* compressed to a hole */
1526                         bzero(wbuf, wlen);
1527                 } else {
1528                         /*
1529                          * A subsequent write may have overwritten this block,
1530                          * in which case wbp may have been been freed and
1531                          * reallocated, and our read of wbp may fail with a
1532                          * checksum error.  We can safely ignore this because
1533                          * the later write will provide the correct data.
1534                          */
1535                         zbookmark_t zb;
1536
1537                         zb.zb_objset = dmu_objset_id(zilog->zl_os);
1538                         zb.zb_object = lrw->lr_foid;
1539                         zb.zb_level = -1;
1540                         zb.zb_blkid = lrw->lr_offset / BP_GET_LSIZE(wbp);
1541
1542                         (void) zio_wait(zio_read(NULL, zilog->zl_spa,
1543                             wbp, wbuf, BP_GET_LSIZE(wbp), NULL, NULL,
1544                             ZIO_PRIORITY_SYNC_READ,
1545                             ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE, &zb));
1546                         (void) memmove(wbuf, wbuf + lrw->lr_blkoff, wlen);
1547                 }
1548         }
1549
1550         /*
1551          * We must now do two things atomically: replay this log record,
1552          * and update the log header sequence number to reflect the fact that
1553          * we did so. At the end of each replay function the sequence number
1554          * is updated if we are in replay mode.
1555          */
1556         for (pass = 1; pass <= 2; pass++) {
1557                 zilog->zl_replaying_seq = lr->lrc_seq;
1558                 /* Only byteswap (if needed) on the 1st pass.  */
1559                 error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lrbuf,
1560                     zr->zr_byteswap && pass == 1);
1561
1562                 if (!error)
1563                         return;
1564
1565                 /*
1566                  * The DMU's dnode layer doesn't see removes until the txg
1567                  * commits, so a subsequent claim can spuriously fail with
1568                  * EEXIST. So if we receive any error we try syncing out
1569                  * any removes then retry the transaction.
1570                  */
1571                 if (pass == 1)
1572                         txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
1573         }
1574
1575 bad:
1576         ASSERT(error);
1577         name = kmem_alloc(MAXNAMELEN, KM_SLEEP);
1578         dmu_objset_name(zr->zr_os, name);
1579         cmn_err(CE_WARN, "ZFS replay transaction error %d, "
1580             "dataset %s, seq 0x%llx, txtype %llu %s\n",
1581             error, name, (u_longlong_t)lr->lrc_seq, (u_longlong_t)txtype,
1582             (lr->lrc_txtype & TX_CI) ? "CI" : "");
1583         zilog->zl_replay = B_FALSE;
1584         kmem_free(name, MAXNAMELEN);
1585 }
1586
1587 /* ARGSUSED */
1588 static void
1589 zil_incr_blks(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
1590 {
1591         zilog->zl_replay_blks++;
1592 }
1593
1594 /*
1595  * If this dataset has a non-empty intent log, replay it and destroy it.
1596  */
1597 void
1598 zil_replay(objset_t *os, void *arg, zil_replay_func_t *replay_func[TX_MAX_TYPE])
1599 {
1600         zilog_t *zilog = dmu_objset_zil(os);
1601         const zil_header_t *zh = zilog->zl_header;
1602         zil_replay_arg_t zr;
1603
1604         if (zil_empty(zilog)) {
1605                 zil_destroy(zilog, B_TRUE);
1606                 return;
1607         }
1608
1609         zr.zr_os = os;
1610         zr.zr_replay = replay_func;
1611         zr.zr_arg = arg;
1612         zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
1613         zr.zr_lrbuf = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
1614
1615         /*
1616          * Wait for in-progress removes to sync before starting replay.
1617          */
1618         txg_wait_synced(zilog->zl_dmu_pool, 0);
1619
1620         zilog->zl_replay = B_TRUE;
1621         zilog->zl_replay_time = lbolt;
1622         ASSERT(zilog->zl_replay_blks == 0);
1623         (void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
1624             zh->zh_claim_txg);
1625         kmem_free(zr.zr_lrbuf, 2 * SPA_MAXBLOCKSIZE);
1626
1627         zil_destroy(zilog, B_FALSE);
1628         txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
1629         zilog->zl_replay = B_FALSE;
1630 }
1631
1632 /*
1633  * Report whether all transactions are committed
1634  */
1635 int
1636 zil_is_committed(zilog_t *zilog)
1637 {
1638         lwb_t *lwb;
1639         int ret;
1640
1641         mutex_enter(&zilog->zl_lock);
1642         while (zilog->zl_writer)
1643                 cv_wait(&zilog->zl_cv_writer, &zilog->zl_lock);
1644
1645         /* recent unpushed intent log transactions? */
1646         if (!list_is_empty(&zilog->zl_itx_list)) {
1647                 ret = B_FALSE;
1648                 goto out;
1649         }
1650
1651         /* intent log never used? */
1652         lwb = list_head(&zilog->zl_lwb_list);
1653         if (lwb == NULL) {
1654                 ret = B_TRUE;
1655                 goto out;
1656         }
1657
1658         /*
1659          * more than 1 log buffer means zil_sync() hasn't yet freed
1660          * entries after a txg has committed
1661          */
1662         if (list_next(&zilog->zl_lwb_list, lwb)) {
1663                 ret = B_FALSE;
1664                 goto out;
1665         }
1666
1667         ASSERT(zil_empty(zilog));
1668         ret = B_TRUE;
1669 out:
1670         cv_broadcast(&zilog->zl_cv_writer);
1671         mutex_exit(&zilog->zl_lock);
1672         return (ret);
1673 }