Improve meta data performance
[zfs.git] / module / zfs / zio.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright (c) 2011 by Delphix. All rights reserved.
24  */
25
26 #include <sys/zfs_context.h>
27 #include <sys/fm/fs/zfs.h>
28 #include <sys/spa.h>
29 #include <sys/txg.h>
30 #include <sys/spa_impl.h>
31 #include <sys/vdev_impl.h>
32 #include <sys/zio_impl.h>
33 #include <sys/zio_compress.h>
34 #include <sys/zio_checksum.h>
35 #include <sys/dmu_objset.h>
36 #include <sys/arc.h>
37 #include <sys/ddt.h>
38
39 /*
40  * ==========================================================================
41  * I/O priority table
42  * ==========================================================================
43  */
44 uint8_t zio_priority_table[ZIO_PRIORITY_TABLE_SIZE] = {
45         0,      /* ZIO_PRIORITY_NOW             */
46         0,      /* ZIO_PRIORITY_SYNC_READ       */
47         0,      /* ZIO_PRIORITY_SYNC_WRITE      */
48         0,      /* ZIO_PRIORITY_LOG_WRITE       */
49         1,      /* ZIO_PRIORITY_CACHE_FILL      */
50         1,      /* ZIO_PRIORITY_AGG             */
51         4,      /* ZIO_PRIORITY_FREE            */
52         4,      /* ZIO_PRIORITY_ASYNC_WRITE     */
53         6,      /* ZIO_PRIORITY_ASYNC_READ      */
54         10,     /* ZIO_PRIORITY_RESILVER        */
55         20,     /* ZIO_PRIORITY_SCRUB           */
56         2,      /* ZIO_PRIORITY_DDT_PREFETCH    */
57 };
58
59 /*
60  * ==========================================================================
61  * I/O type descriptions
62  * ==========================================================================
63  */
64 char *zio_type_name[ZIO_TYPES] = {
65         "z_null", "z_rd", "z_wr", "z_fr", "z_cl", "z_ioctl"
66 };
67
68 /*
69  * ==========================================================================
70  * I/O kmem caches
71  * ==========================================================================
72  */
73 kmem_cache_t *zio_cache;
74 kmem_cache_t *zio_link_cache;
75 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
76 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
77 int zio_bulk_flags = 0;
78 int zio_delay_max = ZIO_DELAY_MAX;
79
80 #ifdef _KERNEL
81 extern vmem_t *zio_alloc_arena;
82 #endif
83 extern int zfs_mg_alloc_failures;
84
85 /*
86  * An allocating zio is one that either currently has the DVA allocate
87  * stage set or will have it later in its lifetime.
88  */
89 #define IO_IS_ALLOCATING(zio) ((zio)->io_orig_pipeline & ZIO_STAGE_DVA_ALLOCATE)
90
91 int zio_requeue_io_start_cut_in_line = 1;
92
93 #ifdef ZFS_DEBUG
94 int zio_buf_debug_limit = 16384;
95 #else
96 int zio_buf_debug_limit = 0;
97 #endif
98
99 static inline void __zio_execute(zio_t *zio);
100
101 void
102 zio_init(void)
103 {
104         size_t c;
105         vmem_t *data_alloc_arena = NULL;
106
107 #ifdef _KERNEL
108         data_alloc_arena = zio_alloc_arena;
109 #endif
110         zio_cache = kmem_cache_create("zio_cache",
111             sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, KMC_KMEM);
112         zio_link_cache = kmem_cache_create("zio_link_cache",
113             sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, KMC_KMEM);
114
115         /*
116          * For small buffers, we want a cache for each multiple of
117          * SPA_MINBLOCKSIZE.  For medium-size buffers, we want a cache
118          * for each quarter-power of 2.  For large buffers, we want
119          * a cache for each multiple of PAGESIZE.
120          */
121         for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
122                 size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
123                 size_t p2 = size;
124                 size_t align = 0;
125
126                 while (p2 & (p2 - 1))
127                         p2 &= p2 - 1;
128
129                 if (size <= 4 * SPA_MINBLOCKSIZE) {
130                         align = SPA_MINBLOCKSIZE;
131                 } else if (P2PHASE(size, PAGESIZE) == 0) {
132                         align = PAGESIZE;
133                 } else if (P2PHASE(size, p2 >> 2) == 0) {
134                         align = p2 >> 2;
135                 }
136
137                 if (align != 0) {
138                         char name[36];
139                         int flags = zio_bulk_flags;
140
141                         /*
142                          * The smallest buffers (512b) are heavily used and
143                          * experience a lot of churn.  The slabs allocated
144                          * for them are also relatively small (32K).  Thus
145                          * in over to avoid expensive calls to vmalloc() we
146                          * make an exception to the usual slab allocation
147                          * policy and force these buffers to be kmem backed.
148                          */
149                         if (size == (1 << SPA_MINBLOCKSHIFT))
150                                 flags |= KMC_KMEM;
151
152                         (void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
153                         zio_buf_cache[c] = kmem_cache_create(name, size,
154                             align, NULL, NULL, NULL, NULL, NULL, flags);
155
156                         (void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
157                         zio_data_buf_cache[c] = kmem_cache_create(name, size,
158                             align, NULL, NULL, NULL, NULL,
159                             data_alloc_arena, flags);
160                 }
161         }
162
163         while (--c != 0) {
164                 ASSERT(zio_buf_cache[c] != NULL);
165                 if (zio_buf_cache[c - 1] == NULL)
166                         zio_buf_cache[c - 1] = zio_buf_cache[c];
167
168                 ASSERT(zio_data_buf_cache[c] != NULL);
169                 if (zio_data_buf_cache[c - 1] == NULL)
170                         zio_data_buf_cache[c - 1] = zio_data_buf_cache[c];
171         }
172
173         /*
174          * The zio write taskqs have 1 thread per cpu, allow 1/2 of the taskqs
175          * to fail 3 times per txg or 8 failures, whichever is greater.
176          */
177         zfs_mg_alloc_failures = MAX((3 * max_ncpus / 2), 8);
178
179         zio_inject_init();
180 }
181
182 void
183 zio_fini(void)
184 {
185         size_t c;
186         kmem_cache_t *last_cache = NULL;
187         kmem_cache_t *last_data_cache = NULL;
188
189         for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
190                 if (zio_buf_cache[c] != last_cache) {
191                         last_cache = zio_buf_cache[c];
192                         kmem_cache_destroy(zio_buf_cache[c]);
193                 }
194                 zio_buf_cache[c] = NULL;
195
196                 if (zio_data_buf_cache[c] != last_data_cache) {
197                         last_data_cache = zio_data_buf_cache[c];
198                         kmem_cache_destroy(zio_data_buf_cache[c]);
199                 }
200                 zio_data_buf_cache[c] = NULL;
201         }
202
203         kmem_cache_destroy(zio_link_cache);
204         kmem_cache_destroy(zio_cache);
205
206         zio_inject_fini();
207 }
208
209 /*
210  * ==========================================================================
211  * Allocate and free I/O buffers
212  * ==========================================================================
213  */
214
215 /*
216  * Use zio_buf_alloc to allocate ZFS metadata.  This data will appear in a
217  * crashdump if the kernel panics, so use it judiciously.  Obviously, it's
218  * useful to inspect ZFS metadata, but if possible, we should avoid keeping
219  * excess / transient data in-core during a crashdump.
220  */
221 void *
222 zio_buf_alloc(size_t size)
223 {
224         size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
225
226         ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
227
228         return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE));
229 }
230
231 /*
232  * Use zio_data_buf_alloc to allocate data.  The data will not appear in a
233  * crashdump if the kernel panics.  This exists so that we will limit the amount
234  * of ZFS data that shows up in a kernel crashdump.  (Thus reducing the amount
235  * of kernel heap dumped to disk when the kernel panics)
236  */
237 void *
238 zio_data_buf_alloc(size_t size)
239 {
240         size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
241
242         ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
243
244         return (kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE));
245 }
246
247 void
248 zio_buf_free(void *buf, size_t size)
249 {
250         size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
251
252         ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
253
254         kmem_cache_free(zio_buf_cache[c], buf);
255 }
256
257 void
258 zio_data_buf_free(void *buf, size_t size)
259 {
260         size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
261
262         ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
263
264         kmem_cache_free(zio_data_buf_cache[c], buf);
265 }
266
267 /*
268  * ==========================================================================
269  * Push and pop I/O transform buffers
270  * ==========================================================================
271  */
272 static void
273 zio_push_transform(zio_t *zio, void *data, uint64_t size, uint64_t bufsize,
274         zio_transform_func_t *transform)
275 {
276         zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_PUSHPAGE);
277
278         zt->zt_orig_data = zio->io_data;
279         zt->zt_orig_size = zio->io_size;
280         zt->zt_bufsize = bufsize;
281         zt->zt_transform = transform;
282
283         zt->zt_next = zio->io_transform_stack;
284         zio->io_transform_stack = zt;
285
286         zio->io_data = data;
287         zio->io_size = size;
288 }
289
290 static void
291 zio_pop_transforms(zio_t *zio)
292 {
293         zio_transform_t *zt;
294
295         while ((zt = zio->io_transform_stack) != NULL) {
296                 if (zt->zt_transform != NULL)
297                         zt->zt_transform(zio,
298                             zt->zt_orig_data, zt->zt_orig_size);
299
300                 if (zt->zt_bufsize != 0)
301                         zio_buf_free(zio->io_data, zt->zt_bufsize);
302
303                 zio->io_data = zt->zt_orig_data;
304                 zio->io_size = zt->zt_orig_size;
305                 zio->io_transform_stack = zt->zt_next;
306
307                 kmem_free(zt, sizeof (zio_transform_t));
308         }
309 }
310
311 /*
312  * ==========================================================================
313  * I/O transform callbacks for subblocks and decompression
314  * ==========================================================================
315  */
316 static void
317 zio_subblock(zio_t *zio, void *data, uint64_t size)
318 {
319         ASSERT(zio->io_size > size);
320
321         if (zio->io_type == ZIO_TYPE_READ)
322                 bcopy(zio->io_data, data, size);
323 }
324
325 static void
326 zio_decompress(zio_t *zio, void *data, uint64_t size)
327 {
328         if (zio->io_error == 0 &&
329             zio_decompress_data(BP_GET_COMPRESS(zio->io_bp),
330             zio->io_data, data, zio->io_size, size) != 0)
331                 zio->io_error = EIO;
332 }
333
334 /*
335  * ==========================================================================
336  * I/O parent/child relationships and pipeline interlocks
337  * ==========================================================================
338  */
339 /*
340  * NOTE - Callers to zio_walk_parents() and zio_walk_children must
341  *        continue calling these functions until they return NULL.
342  *        Otherwise, the next caller will pick up the list walk in
343  *        some indeterminate state.  (Otherwise every caller would
344  *        have to pass in a cookie to keep the state represented by
345  *        io_walk_link, which gets annoying.)
346  */
347 zio_t *
348 zio_walk_parents(zio_t *cio)
349 {
350         zio_link_t *zl = cio->io_walk_link;
351         list_t *pl = &cio->io_parent_list;
352
353         zl = (zl == NULL) ? list_head(pl) : list_next(pl, zl);
354         cio->io_walk_link = zl;
355
356         if (zl == NULL)
357                 return (NULL);
358
359         ASSERT(zl->zl_child == cio);
360         return (zl->zl_parent);
361 }
362
363 zio_t *
364 zio_walk_children(zio_t *pio)
365 {
366         zio_link_t *zl = pio->io_walk_link;
367         list_t *cl = &pio->io_child_list;
368
369         zl = (zl == NULL) ? list_head(cl) : list_next(cl, zl);
370         pio->io_walk_link = zl;
371
372         if (zl == NULL)
373                 return (NULL);
374
375         ASSERT(zl->zl_parent == pio);
376         return (zl->zl_child);
377 }
378
379 zio_t *
380 zio_unique_parent(zio_t *cio)
381 {
382         zio_t *pio = zio_walk_parents(cio);
383
384         VERIFY(zio_walk_parents(cio) == NULL);
385         return (pio);
386 }
387
388 void
389 zio_add_child(zio_t *pio, zio_t *cio)
390 {
391         zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_PUSHPAGE);
392         int w;
393
394         /*
395          * Logical I/Os can have logical, gang, or vdev children.
396          * Gang I/Os can have gang or vdev children.
397          * Vdev I/Os can only have vdev children.
398          * The following ASSERT captures all of these constraints.
399          */
400         ASSERT(cio->io_child_type <= pio->io_child_type);
401
402         zl->zl_parent = pio;
403         zl->zl_child = cio;
404
405         mutex_enter(&cio->io_lock);
406         mutex_enter(&pio->io_lock);
407
408         ASSERT(pio->io_state[ZIO_WAIT_DONE] == 0);
409
410         for (w = 0; w < ZIO_WAIT_TYPES; w++)
411                 pio->io_children[cio->io_child_type][w] += !cio->io_state[w];
412
413         list_insert_head(&pio->io_child_list, zl);
414         list_insert_head(&cio->io_parent_list, zl);
415
416         pio->io_child_count++;
417         cio->io_parent_count++;
418
419         mutex_exit(&pio->io_lock);
420         mutex_exit(&cio->io_lock);
421 }
422
423 static void
424 zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
425 {
426         ASSERT(zl->zl_parent == pio);
427         ASSERT(zl->zl_child == cio);
428
429         mutex_enter(&cio->io_lock);
430         mutex_enter(&pio->io_lock);
431
432         list_remove(&pio->io_child_list, zl);
433         list_remove(&cio->io_parent_list, zl);
434
435         pio->io_child_count--;
436         cio->io_parent_count--;
437
438         mutex_exit(&pio->io_lock);
439         mutex_exit(&cio->io_lock);
440
441         kmem_cache_free(zio_link_cache, zl);
442 }
443
444 static boolean_t
445 zio_wait_for_children(zio_t *zio, enum zio_child child, enum zio_wait_type wait)
446 {
447         uint64_t *countp = &zio->io_children[child][wait];
448         boolean_t waiting = B_FALSE;
449
450         mutex_enter(&zio->io_lock);
451         ASSERT(zio->io_stall == NULL);
452         if (*countp != 0) {
453                 zio->io_stage >>= 1;
454                 zio->io_stall = countp;
455                 waiting = B_TRUE;
456         }
457         mutex_exit(&zio->io_lock);
458
459         return (waiting);
460 }
461
462 __attribute__((always_inline))
463 static inline void
464 zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
465 {
466         uint64_t *countp = &pio->io_children[zio->io_child_type][wait];
467         int *errorp = &pio->io_child_error[zio->io_child_type];
468
469         mutex_enter(&pio->io_lock);
470         if (zio->io_error && !(zio->io_flags & ZIO_FLAG_DONT_PROPAGATE))
471                 *errorp = zio_worst_error(*errorp, zio->io_error);
472         pio->io_reexecute |= zio->io_reexecute;
473         ASSERT3U(*countp, >, 0);
474         if (--*countp == 0 && pio->io_stall == countp) {
475                 pio->io_stall = NULL;
476                 mutex_exit(&pio->io_lock);
477                 __zio_execute(pio);
478         } else {
479                 mutex_exit(&pio->io_lock);
480         }
481 }
482
483 static void
484 zio_inherit_child_errors(zio_t *zio, enum zio_child c)
485 {
486         if (zio->io_child_error[c] != 0 && zio->io_error == 0)
487                 zio->io_error = zio->io_child_error[c];
488 }
489
490 /*
491  * ==========================================================================
492  * Create the various types of I/O (read, write, free, etc)
493  * ==========================================================================
494  */
495 static zio_t *
496 zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
497     void *data, uint64_t size, zio_done_func_t *done, void *private,
498     zio_type_t type, int priority, enum zio_flag flags,
499     vdev_t *vd, uint64_t offset, const zbookmark_t *zb,
500     enum zio_stage stage, enum zio_stage pipeline)
501 {
502         zio_t *zio;
503
504         ASSERT3U(size, <=, SPA_MAXBLOCKSIZE);
505         ASSERT(P2PHASE(size, SPA_MINBLOCKSIZE) == 0);
506         ASSERT(P2PHASE(offset, SPA_MINBLOCKSIZE) == 0);
507
508         ASSERT(!vd || spa_config_held(spa, SCL_STATE_ALL, RW_READER));
509         ASSERT(!bp || !(flags & ZIO_FLAG_CONFIG_WRITER));
510         ASSERT(vd || stage == ZIO_STAGE_OPEN);
511
512         zio = kmem_cache_alloc(zio_cache, KM_PUSHPAGE);
513         bzero(zio, sizeof (zio_t));
514
515         mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
516         cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
517
518         list_create(&zio->io_parent_list, sizeof (zio_link_t),
519             offsetof(zio_link_t, zl_parent_node));
520         list_create(&zio->io_child_list, sizeof (zio_link_t),
521             offsetof(zio_link_t, zl_child_node));
522
523         if (vd != NULL)
524                 zio->io_child_type = ZIO_CHILD_VDEV;
525         else if (flags & ZIO_FLAG_GANG_CHILD)
526                 zio->io_child_type = ZIO_CHILD_GANG;
527         else if (flags & ZIO_FLAG_DDT_CHILD)
528                 zio->io_child_type = ZIO_CHILD_DDT;
529         else
530                 zio->io_child_type = ZIO_CHILD_LOGICAL;
531
532         if (bp != NULL) {
533                 zio->io_bp = (blkptr_t *)bp;
534                 zio->io_bp_copy = *bp;
535                 zio->io_bp_orig = *bp;
536                 if (type != ZIO_TYPE_WRITE ||
537                     zio->io_child_type == ZIO_CHILD_DDT)
538                         zio->io_bp = &zio->io_bp_copy;  /* so caller can free */
539                 if (zio->io_child_type == ZIO_CHILD_LOGICAL)
540                         zio->io_logical = zio;
541                 if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
542                         pipeline |= ZIO_GANG_STAGES;
543         }
544
545         zio->io_spa = spa;
546         zio->io_txg = txg;
547         zio->io_done = done;
548         zio->io_private = private;
549         zio->io_type = type;
550         zio->io_priority = priority;
551         zio->io_vd = vd;
552         zio->io_offset = offset;
553         zio->io_orig_data = zio->io_data = data;
554         zio->io_orig_size = zio->io_size = size;
555         zio->io_orig_flags = zio->io_flags = flags;
556         zio->io_orig_stage = zio->io_stage = stage;
557         zio->io_orig_pipeline = zio->io_pipeline = pipeline;
558
559         zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
560         zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
561
562         if (zb != NULL)
563                 zio->io_bookmark = *zb;
564
565         if (pio != NULL) {
566                 if (zio->io_logical == NULL)
567                         zio->io_logical = pio->io_logical;
568                 if (zio->io_child_type == ZIO_CHILD_GANG)
569                         zio->io_gang_leader = pio->io_gang_leader;
570                 zio_add_child(pio, zio);
571         }
572
573         return (zio);
574 }
575
576 static void
577 zio_destroy(zio_t *zio)
578 {
579         list_destroy(&zio->io_parent_list);
580         list_destroy(&zio->io_child_list);
581         mutex_destroy(&zio->io_lock);
582         cv_destroy(&zio->io_cv);
583         kmem_cache_free(zio_cache, zio);
584 }
585
586 zio_t *
587 zio_null(zio_t *pio, spa_t *spa, vdev_t *vd, zio_done_func_t *done,
588     void *private, enum zio_flag flags)
589 {
590         zio_t *zio;
591
592         zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
593             ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
594             ZIO_STAGE_OPEN, ZIO_INTERLOCK_PIPELINE);
595
596         return (zio);
597 }
598
599 zio_t *
600 zio_root(spa_t *spa, zio_done_func_t *done, void *private, enum zio_flag flags)
601 {
602         return (zio_null(NULL, spa, NULL, done, private, flags));
603 }
604
605 zio_t *
606 zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
607     void *data, uint64_t size, zio_done_func_t *done, void *private,
608     int priority, enum zio_flag flags, const zbookmark_t *zb)
609 {
610         zio_t *zio;
611
612         zio = zio_create(pio, spa, BP_PHYSICAL_BIRTH(bp), bp,
613             data, size, done, private,
614             ZIO_TYPE_READ, priority, flags, NULL, 0, zb,
615             ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
616             ZIO_DDT_CHILD_READ_PIPELINE : ZIO_READ_PIPELINE);
617
618         return (zio);
619 }
620
621 zio_t *
622 zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
623     void *data, uint64_t size, const zio_prop_t *zp,
624     zio_done_func_t *ready, zio_done_func_t *done, void *private,
625     int priority, enum zio_flag flags, const zbookmark_t *zb)
626 {
627         zio_t *zio;
628
629         ASSERT(zp->zp_checksum >= ZIO_CHECKSUM_OFF &&
630             zp->zp_checksum < ZIO_CHECKSUM_FUNCTIONS &&
631             zp->zp_compress >= ZIO_COMPRESS_OFF &&
632             zp->zp_compress < ZIO_COMPRESS_FUNCTIONS &&
633             zp->zp_type < DMU_OT_NUMTYPES &&
634             zp->zp_level < 32 &&
635             zp->zp_copies > 0 &&
636             zp->zp_copies <= spa_max_replication(spa) &&
637             zp->zp_dedup <= 1 &&
638             zp->zp_dedup_verify <= 1);
639
640         zio = zio_create(pio, spa, txg, bp, data, size, done, private,
641             ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
642             ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
643             ZIO_DDT_CHILD_WRITE_PIPELINE : ZIO_WRITE_PIPELINE);
644
645         zio->io_ready = ready;
646         zio->io_prop = *zp;
647
648         return (zio);
649 }
650
651 zio_t *
652 zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, void *data,
653     uint64_t size, zio_done_func_t *done, void *private, int priority,
654     enum zio_flag flags, zbookmark_t *zb)
655 {
656         zio_t *zio;
657
658         zio = zio_create(pio, spa, txg, bp, data, size, done, private,
659             ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
660             ZIO_STAGE_OPEN, ZIO_REWRITE_PIPELINE);
661
662         return (zio);
663 }
664
665 void
666 zio_write_override(zio_t *zio, blkptr_t *bp, int copies)
667 {
668         ASSERT(zio->io_type == ZIO_TYPE_WRITE);
669         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
670         ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
671         ASSERT(zio->io_txg == spa_syncing_txg(zio->io_spa));
672
673         zio->io_prop.zp_copies = copies;
674         zio->io_bp_override = bp;
675 }
676
677 void
678 zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
679 {
680         bplist_append(&spa->spa_free_bplist[txg & TXG_MASK], bp);
681 }
682
683 zio_t *
684 zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
685     enum zio_flag flags)
686 {
687         zio_t *zio;
688
689         dprintf_bp(bp, "freeing in txg %llu, pass %u",
690             (longlong_t)txg, spa->spa_sync_pass);
691
692         ASSERT(!BP_IS_HOLE(bp));
693         ASSERT(spa_syncing_txg(spa) == txg);
694         ASSERT(spa_sync_pass(spa) <= SYNC_PASS_DEFERRED_FREE);
695
696         zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
697             NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_FREE, flags,
698             NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_FREE_PIPELINE);
699
700         return (zio);
701 }
702
703 zio_t *
704 zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
705     zio_done_func_t *done, void *private, enum zio_flag flags)
706 {
707         zio_t *zio;
708
709         /*
710          * A claim is an allocation of a specific block.  Claims are needed
711          * to support immediate writes in the intent log.  The issue is that
712          * immediate writes contain committed data, but in a txg that was
713          * *not* committed.  Upon opening the pool after an unclean shutdown,
714          * the intent log claims all blocks that contain immediate write data
715          * so that the SPA knows they're in use.
716          *
717          * All claims *must* be resolved in the first txg -- before the SPA
718          * starts allocating blocks -- so that nothing is allocated twice.
719          * If txg == 0 we just verify that the block is claimable.
720          */
721         ASSERT3U(spa->spa_uberblock.ub_rootbp.blk_birth, <, spa_first_txg(spa));
722         ASSERT(txg == spa_first_txg(spa) || txg == 0);
723         ASSERT(!BP_GET_DEDUP(bp) || !spa_writeable(spa));       /* zdb(1M) */
724
725         zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
726             done, private, ZIO_TYPE_CLAIM, ZIO_PRIORITY_NOW, flags,
727             NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_CLAIM_PIPELINE);
728
729         return (zio);
730 }
731
732 zio_t *
733 zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
734     zio_done_func_t *done, void *private, int priority, enum zio_flag flags)
735 {
736         zio_t *zio;
737         int c;
738
739         if (vd->vdev_children == 0) {
740                 zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
741                     ZIO_TYPE_IOCTL, priority, flags, vd, 0, NULL,
742                     ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
743
744                 zio->io_cmd = cmd;
745         } else {
746                 zio = zio_null(pio, spa, NULL, NULL, NULL, flags);
747
748                 for (c = 0; c < vd->vdev_children; c++)
749                         zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
750                             done, private, priority, flags));
751         }
752
753         return (zio);
754 }
755
756 zio_t *
757 zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
758     void *data, int checksum, zio_done_func_t *done, void *private,
759     int priority, enum zio_flag flags, boolean_t labels)
760 {
761         zio_t *zio;
762
763         ASSERT(vd->vdev_children == 0);
764         ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
765             offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
766         ASSERT3U(offset + size, <=, vd->vdev_psize);
767
768         zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
769             ZIO_TYPE_READ, priority, flags, vd, offset, NULL,
770             ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
771
772         zio->io_prop.zp_checksum = checksum;
773
774         return (zio);
775 }
776
777 zio_t *
778 zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
779     void *data, int checksum, zio_done_func_t *done, void *private,
780     int priority, enum zio_flag flags, boolean_t labels)
781 {
782         zio_t *zio;
783
784         ASSERT(vd->vdev_children == 0);
785         ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
786             offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
787         ASSERT3U(offset + size, <=, vd->vdev_psize);
788
789         zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
790             ZIO_TYPE_WRITE, priority, flags, vd, offset, NULL,
791             ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
792
793         zio->io_prop.zp_checksum = checksum;
794
795         if (zio_checksum_table[checksum].ci_eck) {
796                 /*
797                  * zec checksums are necessarily destructive -- they modify
798                  * the end of the write buffer to hold the verifier/checksum.
799                  * Therefore, we must make a local copy in case the data is
800                  * being written to multiple places in parallel.
801                  */
802                 void *wbuf = zio_buf_alloc(size);
803                 bcopy(data, wbuf, size);
804                 zio_push_transform(zio, wbuf, size, size, NULL);
805         }
806
807         return (zio);
808 }
809
810 /*
811  * Create a child I/O to do some work for us.
812  */
813 zio_t *
814 zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
815         void *data, uint64_t size, int type, int priority, enum zio_flag flags,
816         zio_done_func_t *done, void *private)
817 {
818         enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
819         zio_t *zio;
820
821         ASSERT(vd->vdev_parent ==
822             (pio->io_vd ? pio->io_vd : pio->io_spa->spa_root_vdev));
823
824         if (type == ZIO_TYPE_READ && bp != NULL) {
825                 /*
826                  * If we have the bp, then the child should perform the
827                  * checksum and the parent need not.  This pushes error
828                  * detection as close to the leaves as possible and
829                  * eliminates redundant checksums in the interior nodes.
830                  */
831                 pipeline |= ZIO_STAGE_CHECKSUM_VERIFY;
832                 pio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
833         }
834
835         if (vd->vdev_children == 0)
836                 offset += VDEV_LABEL_START_SIZE;
837
838         flags |= ZIO_VDEV_CHILD_FLAGS(pio) | ZIO_FLAG_DONT_PROPAGATE;
839
840         /*
841          * If we've decided to do a repair, the write is not speculative --
842          * even if the original read was.
843          */
844         if (flags & ZIO_FLAG_IO_REPAIR)
845                 flags &= ~ZIO_FLAG_SPECULATIVE;
846
847         zio = zio_create(pio, pio->io_spa, pio->io_txg, bp, data, size,
848             done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
849             ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
850
851         return (zio);
852 }
853
854 zio_t *
855 zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, void *data, uint64_t size,
856         int type, int priority, enum zio_flag flags,
857         zio_done_func_t *done, void *private)
858 {
859         zio_t *zio;
860
861         ASSERT(vd->vdev_ops->vdev_op_leaf);
862
863         zio = zio_create(NULL, vd->vdev_spa, 0, NULL,
864             data, size, done, private, type, priority,
865             flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY,
866             vd, offset, NULL,
867             ZIO_STAGE_VDEV_IO_START >> 1, ZIO_VDEV_CHILD_PIPELINE);
868
869         return (zio);
870 }
871
872 void
873 zio_flush(zio_t *zio, vdev_t *vd)
874 {
875         zio_nowait(zio_ioctl(zio, zio->io_spa, vd, DKIOCFLUSHWRITECACHE,
876             NULL, NULL, ZIO_PRIORITY_NOW,
877             ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
878 }
879
880 void
881 zio_shrink(zio_t *zio, uint64_t size)
882 {
883         ASSERT(zio->io_executor == NULL);
884         ASSERT(zio->io_orig_size == zio->io_size);
885         ASSERT(size <= zio->io_size);
886
887         /*
888          * We don't shrink for raidz because of problems with the
889          * reconstruction when reading back less than the block size.
890          * Note, BP_IS_RAIDZ() assumes no compression.
891          */
892         ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
893         if (!BP_IS_RAIDZ(zio->io_bp))
894                 zio->io_orig_size = zio->io_size = size;
895 }
896
897 /*
898  * ==========================================================================
899  * Prepare to read and write logical blocks
900  * ==========================================================================
901  */
902
903 static int
904 zio_read_bp_init(zio_t *zio)
905 {
906         blkptr_t *bp = zio->io_bp;
907
908         if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
909             zio->io_child_type == ZIO_CHILD_LOGICAL &&
910             !(zio->io_flags & ZIO_FLAG_RAW)) {
911                 uint64_t psize = BP_GET_PSIZE(bp);
912                 void *cbuf = zio_buf_alloc(psize);
913
914                 zio_push_transform(zio, cbuf, psize, psize, zio_decompress);
915         }
916
917         if (!dmu_ot[BP_GET_TYPE(bp)].ot_metadata && BP_GET_LEVEL(bp) == 0)
918                 zio->io_flags |= ZIO_FLAG_DONT_CACHE;
919
920         if (BP_GET_TYPE(bp) == DMU_OT_DDT_ZAP)
921                 zio->io_flags |= ZIO_FLAG_DONT_CACHE;
922
923         if (BP_GET_DEDUP(bp) && zio->io_child_type == ZIO_CHILD_LOGICAL)
924                 zio->io_pipeline = ZIO_DDT_READ_PIPELINE;
925
926         return (ZIO_PIPELINE_CONTINUE);
927 }
928
929 static int
930 zio_write_bp_init(zio_t *zio)
931 {
932         spa_t *spa = zio->io_spa;
933         zio_prop_t *zp = &zio->io_prop;
934         enum zio_compress compress = zp->zp_compress;
935         blkptr_t *bp = zio->io_bp;
936         uint64_t lsize = zio->io_size;
937         uint64_t psize = lsize;
938         int pass = 1;
939
940         /*
941          * If our children haven't all reached the ready stage,
942          * wait for them and then repeat this pipeline stage.
943          */
944         if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
945             zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY))
946                 return (ZIO_PIPELINE_STOP);
947
948         if (!IO_IS_ALLOCATING(zio))
949                 return (ZIO_PIPELINE_CONTINUE);
950
951         ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
952
953         if (zio->io_bp_override) {
954                 ASSERT(bp->blk_birth != zio->io_txg);
955                 ASSERT(BP_GET_DEDUP(zio->io_bp_override) == 0);
956
957                 *bp = *zio->io_bp_override;
958                 zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
959
960                 if (BP_IS_HOLE(bp) || !zp->zp_dedup)
961                         return (ZIO_PIPELINE_CONTINUE);
962
963                 ASSERT(zio_checksum_table[zp->zp_checksum].ci_dedup ||
964                     zp->zp_dedup_verify);
965
966                 if (BP_GET_CHECKSUM(bp) == zp->zp_checksum) {
967                         BP_SET_DEDUP(bp, 1);
968                         zio->io_pipeline |= ZIO_STAGE_DDT_WRITE;
969                         return (ZIO_PIPELINE_CONTINUE);
970                 }
971                 zio->io_bp_override = NULL;
972                 BP_ZERO(bp);
973         }
974
975         if (bp->blk_birth == zio->io_txg) {
976                 /*
977                  * We're rewriting an existing block, which means we're
978                  * working on behalf of spa_sync().  For spa_sync() to
979                  * converge, it must eventually be the case that we don't
980                  * have to allocate new blocks.  But compression changes
981                  * the blocksize, which forces a reallocate, and makes
982                  * convergence take longer.  Therefore, after the first
983                  * few passes, stop compressing to ensure convergence.
984                  */
985                 pass = spa_sync_pass(spa);
986
987                 ASSERT(zio->io_txg == spa_syncing_txg(spa));
988                 ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
989                 ASSERT(!BP_GET_DEDUP(bp));
990
991                 if (pass > SYNC_PASS_DONT_COMPRESS)
992                         compress = ZIO_COMPRESS_OFF;
993
994                 /* Make sure someone doesn't change their mind on overwrites */
995                 ASSERT(MIN(zp->zp_copies + BP_IS_GANG(bp),
996                     spa_max_replication(spa)) == BP_GET_NDVAS(bp));
997         }
998
999         if (compress != ZIO_COMPRESS_OFF) {
1000                 void *cbuf = zio_buf_alloc(lsize);
1001                 psize = zio_compress_data(compress, zio->io_data, cbuf, lsize);
1002                 if (psize == 0 || psize == lsize) {
1003                         compress = ZIO_COMPRESS_OFF;
1004                         zio_buf_free(cbuf, lsize);
1005                 } else {
1006                         ASSERT(psize < lsize);
1007                         zio_push_transform(zio, cbuf, psize, lsize, NULL);
1008                 }
1009         }
1010
1011         /*
1012          * The final pass of spa_sync() must be all rewrites, but the first
1013          * few passes offer a trade-off: allocating blocks defers convergence,
1014          * but newly allocated blocks are sequential, so they can be written
1015          * to disk faster.  Therefore, we allow the first few passes of
1016          * spa_sync() to allocate new blocks, but force rewrites after that.
1017          * There should only be a handful of blocks after pass 1 in any case.
1018          */
1019         if (bp->blk_birth == zio->io_txg && BP_GET_PSIZE(bp) == psize &&
1020             pass > SYNC_PASS_REWRITE) {
1021                 enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
1022                 ASSERT(psize != 0);
1023                 zio->io_pipeline = ZIO_REWRITE_PIPELINE | gang_stages;
1024                 zio->io_flags |= ZIO_FLAG_IO_REWRITE;
1025         } else {
1026                 BP_ZERO(bp);
1027                 zio->io_pipeline = ZIO_WRITE_PIPELINE;
1028         }
1029
1030         if (psize == 0) {
1031                 zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1032         } else {
1033                 ASSERT(zp->zp_checksum != ZIO_CHECKSUM_GANG_HEADER);
1034                 BP_SET_LSIZE(bp, lsize);
1035                 BP_SET_PSIZE(bp, psize);
1036                 BP_SET_COMPRESS(bp, compress);
1037                 BP_SET_CHECKSUM(bp, zp->zp_checksum);
1038                 BP_SET_TYPE(bp, zp->zp_type);
1039                 BP_SET_LEVEL(bp, zp->zp_level);
1040                 BP_SET_DEDUP(bp, zp->zp_dedup);
1041                 BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
1042                 if (zp->zp_dedup) {
1043                         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1044                         ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
1045                         zio->io_pipeline = ZIO_DDT_WRITE_PIPELINE;
1046                 }
1047         }
1048
1049         return (ZIO_PIPELINE_CONTINUE);
1050 }
1051
1052 static int
1053 zio_free_bp_init(zio_t *zio)
1054 {
1055         blkptr_t *bp = zio->io_bp;
1056
1057         if (zio->io_child_type == ZIO_CHILD_LOGICAL) {
1058                 if (BP_GET_DEDUP(bp))
1059                         zio->io_pipeline = ZIO_DDT_FREE_PIPELINE;
1060         }
1061
1062         return (ZIO_PIPELINE_CONTINUE);
1063 }
1064
1065 /*
1066  * ==========================================================================
1067  * Execute the I/O pipeline
1068  * ==========================================================================
1069  */
1070
1071 static void
1072 zio_taskq_dispatch(zio_t *zio, enum zio_taskq_type q, boolean_t cutinline)
1073 {
1074         spa_t *spa = zio->io_spa;
1075         zio_type_t t = zio->io_type;
1076         int flags = TQ_NOSLEEP | (cutinline ? TQ_FRONT : 0);
1077
1078         /*
1079          * If we're a config writer or a probe, the normal issue and
1080          * interrupt threads may all be blocked waiting for the config lock.
1081          * In this case, select the otherwise-unused taskq for ZIO_TYPE_NULL.
1082          */
1083         if (zio->io_flags & (ZIO_FLAG_CONFIG_WRITER | ZIO_FLAG_PROBE))
1084                 t = ZIO_TYPE_NULL;
1085
1086         /*
1087          * A similar issue exists for the L2ARC write thread until L2ARC 2.0.
1088          */
1089         if (t == ZIO_TYPE_WRITE && zio->io_vd && zio->io_vd->vdev_aux)
1090                 t = ZIO_TYPE_NULL;
1091
1092         /*
1093          * If this is a high priority I/O, then use the high priority taskq.
1094          */
1095         if (zio->io_priority == ZIO_PRIORITY_NOW &&
1096             spa->spa_zio_taskq[t][q + 1] != NULL)
1097                 q++;
1098
1099         ASSERT3U(q, <, ZIO_TASKQ_TYPES);
1100
1101         while (taskq_dispatch(spa->spa_zio_taskq[t][q],
1102             (task_func_t *)zio_execute, zio, flags) == 0); /* do nothing */
1103 }
1104
1105 static boolean_t
1106 zio_taskq_member(zio_t *zio, enum zio_taskq_type q)
1107 {
1108         kthread_t *executor = zio->io_executor;
1109         spa_t *spa = zio->io_spa;
1110         zio_type_t t;
1111
1112         for (t = 0; t < ZIO_TYPES; t++)
1113                 if (taskq_member(spa->spa_zio_taskq[t][q], executor))
1114                         return (B_TRUE);
1115
1116         return (B_FALSE);
1117 }
1118
1119 static int
1120 zio_issue_async(zio_t *zio)
1121 {
1122         zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
1123
1124         return (ZIO_PIPELINE_STOP);
1125 }
1126
1127 void
1128 zio_interrupt(zio_t *zio)
1129 {
1130         zio_taskq_dispatch(zio, ZIO_TASKQ_INTERRUPT, B_FALSE);
1131 }
1132
1133 /*
1134  * Execute the I/O pipeline until one of the following occurs:
1135  * (1) the I/O completes; (2) the pipeline stalls waiting for
1136  * dependent child I/Os; (3) the I/O issues, so we're waiting
1137  * for an I/O completion interrupt; (4) the I/O is delegated by
1138  * vdev-level caching or aggregation; (5) the I/O is deferred
1139  * due to vdev-level queueing; (6) the I/O is handed off to
1140  * another thread.  In all cases, the pipeline stops whenever
1141  * there's no CPU work; it never burns a thread in cv_wait().
1142  *
1143  * There's no locking on io_stage because there's no legitimate way
1144  * for multiple threads to be attempting to process the same I/O.
1145  */
1146 static zio_pipe_stage_t *zio_pipeline[];
1147
1148 /*
1149  * zio_execute() is a wrapper around the static function
1150  * __zio_execute() so that we can force  __zio_execute() to be
1151  * inlined.  This reduces stack overhead which is important
1152  * because __zio_execute() is called recursively in several zio
1153  * code paths.  zio_execute() itself cannot be inlined because
1154  * it is externally visible.
1155  */
1156 void
1157 zio_execute(zio_t *zio)
1158 {
1159         __zio_execute(zio);
1160 }
1161
1162 __attribute__((always_inline))
1163 static inline void
1164 __zio_execute(zio_t *zio)
1165 {
1166         zio->io_executor = curthread;
1167
1168         while (zio->io_stage < ZIO_STAGE_DONE) {
1169                 enum zio_stage pipeline = zio->io_pipeline;
1170                 enum zio_stage stage = zio->io_stage;
1171                 dsl_pool_t *dsl;
1172                 boolean_t cut;
1173                 int rv;
1174
1175                 ASSERT(!MUTEX_HELD(&zio->io_lock));
1176                 ASSERT(ISP2(stage));
1177                 ASSERT(zio->io_stall == NULL);
1178
1179                 do {
1180                         stage <<= 1;
1181                 } while ((stage & pipeline) == 0);
1182
1183                 ASSERT(stage <= ZIO_STAGE_DONE);
1184
1185                 dsl = spa_get_dsl(zio->io_spa);
1186                 cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
1187                     zio_requeue_io_start_cut_in_line : B_FALSE;
1188
1189                 /*
1190                  * If we are in interrupt context and this pipeline stage
1191                  * will grab a config lock that is held across I/O,
1192                  * or may wait for an I/O that needs an interrupt thread
1193                  * to complete, issue async to avoid deadlock.
1194                  *
1195                  * If we are in the txg_sync_thread or being called
1196                  * during pool init issue async to minimize stack depth.
1197                  * Both of these call paths may be recursively called.
1198                  *
1199                  * For VDEV_IO_START, we cut in line so that the io will
1200                  * be sent to disk promptly.
1201                  */
1202                 if (((stage & ZIO_BLOCKING_STAGES) && zio->io_vd == NULL &&
1203                     zio_taskq_member(zio, ZIO_TASKQ_INTERRUPT)) ||
1204                     (dsl != NULL && dsl_pool_sync_context(dsl))) {
1205                         zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
1206                         return;
1207                 }
1208
1209                 zio->io_stage = stage;
1210                 rv = zio_pipeline[highbit(stage) - 1](zio);
1211
1212                 if (rv == ZIO_PIPELINE_STOP)
1213                         return;
1214
1215                 ASSERT(rv == ZIO_PIPELINE_CONTINUE);
1216         }
1217 }
1218
1219
1220 /*
1221  * ==========================================================================
1222  * Initiate I/O, either sync or async
1223  * ==========================================================================
1224  */
1225 int
1226 zio_wait(zio_t *zio)
1227 {
1228         int error;
1229
1230         ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
1231         ASSERT(zio->io_executor == NULL);
1232
1233         zio->io_waiter = curthread;
1234
1235         __zio_execute(zio);
1236
1237         mutex_enter(&zio->io_lock);
1238         while (zio->io_executor != NULL)
1239                 cv_wait(&zio->io_cv, &zio->io_lock);
1240         mutex_exit(&zio->io_lock);
1241
1242         error = zio->io_error;
1243         zio_destroy(zio);
1244
1245         return (error);
1246 }
1247
1248 void
1249 zio_nowait(zio_t *zio)
1250 {
1251         ASSERT(zio->io_executor == NULL);
1252
1253         if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
1254             zio_unique_parent(zio) == NULL) {
1255                 /*
1256                  * This is a logical async I/O with no parent to wait for it.
1257                  * We add it to the spa_async_root_zio "Godfather" I/O which
1258                  * will ensure they complete prior to unloading the pool.
1259                  */
1260                 spa_t *spa = zio->io_spa;
1261
1262                 zio_add_child(spa->spa_async_zio_root, zio);
1263         }
1264
1265         __zio_execute(zio);
1266 }
1267
1268 /*
1269  * ==========================================================================
1270  * Reexecute or suspend/resume failed I/O
1271  * ==========================================================================
1272  */
1273
1274 static void
1275 zio_reexecute(zio_t *pio)
1276 {
1277         zio_t *cio, *cio_next;
1278         int c, w;
1279
1280         ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
1281         ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
1282         ASSERT(pio->io_gang_leader == NULL);
1283         ASSERT(pio->io_gang_tree == NULL);
1284
1285         pio->io_flags = pio->io_orig_flags;
1286         pio->io_stage = pio->io_orig_stage;
1287         pio->io_pipeline = pio->io_orig_pipeline;
1288         pio->io_reexecute = 0;
1289         pio->io_error = 0;
1290         for (w = 0; w < ZIO_WAIT_TYPES; w++)
1291                 pio->io_state[w] = 0;
1292         for (c = 0; c < ZIO_CHILD_TYPES; c++)
1293                 pio->io_child_error[c] = 0;
1294
1295         if (IO_IS_ALLOCATING(pio))
1296                 BP_ZERO(pio->io_bp);
1297
1298         /*
1299          * As we reexecute pio's children, new children could be created.
1300          * New children go to the head of pio's io_child_list, however,
1301          * so we will (correctly) not reexecute them.  The key is that
1302          * the remainder of pio's io_child_list, from 'cio_next' onward,
1303          * cannot be affected by any side effects of reexecuting 'cio'.
1304          */
1305         for (cio = zio_walk_children(pio); cio != NULL; cio = cio_next) {
1306                 cio_next = zio_walk_children(pio);
1307                 mutex_enter(&pio->io_lock);
1308                 for (w = 0; w < ZIO_WAIT_TYPES; w++)
1309                         pio->io_children[cio->io_child_type][w]++;
1310                 mutex_exit(&pio->io_lock);
1311                 zio_reexecute(cio);
1312         }
1313
1314         /*
1315          * Now that all children have been reexecuted, execute the parent.
1316          * We don't reexecute "The Godfather" I/O here as it's the
1317          * responsibility of the caller to wait on him.
1318          */
1319         if (!(pio->io_flags & ZIO_FLAG_GODFATHER))
1320                 __zio_execute(pio);
1321 }
1322
1323 void
1324 zio_suspend(spa_t *spa, zio_t *zio)
1325 {
1326         if (spa_get_failmode(spa) == ZIO_FAILURE_MODE_PANIC)
1327                 fm_panic("Pool '%s' has encountered an uncorrectable I/O "
1328                     "failure and the failure mode property for this pool "
1329                     "is set to panic.", spa_name(spa));
1330
1331         zfs_ereport_post(FM_EREPORT_ZFS_IO_FAILURE, spa, NULL, NULL, 0, 0);
1332
1333         mutex_enter(&spa->spa_suspend_lock);
1334
1335         if (spa->spa_suspend_zio_root == NULL)
1336                 spa->spa_suspend_zio_root = zio_root(spa, NULL, NULL,
1337                     ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE |
1338                     ZIO_FLAG_GODFATHER);
1339
1340         spa->spa_suspended = B_TRUE;
1341
1342         if (zio != NULL) {
1343                 ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
1344                 ASSERT(zio != spa->spa_suspend_zio_root);
1345                 ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1346                 ASSERT(zio_unique_parent(zio) == NULL);
1347                 ASSERT(zio->io_stage == ZIO_STAGE_DONE);
1348                 zio_add_child(spa->spa_suspend_zio_root, zio);
1349         }
1350
1351         mutex_exit(&spa->spa_suspend_lock);
1352 }
1353
1354 int
1355 zio_resume(spa_t *spa)
1356 {
1357         zio_t *pio;
1358
1359         /*
1360          * Reexecute all previously suspended i/o.
1361          */
1362         mutex_enter(&spa->spa_suspend_lock);
1363         spa->spa_suspended = B_FALSE;
1364         cv_broadcast(&spa->spa_suspend_cv);
1365         pio = spa->spa_suspend_zio_root;
1366         spa->spa_suspend_zio_root = NULL;
1367         mutex_exit(&spa->spa_suspend_lock);
1368
1369         if (pio == NULL)
1370                 return (0);
1371
1372         zio_reexecute(pio);
1373         return (zio_wait(pio));
1374 }
1375
1376 void
1377 zio_resume_wait(spa_t *spa)
1378 {
1379         mutex_enter(&spa->spa_suspend_lock);
1380         while (spa_suspended(spa))
1381                 cv_wait(&spa->spa_suspend_cv, &spa->spa_suspend_lock);
1382         mutex_exit(&spa->spa_suspend_lock);
1383 }
1384
1385 /*
1386  * ==========================================================================
1387  * Gang blocks.
1388  *
1389  * A gang block is a collection of small blocks that looks to the DMU
1390  * like one large block.  When zio_dva_allocate() cannot find a block
1391  * of the requested size, due to either severe fragmentation or the pool
1392  * being nearly full, it calls zio_write_gang_block() to construct the
1393  * block from smaller fragments.
1394  *
1395  * A gang block consists of a gang header (zio_gbh_phys_t) and up to
1396  * three (SPA_GBH_NBLKPTRS) gang members.  The gang header is just like
1397  * an indirect block: it's an array of block pointers.  It consumes
1398  * only one sector and hence is allocatable regardless of fragmentation.
1399  * The gang header's bps point to its gang members, which hold the data.
1400  *
1401  * Gang blocks are self-checksumming, using the bp's <vdev, offset, txg>
1402  * as the verifier to ensure uniqueness of the SHA256 checksum.
1403  * Critically, the gang block bp's blk_cksum is the checksum of the data,
1404  * not the gang header.  This ensures that data block signatures (needed for
1405  * deduplication) are independent of how the block is physically stored.
1406  *
1407  * Gang blocks can be nested: a gang member may itself be a gang block.
1408  * Thus every gang block is a tree in which root and all interior nodes are
1409  * gang headers, and the leaves are normal blocks that contain user data.
1410  * The root of the gang tree is called the gang leader.
1411  *
1412  * To perform any operation (read, rewrite, free, claim) on a gang block,
1413  * zio_gang_assemble() first assembles the gang tree (minus data leaves)
1414  * in the io_gang_tree field of the original logical i/o by recursively
1415  * reading the gang leader and all gang headers below it.  This yields
1416  * an in-core tree containing the contents of every gang header and the
1417  * bps for every constituent of the gang block.
1418  *
1419  * With the gang tree now assembled, zio_gang_issue() just walks the gang tree
1420  * and invokes a callback on each bp.  To free a gang block, zio_gang_issue()
1421  * calls zio_free_gang() -- a trivial wrapper around zio_free() -- for each bp.
1422  * zio_claim_gang() provides a similarly trivial wrapper for zio_claim().
1423  * zio_read_gang() is a wrapper around zio_read() that omits reading gang
1424  * headers, since we already have those in io_gang_tree.  zio_rewrite_gang()
1425  * performs a zio_rewrite() of the data or, for gang headers, a zio_rewrite()
1426  * of the gang header plus zio_checksum_compute() of the data to update the
1427  * gang header's blk_cksum as described above.
1428  *
1429  * The two-phase assemble/issue model solves the problem of partial failure --
1430  * what if you'd freed part of a gang block but then couldn't read the
1431  * gang header for another part?  Assembling the entire gang tree first
1432  * ensures that all the necessary gang header I/O has succeeded before
1433  * starting the actual work of free, claim, or write.  Once the gang tree
1434  * is assembled, free and claim are in-memory operations that cannot fail.
1435  *
1436  * In the event that a gang write fails, zio_dva_unallocate() walks the
1437  * gang tree to immediately free (i.e. insert back into the space map)
1438  * everything we've allocated.  This ensures that we don't get ENOSPC
1439  * errors during repeated suspend/resume cycles due to a flaky device.
1440  *
1441  * Gang rewrites only happen during sync-to-convergence.  If we can't assemble
1442  * the gang tree, we won't modify the block, so we can safely defer the free
1443  * (knowing that the block is still intact).  If we *can* assemble the gang
1444  * tree, then even if some of the rewrites fail, zio_dva_unallocate() will free
1445  * each constituent bp and we can allocate a new block on the next sync pass.
1446  *
1447  * In all cases, the gang tree allows complete recovery from partial failure.
1448  * ==========================================================================
1449  */
1450
1451 static zio_t *
1452 zio_read_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1453 {
1454         if (gn != NULL)
1455                 return (pio);
1456
1457         return (zio_read(pio, pio->io_spa, bp, data, BP_GET_PSIZE(bp),
1458             NULL, NULL, pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
1459             &pio->io_bookmark));
1460 }
1461
1462 zio_t *
1463 zio_rewrite_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1464 {
1465         zio_t *zio;
1466
1467         if (gn != NULL) {
1468                 zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
1469                     gn->gn_gbh, SPA_GANGBLOCKSIZE, NULL, NULL, pio->io_priority,
1470                     ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
1471                 /*
1472                  * As we rewrite each gang header, the pipeline will compute
1473                  * a new gang block header checksum for it; but no one will
1474                  * compute a new data checksum, so we do that here.  The one
1475                  * exception is the gang leader: the pipeline already computed
1476                  * its data checksum because that stage precedes gang assembly.
1477                  * (Presently, nothing actually uses interior data checksums;
1478                  * this is just good hygiene.)
1479                  */
1480                 if (gn != pio->io_gang_leader->io_gang_tree) {
1481                         zio_checksum_compute(zio, BP_GET_CHECKSUM(bp),
1482                             data, BP_GET_PSIZE(bp));
1483                 }
1484                 /*
1485                  * If we are here to damage data for testing purposes,
1486                  * leave the GBH alone so that we can detect the damage.
1487                  */
1488                 if (pio->io_gang_leader->io_flags & ZIO_FLAG_INDUCE_DAMAGE)
1489                         zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
1490         } else {
1491                 zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
1492                     data, BP_GET_PSIZE(bp), NULL, NULL, pio->io_priority,
1493                     ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
1494         }
1495
1496         return (zio);
1497 }
1498
1499 /* ARGSUSED */
1500 zio_t *
1501 zio_free_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1502 {
1503         return (zio_free_sync(pio, pio->io_spa, pio->io_txg, bp,
1504             ZIO_GANG_CHILD_FLAGS(pio)));
1505 }
1506
1507 /* ARGSUSED */
1508 zio_t *
1509 zio_claim_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
1510 {
1511         return (zio_claim(pio, pio->io_spa, pio->io_txg, bp,
1512             NULL, NULL, ZIO_GANG_CHILD_FLAGS(pio)));
1513 }
1514
1515 static zio_gang_issue_func_t *zio_gang_issue_func[ZIO_TYPES] = {
1516         NULL,
1517         zio_read_gang,
1518         zio_rewrite_gang,
1519         zio_free_gang,
1520         zio_claim_gang,
1521         NULL
1522 };
1523
1524 static void zio_gang_tree_assemble_done(zio_t *zio);
1525
1526 static zio_gang_node_t *
1527 zio_gang_node_alloc(zio_gang_node_t **gnpp)
1528 {
1529         zio_gang_node_t *gn;
1530
1531         ASSERT(*gnpp == NULL);
1532
1533         gn = kmem_zalloc(sizeof (*gn), KM_PUSHPAGE);
1534         gn->gn_gbh = zio_buf_alloc(SPA_GANGBLOCKSIZE);
1535         *gnpp = gn;
1536
1537         return (gn);
1538 }
1539
1540 static void
1541 zio_gang_node_free(zio_gang_node_t **gnpp)
1542 {
1543         zio_gang_node_t *gn = *gnpp;
1544         int g;
1545
1546         for (g = 0; g < SPA_GBH_NBLKPTRS; g++)
1547                 ASSERT(gn->gn_child[g] == NULL);
1548
1549         zio_buf_free(gn->gn_gbh, SPA_GANGBLOCKSIZE);
1550         kmem_free(gn, sizeof (*gn));
1551         *gnpp = NULL;
1552 }
1553
1554 static void
1555 zio_gang_tree_free(zio_gang_node_t **gnpp)
1556 {
1557         zio_gang_node_t *gn = *gnpp;
1558         int g;
1559
1560         if (gn == NULL)
1561                 return;
1562
1563         for (g = 0; g < SPA_GBH_NBLKPTRS; g++)
1564                 zio_gang_tree_free(&gn->gn_child[g]);
1565
1566         zio_gang_node_free(gnpp);
1567 }
1568
1569 static void
1570 zio_gang_tree_assemble(zio_t *gio, blkptr_t *bp, zio_gang_node_t **gnpp)
1571 {
1572         zio_gang_node_t *gn = zio_gang_node_alloc(gnpp);
1573
1574         ASSERT(gio->io_gang_leader == gio);
1575         ASSERT(BP_IS_GANG(bp));
1576
1577         zio_nowait(zio_read(gio, gio->io_spa, bp, gn->gn_gbh,
1578             SPA_GANGBLOCKSIZE, zio_gang_tree_assemble_done, gn,
1579             gio->io_priority, ZIO_GANG_CHILD_FLAGS(gio), &gio->io_bookmark));
1580 }
1581
1582 static void
1583 zio_gang_tree_assemble_done(zio_t *zio)
1584 {
1585         zio_t *gio = zio->io_gang_leader;
1586         zio_gang_node_t *gn = zio->io_private;
1587         blkptr_t *bp = zio->io_bp;
1588         int g;
1589
1590         ASSERT(gio == zio_unique_parent(zio));
1591         ASSERT(zio->io_child_count == 0);
1592
1593         if (zio->io_error)
1594                 return;
1595
1596         if (BP_SHOULD_BYTESWAP(bp))
1597                 byteswap_uint64_array(zio->io_data, zio->io_size);
1598
1599         ASSERT(zio->io_data == gn->gn_gbh);
1600         ASSERT(zio->io_size == SPA_GANGBLOCKSIZE);
1601         ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
1602
1603         for (g = 0; g < SPA_GBH_NBLKPTRS; g++) {
1604                 blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
1605                 if (!BP_IS_GANG(gbp))
1606                         continue;
1607                 zio_gang_tree_assemble(gio, gbp, &gn->gn_child[g]);
1608         }
1609 }
1610
1611 static void
1612 zio_gang_tree_issue(zio_t *pio, zio_gang_node_t *gn, blkptr_t *bp, void *data)
1613 {
1614         zio_t *gio = pio->io_gang_leader;
1615         zio_t *zio;
1616         int g;
1617
1618         ASSERT(BP_IS_GANG(bp) == !!gn);
1619         ASSERT(BP_GET_CHECKSUM(bp) == BP_GET_CHECKSUM(gio->io_bp));
1620         ASSERT(BP_GET_LSIZE(bp) == BP_GET_PSIZE(bp) || gn == gio->io_gang_tree);
1621
1622         /*
1623          * If you're a gang header, your data is in gn->gn_gbh.
1624          * If you're a gang member, your data is in 'data' and gn == NULL.
1625          */
1626         zio = zio_gang_issue_func[gio->io_type](pio, bp, gn, data);
1627
1628         if (gn != NULL) {
1629                 ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
1630
1631                 for (g = 0; g < SPA_GBH_NBLKPTRS; g++) {
1632                         blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
1633                         if (BP_IS_HOLE(gbp))
1634                                 continue;
1635                         zio_gang_tree_issue(zio, gn->gn_child[g], gbp, data);
1636                         data = (char *)data + BP_GET_PSIZE(gbp);
1637                 }
1638         }
1639
1640         if (gn == gio->io_gang_tree)
1641                 ASSERT3P((char *)gio->io_data + gio->io_size, ==, data);
1642
1643         if (zio != pio)
1644                 zio_nowait(zio);
1645 }
1646
1647 static int
1648 zio_gang_assemble(zio_t *zio)
1649 {
1650         blkptr_t *bp = zio->io_bp;
1651
1652         ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == NULL);
1653         ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
1654
1655         zio->io_gang_leader = zio;
1656
1657         zio_gang_tree_assemble(zio, bp, &zio->io_gang_tree);
1658
1659         return (ZIO_PIPELINE_CONTINUE);
1660 }
1661
1662 static int
1663 zio_gang_issue(zio_t *zio)
1664 {
1665         blkptr_t *bp = zio->io_bp;
1666
1667         if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE))
1668                 return (ZIO_PIPELINE_STOP);
1669
1670         ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == zio);
1671         ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
1672
1673         if (zio->io_child_error[ZIO_CHILD_GANG] == 0)
1674                 zio_gang_tree_issue(zio, zio->io_gang_tree, bp, zio->io_data);
1675         else
1676                 zio_gang_tree_free(&zio->io_gang_tree);
1677
1678         zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1679
1680         return (ZIO_PIPELINE_CONTINUE);
1681 }
1682
1683 static void
1684 zio_write_gang_member_ready(zio_t *zio)
1685 {
1686         zio_t *pio = zio_unique_parent(zio);
1687         ASSERTV(zio_t *gio = zio->io_gang_leader;)
1688         dva_t *cdva = zio->io_bp->blk_dva;
1689         dva_t *pdva = pio->io_bp->blk_dva;
1690         uint64_t asize;
1691         int d;
1692
1693         if (BP_IS_HOLE(zio->io_bp))
1694                 return;
1695
1696         ASSERT(BP_IS_HOLE(&zio->io_bp_orig));
1697
1698         ASSERT(zio->io_child_type == ZIO_CHILD_GANG);
1699         ASSERT3U(zio->io_prop.zp_copies, ==, gio->io_prop.zp_copies);
1700         ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
1701         ASSERT3U(pio->io_prop.zp_copies, <=, BP_GET_NDVAS(pio->io_bp));
1702         ASSERT3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
1703
1704         mutex_enter(&pio->io_lock);
1705         for (d = 0; d < BP_GET_NDVAS(zio->io_bp); d++) {
1706                 ASSERT(DVA_GET_GANG(&pdva[d]));
1707                 asize = DVA_GET_ASIZE(&pdva[d]);
1708                 asize += DVA_GET_ASIZE(&cdva[d]);
1709                 DVA_SET_ASIZE(&pdva[d], asize);
1710         }
1711         mutex_exit(&pio->io_lock);
1712 }
1713
1714 static int
1715 zio_write_gang_block(zio_t *pio)
1716 {
1717         spa_t *spa = pio->io_spa;
1718         blkptr_t *bp = pio->io_bp;
1719         zio_t *gio = pio->io_gang_leader;
1720         zio_t *zio;
1721         zio_gang_node_t *gn, **gnpp;
1722         zio_gbh_phys_t *gbh;
1723         uint64_t txg = pio->io_txg;
1724         uint64_t resid = pio->io_size;
1725         uint64_t lsize;
1726         int copies = gio->io_prop.zp_copies;
1727         int gbh_copies = MIN(copies + 1, spa_max_replication(spa));
1728         zio_prop_t zp;
1729         int g, error;
1730
1731         error = metaslab_alloc(spa, spa_normal_class(spa), SPA_GANGBLOCKSIZE,
1732             bp, gbh_copies, txg, pio == gio ? NULL : gio->io_bp,
1733             METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER);
1734         if (error) {
1735                 pio->io_error = error;
1736                 return (ZIO_PIPELINE_CONTINUE);
1737         }
1738
1739         if (pio == gio) {
1740                 gnpp = &gio->io_gang_tree;
1741         } else {
1742                 gnpp = pio->io_private;
1743                 ASSERT(pio->io_ready == zio_write_gang_member_ready);
1744         }
1745
1746         gn = zio_gang_node_alloc(gnpp);
1747         gbh = gn->gn_gbh;
1748         bzero(gbh, SPA_GANGBLOCKSIZE);
1749
1750         /*
1751          * Create the gang header.
1752          */
1753         zio = zio_rewrite(pio, spa, txg, bp, gbh, SPA_GANGBLOCKSIZE, NULL, NULL,
1754             pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
1755
1756         /*
1757          * Create and nowait the gang children.
1758          */
1759         for (g = 0; resid != 0; resid -= lsize, g++) {
1760                 lsize = P2ROUNDUP(resid / (SPA_GBH_NBLKPTRS - g),
1761                     SPA_MINBLOCKSIZE);
1762                 ASSERT(lsize >= SPA_MINBLOCKSIZE && lsize <= resid);
1763
1764                 zp.zp_checksum = gio->io_prop.zp_checksum;
1765                 zp.zp_compress = ZIO_COMPRESS_OFF;
1766                 zp.zp_type = DMU_OT_NONE;
1767                 zp.zp_level = 0;
1768                 zp.zp_copies = gio->io_prop.zp_copies;
1769                 zp.zp_dedup = 0;
1770                 zp.zp_dedup_verify = 0;
1771
1772                 zio_nowait(zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
1773                     (char *)pio->io_data + (pio->io_size - resid), lsize, &zp,
1774                     zio_write_gang_member_ready, NULL, &gn->gn_child[g],
1775                     pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
1776                     &pio->io_bookmark));
1777         }
1778
1779         /*
1780          * Set pio's pipeline to just wait for zio to finish.
1781          */
1782         pio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1783
1784         zio_nowait(zio);
1785
1786         return (ZIO_PIPELINE_CONTINUE);
1787 }
1788
1789 /*
1790  * ==========================================================================
1791  * Dedup
1792  * ==========================================================================
1793  */
1794 static void
1795 zio_ddt_child_read_done(zio_t *zio)
1796 {
1797         blkptr_t *bp = zio->io_bp;
1798         ddt_entry_t *dde = zio->io_private;
1799         ddt_phys_t *ddp;
1800         zio_t *pio = zio_unique_parent(zio);
1801
1802         mutex_enter(&pio->io_lock);
1803         ddp = ddt_phys_select(dde, bp);
1804         if (zio->io_error == 0)
1805                 ddt_phys_clear(ddp);    /* this ddp doesn't need repair */
1806         if (zio->io_error == 0 && dde->dde_repair_data == NULL)
1807                 dde->dde_repair_data = zio->io_data;
1808         else
1809                 zio_buf_free(zio->io_data, zio->io_size);
1810         mutex_exit(&pio->io_lock);
1811 }
1812
1813 static int
1814 zio_ddt_read_start(zio_t *zio)
1815 {
1816         blkptr_t *bp = zio->io_bp;
1817         int p;
1818
1819         ASSERT(BP_GET_DEDUP(bp));
1820         ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
1821         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1822
1823         if (zio->io_child_error[ZIO_CHILD_DDT]) {
1824                 ddt_t *ddt = ddt_select(zio->io_spa, bp);
1825                 ddt_entry_t *dde = ddt_repair_start(ddt, bp);
1826                 ddt_phys_t *ddp = dde->dde_phys;
1827                 ddt_phys_t *ddp_self = ddt_phys_select(dde, bp);
1828                 blkptr_t blk;
1829
1830                 ASSERT(zio->io_vsd == NULL);
1831                 zio->io_vsd = dde;
1832
1833                 if (ddp_self == NULL)
1834                         return (ZIO_PIPELINE_CONTINUE);
1835
1836                 for (p = 0; p < DDT_PHYS_TYPES; p++, ddp++) {
1837                         if (ddp->ddp_phys_birth == 0 || ddp == ddp_self)
1838                                 continue;
1839                         ddt_bp_create(ddt->ddt_checksum, &dde->dde_key, ddp,
1840                             &blk);
1841                         zio_nowait(zio_read(zio, zio->io_spa, &blk,
1842                             zio_buf_alloc(zio->io_size), zio->io_size,
1843                             zio_ddt_child_read_done, dde, zio->io_priority,
1844                             ZIO_DDT_CHILD_FLAGS(zio) | ZIO_FLAG_DONT_PROPAGATE,
1845                             &zio->io_bookmark));
1846                 }
1847                 return (ZIO_PIPELINE_CONTINUE);
1848         }
1849
1850         zio_nowait(zio_read(zio, zio->io_spa, bp,
1851             zio->io_data, zio->io_size, NULL, NULL, zio->io_priority,
1852             ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark));
1853
1854         return (ZIO_PIPELINE_CONTINUE);
1855 }
1856
1857 static int
1858 zio_ddt_read_done(zio_t *zio)
1859 {
1860         blkptr_t *bp = zio->io_bp;
1861
1862         if (zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE))
1863                 return (ZIO_PIPELINE_STOP);
1864
1865         ASSERT(BP_GET_DEDUP(bp));
1866         ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
1867         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1868
1869         if (zio->io_child_error[ZIO_CHILD_DDT]) {
1870                 ddt_t *ddt = ddt_select(zio->io_spa, bp);
1871                 ddt_entry_t *dde = zio->io_vsd;
1872                 if (ddt == NULL) {
1873                         ASSERT(spa_load_state(zio->io_spa) != SPA_LOAD_NONE);
1874                         return (ZIO_PIPELINE_CONTINUE);
1875                 }
1876                 if (dde == NULL) {
1877                         zio->io_stage = ZIO_STAGE_DDT_READ_START >> 1;
1878                         zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
1879                         return (ZIO_PIPELINE_STOP);
1880                 }
1881                 if (dde->dde_repair_data != NULL) {
1882                         bcopy(dde->dde_repair_data, zio->io_data, zio->io_size);
1883                         zio->io_child_error[ZIO_CHILD_DDT] = 0;
1884                 }
1885                 ddt_repair_done(ddt, dde);
1886                 zio->io_vsd = NULL;
1887         }
1888
1889         ASSERT(zio->io_vsd == NULL);
1890
1891         return (ZIO_PIPELINE_CONTINUE);
1892 }
1893
1894 static boolean_t
1895 zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
1896 {
1897         spa_t *spa = zio->io_spa;
1898         int p;
1899
1900         /*
1901          * Note: we compare the original data, not the transformed data,
1902          * because when zio->io_bp is an override bp, we will not have
1903          * pushed the I/O transforms.  That's an important optimization
1904          * because otherwise we'd compress/encrypt all dmu_sync() data twice.
1905          */
1906         for (p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
1907                 zio_t *lio = dde->dde_lead_zio[p];
1908
1909                 if (lio != NULL) {
1910                         return (lio->io_orig_size != zio->io_orig_size ||
1911                             bcmp(zio->io_orig_data, lio->io_orig_data,
1912                             zio->io_orig_size) != 0);
1913                 }
1914         }
1915
1916         for (p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
1917                 ddt_phys_t *ddp = &dde->dde_phys[p];
1918
1919                 if (ddp->ddp_phys_birth != 0) {
1920                         arc_buf_t *abuf = NULL;
1921                         uint32_t aflags = ARC_WAIT;
1922                         blkptr_t blk = *zio->io_bp;
1923                         int error;
1924
1925                         ddt_bp_fill(ddp, &blk, ddp->ddp_phys_birth);
1926
1927                         ddt_exit(ddt);
1928
1929                         error = arc_read_nolock(NULL, spa, &blk,
1930                             arc_getbuf_func, &abuf, ZIO_PRIORITY_SYNC_READ,
1931                             ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
1932                             &aflags, &zio->io_bookmark);
1933
1934                         if (error == 0) {
1935                                 if (arc_buf_size(abuf) != zio->io_orig_size ||
1936                                     bcmp(abuf->b_data, zio->io_orig_data,
1937                                     zio->io_orig_size) != 0)
1938                                         error = EEXIST;
1939                                 VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
1940                         }
1941
1942                         ddt_enter(ddt);
1943                         return (error != 0);
1944                 }
1945         }
1946
1947         return (B_FALSE);
1948 }
1949
1950 static void
1951 zio_ddt_child_write_ready(zio_t *zio)
1952 {
1953         int p = zio->io_prop.zp_copies;
1954         ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
1955         ddt_entry_t *dde = zio->io_private;
1956         ddt_phys_t *ddp = &dde->dde_phys[p];
1957         zio_t *pio;
1958
1959         if (zio->io_error)
1960                 return;
1961
1962         ddt_enter(ddt);
1963
1964         ASSERT(dde->dde_lead_zio[p] == zio);
1965
1966         ddt_phys_fill(ddp, zio->io_bp);
1967
1968         while ((pio = zio_walk_parents(zio)) != NULL)
1969                 ddt_bp_fill(ddp, pio->io_bp, zio->io_txg);
1970
1971         ddt_exit(ddt);
1972 }
1973
1974 static void
1975 zio_ddt_child_write_done(zio_t *zio)
1976 {
1977         int p = zio->io_prop.zp_copies;
1978         ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
1979         ddt_entry_t *dde = zio->io_private;
1980         ddt_phys_t *ddp = &dde->dde_phys[p];
1981
1982         ddt_enter(ddt);
1983
1984         ASSERT(ddp->ddp_refcnt == 0);
1985         ASSERT(dde->dde_lead_zio[p] == zio);
1986         dde->dde_lead_zio[p] = NULL;
1987
1988         if (zio->io_error == 0) {
1989                 while (zio_walk_parents(zio) != NULL)
1990                         ddt_phys_addref(ddp);
1991         } else {
1992                 ddt_phys_clear(ddp);
1993         }
1994
1995         ddt_exit(ddt);
1996 }
1997
1998 static void
1999 zio_ddt_ditto_write_done(zio_t *zio)
2000 {
2001         int p = DDT_PHYS_DITTO;
2002         blkptr_t *bp = zio->io_bp;
2003         ddt_t *ddt = ddt_select(zio->io_spa, bp);
2004         ddt_entry_t *dde = zio->io_private;
2005         ddt_phys_t *ddp = &dde->dde_phys[p];
2006         ddt_key_t *ddk = &dde->dde_key;
2007         ASSERTV(zio_prop_t *zp = &zio->io_prop);
2008
2009         ddt_enter(ddt);
2010
2011         ASSERT(ddp->ddp_refcnt == 0);
2012         ASSERT(dde->dde_lead_zio[p] == zio);
2013         dde->dde_lead_zio[p] = NULL;
2014
2015         if (zio->io_error == 0) {
2016                 ASSERT(ZIO_CHECKSUM_EQUAL(bp->blk_cksum, ddk->ddk_cksum));
2017                 ASSERT(zp->zp_copies < SPA_DVAS_PER_BP);
2018                 ASSERT(zp->zp_copies == BP_GET_NDVAS(bp) - BP_IS_GANG(bp));
2019                 if (ddp->ddp_phys_birth != 0)
2020                         ddt_phys_free(ddt, ddk, ddp, zio->io_txg);
2021                 ddt_phys_fill(ddp, bp);
2022         }
2023
2024         ddt_exit(ddt);
2025 }
2026
2027 static int
2028 zio_ddt_write(zio_t *zio)
2029 {
2030         spa_t *spa = zio->io_spa;
2031         blkptr_t *bp = zio->io_bp;
2032         uint64_t txg = zio->io_txg;
2033         zio_prop_t *zp = &zio->io_prop;
2034         int p = zp->zp_copies;
2035         int ditto_copies;
2036         zio_t *cio = NULL;
2037         zio_t *dio = NULL;
2038         ddt_t *ddt = ddt_select(spa, bp);
2039         ddt_entry_t *dde;
2040         ddt_phys_t *ddp;
2041
2042         ASSERT(BP_GET_DEDUP(bp));
2043         ASSERT(BP_GET_CHECKSUM(bp) == zp->zp_checksum);
2044         ASSERT(BP_IS_HOLE(bp) || zio->io_bp_override);
2045
2046         ddt_enter(ddt);
2047         dde = ddt_lookup(ddt, bp, B_TRUE);
2048         ddp = &dde->dde_phys[p];
2049
2050         if (zp->zp_dedup_verify && zio_ddt_collision(zio, ddt, dde)) {
2051                 /*
2052                  * If we're using a weak checksum, upgrade to a strong checksum
2053                  * and try again.  If we're already using a strong checksum,
2054                  * we can't resolve it, so just convert to an ordinary write.
2055                  * (And automatically e-mail a paper to Nature?)
2056                  */
2057                 if (!zio_checksum_table[zp->zp_checksum].ci_dedup) {
2058                         zp->zp_checksum = spa_dedup_checksum(spa);
2059                         zio_pop_transforms(zio);
2060                         zio->io_stage = ZIO_STAGE_OPEN;
2061                         BP_ZERO(bp);
2062                 } else {
2063                         zp->zp_dedup = 0;
2064                 }
2065                 zio->io_pipeline = ZIO_WRITE_PIPELINE;
2066                 ddt_exit(ddt);
2067                 return (ZIO_PIPELINE_CONTINUE);
2068         }
2069
2070         ditto_copies = ddt_ditto_copies_needed(ddt, dde, ddp);
2071         ASSERT(ditto_copies < SPA_DVAS_PER_BP);
2072
2073         if (ditto_copies > ddt_ditto_copies_present(dde) &&
2074             dde->dde_lead_zio[DDT_PHYS_DITTO] == NULL) {
2075                 zio_prop_t czp = *zp;
2076
2077                 czp.zp_copies = ditto_copies;
2078
2079                 /*
2080                  * If we arrived here with an override bp, we won't have run
2081                  * the transform stack, so we won't have the data we need to
2082                  * generate a child i/o.  So, toss the override bp and restart.
2083                  * This is safe, because using the override bp is just an
2084                  * optimization; and it's rare, so the cost doesn't matter.
2085                  */
2086                 if (zio->io_bp_override) {
2087                         zio_pop_transforms(zio);
2088                         zio->io_stage = ZIO_STAGE_OPEN;
2089                         zio->io_pipeline = ZIO_WRITE_PIPELINE;
2090                         zio->io_bp_override = NULL;
2091                         BP_ZERO(bp);
2092                         ddt_exit(ddt);
2093                         return (ZIO_PIPELINE_CONTINUE);
2094                 }
2095
2096                 dio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
2097                     zio->io_orig_size, &czp, NULL,
2098                     zio_ddt_ditto_write_done, dde, zio->io_priority,
2099                     ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2100
2101                 zio_push_transform(dio, zio->io_data, zio->io_size, 0, NULL);
2102                 dde->dde_lead_zio[DDT_PHYS_DITTO] = dio;
2103         }
2104
2105         if (ddp->ddp_phys_birth != 0 || dde->dde_lead_zio[p] != NULL) {
2106                 if (ddp->ddp_phys_birth != 0)
2107                         ddt_bp_fill(ddp, bp, txg);
2108                 if (dde->dde_lead_zio[p] != NULL)
2109                         zio_add_child(zio, dde->dde_lead_zio[p]);
2110                 else
2111                         ddt_phys_addref(ddp);
2112         } else if (zio->io_bp_override) {
2113                 ASSERT(bp->blk_birth == txg);
2114                 ASSERT(BP_EQUAL(bp, zio->io_bp_override));
2115                 ddt_phys_fill(ddp, bp);
2116                 ddt_phys_addref(ddp);
2117         } else {
2118                 cio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
2119                     zio->io_orig_size, zp, zio_ddt_child_write_ready,
2120                     zio_ddt_child_write_done, dde, zio->io_priority,
2121                     ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2122
2123                 zio_push_transform(cio, zio->io_data, zio->io_size, 0, NULL);
2124                 dde->dde_lead_zio[p] = cio;
2125         }
2126
2127         ddt_exit(ddt);
2128
2129         if (cio)
2130                 zio_nowait(cio);
2131         if (dio)
2132                 zio_nowait(dio);
2133
2134         return (ZIO_PIPELINE_CONTINUE);
2135 }
2136
2137 ddt_entry_t *freedde; /* for debugging */
2138
2139 static int
2140 zio_ddt_free(zio_t *zio)
2141 {
2142         spa_t *spa = zio->io_spa;
2143         blkptr_t *bp = zio->io_bp;
2144         ddt_t *ddt = ddt_select(spa, bp);
2145         ddt_entry_t *dde;
2146         ddt_phys_t *ddp;
2147
2148         ASSERT(BP_GET_DEDUP(bp));
2149         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2150
2151         ddt_enter(ddt);
2152         freedde = dde = ddt_lookup(ddt, bp, B_TRUE);
2153         ddp = ddt_phys_select(dde, bp);
2154         ddt_phys_decref(ddp);
2155         ddt_exit(ddt);
2156
2157         return (ZIO_PIPELINE_CONTINUE);
2158 }
2159
2160 /*
2161  * ==========================================================================
2162  * Allocate and free blocks
2163  * ==========================================================================
2164  */
2165 static int
2166 zio_dva_allocate(zio_t *zio)
2167 {
2168         spa_t *spa = zio->io_spa;
2169         metaslab_class_t *mc = spa_normal_class(spa);
2170         blkptr_t *bp = zio->io_bp;
2171         int error;
2172         int flags = 0;
2173
2174         if (zio->io_gang_leader == NULL) {
2175                 ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2176                 zio->io_gang_leader = zio;
2177         }
2178
2179         ASSERT(BP_IS_HOLE(bp));
2180         ASSERT3U(BP_GET_NDVAS(bp), ==, 0);
2181         ASSERT3U(zio->io_prop.zp_copies, >, 0);
2182         ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
2183         ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
2184
2185         /*
2186          * The dump device does not support gang blocks so allocation on
2187          * behalf of the dump device (i.e. ZIO_FLAG_NODATA) must avoid
2188          * the "fast" gang feature.
2189          */
2190         flags |= (zio->io_flags & ZIO_FLAG_NODATA) ? METASLAB_GANG_AVOID : 0;
2191         flags |= (zio->io_flags & ZIO_FLAG_GANG_CHILD) ?
2192             METASLAB_GANG_CHILD : 0;
2193         error = metaslab_alloc(spa, mc, zio->io_size, bp,
2194             zio->io_prop.zp_copies, zio->io_txg, NULL, flags);
2195
2196         if (error) {
2197                 spa_dbgmsg(spa, "%s: metaslab allocation failure: zio %p, "
2198                     "size %llu, error %d", spa_name(spa), zio, zio->io_size,
2199                     error);
2200                 if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE)
2201                         return (zio_write_gang_block(zio));
2202                 zio->io_error = error;
2203         }
2204
2205         return (ZIO_PIPELINE_CONTINUE);
2206 }
2207
2208 static int
2209 zio_dva_free(zio_t *zio)
2210 {
2211         metaslab_free(zio->io_spa, zio->io_bp, zio->io_txg, B_FALSE);
2212
2213         return (ZIO_PIPELINE_CONTINUE);
2214 }
2215
2216 static int
2217 zio_dva_claim(zio_t *zio)
2218 {
2219         int error;
2220
2221         error = metaslab_claim(zio->io_spa, zio->io_bp, zio->io_txg);
2222         if (error)
2223                 zio->io_error = error;
2224
2225         return (ZIO_PIPELINE_CONTINUE);
2226 }
2227
2228 /*
2229  * Undo an allocation.  This is used by zio_done() when an I/O fails
2230  * and we want to give back the block we just allocated.
2231  * This handles both normal blocks and gang blocks.
2232  */
2233 static void
2234 zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
2235 {
2236         int g;
2237
2238         ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
2239         ASSERT(zio->io_bp_override == NULL);
2240
2241         if (!BP_IS_HOLE(bp))
2242                 metaslab_free(zio->io_spa, bp, bp->blk_birth, B_TRUE);
2243
2244         if (gn != NULL) {
2245                 for (g = 0; g < SPA_GBH_NBLKPTRS; g++) {
2246                         zio_dva_unallocate(zio, gn->gn_child[g],
2247                             &gn->gn_gbh->zg_blkptr[g]);
2248                 }
2249         }
2250 }
2251
2252 /*
2253  * Try to allocate an intent log block.  Return 0 on success, errno on failure.
2254  */
2255 int
2256 zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, blkptr_t *old_bp,
2257     uint64_t size, boolean_t use_slog)
2258 {
2259         int error = 1;
2260
2261         ASSERT(txg > spa_syncing_txg(spa));
2262
2263         if (use_slog)
2264                 error = metaslab_alloc(spa, spa_log_class(spa), size,
2265                     new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID);
2266
2267         if (error)
2268                 error = metaslab_alloc(spa, spa_normal_class(spa), size,
2269                     new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID);
2270
2271         if (error == 0) {
2272                 BP_SET_LSIZE(new_bp, size);
2273                 BP_SET_PSIZE(new_bp, size);
2274                 BP_SET_COMPRESS(new_bp, ZIO_COMPRESS_OFF);
2275                 BP_SET_CHECKSUM(new_bp,
2276                     spa_version(spa) >= SPA_VERSION_SLIM_ZIL
2277                     ? ZIO_CHECKSUM_ZILOG2 : ZIO_CHECKSUM_ZILOG);
2278                 BP_SET_TYPE(new_bp, DMU_OT_INTENT_LOG);
2279                 BP_SET_LEVEL(new_bp, 0);
2280                 BP_SET_DEDUP(new_bp, 0);
2281                 BP_SET_BYTEORDER(new_bp, ZFS_HOST_BYTEORDER);
2282         }
2283
2284         return (error);
2285 }
2286
2287 /*
2288  * Free an intent log block.
2289  */
2290 void
2291 zio_free_zil(spa_t *spa, uint64_t txg, blkptr_t *bp)
2292 {
2293         ASSERT(BP_GET_TYPE(bp) == DMU_OT_INTENT_LOG);
2294         ASSERT(!BP_IS_GANG(bp));
2295
2296         zio_free(spa, txg, bp);
2297 }
2298
2299 /*
2300  * ==========================================================================
2301  * Read and write to physical devices
2302  * ==========================================================================
2303  */
2304 static int
2305 zio_vdev_io_start(zio_t *zio)
2306 {
2307         vdev_t *vd = zio->io_vd;
2308         uint64_t align;
2309         spa_t *spa = zio->io_spa;
2310
2311         ASSERT(zio->io_error == 0);
2312         ASSERT(zio->io_child_error[ZIO_CHILD_VDEV] == 0);
2313
2314         if (vd == NULL) {
2315                 if (!(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
2316                         spa_config_enter(spa, SCL_ZIO, zio, RW_READER);
2317
2318                 /*
2319                  * The mirror_ops handle multiple DVAs in a single BP.
2320                  */
2321                 return (vdev_mirror_ops.vdev_op_io_start(zio));
2322         }
2323
2324         /*
2325          * We keep track of time-sensitive I/Os so that the scan thread
2326          * can quickly react to certain workloads.  In particular, we care
2327          * about non-scrubbing, top-level reads and writes with the following
2328          * characteristics:
2329          *      - synchronous writes of user data to non-slog devices
2330          *      - any reads of user data
2331          * When these conditions are met, adjust the timestamp of spa_last_io
2332          * which allows the scan thread to adjust its workload accordingly.
2333          */
2334         if (!(zio->io_flags & ZIO_FLAG_SCAN_THREAD) && zio->io_bp != NULL &&
2335             vd == vd->vdev_top && !vd->vdev_islog &&
2336             zio->io_bookmark.zb_objset != DMU_META_OBJSET &&
2337             zio->io_txg != spa_syncing_txg(spa)) {
2338                 uint64_t old = spa->spa_last_io;
2339                 uint64_t new = ddi_get_lbolt64();
2340                 if (old != new)
2341                         (void) atomic_cas_64(&spa->spa_last_io, old, new);
2342         }
2343
2344         align = 1ULL << vd->vdev_top->vdev_ashift;
2345
2346         if (P2PHASE(zio->io_size, align) != 0) {
2347                 uint64_t asize = P2ROUNDUP(zio->io_size, align);
2348                 char *abuf = zio_buf_alloc(asize);
2349                 ASSERT(vd == vd->vdev_top);
2350                 if (zio->io_type == ZIO_TYPE_WRITE) {
2351                         bcopy(zio->io_data, abuf, zio->io_size);
2352                         bzero(abuf + zio->io_size, asize - zio->io_size);
2353                 }
2354                 zio_push_transform(zio, abuf, asize, asize, zio_subblock);
2355         }
2356
2357         ASSERT(P2PHASE(zio->io_offset, align) == 0);
2358         ASSERT(P2PHASE(zio->io_size, align) == 0);
2359         VERIFY(zio->io_type != ZIO_TYPE_WRITE || spa_writeable(spa));
2360
2361         /*
2362          * If this is a repair I/O, and there's no self-healing involved --
2363          * that is, we're just resilvering what we expect to resilver --
2364          * then don't do the I/O unless zio's txg is actually in vd's DTL.
2365          * This prevents spurious resilvering with nested replication.
2366          * For example, given a mirror of mirrors, (A+B)+(C+D), if only
2367          * A is out of date, we'll read from C+D, then use the data to
2368          * resilver A+B -- but we don't actually want to resilver B, just A.
2369          * The top-level mirror has no way to know this, so instead we just
2370          * discard unnecessary repairs as we work our way down the vdev tree.
2371          * The same logic applies to any form of nested replication:
2372          * ditto + mirror, RAID-Z + replacing, etc.  This covers them all.
2373          */
2374         if ((zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
2375             !(zio->io_flags & ZIO_FLAG_SELF_HEAL) &&
2376             zio->io_txg != 0 && /* not a delegated i/o */
2377             !vdev_dtl_contains(vd, DTL_PARTIAL, zio->io_txg, 1)) {
2378                 ASSERT(zio->io_type == ZIO_TYPE_WRITE);
2379                 zio_vdev_io_bypass(zio);
2380                 return (ZIO_PIPELINE_CONTINUE);
2381         }
2382
2383         if (vd->vdev_ops->vdev_op_leaf &&
2384             (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE)) {
2385
2386                 if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio) == 0)
2387                         return (ZIO_PIPELINE_CONTINUE);
2388
2389                 if ((zio = vdev_queue_io(zio)) == NULL)
2390                         return (ZIO_PIPELINE_STOP);
2391
2392                 if (!vdev_accessible(vd, zio)) {
2393                         zio->io_error = ENXIO;
2394                         zio_interrupt(zio);
2395                         return (ZIO_PIPELINE_STOP);
2396                 }
2397         }
2398
2399         return (vd->vdev_ops->vdev_op_io_start(zio));
2400 }
2401
2402 static int
2403 zio_vdev_io_done(zio_t *zio)
2404 {
2405         vdev_t *vd = zio->io_vd;
2406         vdev_ops_t *ops = vd ? vd->vdev_ops : &vdev_mirror_ops;
2407         boolean_t unexpected_error = B_FALSE;
2408
2409         if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
2410                 return (ZIO_PIPELINE_STOP);
2411
2412         ASSERT(zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE);
2413
2414         if (vd != NULL && vd->vdev_ops->vdev_op_leaf) {
2415
2416                 vdev_queue_io_done(zio);
2417
2418                 if (zio->io_type == ZIO_TYPE_WRITE)
2419                         vdev_cache_write(zio);
2420
2421                 if (zio_injection_enabled && zio->io_error == 0)
2422                         zio->io_error = zio_handle_device_injection(vd,
2423                             zio, EIO);
2424
2425                 if (zio_injection_enabled && zio->io_error == 0)
2426                         zio->io_error = zio_handle_label_injection(zio, EIO);
2427
2428                 if (zio->io_error) {
2429                         if (!vdev_accessible(vd, zio)) {
2430                                 zio->io_error = ENXIO;
2431                         } else {
2432                                 unexpected_error = B_TRUE;
2433                         }
2434                 }
2435         }
2436
2437         ops->vdev_op_io_done(zio);
2438
2439         if (unexpected_error)
2440                 VERIFY(vdev_probe(vd, zio) == NULL);
2441
2442         return (ZIO_PIPELINE_CONTINUE);
2443 }
2444
2445 /*
2446  * For non-raidz ZIOs, we can just copy aside the bad data read from the
2447  * disk, and use that to finish the checksum ereport later.
2448  */
2449 static void
2450 zio_vsd_default_cksum_finish(zio_cksum_report_t *zcr,
2451     const void *good_buf)
2452 {
2453         /* no processing needed */
2454         zfs_ereport_finish_checksum(zcr, good_buf, zcr->zcr_cbdata, B_FALSE);
2455 }
2456
2457 /*ARGSUSED*/
2458 void
2459 zio_vsd_default_cksum_report(zio_t *zio, zio_cksum_report_t *zcr, void *ignored)
2460 {
2461         void *buf = zio_buf_alloc(zio->io_size);
2462
2463         bcopy(zio->io_data, buf, zio->io_size);
2464
2465         zcr->zcr_cbinfo = zio->io_size;
2466         zcr->zcr_cbdata = buf;
2467         zcr->zcr_finish = zio_vsd_default_cksum_finish;
2468         zcr->zcr_free = zio_buf_free;
2469 }
2470
2471 static int
2472 zio_vdev_io_assess(zio_t *zio)
2473 {
2474         vdev_t *vd = zio->io_vd;
2475
2476         if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
2477                 return (ZIO_PIPELINE_STOP);
2478
2479         if (vd == NULL && !(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
2480                 spa_config_exit(zio->io_spa, SCL_ZIO, zio);
2481
2482         if (zio->io_vsd != NULL) {
2483                 zio->io_vsd_ops->vsd_free(zio);
2484                 zio->io_vsd = NULL;
2485         }
2486
2487         if (zio_injection_enabled && zio->io_error == 0)
2488                 zio->io_error = zio_handle_fault_injection(zio, EIO);
2489
2490         /*
2491          * If the I/O failed, determine whether we should attempt to retry it.
2492          *
2493          * On retry, we cut in line in the issue queue, since we don't want
2494          * compression/checksumming/etc. work to prevent our (cheap) IO reissue.
2495          */
2496         if (zio->io_error && vd == NULL &&
2497             !(zio->io_flags & (ZIO_FLAG_DONT_RETRY | ZIO_FLAG_IO_RETRY))) {
2498                 ASSERT(!(zio->io_flags & ZIO_FLAG_DONT_QUEUE)); /* not a leaf */
2499                 ASSERT(!(zio->io_flags & ZIO_FLAG_IO_BYPASS));  /* not a leaf */
2500                 zio->io_error = 0;
2501                 zio->io_flags |= ZIO_FLAG_IO_RETRY |
2502                     ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_AGGREGATE;
2503                 zio->io_stage = ZIO_STAGE_VDEV_IO_START >> 1;
2504                 zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE,
2505                     zio_requeue_io_start_cut_in_line);
2506                 return (ZIO_PIPELINE_STOP);
2507         }
2508
2509         /*
2510          * If we got an error on a leaf device, convert it to ENXIO
2511          * if the device is not accessible at all.
2512          */
2513         if (zio->io_error && vd != NULL && vd->vdev_ops->vdev_op_leaf &&
2514             !vdev_accessible(vd, zio))
2515                 zio->io_error = ENXIO;
2516
2517         /*
2518          * If we can't write to an interior vdev (mirror or RAID-Z),
2519          * set vdev_cant_write so that we stop trying to allocate from it.
2520          */
2521         if (zio->io_error == ENXIO && zio->io_type == ZIO_TYPE_WRITE &&
2522             vd != NULL && !vd->vdev_ops->vdev_op_leaf)
2523                 vd->vdev_cant_write = B_TRUE;
2524
2525         if (zio->io_error)
2526                 zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2527
2528         return (ZIO_PIPELINE_CONTINUE);
2529 }
2530
2531 void
2532 zio_vdev_io_reissue(zio_t *zio)
2533 {
2534         ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
2535         ASSERT(zio->io_error == 0);
2536
2537         zio->io_stage >>= 1;
2538 }
2539
2540 void
2541 zio_vdev_io_redone(zio_t *zio)
2542 {
2543         ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_DONE);
2544
2545         zio->io_stage >>= 1;
2546 }
2547
2548 void
2549 zio_vdev_io_bypass(zio_t *zio)
2550 {
2551         ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
2552         ASSERT(zio->io_error == 0);
2553
2554         zio->io_flags |= ZIO_FLAG_IO_BYPASS;
2555         zio->io_stage = ZIO_STAGE_VDEV_IO_ASSESS >> 1;
2556 }
2557
2558 /*
2559  * ==========================================================================
2560  * Generate and verify checksums
2561  * ==========================================================================
2562  */
2563 static int
2564 zio_checksum_generate(zio_t *zio)
2565 {
2566         blkptr_t *bp = zio->io_bp;
2567         enum zio_checksum checksum;
2568
2569         if (bp == NULL) {
2570                 /*
2571                  * This is zio_write_phys().
2572                  * We're either generating a label checksum, or none at all.
2573                  */
2574                 checksum = zio->io_prop.zp_checksum;
2575
2576                 if (checksum == ZIO_CHECKSUM_OFF)
2577                         return (ZIO_PIPELINE_CONTINUE);
2578
2579                 ASSERT(checksum == ZIO_CHECKSUM_LABEL);
2580         } else {
2581                 if (BP_IS_GANG(bp) && zio->io_child_type == ZIO_CHILD_GANG) {
2582                         ASSERT(!IO_IS_ALLOCATING(zio));
2583                         checksum = ZIO_CHECKSUM_GANG_HEADER;
2584                 } else {
2585                         checksum = BP_GET_CHECKSUM(bp);
2586                 }
2587         }
2588
2589         zio_checksum_compute(zio, checksum, zio->io_data, zio->io_size);
2590
2591         return (ZIO_PIPELINE_CONTINUE);
2592 }
2593
2594 static int
2595 zio_checksum_verify(zio_t *zio)
2596 {
2597         zio_bad_cksum_t info;
2598         blkptr_t *bp = zio->io_bp;
2599         int error;
2600
2601         ASSERT(zio->io_vd != NULL);
2602
2603         if (bp == NULL) {
2604                 /*
2605                  * This is zio_read_phys().
2606                  * We're either verifying a label checksum, or nothing at all.
2607                  */
2608                 if (zio->io_prop.zp_checksum == ZIO_CHECKSUM_OFF)
2609                         return (ZIO_PIPELINE_CONTINUE);
2610
2611                 ASSERT(zio->io_prop.zp_checksum == ZIO_CHECKSUM_LABEL);
2612         }
2613
2614         if ((error = zio_checksum_error(zio, &info)) != 0) {
2615                 zio->io_error = error;
2616                 if (!(zio->io_flags & ZIO_FLAG_SPECULATIVE)) {
2617                         zfs_ereport_start_checksum(zio->io_spa,
2618                             zio->io_vd, zio, zio->io_offset,
2619                             zio->io_size, NULL, &info);
2620                 }
2621         }
2622
2623         return (ZIO_PIPELINE_CONTINUE);
2624 }
2625
2626 /*
2627  * Called by RAID-Z to ensure we don't compute the checksum twice.
2628  */
2629 void
2630 zio_checksum_verified(zio_t *zio)
2631 {
2632         zio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
2633 }
2634
2635 /*
2636  * ==========================================================================
2637  * Error rank.  Error are ranked in the order 0, ENXIO, ECKSUM, EIO, other.
2638  * An error of 0 indictes success.  ENXIO indicates whole-device failure,
2639  * which may be transient (e.g. unplugged) or permament.  ECKSUM and EIO
2640  * indicate errors that are specific to one I/O, and most likely permanent.
2641  * Any other error is presumed to be worse because we weren't expecting it.
2642  * ==========================================================================
2643  */
2644 int
2645 zio_worst_error(int e1, int e2)
2646 {
2647         static int zio_error_rank[] = { 0, ENXIO, ECKSUM, EIO };
2648         int r1, r2;
2649
2650         for (r1 = 0; r1 < sizeof (zio_error_rank) / sizeof (int); r1++)
2651                 if (e1 == zio_error_rank[r1])
2652                         break;
2653
2654         for (r2 = 0; r2 < sizeof (zio_error_rank) / sizeof (int); r2++)
2655                 if (e2 == zio_error_rank[r2])
2656                         break;
2657
2658         return (r1 > r2 ? e1 : e2);
2659 }
2660
2661 /*
2662  * ==========================================================================
2663  * I/O completion
2664  * ==========================================================================
2665  */
2666 static int
2667 zio_ready(zio_t *zio)
2668 {
2669         blkptr_t *bp = zio->io_bp;
2670         zio_t *pio, *pio_next;
2671
2672         if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
2673             zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_READY))
2674                 return (ZIO_PIPELINE_STOP);
2675
2676         if (zio->io_ready) {
2677                 ASSERT(IO_IS_ALLOCATING(zio));
2678                 ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
2679                 ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
2680
2681                 zio->io_ready(zio);
2682         }
2683
2684         if (bp != NULL && bp != &zio->io_bp_copy)
2685                 zio->io_bp_copy = *bp;
2686
2687         if (zio->io_error)
2688                 zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2689
2690         mutex_enter(&zio->io_lock);
2691         zio->io_state[ZIO_WAIT_READY] = 1;
2692         pio = zio_walk_parents(zio);
2693         mutex_exit(&zio->io_lock);
2694
2695         /*
2696          * As we notify zio's parents, new parents could be added.
2697          * New parents go to the head of zio's io_parent_list, however,
2698          * so we will (correctly) not notify them.  The remainder of zio's
2699          * io_parent_list, from 'pio_next' onward, cannot change because
2700          * all parents must wait for us to be done before they can be done.
2701          */
2702         for (; pio != NULL; pio = pio_next) {
2703                 pio_next = zio_walk_parents(zio);
2704                 zio_notify_parent(pio, zio, ZIO_WAIT_READY);
2705         }
2706
2707         if (zio->io_flags & ZIO_FLAG_NODATA) {
2708                 if (BP_IS_GANG(bp)) {
2709                         zio->io_flags &= ~ZIO_FLAG_NODATA;
2710                 } else {
2711                         ASSERT((uintptr_t)zio->io_data < SPA_MAXBLOCKSIZE);
2712                         zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
2713                 }
2714         }
2715
2716         if (zio_injection_enabled &&
2717             zio->io_spa->spa_syncing_txg == zio->io_txg)
2718                 zio_handle_ignored_writes(zio);
2719
2720         return (ZIO_PIPELINE_CONTINUE);
2721 }
2722
2723 static int
2724 zio_done(zio_t *zio)
2725 {
2726         zio_t *pio, *pio_next;
2727         int c, w;
2728
2729         /*
2730          * If our children haven't all completed,
2731          * wait for them and then repeat this pipeline stage.
2732          */
2733         if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE) ||
2734             zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE) ||
2735             zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE) ||
2736             zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_DONE))
2737                 return (ZIO_PIPELINE_STOP);
2738
2739         for (c = 0; c < ZIO_CHILD_TYPES; c++)
2740                 for (w = 0; w < ZIO_WAIT_TYPES; w++)
2741                         ASSERT(zio->io_children[c][w] == 0);
2742
2743         if (zio->io_bp != NULL) {
2744                 ASSERT(zio->io_bp->blk_pad[0] == 0);
2745                 ASSERT(zio->io_bp->blk_pad[1] == 0);
2746                 ASSERT(bcmp(zio->io_bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
2747                     (zio->io_bp == zio_unique_parent(zio)->io_bp));
2748                 if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(zio->io_bp) &&
2749                     zio->io_bp_override == NULL &&
2750                     !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
2751                         ASSERT(!BP_SHOULD_BYTESWAP(zio->io_bp));
2752                         ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
2753                         ASSERT(BP_COUNT_GANG(zio->io_bp) == 0 ||
2754                             (BP_COUNT_GANG(zio->io_bp) == BP_GET_NDVAS(zio->io_bp)));
2755                 }
2756         }
2757
2758         /*
2759          * If there were child vdev/gang/ddt errors, they apply to us now.
2760          */
2761         zio_inherit_child_errors(zio, ZIO_CHILD_VDEV);
2762         zio_inherit_child_errors(zio, ZIO_CHILD_GANG);
2763         zio_inherit_child_errors(zio, ZIO_CHILD_DDT);
2764
2765         /*
2766          * If the I/O on the transformed data was successful, generate any
2767          * checksum reports now while we still have the transformed data.
2768          */
2769         if (zio->io_error == 0) {
2770                 while (zio->io_cksum_report != NULL) {
2771                         zio_cksum_report_t *zcr = zio->io_cksum_report;
2772                         uint64_t align = zcr->zcr_align;
2773                         uint64_t asize = P2ROUNDUP(zio->io_size, align);
2774                         char *abuf = zio->io_data;
2775
2776                         if (asize != zio->io_size) {
2777                                 abuf = zio_buf_alloc(asize);
2778                                 bcopy(zio->io_data, abuf, zio->io_size);
2779                                 bzero(abuf + zio->io_size, asize - zio->io_size);
2780                         }
2781
2782                         zio->io_cksum_report = zcr->zcr_next;
2783                         zcr->zcr_next = NULL;
2784                         zcr->zcr_finish(zcr, abuf);
2785                         zfs_ereport_free_checksum(zcr);
2786
2787                         if (asize != zio->io_size)
2788                                 zio_buf_free(abuf, asize);
2789                 }
2790         }
2791
2792         zio_pop_transforms(zio);        /* note: may set zio->io_error */
2793
2794         vdev_stat_update(zio, zio->io_size);
2795
2796         /*
2797          * If this I/O is attached to a particular vdev is slow, exeeding
2798          * 30 seconds to complete, post an error described the I/O delay.
2799          * We ignore these errors if the device is currently unavailable.
2800          */
2801         if (zio->io_delay >= zio_delay_max) {
2802                 if (zio->io_vd != NULL && !vdev_is_dead(zio->io_vd))
2803                         zfs_ereport_post(FM_EREPORT_ZFS_DELAY, zio->io_spa,
2804                                          zio->io_vd, zio, 0, 0);
2805         }
2806
2807         if (zio->io_error) {
2808                 /*
2809                  * If this I/O is attached to a particular vdev,
2810                  * generate an error message describing the I/O failure
2811                  * at the block level.  We ignore these errors if the
2812                  * device is currently unavailable.
2813                  */
2814                 if (zio->io_error != ECKSUM && zio->io_vd != NULL &&
2815                         !vdev_is_dead(zio->io_vd))
2816                         zfs_ereport_post(FM_EREPORT_ZFS_IO, zio->io_spa,
2817                                                 zio->io_vd, zio, 0, 0);
2818
2819                 if ((zio->io_error == EIO || !(zio->io_flags &
2820                     (ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_PROPAGATE))) &&
2821                     zio == zio->io_logical) {
2822                         /*
2823                          * For logical I/O requests, tell the SPA to log the
2824                          * error and generate a logical data ereport.
2825                          */
2826                         spa_log_error(zio->io_spa, zio);
2827                         zfs_ereport_post(FM_EREPORT_ZFS_DATA, zio->io_spa, NULL, zio,
2828                             0, 0);
2829                 }
2830         }
2831
2832         if (zio->io_error && zio == zio->io_logical) {
2833                 /*
2834                  * Determine whether zio should be reexecuted.  This will
2835                  * propagate all the way to the root via zio_notify_parent().
2836                  */
2837                 ASSERT(zio->io_vd == NULL && zio->io_bp != NULL);
2838                 ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2839
2840                 if (IO_IS_ALLOCATING(zio) &&
2841                     !(zio->io_flags & ZIO_FLAG_CANFAIL)) {
2842                         if (zio->io_error != ENOSPC)
2843                                 zio->io_reexecute |= ZIO_REEXECUTE_NOW;
2844                         else
2845                                 zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
2846                 }
2847
2848                 if ((zio->io_type == ZIO_TYPE_READ ||
2849                     zio->io_type == ZIO_TYPE_FREE) &&
2850                     !(zio->io_flags & ZIO_FLAG_SCAN_THREAD) &&
2851                     zio->io_error == ENXIO &&
2852                     spa_load_state(zio->io_spa) == SPA_LOAD_NONE &&
2853                     spa_get_failmode(zio->io_spa) != ZIO_FAILURE_MODE_CONTINUE)
2854                         zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
2855
2856                 if (!(zio->io_flags & ZIO_FLAG_CANFAIL) && !zio->io_reexecute)
2857                         zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
2858
2859                 /*
2860                  * Here is a possibly good place to attempt to do
2861                  * either combinatorial reconstruction or error correction
2862                  * based on checksums.  It also might be a good place
2863                  * to send out preliminary ereports before we suspend
2864                  * processing.
2865                  */
2866         }
2867
2868         /*
2869          * If there were logical child errors, they apply to us now.
2870          * We defer this until now to avoid conflating logical child
2871          * errors with errors that happened to the zio itself when
2872          * updating vdev stats and reporting FMA events above.
2873          */
2874         zio_inherit_child_errors(zio, ZIO_CHILD_LOGICAL);
2875
2876         if ((zio->io_error || zio->io_reexecute) &&
2877             IO_IS_ALLOCATING(zio) && zio->io_gang_leader == zio &&
2878             !(zio->io_flags & ZIO_FLAG_IO_REWRITE))
2879                 zio_dva_unallocate(zio, zio->io_gang_tree, zio->io_bp);
2880
2881         zio_gang_tree_free(&zio->io_gang_tree);
2882
2883         /*
2884          * Godfather I/Os should never suspend.
2885          */
2886         if ((zio->io_flags & ZIO_FLAG_GODFATHER) &&
2887             (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND))
2888                 zio->io_reexecute = 0;
2889
2890         if (zio->io_reexecute) {
2891                 /*
2892                  * This is a logical I/O that wants to reexecute.
2893                  *
2894                  * Reexecute is top-down.  When an i/o fails, if it's not
2895                  * the root, it simply notifies its parent and sticks around.
2896                  * The parent, seeing that it still has children in zio_done(),
2897                  * does the same.  This percolates all the way up to the root.
2898                  * The root i/o will reexecute or suspend the entire tree.
2899                  *
2900                  * This approach ensures that zio_reexecute() honors
2901                  * all the original i/o dependency relationships, e.g.
2902                  * parents not executing until children are ready.
2903                  */
2904                 ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2905
2906                 zio->io_gang_leader = NULL;
2907
2908                 mutex_enter(&zio->io_lock);
2909                 zio->io_state[ZIO_WAIT_DONE] = 1;
2910                 mutex_exit(&zio->io_lock);
2911
2912                 /*
2913                  * "The Godfather" I/O monitors its children but is
2914                  * not a true parent to them. It will track them through
2915                  * the pipeline but severs its ties whenever they get into
2916                  * trouble (e.g. suspended). This allows "The Godfather"
2917                  * I/O to return status without blocking.
2918                  */
2919                 for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
2920                         zio_link_t *zl = zio->io_walk_link;
2921                         pio_next = zio_walk_parents(zio);
2922
2923                         if ((pio->io_flags & ZIO_FLAG_GODFATHER) &&
2924                             (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND)) {
2925                                 zio_remove_child(pio, zio, zl);
2926                                 zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
2927                         }
2928                 }
2929
2930                 if ((pio = zio_unique_parent(zio)) != NULL) {
2931                         /*
2932                          * We're not a root i/o, so there's nothing to do
2933                          * but notify our parent.  Don't propagate errors
2934                          * upward since we haven't permanently failed yet.
2935                          */
2936                         ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
2937                         zio->io_flags |= ZIO_FLAG_DONT_PROPAGATE;
2938                         zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
2939                 } else if (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND) {
2940                         /*
2941                          * We'd fail again if we reexecuted now, so suspend
2942                          * until conditions improve (e.g. device comes online).
2943                          */
2944                         zio_suspend(zio->io_spa, zio);
2945                 } else {
2946                         /*
2947                          * Reexecution is potentially a huge amount of work.
2948                          * Hand it off to the otherwise-unused claim taskq.
2949                          */
2950                         (void) taskq_dispatch(
2951                             zio->io_spa->spa_zio_taskq[ZIO_TYPE_CLAIM][ZIO_TASKQ_ISSUE],
2952                             (task_func_t *)zio_reexecute, zio, TQ_SLEEP);
2953                 }
2954                 return (ZIO_PIPELINE_STOP);
2955         }
2956
2957         ASSERT(zio->io_child_count == 0);
2958         ASSERT(zio->io_reexecute == 0);
2959         ASSERT(zio->io_error == 0 || (zio->io_flags & ZIO_FLAG_CANFAIL));
2960
2961         /*
2962          * Report any checksum errors, since the I/O is complete.
2963          */
2964         while (zio->io_cksum_report != NULL) {
2965                 zio_cksum_report_t *zcr = zio->io_cksum_report;
2966                 zio->io_cksum_report = zcr->zcr_next;
2967                 zcr->zcr_next = NULL;
2968                 zcr->zcr_finish(zcr, NULL);
2969                 zfs_ereport_free_checksum(zcr);
2970         }
2971
2972         /*
2973          * It is the responsibility of the done callback to ensure that this
2974          * particular zio is no longer discoverable for adoption, and as
2975          * such, cannot acquire any new parents.
2976          */
2977         if (zio->io_done)
2978                 zio->io_done(zio);
2979
2980         mutex_enter(&zio->io_lock);
2981         zio->io_state[ZIO_WAIT_DONE] = 1;
2982         mutex_exit(&zio->io_lock);
2983
2984         for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
2985                 zio_link_t *zl = zio->io_walk_link;
2986                 pio_next = zio_walk_parents(zio);
2987                 zio_remove_child(pio, zio, zl);
2988                 zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
2989         }
2990
2991         if (zio->io_waiter != NULL) {
2992                 mutex_enter(&zio->io_lock);
2993                 zio->io_executor = NULL;
2994                 cv_broadcast(&zio->io_cv);
2995                 mutex_exit(&zio->io_lock);
2996         } else {
2997                 zio_destroy(zio);
2998         }
2999
3000         return (ZIO_PIPELINE_STOP);
3001 }
3002
3003 /*
3004  * ==========================================================================
3005  * I/O pipeline definition
3006  * ==========================================================================
3007  */
3008 static zio_pipe_stage_t *zio_pipeline[] = {
3009         NULL,
3010         zio_read_bp_init,
3011         zio_free_bp_init,
3012         zio_issue_async,
3013         zio_write_bp_init,
3014         zio_checksum_generate,
3015         zio_ddt_read_start,
3016         zio_ddt_read_done,
3017         zio_ddt_write,
3018         zio_ddt_free,
3019         zio_gang_assemble,
3020         zio_gang_issue,
3021         zio_dva_allocate,
3022         zio_dva_free,
3023         zio_dva_claim,
3024         zio_ready,
3025         zio_vdev_io_start,
3026         zio_vdev_io_done,
3027         zio_vdev_io_assess,
3028         zio_checksum_verify,
3029         zio_done
3030 };
3031
3032 #if defined(_KERNEL) && defined(HAVE_SPL)
3033 /* Fault injection */
3034 EXPORT_SYMBOL(zio_injection_enabled);
3035 EXPORT_SYMBOL(zio_inject_fault);
3036 EXPORT_SYMBOL(zio_inject_list_next);
3037 EXPORT_SYMBOL(zio_clear_fault);
3038 EXPORT_SYMBOL(zio_handle_fault_injection);
3039 EXPORT_SYMBOL(zio_handle_device_injection);
3040 EXPORT_SYMBOL(zio_handle_label_injection);
3041 EXPORT_SYMBOL(zio_priority_table);
3042 EXPORT_SYMBOL(zio_type_name);
3043
3044 module_param(zio_bulk_flags, int, 0644);
3045 MODULE_PARM_DESC(zio_bulk_flags, "Additional flags to pass to bulk buffers");
3046
3047 module_param(zio_delay_max, int, 0644);
3048 MODULE_PARM_DESC(zio_delay_max, "Max zio millisec delay before posting event");
3049
3050 module_param(zio_requeue_io_start_cut_in_line, int, 0644);
3051 MODULE_PARM_DESC(zio_requeue_io_start_cut_in_line, "Prioritize requeued I/O");
3052 #endif