Rebase master to b117
[zfs.git] / module / zfs / zio.c
index d347920..a2bdab9 100644 (file)
@@ -19,7 +19,7 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
@@ -69,6 +69,7 @@ char *zio_type_name[ZIO_TYPES] = {
  * ==========================================================================
  */
 kmem_cache_t *zio_cache;
+kmem_cache_t *zio_link_cache;
 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
 
@@ -92,8 +93,10 @@ zio_init(void)
 #ifdef _KERNEL
        data_alloc_arena = zio_alloc_arena;
 #endif
-       zio_cache = kmem_cache_create("zio_cache", sizeof (zio_t), 0,
-           NULL, NULL, NULL, NULL, NULL, 0);
+       zio_cache = kmem_cache_create("zio_cache",
+           sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
+       zio_link_cache = kmem_cache_create("zio_link_cache",
+           sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
 
        /*
         * For small buffers, we want a cache for each multiple of
@@ -164,6 +167,7 @@ zio_fini(void)
                zio_data_buf_cache[c] = NULL;
        }
 
+       kmem_cache_destroy(zio_link_cache);
        kmem_cache_destroy(zio_cache);
 
        zio_inject_fini();
@@ -298,41 +302,102 @@ zio_decompress(zio_t *zio, void *data, uint64_t size)
  * I/O parent/child relationships and pipeline interlocks
  * ==========================================================================
  */
+/*
+ * NOTE - Callers to zio_walk_parents() and zio_walk_children must
+ *        continue calling these functions until they return NULL.
+ *        Otherwise, the next caller will pick up the list walk in
+ *        some indeterminate state.  (Otherwise every caller would
+ *        have to pass in a cookie to keep the state represented by
+ *        io_walk_link, which gets annoying.)
+ */
+zio_t *
+zio_walk_parents(zio_t *cio)
+{
+       zio_link_t *zl = cio->io_walk_link;
+       list_t *pl = &cio->io_parent_list;
 
-static void
-zio_add_child(zio_t *pio, zio_t *zio)
+       zl = (zl == NULL) ? list_head(pl) : list_next(pl, zl);
+       cio->io_walk_link = zl;
+
+       if (zl == NULL)
+               return (NULL);
+
+       ASSERT(zl->zl_child == cio);
+       return (zl->zl_parent);
+}
+
+zio_t *
+zio_walk_children(zio_t *pio)
+{
+       zio_link_t *zl = pio->io_walk_link;
+       list_t *cl = &pio->io_child_list;
+
+       zl = (zl == NULL) ? list_head(cl) : list_next(cl, zl);
+       pio->io_walk_link = zl;
+
+       if (zl == NULL)
+               return (NULL);
+
+       ASSERT(zl->zl_parent == pio);
+       return (zl->zl_child);
+}
+
+zio_t *
+zio_unique_parent(zio_t *cio)
 {
+       zio_t *pio = zio_walk_parents(cio);
+
+       VERIFY(zio_walk_parents(cio) == NULL);
+       return (pio);
+}
+
+void
+zio_add_child(zio_t *pio, zio_t *cio)
+{
+       zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
+
+       /*
+        * Logical I/Os can have logical, gang, or vdev children.
+        * Gang I/Os can have gang or vdev children.
+        * Vdev I/Os can only have vdev children.
+        * The following ASSERT captures all of these constraints.
+        */
+       ASSERT(cio->io_child_type <= pio->io_child_type);
+
+       zl->zl_parent = pio;
+       zl->zl_child = cio;
+
+       mutex_enter(&cio->io_lock);
        mutex_enter(&pio->io_lock);
-       if (zio->io_stage < ZIO_STAGE_READY)
-               pio->io_children[zio->io_child_type][ZIO_WAIT_READY]++;
-       if (zio->io_stage < ZIO_STAGE_DONE)
-               pio->io_children[zio->io_child_type][ZIO_WAIT_DONE]++;
-       zio->io_sibling_prev = NULL;
-       zio->io_sibling_next = pio->io_child;
-       if (pio->io_child != NULL)
-               pio->io_child->io_sibling_prev = zio;
-       pio->io_child = zio;
-       zio->io_parent = pio;
+
+       ASSERT(pio->io_state[ZIO_WAIT_DONE] == 0);
+
+       for (int w = 0; w < ZIO_WAIT_TYPES; w++)
+               pio->io_children[cio->io_child_type][w] += !cio->io_state[w];
+
+       list_insert_head(&pio->io_child_list, zl);
+       list_insert_head(&cio->io_parent_list, zl);
+
        mutex_exit(&pio->io_lock);
+       mutex_exit(&cio->io_lock);
 }
 
 static void
-zio_remove_child(zio_t *pio, zio_t *zio)
+zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
 {
-       zio_t *next, *prev;
-
-       ASSERT(zio->io_parent == pio);
+       ASSERT(zl->zl_parent == pio);
+       ASSERT(zl->zl_child == cio);
 
+       mutex_enter(&cio->io_lock);
        mutex_enter(&pio->io_lock);
-       next = zio->io_sibling_next;
-       prev = zio->io_sibling_prev;
-       if (next != NULL)
-               next->io_sibling_prev = prev;
-       if (prev != NULL)
-               prev->io_sibling_next = next;
-       if (pio->io_child == zio)
-               pio->io_child = next;
+
+       list_remove(&pio->io_child_list, zl);
+       list_remove(&cio->io_parent_list, zl);
+
        mutex_exit(&pio->io_lock);
+       mutex_exit(&cio->io_lock);
+
+       kmem_cache_free(zio_link_cache, zl);
 }
 
 static boolean_t
@@ -407,6 +472,11 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
        mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
        cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
 
+       list_create(&zio->io_parent_list, sizeof (zio_link_t),
+           offsetof(zio_link_t, zl_parent_node));
+       list_create(&zio->io_child_list, sizeof (zio_link_t),
+           offsetof(zio_link_t, zl_child_node));
+
        if (vd != NULL)
                zio->io_child_type = ZIO_CHILD_VDEV;
        else if (flags & ZIO_FLAG_GANG_CHILD)
@@ -420,11 +490,10 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
                zio->io_bp_orig = *bp;
                if (type != ZIO_TYPE_WRITE)
                        zio->io_bp = &zio->io_bp_copy;  /* so caller can free */
-               if (zio->io_child_type == ZIO_CHILD_LOGICAL) {
-                       if (BP_IS_GANG(bp))
-                               pipeline |= ZIO_GANG_STAGES;
+               if (zio->io_child_type == ZIO_CHILD_LOGICAL)
                        zio->io_logical = zio;
-               }
+               if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
+                       pipeline |= ZIO_GANG_STAGES;
        }
 
        zio->io_spa = spa;
@@ -441,19 +510,17 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
        zio->io_orig_stage = zio->io_stage = stage;
        zio->io_orig_pipeline = zio->io_pipeline = pipeline;
 
+       zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
+       zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
+
        if (zb != NULL)
                zio->io_bookmark = *zb;
 
        if (pio != NULL) {
-               /*
-                * Logical I/Os can have logical, gang, or vdev children.
-                * Gang I/Os can have gang or vdev children.
-                * Vdev I/Os can only have vdev children.
-                * The following ASSERT captures all of these constraints.
-                */
-               ASSERT(zio->io_child_type <= pio->io_child_type);
                if (zio->io_logical == NULL)
                        zio->io_logical = pio->io_logical;
+               if (zio->io_child_type == ZIO_CHILD_GANG)
+                       zio->io_gang_leader = pio->io_gang_leader;
                zio_add_child(pio, zio);
        }
 
@@ -463,29 +530,21 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
 static void
 zio_destroy(zio_t *zio)
 {
-       spa_t *spa = zio->io_spa;
-       uint8_t async_root = zio->io_async_root;
-
+       list_destroy(&zio->io_parent_list);
+       list_destroy(&zio->io_child_list);
        mutex_destroy(&zio->io_lock);
        cv_destroy(&zio->io_cv);
        kmem_cache_free(zio_cache, zio);
-
-       if (async_root) {
-               mutex_enter(&spa->spa_async_root_lock);
-               if (--spa->spa_async_root_count == 0)
-                       cv_broadcast(&spa->spa_async_root_cv);
-               mutex_exit(&spa->spa_async_root_lock);
-       }
 }
 
 zio_t *
-zio_null(zio_t *pio, spa_t *spa, zio_done_func_t *done, void *private,
-       int flags)
+zio_null(zio_t *pio, spa_t *spa, vdev_t *vd, zio_done_func_t *done,
+    void *private, int flags)
 {
        zio_t *zio;
 
        zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
-           ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, NULL, 0, NULL,
+           ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
            ZIO_STAGE_OPEN, ZIO_INTERLOCK_PIPELINE);
 
        return (zio);
@@ -494,7 +553,7 @@ zio_null(zio_t *pio, spa_t *spa, zio_done_func_t *done, void *private,
 zio_t *
 zio_root(spa_t *spa, zio_done_func_t *done, void *private, int flags)
 {
-       return (zio_null(NULL, spa, done, private, flags));
+       return (zio_null(NULL, spa, NULL, done, private, flags));
 }
 
 zio_t *
@@ -573,12 +632,12 @@ zio_free(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
        ASSERT(!BP_IS_HOLE(bp));
 
        if (bp->blk_fill == BLK_FILL_ALREADY_FREED)
-               return (zio_null(pio, spa, NULL, NULL, flags));
+               return (zio_null(pio, spa, NULL, NULL, NULL, flags));
 
        if (txg == spa->spa_syncing_txg &&
            spa_sync_pass(spa) > SYNC_PASS_DEFERRED_FREE) {
                bplist_enqueue_deferred(&spa->spa_sync_bplist, bp);
-               return (zio_null(pio, spa, NULL, NULL, flags));
+               return (zio_null(pio, spa, NULL, NULL, NULL, flags));
        }
 
        zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
@@ -629,7 +688,7 @@ zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
 
                zio->io_cmd = cmd;
        } else {
-               zio = zio_null(pio, spa, NULL, NULL, flags);
+               zio = zio_null(pio, spa, NULL, NULL, NULL, flags);
 
                for (c = 0; c < vd->vdev_children; c++)
                        zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
@@ -767,7 +826,9 @@ zio_read_bp_init(zio_t *zio)
 {
        blkptr_t *bp = zio->io_bp;
 
-       if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF && zio->io_logical == zio) {
+       if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
+           zio->io_child_type == ZIO_CHILD_LOGICAL &&
+           !(zio->io_flags & ZIO_FLAG_RAW)) {
                uint64_t csize = BP_GET_PSIZE(bp);
                void *cbuf = zio_buf_alloc(csize);
 
@@ -816,16 +877,10 @@ zio_write_bp_init(zio_t *zio)
                 * few passes, stop compressing to ensure convergence.
                 */
                pass = spa_sync_pass(zio->io_spa);
-               ASSERT(pass > 1);
 
                if (pass > SYNC_PASS_DONT_COMPRESS)
                        compress = ZIO_COMPRESS_OFF;
 
-               /*
-                * Only MOS (objset 0) data should need to be rewritten.
-                */
-               ASSERT(zio->io_logical->io_bookmark.zb_objset == 0);
-
                /* Make sure someone doesn't change their mind on overwrites */
                ASSERT(MIN(zp->zp_ndvas + BP_IS_GANG(bp),
                    spa_max_replication(zio->io_spa)) == BP_GET_NDVAS(bp));
@@ -887,11 +942,11 @@ zio_taskq_dispatch(zio_t *zio, enum zio_taskq_type q)
        zio_type_t t = zio->io_type;
 
        /*
-        * If we're a config writer, the normal issue and interrupt threads
-        * may all be blocked waiting for the config lock.  In this case,
-        * select the otherwise-unused taskq for ZIO_TYPE_NULL.
+        * If we're a config writer or a probe, the normal issue and
+        * interrupt threads may all be blocked waiting for the config lock.
+        * In this case, select the otherwise-unused taskq for ZIO_TYPE_NULL.
         */
-       if (zio->io_flags & ZIO_FLAG_CONFIG_WRITER)
+       if (zio->io_flags & (ZIO_FLAG_CONFIG_WRITER | ZIO_FLAG_PROBE))
                t = ZIO_TYPE_NULL;
 
        /*
@@ -1019,17 +1074,16 @@ zio_nowait(zio_t *zio)
 {
        ASSERT(zio->io_executor == NULL);
 
-       if (zio->io_parent == NULL && zio->io_child_type == ZIO_CHILD_LOGICAL) {
+       if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
+           zio_unique_parent(zio) == NULL) {
                /*
                 * This is a logical async I/O with no parent to wait for it.
-                * Attach it to the pool's global async root zio so that
-                * spa_unload() has a way of waiting for async I/O to finish.
+                * We add it to the spa_async_root_zio "Godfather" I/O which
+                * will ensure they complete prior to unloading the pool.
                 */
                spa_t *spa = zio->io_spa;
-               zio->io_async_root = B_TRUE;
-               mutex_enter(&spa->spa_async_root_lock);
-               spa->spa_async_root_count++;
-               mutex_exit(&spa->spa_async_root_lock);
+
+               zio_add_child(spa->spa_async_zio_root, zio);
        }
 
        zio_execute(zio);
@@ -1044,13 +1098,20 @@ zio_nowait(zio_t *zio)
 static void
 zio_reexecute(zio_t *pio)
 {
-       zio_t *zio, *zio_next;
+       zio_t *cio, *cio_next;
+
+       ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
+       ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
+       ASSERT(pio->io_gang_leader == NULL);
+       ASSERT(pio->io_gang_tree == NULL);
 
        pio->io_flags = pio->io_orig_flags;
        pio->io_stage = pio->io_orig_stage;
        pio->io_pipeline = pio->io_orig_pipeline;
        pio->io_reexecute = 0;
        pio->io_error = 0;
+       for (int w = 0; w < ZIO_WAIT_TYPES; w++)
+               pio->io_state[w] = 0;
        for (int c = 0; c < ZIO_CHILD_TYPES; c++)
                pio->io_child_error[c] = 0;
 
@@ -1070,24 +1131,27 @@ zio_reexecute(zio_t *pio)
 
        /*
         * As we reexecute pio's children, new children could be created.
-        * New children go to the head of the io_child list, however,
+        * New children go to the head of pio's io_child_list, however,
         * so we will (correctly) not reexecute them.  The key is that
-        * the remainder of the io_child list, from 'zio_next' onward,
-        * cannot be affected by any side effects of reexecuting 'zio'.
+        * the remainder of pio's io_child_list, from 'cio_next' onward,
+        * cannot be affected by any side effects of reexecuting 'cio'.
         */
-       for (zio = pio->io_child; zio != NULL; zio = zio_next) {
-               zio_next = zio->io_sibling_next;
+       for (cio = zio_walk_children(pio); cio != NULL; cio = cio_next) {
+               cio_next = zio_walk_children(pio);
                mutex_enter(&pio->io_lock);
-               pio->io_children[zio->io_child_type][ZIO_WAIT_READY]++;
-               pio->io_children[zio->io_child_type][ZIO_WAIT_DONE]++;
+               for (int w = 0; w < ZIO_WAIT_TYPES; w++)
+                       pio->io_children[cio->io_child_type][w]++;
                mutex_exit(&pio->io_lock);
-               zio_reexecute(zio);
+               zio_reexecute(cio);
        }
 
        /*
         * Now that all children have been reexecuted, execute the parent.
+        * We don't reexecute "The Godfather" I/O here as it's the
+        * responsibility of the caller to wait on him.
         */
-       zio_execute(pio);
+       if (!(pio->io_flags & ZIO_FLAG_GODFATHER))
+               zio_execute(pio);
 }
 
 void
@@ -1103,14 +1167,17 @@ zio_suspend(spa_t *spa, zio_t *zio)
        mutex_enter(&spa->spa_suspend_lock);
 
        if (spa->spa_suspend_zio_root == NULL)
-               spa->spa_suspend_zio_root = zio_root(spa, NULL, NULL, 0);
+               spa->spa_suspend_zio_root = zio_root(spa, NULL, NULL,
+                   ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE |
+                   ZIO_FLAG_GODFATHER);
 
        spa->spa_suspended = B_TRUE;
 
        if (zio != NULL) {
+               ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
                ASSERT(zio != spa->spa_suspend_zio_root);
                ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
-               ASSERT(zio->io_parent == NULL);
+               ASSERT(zio_unique_parent(zio) == NULL);
                ASSERT(zio->io_stage == ZIO_STAGE_DONE);
                zio_add_child(spa->spa_suspend_zio_root, zio);
        }
@@ -1118,10 +1185,10 @@ zio_suspend(spa_t *spa, zio_t *zio)
        mutex_exit(&spa->spa_suspend_lock);
 }
 
-void
+int
 zio_resume(spa_t *spa)
 {
-       zio_t *pio, *zio;
+       zio_t *pio;
 
        /*
         * Reexecute all previously suspended i/o.
@@ -1134,17 +1201,10 @@ zio_resume(spa_t *spa)
        mutex_exit(&spa->spa_suspend_lock);
 
        if (pio == NULL)
-               return;
+               return (0);
 
-       while ((zio = pio->io_child) != NULL) {
-               zio_remove_child(pio, zio);
-               zio->io_parent = NULL;
-               zio_reexecute(zio);
-       }
-
-       ASSERT(pio->io_children[ZIO_CHILD_LOGICAL][ZIO_WAIT_DONE] == 0);
-
-       (void) zio_wait(pio);
+       zio_reexecute(pio);
+       return (zio_wait(pio));
 }
 
 void
@@ -1251,7 +1311,7 @@ zio_rewrite_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
                 * (Presently, nothing actually uses interior data checksums;
                 * this is just good hygiene.)
                 */
-               if (gn != pio->io_logical->io_gang_tree) {
+               if (gn != pio->io_gang_leader->io_gang_tree) {
                        zio_checksum_compute(zio, BP_GET_CHECKSUM(bp),
                            data, BP_GET_PSIZE(bp));
                }
@@ -1333,27 +1393,27 @@ zio_gang_tree_free(zio_gang_node_t **gnpp)
 }
 
 static void
-zio_gang_tree_assemble(zio_t *lio, blkptr_t *bp, zio_gang_node_t **gnpp)
+zio_gang_tree_assemble(zio_t *gio, blkptr_t *bp, zio_gang_node_t **gnpp)
 {
        zio_gang_node_t *gn = zio_gang_node_alloc(gnpp);
 
-       ASSERT(lio->io_logical == lio);
+       ASSERT(gio->io_gang_leader == gio);
        ASSERT(BP_IS_GANG(bp));
 
-       zio_nowait(zio_read(lio, lio->io_spa, bp, gn->gn_gbh,
+       zio_nowait(zio_read(gio, gio->io_spa, bp, gn->gn_gbh,
            SPA_GANGBLOCKSIZE, zio_gang_tree_assemble_done, gn,
-           lio->io_priority, ZIO_GANG_CHILD_FLAGS(lio), &lio->io_bookmark));
+           gio->io_priority, ZIO_GANG_CHILD_FLAGS(gio), &gio->io_bookmark));
 }
 
 static void
 zio_gang_tree_assemble_done(zio_t *zio)
 {
-       zio_t *lio = zio->io_logical;
+       zio_t *gio = zio->io_gang_leader;
        zio_gang_node_t *gn = zio->io_private;
        blkptr_t *bp = zio->io_bp;
 
-       ASSERT(zio->io_parent == lio);
-       ASSERT(zio->io_child == NULL);
+       ASSERT(gio == zio_unique_parent(zio));
+       ASSERT(zio_walk_children(zio) == NULL);
 
        if (zio->io_error)
                return;
@@ -1369,25 +1429,25 @@ zio_gang_tree_assemble_done(zio_t *zio)
                blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
                if (!BP_IS_GANG(gbp))
                        continue;
-               zio_gang_tree_assemble(lio, gbp, &gn->gn_child[g]);
+               zio_gang_tree_assemble(gio, gbp, &gn->gn_child[g]);
        }
 }
 
 static void
 zio_gang_tree_issue(zio_t *pio, zio_gang_node_t *gn, blkptr_t *bp, void *data)
 {
-       zio_t *lio = pio->io_logical;
+       zio_t *gio = pio->io_gang_leader;
        zio_t *zio;
 
        ASSERT(BP_IS_GANG(bp) == !!gn);
-       ASSERT(BP_GET_CHECKSUM(bp) == BP_GET_CHECKSUM(lio->io_bp));
-       ASSERT(BP_GET_LSIZE(bp) == BP_GET_PSIZE(bp) || gn == lio->io_gang_tree);
+       ASSERT(BP_GET_CHECKSUM(bp) == BP_GET_CHECKSUM(gio->io_bp));
+       ASSERT(BP_GET_LSIZE(bp) == BP_GET_PSIZE(bp) || gn == gio->io_gang_tree);
 
        /*
         * If you're a gang header, your data is in gn->gn_gbh.
         * If you're a gang member, your data is in 'data' and gn == NULL.
         */
-       zio = zio_gang_issue_func[lio->io_type](pio, bp, gn, data);
+       zio = zio_gang_issue_func[gio->io_type](pio, bp, gn, data);
 
        if (gn != NULL) {
                ASSERT(gn->gn_gbh->zg_tail.zbt_magic == ZBT_MAGIC);
@@ -1401,8 +1461,8 @@ zio_gang_tree_issue(zio_t *pio, zio_gang_node_t *gn, blkptr_t *bp, void *data)
                }
        }
 
-       if (gn == lio->io_gang_tree)
-               ASSERT3P((char *)lio->io_data + lio->io_size, ==, data);
+       if (gn == gio->io_gang_tree)
+               ASSERT3P((char *)gio->io_data + gio->io_size, ==, data);
 
        if (zio != pio)
                zio_nowait(zio);
@@ -1413,7 +1473,10 @@ zio_gang_assemble(zio_t *zio)
 {
        blkptr_t *bp = zio->io_bp;
 
-       ASSERT(BP_IS_GANG(bp) && zio == zio->io_logical);
+       ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == NULL);
+       ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
+
+       zio->io_gang_leader = zio;
 
        zio_gang_tree_assemble(zio, bp, &zio->io_gang_tree);
 
@@ -1423,18 +1486,18 @@ zio_gang_assemble(zio_t *zio)
 static int
 zio_gang_issue(zio_t *zio)
 {
-       zio_t *lio = zio->io_logical;
        blkptr_t *bp = zio->io_bp;
 
        if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE))
                return (ZIO_PIPELINE_STOP);
 
-       ASSERT(BP_IS_GANG(bp) && zio == lio);
+       ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == zio);
+       ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
 
        if (zio->io_child_error[ZIO_CHILD_GANG] == 0)
-               zio_gang_tree_issue(lio, lio->io_gang_tree, bp, lio->io_data);
+               zio_gang_tree_issue(zio, zio->io_gang_tree, bp, zio->io_data);
        else
-               zio_gang_tree_free(&lio->io_gang_tree);
+               zio_gang_tree_free(&zio->io_gang_tree);
 
        zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
@@ -1444,8 +1507,8 @@ zio_gang_issue(zio_t *zio)
 static void
 zio_write_gang_member_ready(zio_t *zio)
 {
-       zio_t *pio = zio->io_parent;
-       zio_t *lio = zio->io_logical;
+       zio_t *pio = zio_unique_parent(zio);
+       zio_t *gio = zio->io_gang_leader;
        dva_t *cdva = zio->io_bp->blk_dva;
        dva_t *pdva = pio->io_bp->blk_dva;
        uint64_t asize;
@@ -1456,7 +1519,7 @@ zio_write_gang_member_ready(zio_t *zio)
        ASSERT(BP_IS_HOLE(&zio->io_bp_orig));
 
        ASSERT(zio->io_child_type == ZIO_CHILD_GANG);
-       ASSERT3U(zio->io_prop.zp_ndvas, ==, lio->io_prop.zp_ndvas);
+       ASSERT3U(zio->io_prop.zp_ndvas, ==, gio->io_prop.zp_ndvas);
        ASSERT3U(zio->io_prop.zp_ndvas, <=, BP_GET_NDVAS(zio->io_bp));
        ASSERT3U(pio->io_prop.zp_ndvas, <=, BP_GET_NDVAS(pio->io_bp));
        ASSERT3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
@@ -1476,28 +1539,28 @@ zio_write_gang_block(zio_t *pio)
 {
        spa_t *spa = pio->io_spa;
        blkptr_t *bp = pio->io_bp;
-       zio_t *lio = pio->io_logical;
+       zio_t *gio = pio->io_gang_leader;
        zio_t *zio;
        zio_gang_node_t *gn, **gnpp;
        zio_gbh_phys_t *gbh;
        uint64_t txg = pio->io_txg;
        uint64_t resid = pio->io_size;
        uint64_t lsize;
-       int ndvas = lio->io_prop.zp_ndvas;
+       int ndvas = gio->io_prop.zp_ndvas;
        int gbh_ndvas = MIN(ndvas + 1, spa_max_replication(spa));
        zio_prop_t zp;
        int error;
 
        error = metaslab_alloc(spa, spa->spa_normal_class, SPA_GANGBLOCKSIZE,
-           bp, gbh_ndvas, txg, pio == lio ? NULL : lio->io_bp,
+           bp, gbh_ndvas, txg, pio == gio ? NULL : gio->io_bp,
            METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER);
        if (error) {
                pio->io_error = error;
                return (ZIO_PIPELINE_CONTINUE);
        }
 
-       if (pio == lio) {
-               gnpp = &lio->io_gang_tree;
+       if (pio == gio) {
+               gnpp = &gio->io_gang_tree;
        } else {
                gnpp = pio->io_private;
                ASSERT(pio->io_ready == zio_write_gang_member_ready);
@@ -1521,11 +1584,11 @@ zio_write_gang_block(zio_t *pio)
                    SPA_MINBLOCKSIZE);
                ASSERT(lsize >= SPA_MINBLOCKSIZE && lsize <= resid);
 
-               zp.zp_checksum = lio->io_prop.zp_checksum;
+               zp.zp_checksum = gio->io_prop.zp_checksum;
                zp.zp_compress = ZIO_COMPRESS_OFF;
                zp.zp_type = DMU_OT_NONE;
                zp.zp_level = 0;
-               zp.zp_ndvas = lio->io_prop.zp_ndvas;
+               zp.zp_ndvas = gio->io_prop.zp_ndvas;
 
                zio_nowait(zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
                    (char *)pio->io_data + (pio->io_size - resid), lsize, &zp,
@@ -1558,6 +1621,11 @@ zio_dva_allocate(zio_t *zio)
        blkptr_t *bp = zio->io_bp;
        int error;
 
+       if (zio->io_gang_leader == NULL) {
+               ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
+               zio->io_gang_leader = zio;
+       }
+
        ASSERT(BP_IS_HOLE(bp));
        ASSERT3U(BP_GET_NDVAS(bp), ==, 0);
        ASSERT3U(zio->io_prop.zp_ndvas, >, 0);
@@ -1689,72 +1757,6 @@ zio_free_blk(spa_t *spa, blkptr_t *bp, uint64_t txg)
  * Read and write to physical devices
  * ==========================================================================
  */
-
-static void
-zio_vdev_io_probe_done(zio_t *zio)
-{
-       zio_t *dio;
-       vdev_t *vd = zio->io_private;
-
-       mutex_enter(&vd->vdev_probe_lock);
-       ASSERT(vd->vdev_probe_zio == zio);
-       vd->vdev_probe_zio = NULL;
-       mutex_exit(&vd->vdev_probe_lock);
-
-       while ((dio = zio->io_delegate_list) != NULL) {
-               zio->io_delegate_list = dio->io_delegate_next;
-               dio->io_delegate_next = NULL;
-               if (!vdev_accessible(vd, dio))
-                       dio->io_error = ENXIO;
-               zio_execute(dio);
-       }
-}
-
-/*
- * Probe the device to determine whether I/O failure is specific to this
- * zio (e.g. a bad sector) or affects the entire vdev (e.g. unplugged).
- */
-static int
-zio_vdev_io_probe(zio_t *zio)
-{
-       vdev_t *vd = zio->io_vd;
-       zio_t *pio = NULL;
-       boolean_t created_pio = B_FALSE;
-
-       /*
-        * Don't probe the probe.
-        */
-       if (zio->io_flags & ZIO_FLAG_PROBE)
-               return (ZIO_PIPELINE_CONTINUE);
-
-       /*
-        * To prevent 'probe storms' when a device fails, we create
-        * just one probe i/o at a time.  All zios that want to probe
-        * this vdev will join the probe zio's io_delegate_list.
-        */
-       mutex_enter(&vd->vdev_probe_lock);
-
-       if ((pio = vd->vdev_probe_zio) == NULL) {
-               vd->vdev_probe_zio = pio = zio_root(zio->io_spa,
-                   zio_vdev_io_probe_done, vd, ZIO_FLAG_CANFAIL);
-               created_pio = B_TRUE;
-               vd->vdev_probe_wanted = B_TRUE;
-               spa_async_request(zio->io_spa, SPA_ASYNC_PROBE);
-       }
-
-       zio->io_delegate_next = pio->io_delegate_list;
-       pio->io_delegate_list = zio;
-
-       mutex_exit(&vd->vdev_probe_lock);
-
-       if (created_pio) {
-               zio_nowait(vdev_probe(vd, pio));
-               zio_nowait(pio);
-       }
-
-       return (ZIO_PIPELINE_STOP);
-}
-
 static int
 zio_vdev_io_start(zio_t *zio)
 {
@@ -1790,13 +1792,35 @@ zio_vdev_io_start(zio_t *zio)
 
        ASSERT(P2PHASE(zio->io_offset, align) == 0);
        ASSERT(P2PHASE(zio->io_size, align) == 0);
-       ASSERT(zio->io_type != ZIO_TYPE_WRITE || (spa_mode & FWRITE));
+       ASSERT(zio->io_type != ZIO_TYPE_WRITE || spa_writeable(spa));
+
+       /*
+        * If this is a repair I/O, and there's no self-healing involved --
+        * that is, we're just resilvering what we expect to resilver --
+        * then don't do the I/O unless zio's txg is actually in vd's DTL.
+        * This prevents spurious resilvering with nested replication.
+        * For example, given a mirror of mirrors, (A+B)+(C+D), if only
+        * A is out of date, we'll read from C+D, then use the data to
+        * resilver A+B -- but we don't actually want to resilver B, just A.
+        * The top-level mirror has no way to know this, so instead we just
+        * discard unnecessary repairs as we work our way down the vdev tree.
+        * The same logic applies to any form of nested replication:
+        * ditto + mirror, RAID-Z + replacing, etc.  This covers them all.
+        */
+       if ((zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
+           !(zio->io_flags & ZIO_FLAG_SELF_HEAL) &&
+           zio->io_txg != 0 && /* not a delegated i/o */
+           !vdev_dtl_contains(vd, DTL_PARTIAL, zio->io_txg, 1)) {
+               ASSERT(zio->io_type == ZIO_TYPE_WRITE);
+               zio_vdev_io_bypass(zio);
+               return (ZIO_PIPELINE_CONTINUE);
+       }
 
        if (vd->vdev_ops->vdev_op_leaf &&
            (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE)) {
 
                if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio) == 0)
-                       return (ZIO_PIPELINE_STOP);
+                       return (ZIO_PIPELINE_CONTINUE);
 
                if ((zio = vdev_queue_io(zio)) == NULL)
                        return (ZIO_PIPELINE_STOP);
@@ -1806,7 +1830,6 @@ zio_vdev_io_start(zio_t *zio)
                        zio_interrupt(zio);
                        return (ZIO_PIPELINE_STOP);
                }
-
        }
 
        return (vd->vdev_ops->vdev_op_io_start(zio));
@@ -1832,7 +1855,8 @@ zio_vdev_io_done(zio_t *zio)
                        vdev_cache_write(zio);
 
                if (zio_injection_enabled && zio->io_error == 0)
-                       zio->io_error = zio_handle_device_injection(vd, EIO);
+                       zio->io_error = zio_handle_device_injection(vd,
+                           zio, EIO);
 
                if (zio_injection_enabled && zio->io_error == 0)
                        zio->io_error = zio_handle_label_injection(zio, EIO);
@@ -1849,7 +1873,7 @@ zio_vdev_io_done(zio_t *zio)
        ops->vdev_op_io_done(zio);
 
        if (unexpected_error)
-               return (zio_vdev_io_probe(zio));
+               VERIFY(vdev_probe(vd, zio) == NULL);
 
        return (ZIO_PIPELINE_CONTINUE);
 }
@@ -2045,13 +2069,12 @@ static int
 zio_ready(zio_t *zio)
 {
        blkptr_t *bp = zio->io_bp;
-       zio_t *pio = zio->io_parent;
+       zio_t *pio, *pio_next;
 
-       if (zio->io_ready) {
-               if (BP_IS_GANG(bp) &&
-                   zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY))
-                       return (ZIO_PIPELINE_STOP);
+       if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY))
+               return (ZIO_PIPELINE_STOP);
 
+       if (zio->io_ready) {
                ASSERT(IO_IS_ALLOCATING(zio));
                ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
                ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
@@ -2065,8 +2088,22 @@ zio_ready(zio_t *zio)
        if (zio->io_error)
                zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
-       if (pio != NULL)
+       mutex_enter(&zio->io_lock);
+       zio->io_state[ZIO_WAIT_READY] = 1;
+       pio = zio_walk_parents(zio);
+       mutex_exit(&zio->io_lock);
+
+       /*
+        * As we notify zio's parents, new parents could be added.
+        * New parents go to the head of zio's io_parent_list, however,
+        * so we will (correctly) not notify them.  The remainder of zio's
+        * io_parent_list, from 'pio_next' onward, cannot change because
+        * all parents must wait for us to be done before they can be done.
+        */
+       for (; pio != NULL; pio = pio_next) {
+               pio_next = zio_walk_parents(zio);
                zio_notify_parent(pio, zio, ZIO_WAIT_READY);
+       }
 
        return (ZIO_PIPELINE_CONTINUE);
 }
@@ -2075,14 +2112,14 @@ static int
 zio_done(zio_t *zio)
 {
        spa_t *spa = zio->io_spa;
-       zio_t *pio = zio->io_parent;
        zio_t *lio = zio->io_logical;
        blkptr_t *bp = zio->io_bp;
        vdev_t *vd = zio->io_vd;
        uint64_t psize = zio->io_size;
+       zio_t *pio, *pio_next;
 
        /*
-        * If our of children haven't all completed,
+        * If our children haven't all completed,
         * wait for them and then repeat this pipeline stage.
         */
        if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE) ||
@@ -2099,7 +2136,7 @@ zio_done(zio_t *zio)
                ASSERT(bp->blk_pad[1] == 0);
                ASSERT(bp->blk_pad[2] == 0);
                ASSERT(bcmp(bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
-                   (pio != NULL && bp == pio->io_bp));
+                   (bp == zio_unique_parent(zio)->io_bp));
                if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(bp) &&
                    !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
                        ASSERT(!BP_SHOULD_BYTESWAP(bp));
@@ -2157,6 +2194,7 @@ zio_done(zio_t *zio)
                if ((zio->io_type == ZIO_TYPE_READ ||
                    zio->io_type == ZIO_TYPE_FREE) &&
                    zio->io_error == ENXIO &&
+                   spa->spa_load_state == SPA_LOAD_NONE &&
                    spa_get_failmode(spa) != ZIO_FAILURE_MODE_CONTINUE)
                        zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
 
@@ -2172,6 +2210,21 @@ zio_done(zio_t *zio)
         */
        zio_inherit_child_errors(zio, ZIO_CHILD_LOGICAL);
 
+       if ((zio->io_error || zio->io_reexecute) && IO_IS_ALLOCATING(zio) &&
+           zio->io_child_type == ZIO_CHILD_LOGICAL) {
+               ASSERT(zio->io_child_type != ZIO_CHILD_GANG);
+               zio_dva_unallocate(zio, zio->io_gang_tree, bp);
+       }
+
+       zio_gang_tree_free(&zio->io_gang_tree);
+
+       /*
+        * Godfather I/Os should never suspend.
+        */
+       if ((zio->io_flags & ZIO_FLAG_GODFATHER) &&
+           (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND))
+               zio->io_reexecute = 0;
+
        if (zio->io_reexecute) {
                /*
                 * This is a logical I/O that wants to reexecute.
@@ -2188,17 +2241,37 @@ zio_done(zio_t *zio)
                 */
                ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
 
-               if (IO_IS_ALLOCATING(zio))
-                       zio_dva_unallocate(zio, zio->io_gang_tree, bp);
+               zio->io_gang_leader = NULL;
 
-               zio_gang_tree_free(&zio->io_gang_tree);
+               mutex_enter(&zio->io_lock);
+               zio->io_state[ZIO_WAIT_DONE] = 1;
+               mutex_exit(&zio->io_lock);
+
+               /*
+                * "The Godfather" I/O monitors its children but is
+                * not a true parent to them. It will track them through
+                * the pipeline but severs its ties whenever they get into
+                * trouble (e.g. suspended). This allows "The Godfather"
+                * I/O to return status without blocking.
+                */
+               for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
+                       zio_link_t *zl = zio->io_walk_link;
+                       pio_next = zio_walk_parents(zio);
+
+                       if ((pio->io_flags & ZIO_FLAG_GODFATHER) &&
+                           (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND)) {
+                               zio_remove_child(pio, zio, zl);
+                               zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
+                       }
+               }
 
-               if (pio != NULL) {
+               if ((pio = zio_unique_parent(zio)) != NULL) {
                        /*
                         * We're not a root i/o, so there's nothing to do
                         * but notify our parent.  Don't propagate errors
                         * upward since we haven't permanently failed yet.
                         */
+                       ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
                        zio->io_flags |= ZIO_FLAG_DONT_PROPAGATE;
                        zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
                } else if (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND) {
@@ -2219,20 +2292,26 @@ zio_done(zio_t *zio)
                return (ZIO_PIPELINE_STOP);
        }
 
-       ASSERT(zio->io_child == NULL);
+       ASSERT(zio_walk_children(zio) == NULL);
        ASSERT(zio->io_reexecute == 0);
        ASSERT(zio->io_error == 0 || (zio->io_flags & ZIO_FLAG_CANFAIL));
 
+       /*
+        * It is the responsibility of the done callback to ensure that this
+        * particular zio is no longer discoverable for adoption, and as
+        * such, cannot acquire any new parents.
+        */
        if (zio->io_done)
                zio->io_done(zio);
 
-       zio_gang_tree_free(&zio->io_gang_tree);
-
-       ASSERT(zio->io_delegate_list == NULL);
-       ASSERT(zio->io_delegate_next == NULL);
+       mutex_enter(&zio->io_lock);
+       zio->io_state[ZIO_WAIT_DONE] = 1;
+       mutex_exit(&zio->io_lock);
 
-       if (pio != NULL) {
-               zio_remove_child(pio, zio);
+       for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
+               zio_link_t *zl = zio->io_walk_link;
+               pio_next = zio_walk_parents(zio);
+               zio_remove_child(pio, zio, zl);
                zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
        }