Rebase to OpenSolaris b103, in the process we are removing any code which did not...
[zfs.git] / zfs / lib / libzpool / dbuf.c
index 08d17fb..d046103 100644 (file)
@@ -23,8 +23,6 @@
  * Use is subject to license terms.
  */
 
-#pragma ident  "@(#)dbuf.c     1.32    08/03/20 SMI"
-
 #include <sys/zfs_context.h>
 #include <sys/dmu.h>
 #include <sys/dmu_impl.h>
 
 static void dbuf_destroy(dmu_buf_impl_t *db);
 static int dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx);
-static void dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, int checksum,
-    int compress, dmu_tx_t *tx);
+static void dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx);
 static arc_done_func_t dbuf_write_ready;
 static arc_done_func_t dbuf_write_done;
-
-int zfs_mdcomp_disable = 0;
+static zio_done_func_t dbuf_skip_write_ready;
+static zio_done_func_t dbuf_skip_write_done;
 
 /*
  * Global data structures and functions for the dbuf cache.
@@ -313,20 +310,18 @@ dbuf_verify(dmu_buf_impl_t *db)
                ASSERT3U(db->db.db_offset, ==, db->db_blkid * db->db.db_size);
        }
 
-       if (db->db_level == 0) {
-               /* we can be momentarily larger in dnode_set_blksz() */
-               if (db->db_blkid != DB_BONUS_BLKID && dn) {
-                       ASSERT3U(db->db.db_size, >=, dn->dn_datablksz);
-               }
-               if (db->db.db_object == DMU_META_DNODE_OBJECT) {
-                       dbuf_dirty_record_t *dr = db->db_data_pending;
-                       /*
-                        * it should only be modified in syncing
-                        * context, so make sure we only have
-                        * one copy of the data.
-                        */
-                       ASSERT(dr == NULL || dr->dt.dl.dr_data == db->db_buf);
-               }
+       /*
+        * We can't assert that db_size matches dn_datablksz because it
+        * can be momentarily different when another thread is doing
+        * dnode_set_blksz().
+        */
+       if (db->db_level == 0 && db->db.db_object == DMU_META_DNODE_OBJECT) {
+               dbuf_dirty_record_t *dr = db->db_data_pending;
+               /*
+                * It should only be modified in syncing context, so
+                * make sure we only have one copy of the data.
+                */
+               ASSERT(dr == NULL || dr->dt.dl.dr_data == db->db_buf);
        }
 
        /* verify db->db_blkptr */
@@ -403,7 +398,8 @@ dbuf_set_data(dmu_buf_impl_t *db, arc_buf_t *buf)
        } else {
                dbuf_evict_user(db);
                db->db.db_data = NULL;
-               db->db_state = DB_UNCACHED;
+               if (db->db_state != DB_NOFILL)
+                       db->db_state = DB_UNCACHED;
        }
 }
 
@@ -456,26 +452,27 @@ dbuf_read_done(zio_t *zio, arc_buf_t *buf, void *vdb)
 static void
 dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
 {
-       blkptr_t *bp;
+       dnode_t *dn = db->db_dnode;
        zbookmark_t zb;
        uint32_t aflags = ARC_NOWAIT;
+       arc_buf_t *pbuf;
 
        ASSERT(!refcount_is_zero(&db->db_holds));
        /* We need the struct_rwlock to prevent db_blkptr from changing. */
-       ASSERT(RW_LOCK_HELD(&db->db_dnode->dn_struct_rwlock));
+       ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
        ASSERT(MUTEX_HELD(&db->db_mtx));
        ASSERT(db->db_state == DB_UNCACHED);
        ASSERT(db->db_buf == NULL);
 
        if (db->db_blkid == DB_BONUS_BLKID) {
-               int bonuslen = db->db_dnode->dn_bonuslen;
+               int bonuslen = dn->dn_bonuslen;
 
                ASSERT3U(bonuslen, <=, db->db.db_size);
                db->db.db_data = zio_buf_alloc(DN_MAX_BONUSLEN);
                arc_space_consume(DN_MAX_BONUSLEN);
                if (bonuslen < DN_MAX_BONUSLEN)
                        bzero(db->db.db_data, DN_MAX_BONUSLEN);
-               bcopy(DN_BONUS(db->db_dnode->dn_phys), db->db.db_data,
+               bcopy(DN_BONUS(dn->dn_phys), db->db.db_data,
                    bonuslen);
                dbuf_update_data(db);
                db->db_state = DB_CACHED;
@@ -483,21 +480,17 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
                return;
        }
 
-       if (db->db_level == 0 && dnode_block_freed(db->db_dnode, db->db_blkid))
-               bp = NULL;
-       else
-               bp = db->db_blkptr;
-
-       if (bp == NULL)
-               dprintf_dbuf(db, "blkptr: %s\n", "NULL");
-       else
-               dprintf_dbuf_bp(db, bp, "%s", "blkptr:");
-
-       if (bp == NULL || BP_IS_HOLE(bp)) {
+       /*
+        * Recheck BP_IS_HOLE() after dnode_block_freed() in case dnode_sync()
+        * processes the delete record and clears the bp while we are waiting
+        * for the dn_mtx (resulting in a "no" from block_freed).
+        */
+       if (db->db_blkptr == NULL || BP_IS_HOLE(db->db_blkptr) ||
+           (db->db_level == 0 && (dnode_block_freed(dn, db->db_blkid) ||
+           BP_IS_HOLE(db->db_blkptr)))) {
                arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
 
-               ASSERT(bp == NULL || BP_IS_HOLE(bp));
-               dbuf_set_data(db, arc_buf_alloc(db->db_dnode->dn_objset->os_spa,
+               dbuf_set_data(db, arc_buf_alloc(dn->dn_objset->os_spa,
                    db->db.db_size, db, type));
                bzero(db->db.db_data, db->db.db_size);
                db->db_state = DB_CACHED;
@@ -509,6 +502,9 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
        db->db_state = DB_READ;
        mutex_exit(&db->db_mtx);
 
+       if (DBUF_IS_L2CACHEABLE(db))
+               aflags |= ARC_L2CACHE;
+
        zb.zb_objset = db->db_objset->os_dsl_dataset ?
            db->db_objset->os_dsl_dataset->ds_object : 0;
        zb.zb_object = db->db.db_object;
@@ -517,10 +513,13 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
 
        dbuf_add_ref(db, NULL);
        /* ZIO_FLAG_CANFAIL callers have to check the parent zio's error */
-       ASSERT3U(db->db_dnode->dn_type, <, DMU_OT_NUMTYPES);
-       (void) arc_read(zio, db->db_dnode->dn_objset->os_spa, bp,
-           db->db_level > 0 ? byteswap_uint64_array :
-           dmu_ot[db->db_dnode->dn_type].ot_byteswap,
+
+       if (db->db_parent)
+               pbuf = db->db_parent->db_buf;
+       else
+               pbuf = db->db_objset->os_phys_buf;
+
+       (void) arc_read(zio, dn->dn_objset->os_spa, db->db_blkptr, pbuf,
            dbuf_read_done, db, ZIO_PRIORITY_SYNC_READ,
            (*flags & DB_RF_CANFAIL) ? ZIO_FLAG_CANFAIL : ZIO_FLAG_MUSTSUCCEED,
            &aflags, &zb);
@@ -541,11 +540,15 @@ dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
         */
        ASSERT(!refcount_is_zero(&db->db_holds));
 
+       if (db->db_state == DB_NOFILL)
+               return (EIO);
+
        if ((flags & DB_RF_HAVESTRUCT) == 0)
                rw_enter(&db->db_dnode->dn_struct_rwlock, RW_READER);
 
        prefetch = db->db_level == 0 && db->db_blkid != DB_BONUS_BLKID &&
-           (flags & DB_RF_NOPREFETCH) == 0 && db->db_dnode != NULL;
+           (flags & DB_RF_NOPREFETCH) == 0 && db->db_dnode != NULL &&
+           DBUF_IS_CACHEABLE(db);
 
        mutex_enter(&db->db_mtx);
        if (db->db_state == DB_CACHED) {
@@ -615,6 +618,8 @@ dbuf_noread(dmu_buf_impl_t *db)
                dbuf_set_data(db, arc_buf_alloc(db->db_dnode->dn_objset->os_spa,
                    db->db.db_size, db, type));
                db->db_state = DB_FILL;
+       } else if (db->db_state == DB_NOFILL) {
+               dbuf_set_data(db, NULL);
        } else {
                ASSERT3U(db->db_state, ==, DB_CACHED);
        }
@@ -690,7 +695,8 @@ dbuf_unoverride(dbuf_dirty_record_t *dr)
        /* free this block */
        if (!BP_IS_HOLE(&dr->dt.dl.dr_overridden_by)) {
                /* XXX can get silent EIO here */
-               (void) arc_free(NULL, db->db_dnode->dn_objset->os_spa,
+               (void) dsl_free(NULL,
+                   spa_get_dsl(db->db_dnode->dn_objset->os_spa),
                    txg, &dr->dt.dl.dr_overridden_by, NULL, NULL, ARC_WAIT);
        }
        dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
@@ -705,22 +711,50 @@ dbuf_unoverride(dbuf_dirty_record_t *dr)
        arc_release(dr->dt.dl.dr_data, db);
 }
 
+/*
+ * Evict (if its unreferenced) or clear (if its referenced) any level-0
+ * data blocks in the free range, so that any future readers will find
+ * empty blocks.  Also, if we happen accross any level-1 dbufs in the
+ * range that have not already been marked dirty, mark them dirty so
+ * they stay in memory.
+ */
 void
-dbuf_free_range(dnode_t *dn, uint64_t blkid, uint64_t nblks, dmu_tx_t *tx)
+dbuf_free_range(dnode_t *dn, uint64_t start, uint64_t end, dmu_tx_t *tx)
 {
        dmu_buf_impl_t *db, *db_next;
        uint64_t txg = tx->tx_txg;
+       int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
+       uint64_t first_l1 = start >> epbs;
+       uint64_t last_l1 = end >> epbs;
 
-       dprintf_dnode(dn, "blkid=%llu nblks=%llu\n", blkid, nblks);
+       if (end > dn->dn_maxblkid) {
+               end = dn->dn_maxblkid;
+               last_l1 = end >> epbs;
+       }
+       dprintf_dnode(dn, "start=%llu end=%llu\n", start, end);
        mutex_enter(&dn->dn_dbufs_mtx);
        for (db = list_head(&dn->dn_dbufs); db; db = db_next) {
                db_next = list_next(&dn->dn_dbufs, db);
                ASSERT(db->db_blkid != DB_BONUS_BLKID);
+
+               if (db->db_level == 1 &&
+                   db->db_blkid >= first_l1 && db->db_blkid <= last_l1) {
+                       mutex_enter(&db->db_mtx);
+                       if (db->db_last_dirty &&
+                           db->db_last_dirty->dr_txg < txg) {
+                               dbuf_add_ref(db, FTAG);
+                               mutex_exit(&db->db_mtx);
+                               dbuf_will_dirty(db, tx);
+                               dbuf_rele(db, FTAG);
+                       } else {
+                               mutex_exit(&db->db_mtx);
+                       }
+               }
+
                if (db->db_level != 0)
                        continue;
                dprintf_dbuf(db, "found buf %s\n", "");
-               if (db->db_blkid < blkid ||
-                   db->db_blkid >= blkid+nblks)
+               if (db->db_blkid < start || db->db_blkid > end)
                        continue;
 
                /* found a level 0 buffer in the range */
@@ -729,6 +763,7 @@ dbuf_free_range(dnode_t *dn, uint64_t blkid, uint64_t nblks, dmu_tx_t *tx)
 
                mutex_enter(&db->db_mtx);
                if (db->db_state == DB_UNCACHED ||
+                   db->db_state == DB_NOFILL ||
                    db->db_state == DB_EVICTING) {
                        ASSERT(db->db.db_data == NULL);
                        mutex_exit(&db->db_mtx);
@@ -862,6 +897,7 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        objset_impl_t *os = dn->dn_objset;
        dbuf_dirty_record_t **drp, *dr;
        int drop_struct_lock = FALSE;
+       boolean_t do_free_accounting = B_FALSE;
        int txgoff = tx->tx_txg & TXG_MASK;
 
        ASSERT(tx->tx_txg != 0);
@@ -897,7 +933,8 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
         * syncing context don't bother holding ahead.
         */
        ASSERT(db->db_level != 0 ||
-           db->db_state == DB_CACHED || db->db_state == DB_FILL);
+           db->db_state == DB_CACHED || db->db_state == DB_FILL ||
+           db->db_state == DB_NOFILL);
 
        mutex_enter(&dn->dn_mtx);
        /*
@@ -966,22 +1003,13 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        if (db->db_blkid != DB_BONUS_BLKID) {
                /*
                 * Update the accounting.
+                * Note: we delay "free accounting" until after we drop
+                * the db_mtx.  This keeps us from grabbing other locks
+                * (and possibly deadlocking) in bp_get_dasize() while
+                * also holding the db_mtx.
                 */
-               if (dbuf_block_freeable(db)) {
-                       blkptr_t *bp = db->db_blkptr;
-                       int64_t willfree = (bp && !BP_IS_HOLE(bp)) ?
-                           bp_get_dasize(os->os_spa, bp) : db->db.db_size;
-                       /*
-                        * This is only a guess -- if the dbuf is dirty
-                        * in a previous txg, we don't know how much
-                        * space it will use on disk yet.  We should
-                        * really have the struct_rwlock to access
-                        * db_blkptr, but since this is just a guess,
-                        * it's OK if we get an odd answer.
-                        */
-                       dnode_willuse_space(dn, -willfree, tx);
-               }
                dnode_willuse_space(dn, db->db.db_size, tx);
+               do_free_accounting = dbuf_block_freeable(db);
        }
 
        /*
@@ -993,22 +1021,26 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        if (db->db_level == 0) {
                void *data_old = db->db_buf;
 
-               if (db->db_blkid == DB_BONUS_BLKID) {
-                       dbuf_fix_old_data(db, tx->tx_txg);
-                       data_old = db->db.db_data;
-               } else if (db->db.db_object != DMU_META_DNODE_OBJECT) {
-                       /*
-                        * Release the data buffer from the cache so that we
-                        * can modify it without impacting possible other users
-                        * of this cached data block.  Note that indirect
-                        * blocks and private objects are not released until the
-                        * syncing state (since they are only modified then).
-                        */
-                       arc_release(db->db_buf, db);
-                       dbuf_fix_old_data(db, tx->tx_txg);
-                       data_old = db->db_buf;
+               if (db->db_state != DB_NOFILL) {
+                       if (db->db_blkid == DB_BONUS_BLKID) {
+                               dbuf_fix_old_data(db, tx->tx_txg);
+                               data_old = db->db.db_data;
+                       } else if (db->db.db_object != DMU_META_DNODE_OBJECT) {
+                               /*
+                                * Release the data buffer from the cache so
+                                * that we can modify it without impacting
+                                * possible other users of this cached data
+                                * block.  Note that indirect blocks and
+                                * private objects are not released until the
+                                * syncing state (since they are only modified
+                                * then).
+                                */
+                               arc_release(db->db_buf, db);
+                               dbuf_fix_old_data(db, tx->tx_txg);
+                               data_old = db->db_buf;
+                       }
+                       ASSERT(data_old != NULL);
                }
-               ASSERT(data_old != NULL);
                dr->dt.dl.dr_data = data_old;
        } else {
                mutex_init(&dr->dt.di.dr_mtx, NULL, MUTEX_DEFAULT, NULL);
@@ -1049,11 +1081,19 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                mutex_exit(&dn->dn_mtx);
                dnode_setdirty(dn, tx);
                return (dr);
-       }
-
-       if (db->db_level == 0) {
-               dnode_new_blkid(dn, db->db_blkid, tx);
-               ASSERT(dn->dn_maxblkid >= db->db_blkid);
+       } else if (do_free_accounting) {
+               blkptr_t *bp = db->db_blkptr;
+               int64_t willfree = (bp && !BP_IS_HOLE(bp)) ?
+                   bp_get_dasize(os->os_spa, bp) : db->db.db_size;
+               /*
+                * This is only a guess -- if the dbuf is dirty
+                * in a previous txg, we don't know how much
+                * space it will use on disk yet.  We should
+                * really have the struct_rwlock to access
+                * db_blkptr, but since this is just a guess,
+                * it's OK if we get an odd answer.
+                */
+               dnode_willuse_space(dn, -willfree, tx);
        }
 
        if (!RW_WRITE_HELD(&dn->dn_struct_rwlock)) {
@@ -1061,6 +1101,11 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                drop_struct_lock = TRUE;
        }
 
+       if (db->db_level == 0) {
+               dnode_new_blkid(dn, db->db_blkid, tx, drop_struct_lock);
+               ASSERT(dn->dn_maxblkid >= db->db_blkid);
+       }
+
        if (db->db_level+1 < dn->dn_nlevels) {
                dmu_buf_impl_t *parent = db->db_parent;
                dbuf_dirty_record_t *di;
@@ -1161,19 +1206,22 @@ dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                list_remove(&dr->dr_parent->dt.di.dr_children, dr);
                mutex_exit(&dr->dr_parent->dt.di.dr_mtx);
        } else if (db->db_level+1 == dn->dn_nlevels) {
-               ASSERT3P(db->db_parent, ==, dn->dn_dbuf);
+               ASSERT(db->db_blkptr == NULL || db->db_parent == dn->dn_dbuf);
                mutex_enter(&dn->dn_mtx);
                list_remove(&dn->dn_dirty_records[txg & TXG_MASK], dr);
                mutex_exit(&dn->dn_mtx);
        }
 
        if (db->db_level == 0) {
-               dbuf_unoverride(dr);
+               if (db->db_state != DB_NOFILL) {
+                       dbuf_unoverride(dr);
 
-               ASSERT(db->db_buf != NULL);
-               ASSERT(dr->dt.dl.dr_data != NULL);
-               if (dr->dt.dl.dr_data != db->db_buf)
-                       VERIFY(arc_buf_remove_ref(dr->dt.dl.dr_data, db) == 1);
+                       ASSERT(db->db_buf != NULL);
+                       ASSERT(dr->dt.dl.dr_data != NULL);
+                       if (dr->dt.dl.dr_data != db->db_buf)
+                               VERIFY(arc_buf_remove_ref(dr->dt.dl.dr_data,
+                                   db) == 1);
+               }
        } else {
                ASSERT(db->db_buf != NULL);
                ASSERT(list_head(&dr->dt.di.dr_children) == NULL);
@@ -1215,6 +1263,16 @@ dbuf_will_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
 }
 
 void
+dmu_buf_will_not_fill(dmu_buf_t *db_fake, dmu_tx_t *tx)
+{
+       dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
+
+       db->db_state = DB_NOFILL;
+
+       dmu_buf_will_fill(db_fake, tx);
+}
+
+void
 dmu_buf_will_fill(dmu_buf_t *db_fake, dmu_tx_t *tx)
 {
        dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
@@ -1289,7 +1347,7 @@ dbuf_clear(dmu_buf_impl_t *db)
                db->db_state = DB_UNCACHED;
        }
 
-       ASSERT3U(db->db_state, ==, DB_UNCACHED);
+       ASSERT(db->db_state == DB_UNCACHED || db->db_state == DB_NOFILL);
        ASSERT(db->db_data_pending == NULL);
 
        db->db_state = DB_EVICTING;
@@ -1533,6 +1591,7 @@ dbuf_prefetch(dnode_t *dn, uint64_t blkid)
 
        if (dbuf_findbp(dn, 0, blkid, TRUE, &db, &bp) == 0) {
                if (bp && !BP_IS_HOLE(bp)) {
+                       arc_buf_t *pbuf;
                        uint32_t aflags = ARC_NOWAIT | ARC_PREFETCH;
                        zbookmark_t zb;
                        zb.zb_objset = dn->dn_objset->os_dsl_dataset ?
@@ -1541,9 +1600,13 @@ dbuf_prefetch(dnode_t *dn, uint64_t blkid)
                        zb.zb_level = 0;
                        zb.zb_blkid = blkid;
 
-                       (void) arc_read(NULL, dn->dn_objset->os_spa, bp,
-                           dmu_ot[dn->dn_type].ot_byteswap,
-                           NULL, NULL, ZIO_PRIORITY_ASYNC_READ,
+                       if (db)
+                               pbuf = db->db_buf;
+                       else
+                               pbuf = dn->dn_objset->os_phys_buf;
+
+                       (void) arc_read(NULL, dn->dn_objset->os_spa,
+                           bp, pbuf, NULL, NULL, ZIO_PRIORITY_ASYNC_READ,
                            ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
                            &aflags, &zb);
                }
@@ -1709,7 +1772,8 @@ dbuf_rele(dmu_buf_impl_t *db, void *tag)
                         * This is a special case: we never associated this
                         * dbuf with any data allocated from the ARC.
                         */
-                       ASSERT3U(db->db_state, ==, DB_UNCACHED);
+                       ASSERT(db->db_state == DB_UNCACHED ||
+                           db->db_state == DB_NOFILL);
                        dbuf_evict(db);
                } else if (arc_released(db->db_buf)) {
                        arc_buf_t *buf = db->db_buf;
@@ -1721,7 +1785,10 @@ dbuf_rele(dmu_buf_impl_t *db, void *tag)
                        dbuf_evict(db);
                } else {
                        VERIFY(arc_buf_remove_ref(db->db_buf, db) == 0);
-                       mutex_exit(&db->db_mtx);
+                       if (!DBUF_IS_CACHEABLE(db))
+                               dbuf_clear(db);
+                       else
+                               mutex_exit(&db->db_mtx);
                }
        } else {
                mutex_exit(&db->db_mtx);
@@ -1857,15 +1924,8 @@ dbuf_sync_indirect(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
 
        db->db_data_pending = dr;
 
-       arc_release(db->db_buf, db);
        mutex_exit(&db->db_mtx);
-
-       /*
-        * XXX -- we should design a compression algorithm
-        * that specializes in arrays of bps.
-        */
-       dbuf_write(dr, db->db_buf, ZIO_CHECKSUM_FLETCHER_4,
-           zfs_mdcomp_disable ? ZIO_COMPRESS_EMPTY : ZIO_COMPRESS_LZJB, tx);
+       dbuf_write(dr, db->db_buf, tx);
 
        zio = dr->dr_zio;
        mutex_enter(&dr->dt.di.dr_mtx);
@@ -1883,7 +1943,6 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
        dnode_t *dn = db->db_dnode;
        objset_impl_t *os = dn->dn_objset;
        uint64_t txg = tx->tx_txg;
-       int checksum, compress;
        int blksz;
 
        ASSERT(dmu_tx_is_syncing(tx));
@@ -1902,7 +1961,7 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
                /* This buffer was freed and is now being re-filled */
                ASSERT(db->db.db_data != dr->dt.dl.dr_data);
        } else {
-               ASSERT3U(db->db_state, ==, DB_CACHED);
+               ASSERT(db->db_state == DB_CACHED || db->db_state == DB_NOFILL);
        }
        DBUF_VERIFY(db);
 
@@ -1968,6 +2027,7 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
                zio_fake.io_bp = db->db_blkptr;
                zio_fake.io_bp_orig = *db->db_blkptr;
                zio_fake.io_txg = txg;
+               zio_fake.io_flags = 0;
 
                *db->db_blkptr = dr->dt.dl.dr_overridden_by;
                dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
@@ -1975,8 +2035,12 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
                dr->dr_zio = &zio_fake;
                mutex_exit(&db->db_mtx);
 
+               ASSERT(!DVA_EQUAL(BP_IDENTITY(zio_fake.io_bp),
+                   BP_IDENTITY(&zio_fake.io_bp_orig)) ||
+                   BP_IS_HOLE(zio_fake.io_bp));
+
                if (BP_IS_OLDER(&zio_fake.io_bp_orig, txg))
-                       dsl_dataset_block_kill(os->os_dsl_dataset,
+                       (void) dsl_dataset_block_kill(os->os_dsl_dataset,
                            &zio_fake.io_bp_orig, dn->dn_zio, tx);
 
                dbuf_write_ready(&zio_fake, db->db_buf, db);
@@ -1985,54 +2049,38 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
                return;
        }
 
-       blksz = arc_buf_size(*datap);
+       if (db->db_state != DB_NOFILL) {
+               blksz = arc_buf_size(*datap);
 
-       if (dn->dn_object != DMU_META_DNODE_OBJECT) {
-               /*
-                * If this buffer is currently "in use" (i.e., there are
-                * active holds and db_data still references it), then make
-                * a copy before we start the write so that any modifications
-                * from the open txg will not leak into this write.
-                *
-                * NOTE: this copy does not need to be made for objects only
-                * modified in the syncing context (e.g. DNONE_DNODE blocks).
-                */
-               if (refcount_count(&db->db_holds) > 1 && *datap == db->db_buf) {
-                       arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
-                       *datap = arc_buf_alloc(os->os_spa, blksz, db, type);
-                       bcopy(db->db.db_data, (*datap)->b_data, blksz);
+               if (dn->dn_object != DMU_META_DNODE_OBJECT) {
+                       /*
+                        * If this buffer is currently "in use" (i.e., there
+                        * are active holds and db_data still references it),
+                        * then make a copy before we start the write so that
+                        * any modifications from the open txg will not leak
+                        * into this write.
+                        *
+                        * NOTE: this copy does not need to be made for
+                        * objects only modified in the syncing context (e.g.
+                        * DNONE_DNODE blocks).
+                        */
+                       if (refcount_count(&db->db_holds) > 1 &&
+                           *datap == db->db_buf) {
+                               arc_buf_contents_t type =
+                                   DBUF_GET_BUFC_TYPE(db);
+                               *datap =
+                                   arc_buf_alloc(os->os_spa, blksz, db, type);
+                               bcopy(db->db.db_data, (*datap)->b_data, blksz);
+                       }
                }
-       } else {
-               /*
-                * Private object buffers are released here rather
-                * than in dbuf_dirty() since they are only modified
-                * in the syncing context and we don't want the
-                * overhead of making multiple copies of the data.
-                */
-               arc_release(db->db_buf, db);
-       }
 
-       ASSERT(*datap != NULL);
+               ASSERT(*datap != NULL);
+       }
        db->db_data_pending = dr;
 
        mutex_exit(&db->db_mtx);
 
-       /*
-        * Allow dnode settings to override objset settings,
-        * except for metadata checksums.
-        */
-       if (dmu_ot[dn->dn_type].ot_metadata) {
-               checksum = os->os_md_checksum;
-               compress = zio_compress_select(dn->dn_compress,
-                   os->os_md_compress);
-       } else {
-               checksum = zio_checksum_select(dn->dn_checksum,
-                   os->os_checksum);
-               compress = zio_compress_select(dn->dn_compress,
-                   os->os_compress);
-       }
-
-       dbuf_write(dr, *datap, checksum, compress, tx);
+       dbuf_write(dr, *datap, tx);
 
        ASSERT(!list_link_active(&dr->dr_dirty_node));
        if (dn->dn_object == DMU_META_DNODE_OBJECT)
@@ -2068,8 +2116,7 @@ dbuf_sync_list(list_t *list, dmu_tx_t *tx)
 }
 
 static void
-dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, int checksum,
-    int compress, dmu_tx_t *tx)
+dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx)
 {
        dmu_buf_impl_t *db = dr->dr_dbuf;
        dnode_t *dn = db->db_dnode;
@@ -2077,8 +2124,23 @@ dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, int checksum,
        dmu_buf_impl_t *parent = db->db_parent;
        uint64_t txg = tx->tx_txg;
        zbookmark_t zb;
+       writeprops_t wp = { 0 };
        zio_t *zio;
-       int zio_flags;
+
+       if (!BP_IS_HOLE(db->db_blkptr) &&
+           (db->db_level > 0 || dn->dn_type == DMU_OT_DNODE)) {
+               /*
+                * Private object buffers are released here rather
+                * than in dbuf_dirty() since they are only modified
+                * in the syncing context and we don't want the
+                * overhead of making multiple copies of the data.
+                */
+               arc_release(data, db);
+       } else if (db->db_state != DB_NOFILL) {
+               ASSERT(arc_released(data));
+               /* XXX why do we need to thaw here? */
+               arc_buf_thaw(data);
+       }
 
        if (parent != dn->dn_dbuf) {
                ASSERT(parent && parent->db_data_pending);
@@ -2101,17 +2163,52 @@ dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, int checksum,
        zb.zb_level = db->db_level;
        zb.zb_blkid = db->db_blkid;
 
-       zio_flags = ZIO_FLAG_MUSTSUCCEED;
-       if (dmu_ot[dn->dn_type].ot_metadata || zb.zb_level != 0)
-               zio_flags |= ZIO_FLAG_METADATA;
+       wp.wp_type = dn->dn_type;
+       wp.wp_level = db->db_level;
+       wp.wp_copies = os->os_copies;
+       wp.wp_dncompress = dn->dn_compress;
+       wp.wp_oscompress = os->os_compress;
+       wp.wp_dnchecksum = dn->dn_checksum;
+       wp.wp_oschecksum = os->os_checksum;
+
        if (BP_IS_OLDER(db->db_blkptr, txg))
-               dsl_dataset_block_kill(
+               (void) dsl_dataset_block_kill(
                    os->os_dsl_dataset, db->db_blkptr, zio, tx);
 
-       dr->dr_zio = arc_write(zio, os->os_spa, checksum, compress,
-           dmu_get_replication_level(os, &zb, dn->dn_type), txg,
-           db->db_blkptr, data, dbuf_write_ready, dbuf_write_done, db,
-           ZIO_PRIORITY_ASYNC_WRITE, zio_flags, &zb);
+       if (db->db_state == DB_NOFILL) {
+               zio_prop_t zp = { 0 };
+
+               write_policy(os->os_spa, &wp, &zp);
+               dr->dr_zio = zio_write(zio, os->os_spa,
+                   txg, db->db_blkptr, NULL,
+                   db->db.db_size, &zp, dbuf_skip_write_ready,
+                   dbuf_skip_write_done, db, ZIO_PRIORITY_ASYNC_WRITE,
+                   ZIO_FLAG_MUSTSUCCEED, &zb);
+       } else {
+               dr->dr_zio = arc_write(zio, os->os_spa, &wp,
+                   DBUF_IS_L2CACHEABLE(db), txg, db->db_blkptr,
+                   data, dbuf_write_ready, dbuf_write_done, db,
+                   ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
+       }
+}
+
+/* wrapper function for dbuf_write_ready bypassing ARC */
+static void
+dbuf_skip_write_ready(zio_t *zio)
+{
+       blkptr_t *bp = zio->io_bp;
+
+       if (!BP_IS_GANG(bp))
+               zio_skip_write(zio);
+
+       dbuf_write_ready(zio, NULL, zio->io_private);
+}
+
+/* wrapper function for dbuf_write_done bypassing ARC */
+static void
+dbuf_skip_write_done(zio_t *zio)
+{
+       dbuf_write_done(zio, NULL, zio->io_private);
 }
 
 /* ARGSUSED */
@@ -2121,27 +2218,33 @@ dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
        dmu_buf_impl_t *db = vdb;
        dnode_t *dn = db->db_dnode;
        objset_impl_t *os = dn->dn_objset;
+       blkptr_t *bp = zio->io_bp;
        blkptr_t *bp_orig = &zio->io_bp_orig;
        uint64_t fill = 0;
        int old_size, new_size, i;
 
+       ASSERT(db->db_blkptr == bp);
+
        dprintf_dbuf_bp(db, bp_orig, "bp_orig: %s", "");
 
        old_size = bp_get_dasize(os->os_spa, bp_orig);
-       new_size = bp_get_dasize(os->os_spa, zio->io_bp);
+       new_size = bp_get_dasize(os->os_spa, bp);
 
-       dnode_diduse_space(dn, new_size-old_size);
+       dnode_diduse_space(dn, new_size - old_size);
 
-       if (BP_IS_HOLE(zio->io_bp)) {
+       if (BP_IS_HOLE(bp)) {
                dsl_dataset_t *ds = os->os_dsl_dataset;
                dmu_tx_t *tx = os->os_synctx;
 
                if (bp_orig->blk_birth == tx->tx_txg)
-                       dsl_dataset_block_kill(ds, bp_orig, NULL, tx);
-               ASSERT3U(db->db_blkptr->blk_fill, ==, 0);
+                       (void) dsl_dataset_block_kill(ds, bp_orig, zio, tx);
+               ASSERT3U(bp->blk_fill, ==, 0);
                return;
        }
 
+       ASSERT(BP_GET_TYPE(bp) == dn->dn_type);
+       ASSERT(BP_GET_LEVEL(bp) == db->db_level);
+
        mutex_enter(&db->db_mtx);
 
        if (db->db_level == 0) {
@@ -2161,32 +2264,31 @@ dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
                        fill = 1;
                }
        } else {
-               blkptr_t *bp = db->db.db_data;
+               blkptr_t *ibp = db->db.db_data;
                ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
-               for (i = db->db.db_size >> SPA_BLKPTRSHIFT; i > 0; i--, bp++) {
-                       if (BP_IS_HOLE(bp))
+               for (i = db->db.db_size >> SPA_BLKPTRSHIFT; i > 0; i--, ibp++) {
+                       if (BP_IS_HOLE(ibp))
                                continue;
-                       ASSERT3U(BP_GET_LSIZE(bp), ==,
+                       ASSERT3U(BP_GET_LSIZE(ibp), ==,
                            db->db_level == 1 ? dn->dn_datablksz :
                            (1<<dn->dn_phys->dn_indblkshift));
-                       fill += bp->blk_fill;
+                       fill += ibp->blk_fill;
                }
        }
 
-       db->db_blkptr->blk_fill = fill;
-       BP_SET_TYPE(db->db_blkptr, dn->dn_type);
-       BP_SET_LEVEL(db->db_blkptr, db->db_level);
+       bp->blk_fill = fill;
 
        mutex_exit(&db->db_mtx);
 
-       /* We must do this after we've set the bp's type and level */
-       if (!DVA_EQUAL(BP_IDENTITY(zio->io_bp), BP_IDENTITY(bp_orig))) {
+       if (zio->io_flags & ZIO_FLAG_IO_REWRITE) {
+               ASSERT(DVA_EQUAL(BP_IDENTITY(bp), BP_IDENTITY(bp_orig)));
+       } else {
                dsl_dataset_t *ds = os->os_dsl_dataset;
                dmu_tx_t *tx = os->os_synctx;
 
                if (bp_orig->blk_birth == tx->tx_txg)
-                       dsl_dataset_block_kill(ds, bp_orig, NULL, tx);
-               dsl_dataset_block_born(ds, zio->io_bp, tx);
+                       (void) dsl_dataset_block_kill(ds, bp_orig, zio, tx);
+               dsl_dataset_block_born(ds, bp, tx);
        }
 }
 
@@ -2214,12 +2316,15 @@ dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
                ASSERT(db->db_blkid != DB_BONUS_BLKID);
                ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
 
-               if (dr->dt.dl.dr_data != db->db_buf)
-                       VERIFY(arc_buf_remove_ref(dr->dt.dl.dr_data, db) == 1);
-               else if (!BP_IS_HOLE(db->db_blkptr))
-                       arc_set_callback(db->db_buf, dbuf_do_evict, db);
-               else
-                       ASSERT(arc_released(db->db_buf));
+               if (db->db_state != DB_NOFILL) {
+                       if (dr->dt.dl.dr_data != db->db_buf)
+                               VERIFY(arc_buf_remove_ref(dr->dt.dl.dr_data,
+                                   db) == 1);
+                       else if (!BP_IS_HOLE(db->db_blkptr))
+                               arc_set_callback(db->db_buf, dbuf_do_evict, db);
+                       else
+                               ASSERT(arc_released(db->db_buf));
+               }
        } else {
                dnode_t *dn = db->db_dnode;