Illumos #3137 L2ARC compression
[zfs.git] / module / zfs / dbuf.c
index d046103..faa6cc3 100644 (file)
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
- * Use is subject to license terms.
+ * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
+ * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  */
 
 #include <sys/zfs_context.h>
+#include <sys/arc.h>
 #include <sys/dmu.h>
 #include <sys/dmu_impl.h>
 #include <sys/dbuf.h>
 #include <sys/spa.h>
 #include <sys/zio.h>
 #include <sys/dmu_zfetch.h>
+#include <sys/sa.h>
+#include <sys/sa_impl.h>
+
+struct dbuf_hold_impl_data {
+       /* Function arguments */
+       dnode_t *dh_dn;
+       uint8_t dh_level;
+       uint64_t dh_blkid;
+       int dh_fail_sparse;
+       void *dh_tag;
+       dmu_buf_impl_t **dh_dbp;
+       /* Local variables */
+       dmu_buf_impl_t *dh_db;
+       dmu_buf_impl_t *dh_parent;
+       blkptr_t *dh_bp;
+       int dh_err;
+       dbuf_dirty_record_t *dh_dr;
+       arc_buf_contents_t dh_type;
+       int dh_depth;
+};
+
+static void __dbuf_hold_impl_init(struct dbuf_hold_impl_data *dh,
+    dnode_t *dn, uint8_t level, uint64_t blkid, int fail_sparse,
+    void *tag, dmu_buf_impl_t **dbp, int depth);
+static int __dbuf_hold_impl(struct dbuf_hold_impl_data *dh);
 
 static void dbuf_destroy(dmu_buf_impl_t *db);
 static int dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx);
 static void dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx);
-static arc_done_func_t dbuf_write_ready;
-static arc_done_func_t dbuf_write_done;
-static zio_done_func_t dbuf_skip_write_ready;
-static zio_done_func_t dbuf_skip_write_done;
 
 /*
  * Global data structures and functions for the dbuf cache.
@@ -58,6 +82,7 @@ dbuf_cons(void *vdb, void *unused, int kmflag)
        mutex_init(&db->db_mtx, NULL, MUTEX_DEFAULT, NULL);
        cv_init(&db->db_changed, NULL, CV_DEFAULT, NULL);
        refcount_create(&db->db_holds);
+       list_link_init(&db->db_link);
        return (0);
 }
 
@@ -109,12 +134,16 @@ dmu_buf_impl_t *
 dbuf_find(dnode_t *dn, uint8_t level, uint64_t blkid)
 {
        dbuf_hash_table_t *h = &dbuf_hash_table;
-       objset_impl_t *os = dn->dn_objset;
-       uint64_t obj = dn->dn_object;
-       uint64_t hv = DBUF_HASH(os, obj, level, blkid);
-       uint64_t idx = hv & h->hash_table_mask;
+       objset_t *os = dn->dn_objset;
+       uint64_t obj;
+       uint64_t hv;
+       uint64_t idx;
        dmu_buf_impl_t *db;
 
+       obj = dn->dn_object;
+       hv = DBUF_HASH(os, obj, level, blkid);
+       idx = hv & h->hash_table_mask;
+
        mutex_enter(DBUF_HASH_MUTEX(h, idx));
        for (db = h->hash_table[idx]; db != NULL; db = db->db_hash_next) {
                if (DBUF_EQUAL(db, os, obj, level, blkid)) {
@@ -140,14 +169,16 @@ static dmu_buf_impl_t *
 dbuf_hash_insert(dmu_buf_impl_t *db)
 {
        dbuf_hash_table_t *h = &dbuf_hash_table;
-       objset_impl_t *os = db->db_objset;
+       objset_t *os = db->db_objset;
        uint64_t obj = db->db.db_object;
        int level = db->db_level;
-       uint64_t blkid = db->db_blkid;
-       uint64_t hv = DBUF_HASH(os, obj, level, blkid);
-       uint64_t idx = hv & h->hash_table_mask;
+       uint64_t blkid, hv, idx;
        dmu_buf_impl_t *dbf;
 
+       blkid = db->db_blkid;
+       hv = DBUF_HASH(os, obj, level, blkid);
+       idx = hv & h->hash_table_mask;
+
        mutex_enter(DBUF_HASH_MUTEX(h, idx));
        for (dbf = h->hash_table[idx]; dbf != NULL; dbf = dbf->db_hash_next) {
                if (DBUF_EQUAL(dbf, os, obj, level, blkid)) {
@@ -177,11 +208,13 @@ static void
 dbuf_hash_remove(dmu_buf_impl_t *db)
 {
        dbuf_hash_table_t *h = &dbuf_hash_table;
-       uint64_t hv = DBUF_HASH(db->db_objset, db->db.db_object,
-           db->db_level, db->db_blkid);
-       uint64_t idx = hv & h->hash_table_mask;
+       uint64_t hv, idx;
        dmu_buf_impl_t *dbf, **dbp;
 
+       hv = DBUF_HASH(db->db_objset, db->db.db_object,
+           db->db_level, db->db_blkid);
+       idx = hv & h->hash_table_mask;
+
        /*
         * We musn't hold db_mtx to maintin lock ordering:
         * DBUF_HASH_MUTEX > db_mtx.
@@ -220,6 +253,22 @@ dbuf_evict_user(dmu_buf_impl_t *db)
        db->db_evict_func = NULL;
 }
 
+boolean_t
+dbuf_is_metadata(dmu_buf_impl_t *db)
+{
+       if (db->db_level > 0) {
+               return (B_TRUE);
+       } else {
+               boolean_t is_metadata;
+
+               DB_DNODE_ENTER(db);
+               is_metadata = DMU_OT_IS_METADATA(DB_DNODE(db)->dn_type);
+               DB_DNODE_EXIT(db);
+
+               return (is_metadata);
+       }
+}
+
 void
 dbuf_evict(dmu_buf_impl_t *db)
 {
@@ -248,7 +297,13 @@ dbuf_init(void)
 
 retry:
        h->hash_table_mask = hsize - 1;
+#if defined(_KERNEL) && defined(HAVE_SPL)
+       /* Large allocations which do not require contiguous pages
+        * should be using vmem_alloc() in the linux kernel */
+       h->hash_table = vmem_zalloc(hsize * sizeof (void *), KM_PUSHPAGE);
+#else
        h->hash_table = kmem_zalloc(hsize * sizeof (void *), KM_NOSLEEP);
+#endif
        if (h->hash_table == NULL) {
                /* XXX - we should really return an error instead of assert */
                ASSERT(hsize > (1ULL << 10));
@@ -272,7 +327,13 @@ dbuf_fini(void)
 
        for (i = 0; i < DBUF_MUTEXES; i++)
                mutex_destroy(&h->hash_mutexes[i]);
+#if defined(_KERNEL) && defined(HAVE_SPL)
+       /* Large allocations which do not require contiguous pages
+        * should be using vmem_free() in the linux kernel */
+       vmem_free(h->hash_table, (h->hash_table_mask + 1) * sizeof (void *));
+#else
        kmem_free(h->hash_table, (h->hash_table_mask + 1) * sizeof (void *));
+#endif
        kmem_cache_destroy(dbuf_cache);
 }
 
@@ -284,7 +345,8 @@ dbuf_fini(void)
 static void
 dbuf_verify(dmu_buf_impl_t *db)
 {
-       dnode_t *dn = db->db_dnode;
+       dnode_t *dn;
+       dbuf_dirty_record_t *dr;
 
        ASSERT(MUTEX_HELD(&db->db_mtx));
 
@@ -292,6 +354,8 @@ dbuf_verify(dmu_buf_impl_t *db)
                return;
 
        ASSERT(db->db_objset != NULL);
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
        if (dn == NULL) {
                ASSERT(db->db_parent == NULL);
                ASSERT(db->db_blkptr == NULL);
@@ -299,24 +363,35 @@ dbuf_verify(dmu_buf_impl_t *db)
                ASSERT3U(db->db.db_object, ==, dn->dn_object);
                ASSERT3P(db->db_objset, ==, dn->dn_objset);
                ASSERT3U(db->db_level, <, dn->dn_nlevels);
-               ASSERT(db->db_blkid == DB_BONUS_BLKID ||
-                   list_head(&dn->dn_dbufs));
+               ASSERT(db->db_blkid == DMU_BONUS_BLKID ||
+                   db->db_blkid == DMU_SPILL_BLKID ||
+                   !list_is_empty(&dn->dn_dbufs));
        }
-       if (db->db_blkid == DB_BONUS_BLKID) {
+       if (db->db_blkid == DMU_BONUS_BLKID) {
+               ASSERT(dn != NULL);
+               ASSERT3U(db->db.db_size, >=, dn->dn_bonuslen);
+               ASSERT3U(db->db.db_offset, ==, DMU_BONUS_BLKID);
+       } else if (db->db_blkid == DMU_SPILL_BLKID) {
                ASSERT(dn != NULL);
                ASSERT3U(db->db.db_size, >=, dn->dn_bonuslen);
-               ASSERT3U(db->db.db_offset, ==, DB_BONUS_BLKID);
+               ASSERT0(db->db.db_offset);
        } else {
                ASSERT3U(db->db.db_offset, ==, db->db_blkid * db->db.db_size);
        }
 
+       for (dr = db->db_data_pending; dr != NULL; dr = dr->dr_next)
+               ASSERT(dr->dr_dbuf == db);
+
+       for (dr = db->db_last_dirty; dr != NULL; dr = dr->dr_next)
+               ASSERT(dr->dr_dbuf == db);
+
        /*
         * We can't assert that db_size matches dn_datablksz because it
         * can be momentarily different when another thread is doing
         * dnode_set_blksz().
         */
        if (db->db_level == 0 && db->db.db_object == DMU_META_DNODE_OBJECT) {
-               dbuf_dirty_record_t *dr = db->db_data_pending;
+               dr = db->db_data_pending;
                /*
                 * It should only be modified in syncing context, so
                 * make sure we only have one copy of the data.
@@ -329,15 +404,17 @@ dbuf_verify(dmu_buf_impl_t *db)
                if (db->db_parent == dn->dn_dbuf) {
                        /* db is pointed to by the dnode */
                        /* ASSERT3U(db->db_blkid, <, dn->dn_nblkptr); */
-                       if (db->db.db_object == DMU_META_DNODE_OBJECT)
+                       if (DMU_OBJECT_IS_SPECIAL(db->db.db_object))
                                ASSERT(db->db_parent == NULL);
                        else
                                ASSERT(db->db_parent != NULL);
-                       ASSERT3P(db->db_blkptr, ==,
-                           &dn->dn_phys->dn_blkptr[db->db_blkid]);
+                       if (db->db_blkid != DMU_SPILL_BLKID)
+                               ASSERT3P(db->db_blkptr, ==,
+                                   &dn->dn_phys->dn_blkptr[db->db_blkid]);
                } else {
                        /* db is pointed to by an indirect block */
-                       int epb = db->db_parent->db.db_size >> SPA_BLKPTRSHIFT;
+                       ASSERTV(int epb = db->db_parent->db.db_size >>
+                               SPA_BLKPTRSHIFT);
                        ASSERT3U(db->db_parent->db_level, ==, db->db_level+1);
                        ASSERT3U(db->db_parent->db.db_object, ==,
                            db->db.db_object);
@@ -346,7 +423,7 @@ dbuf_verify(dmu_buf_impl_t *db)
                         * have the struct_rwlock.  XXX indblksz no longer
                         * grows.  safe to do this now?
                         */
-                       if (RW_WRITE_HELD(&db->db_dnode->dn_struct_rwlock)) {
+                       if (RW_WRITE_HELD(&dn->dn_struct_rwlock)) {
                                ASSERT3P(db->db_blkptr, ==,
                                    ((blkptr_t *)db->db_parent->db.db_data +
                                    db->db_blkid % epb));
@@ -354,7 +431,8 @@ dbuf_verify(dmu_buf_impl_t *db)
                }
        }
        if ((db->db_blkptr == NULL || BP_IS_HOLE(db->db_blkptr)) &&
-           db->db.db_data && db->db_blkid != DB_BONUS_BLKID &&
+           (db->db_buf == NULL || db->db_buf->b_data) &&
+           db->db.db_data && db->db_blkid != DMU_BONUS_BLKID &&
            db->db_state != DB_FILL && !dn->dn_free_txg) {
                /*
                 * If the blkptr isn't set but they have nonzero data,
@@ -362,7 +440,7 @@ dbuf_verify(dmu_buf_impl_t *db)
                 * data when we evict this buffer.
                 */
                if (db->db_dirtycnt == 0) {
-                       uint64_t *buf = db->db.db_data;
+                       ASSERTV(uint64_t *buf = db->db.db_data);
                        int i;
 
                        for (i = 0; i < db->db.db_size >> 3; i++) {
@@ -370,6 +448,7 @@ dbuf_verify(dmu_buf_impl_t *db)
                        }
                }
        }
+       DB_DNODE_EXIT(db);
 }
 #endif
 
@@ -403,6 +482,32 @@ dbuf_set_data(dmu_buf_impl_t *db, arc_buf_t *buf)
        }
 }
 
+/*
+ * Loan out an arc_buf for read.  Return the loaned arc_buf.
+ */
+arc_buf_t *
+dbuf_loan_arcbuf(dmu_buf_impl_t *db)
+{
+       arc_buf_t *abuf;
+
+       mutex_enter(&db->db_mtx);
+       if (arc_released(db->db_buf) || refcount_count(&db->db_holds) > 1) {
+               int blksz = db->db.db_size;
+               spa_t *spa;
+
+               mutex_exit(&db->db_mtx);
+               DB_GET_SPA(&spa, db);
+               abuf = arc_loan_buf(spa, blksz);
+               bcopy(db->db.db_data, abuf->b_data, blksz);
+       } else {
+               abuf = db->db_buf;
+               arc_loan_inuse_buf(abuf, db);
+               dbuf_set_data(db, NULL);
+               mutex_exit(&db->db_mtx);
+       }
+       return (abuf);
+}
+
 uint64_t
 dbuf_whichblock(dnode_t *dn, uint64_t offset)
 {
@@ -439,24 +544,25 @@ dbuf_read_done(zio_t *zio, arc_buf_t *buf, void *vdb)
                dbuf_set_data(db, buf);
                db->db_state = DB_CACHED;
        } else {
-               ASSERT(db->db_blkid != DB_BONUS_BLKID);
+               ASSERT(db->db_blkid != DMU_BONUS_BLKID);
                ASSERT3P(db->db_buf, ==, NULL);
                VERIFY(arc_buf_remove_ref(buf, db) == 1);
                db->db_state = DB_UNCACHED;
        }
        cv_broadcast(&db->db_changed);
-       mutex_exit(&db->db_mtx);
-       dbuf_rele(db, NULL);
+       dbuf_rele_and_unlock(db, NULL);
 }
 
 static void
 dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
 {
-       dnode_t *dn = db->db_dnode;
+       dnode_t *dn;
+       spa_t *spa;
        zbookmark_t zb;
        uint32_t aflags = ARC_NOWAIT;
-       arc_buf_t *pbuf;
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
        ASSERT(!refcount_is_zero(&db->db_holds));
        /* We need the struct_rwlock to prevent db_blkptr from changing. */
        ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
@@ -464,16 +570,17 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
        ASSERT(db->db_state == DB_UNCACHED);
        ASSERT(db->db_buf == NULL);
 
-       if (db->db_blkid == DB_BONUS_BLKID) {
-               int bonuslen = dn->dn_bonuslen;
+       if (db->db_blkid == DMU_BONUS_BLKID) {
+               int bonuslen = MIN(dn->dn_bonuslen, dn->dn_phys->dn_bonuslen);
 
                ASSERT3U(bonuslen, <=, db->db.db_size);
                db->db.db_data = zio_buf_alloc(DN_MAX_BONUSLEN);
-               arc_space_consume(DN_MAX_BONUSLEN);
+               arc_space_consume(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
                if (bonuslen < DN_MAX_BONUSLEN)
                        bzero(db->db.db_data, DN_MAX_BONUSLEN);
-               bcopy(DN_BONUS(dn->dn_phys), db->db.db_data,
-                   bonuslen);
+               if (bonuslen)
+                       bcopy(DN_BONUS(dn->dn_phys), db->db.db_data, bonuslen);
+               DB_DNODE_EXIT(db);
                dbuf_update_data(db);
                db->db_state = DB_CACHED;
                mutex_exit(&db->db_mtx);
@@ -492,6 +599,7 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
 
                dbuf_set_data(db, arc_buf_alloc(dn->dn_objset->os_spa,
                    db->db.db_size, db, type));
+               DB_DNODE_EXIT(db);
                bzero(db->db.db_data, db->db.db_size);
                db->db_state = DB_CACHED;
                *flags |= DB_RF_CACHED;
@@ -499,27 +607,24 @@ dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
                return;
        }
 
+       spa = dn->dn_objset->os_spa;
+       DB_DNODE_EXIT(db);
+
        db->db_state = DB_READ;
        mutex_exit(&db->db_mtx);
 
        if (DBUF_IS_L2CACHEABLE(db))
                aflags |= ARC_L2CACHE;
+       if (DBUF_IS_L2COMPRESSIBLE(db))
+               aflags |= ARC_L2COMPRESS;
 
-       zb.zb_objset = db->db_objset->os_dsl_dataset ?
-           db->db_objset->os_dsl_dataset->ds_object : 0;
-       zb.zb_object = db->db.db_object;
-       zb.zb_level = db->db_level;
-       zb.zb_blkid = db->db_blkid;
+       SET_BOOKMARK(&zb, db->db_objset->os_dsl_dataset ?
+           db->db_objset->os_dsl_dataset->ds_object : DMU_META_OBJSET,
+           db->db.db_object, db->db_level, db->db_blkid);
 
        dbuf_add_ref(db, NULL);
-       /* ZIO_FLAG_CANFAIL callers have to check the parent zio's error */
-
-       if (db->db_parent)
-               pbuf = db->db_parent->db_buf;
-       else
-               pbuf = db->db_objset->os_phys_buf;
 
-       (void) arc_read(zio, dn->dn_objset->os_spa, db->db_blkptr, pbuf,
+       (void) arc_read(zio, spa, db->db_blkptr,
            dbuf_read_done, db, ZIO_PRIORITY_SYNC_READ,
            (*flags & DB_RF_CANFAIL) ? ZIO_FLAG_CANFAIL : ZIO_FLAG_MUSTSUCCEED,
            &aflags, &zb);
@@ -533,6 +638,7 @@ dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
        int err = 0;
        int havepzio = (zio != NULL);
        int prefetch;
+       dnode_t *dn;
 
        /*
         * We don't have to hold the mutex to check db_state because it
@@ -543,46 +649,51 @@ dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
        if (db->db_state == DB_NOFILL)
                return (EIO);
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
        if ((flags & DB_RF_HAVESTRUCT) == 0)
-               rw_enter(&db->db_dnode->dn_struct_rwlock, RW_READER);
+               rw_enter(&dn->dn_struct_rwlock, RW_READER);
 
-       prefetch = db->db_level == 0 && db->db_blkid != DB_BONUS_BLKID &&
-           (flags & DB_RF_NOPREFETCH) == 0 && db->db_dnode != NULL &&
+       prefetch = db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID &&
+           (flags & DB_RF_NOPREFETCH) == 0 && dn != NULL &&
            DBUF_IS_CACHEABLE(db);
 
        mutex_enter(&db->db_mtx);
        if (db->db_state == DB_CACHED) {
                mutex_exit(&db->db_mtx);
                if (prefetch)
-                       dmu_zfetch(&db->db_dnode->dn_zfetch, db->db.db_offset,
+                       dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
                            db->db.db_size, TRUE);
                if ((flags & DB_RF_HAVESTRUCT) == 0)
-                       rw_exit(&db->db_dnode->dn_struct_rwlock);
+                       rw_exit(&dn->dn_struct_rwlock);
+               DB_DNODE_EXIT(db);
        } else if (db->db_state == DB_UNCACHED) {
-               if (zio == NULL) {
-                       zio = zio_root(db->db_dnode->dn_objset->os_spa,
-                           NULL, NULL, ZIO_FLAG_CANFAIL);
-               }
+               spa_t *spa = dn->dn_objset->os_spa;
+
+               if (zio == NULL)
+                       zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
                dbuf_read_impl(db, zio, &flags);
 
                /* dbuf_read_impl has dropped db_mtx for us */
 
                if (prefetch)
-                       dmu_zfetch(&db->db_dnode->dn_zfetch, db->db.db_offset,
+                       dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
                            db->db.db_size, flags & DB_RF_CACHED);
 
                if ((flags & DB_RF_HAVESTRUCT) == 0)
-                       rw_exit(&db->db_dnode->dn_struct_rwlock);
+                       rw_exit(&dn->dn_struct_rwlock);
+               DB_DNODE_EXIT(db);
 
                if (!havepzio)
                        err = zio_wait(zio);
        } else {
                mutex_exit(&db->db_mtx);
                if (prefetch)
-                       dmu_zfetch(&db->db_dnode->dn_zfetch, db->db.db_offset,
+                       dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
                            db->db.db_size, TRUE);
                if ((flags & DB_RF_HAVESTRUCT) == 0)
-                       rw_exit(&db->db_dnode->dn_struct_rwlock);
+                       rw_exit(&dn->dn_struct_rwlock);
+               DB_DNODE_EXIT(db);
 
                mutex_enter(&db->db_mtx);
                if ((flags & DB_RF_NEVERWAIT) == 0) {
@@ -606,17 +717,18 @@ static void
 dbuf_noread(dmu_buf_impl_t *db)
 {
        ASSERT(!refcount_is_zero(&db->db_holds));
-       ASSERT(db->db_blkid != DB_BONUS_BLKID);
+       ASSERT(db->db_blkid != DMU_BONUS_BLKID);
        mutex_enter(&db->db_mtx);
        while (db->db_state == DB_READ || db->db_state == DB_FILL)
                cv_wait(&db->db_changed, &db->db_mtx);
        if (db->db_state == DB_UNCACHED) {
                arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
+               spa_t *spa;
 
                ASSERT(db->db_buf == NULL);
                ASSERT(db->db.db_data == NULL);
-               dbuf_set_data(db, arc_buf_alloc(db->db_dnode->dn_objset->os_spa,
-                   db->db.db_size, db, type));
+               DB_GET_SPA(&spa, db);
+               dbuf_set_data(db, arc_buf_alloc(spa, db->db.db_size, db, type));
                db->db_state = DB_FILL;
        } else if (db->db_state == DB_NOFILL) {
                dbuf_set_data(db, NULL);
@@ -651,27 +763,29 @@ dbuf_fix_old_data(dmu_buf_impl_t *db, uint64_t txg)
 
        if (dr == NULL ||
            (dr->dt.dl.dr_data !=
-           ((db->db_blkid  == DB_BONUS_BLKID) ? db->db.db_data : db->db_buf)))
+           ((db->db_blkid  == DMU_BONUS_BLKID) ? db->db.db_data : db->db_buf)))
                return;
 
        /*
         * If the last dirty record for this dbuf has not yet synced
         * and its referencing the dbuf data, either:
-        *      reset the reference to point to a new copy,
+        *      reset the reference to point to a new copy,
         * or (if there a no active holders)
         *      just null out the current db_data pointer.
         */
        ASSERT(dr->dr_txg >= txg - 2);
-       if (db->db_blkid == DB_BONUS_BLKID) {
+       if (db->db_blkid == DMU_BONUS_BLKID) {
                /* Note that the data bufs here are zio_bufs */
                dr->dt.dl.dr_data = zio_buf_alloc(DN_MAX_BONUSLEN);
-               arc_space_consume(DN_MAX_BONUSLEN);
+               arc_space_consume(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
                bcopy(db->db.db_data, dr->dt.dl.dr_data, DN_MAX_BONUSLEN);
        } else if (refcount_count(&db->db_holds) > db->db_dirtycnt) {
                int size = db->db.db_size;
                arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
-               dr->dt.dl.dr_data = arc_buf_alloc(
-                   db->db_dnode->dn_objset->os_spa, size, db, type);
+               spa_t *spa;
+
+               DB_GET_SPA(&spa, db);
+               dr->dt.dl.dr_data = arc_buf_alloc(spa, size, db, type);
                bcopy(db->db.db_data, dr->dt.dl.dr_data->b_data, size);
        } else {
                dbuf_set_data(db, NULL);
@@ -682,22 +796,25 @@ void
 dbuf_unoverride(dbuf_dirty_record_t *dr)
 {
        dmu_buf_impl_t *db = dr->dr_dbuf;
+       blkptr_t *bp = &dr->dt.dl.dr_overridden_by;
        uint64_t txg = dr->dr_txg;
 
        ASSERT(MUTEX_HELD(&db->db_mtx));
        ASSERT(dr->dt.dl.dr_override_state != DR_IN_DMU_SYNC);
        ASSERT(db->db_level == 0);
 
-       if (db->db_blkid == DB_BONUS_BLKID ||
+       if (db->db_blkid == DMU_BONUS_BLKID ||
            dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN)
                return;
 
+       ASSERT(db->db_data_pending != dr);
+
        /* free this block */
-       if (!BP_IS_HOLE(&dr->dt.dl.dr_overridden_by)) {
-               /* XXX can get silent EIO here */
-               (void) dsl_free(NULL,
-                   spa_get_dsl(db->db_dnode->dn_objset->os_spa),
-                   txg, &dr->dt.dl.dr_overridden_by, NULL, NULL, ARC_WAIT);
+       if (!BP_IS_HOLE(bp)) {
+               spa_t *spa;
+
+               DB_GET_SPA(&spa, db);
+               zio_free(spa, txg, bp);
        }
        dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
        /*
@@ -727,7 +844,7 @@ dbuf_free_range(dnode_t *dn, uint64_t start, uint64_t end, dmu_tx_t *tx)
        uint64_t first_l1 = start >> epbs;
        uint64_t last_l1 = end >> epbs;
 
-       if (end > dn->dn_maxblkid) {
+       if (end > dn->dn_maxblkid && (end != DMU_SPILL_BLKID)) {
                end = dn->dn_maxblkid;
                last_l1 = end >> epbs;
        }
@@ -735,7 +852,7 @@ dbuf_free_range(dnode_t *dn, uint64_t start, uint64_t end, dmu_tx_t *tx)
        mutex_enter(&dn->dn_dbufs_mtx);
        for (db = list_head(&dn->dn_dbufs); db; db = db_next) {
                db_next = list_next(&dn->dn_dbufs, db);
-               ASSERT(db->db_blkid != DB_BONUS_BLKID);
+               ASSERT(db->db_blkid != DMU_BONUS_BLKID);
 
                if (db->db_level == 1 &&
                    db->db_blkid >= first_l1 && db->db_blkid <= last_l1) {
@@ -791,7 +908,8 @@ dbuf_free_range(dnode_t *dn, uint64_t start, uint64_t end, dmu_tx_t *tx)
                                 * size to reflect that this buffer may
                                 * contain new data when we sync.
                                 */
-                               if (db->db_blkid > dn->dn_maxblkid)
+                               if (db->db_blkid != DMU_SPILL_BLKID &&
+                                   db->db_blkid > dn->dn_maxblkid)
                                        dn->dn_maxblkid = db->db_blkid;
                                dbuf_unoverride(dr);
                        } else {
@@ -834,10 +952,15 @@ dbuf_block_freeable(dmu_buf_impl_t *db)
        else if (db->db_blkptr)
                birth_txg = db->db_blkptr->blk_birth;
 
-       /* If we don't exist or are in a snapshot, we can't be freed */
+       /*
+        * If we don't exist or are in a snapshot, we can't be freed.
+        * Don't pass the bp to dsl_dataset_block_freeable() since we
+        * are holding the db_mtx lock and might deadlock if we are
+        * prefetching a dedup-ed block.
+        */
        if (birth_txg)
                return (ds == NULL ||
-                   dsl_dataset_block_freeable(ds, birth_txg));
+                   dsl_dataset_block_freeable(ds, NULL, birth_txg));
        else
                return (FALSE);
 }
@@ -848,11 +971,15 @@ dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
        arc_buf_t *buf, *obuf;
        int osize = db->db.db_size;
        arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
+       dnode_t *dn;
 
-       ASSERT(db->db_blkid != DB_BONUS_BLKID);
+       ASSERT(db->db_blkid != DMU_BONUS_BLKID);
+
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
 
        /* XXX does *this* func really need the lock? */
-       ASSERT(RW_WRITE_HELD(&db->db_dnode->dn_struct_rwlock));
+       ASSERT(RW_WRITE_HELD(&dn->dn_struct_rwlock));
 
        /*
         * This call to dbuf_will_dirty() with the dn_struct_rwlock held
@@ -867,7 +994,7 @@ dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
        dbuf_will_dirty(db, tx);
 
        /* create the data buffer for the new block */
-       buf = arc_buf_alloc(db->db_dnode->dn_objset->os_spa, size, db, type);
+       buf = arc_buf_alloc(dn->dn_objset->os_spa, size, db, type);
 
        /* copy old block data to the new block */
        obuf = db->db_buf;
@@ -887,14 +1014,29 @@ dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
        }
        mutex_exit(&db->db_mtx);
 
-       dnode_willuse_space(db->db_dnode, size-osize, tx);
+       dnode_willuse_space(dn, size-osize, tx);
+       DB_DNODE_EXIT(db);
+}
+
+void
+dbuf_release_bp(dmu_buf_impl_t *db)
+{
+       objset_t *os;
+
+       DB_GET_OBJSET(&os, db);
+       ASSERT(dsl_pool_sync_context(dmu_objset_pool(os)));
+       ASSERT(arc_released(os->os_phys_buf) ||
+           list_link_active(&os->os_dsl_dataset->ds_synced_link));
+       ASSERT(db->db_parent == NULL || arc_released(db->db_parent->db_buf));
+
+       (void) arc_release(db->db_buf, db);
 }
 
 dbuf_dirty_record_t *
 dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
 {
-       dnode_t *dn = db->db_dnode;
-       objset_impl_t *os = dn->dn_objset;
+       dnode_t *dn;
+       objset_t *os;
        dbuf_dirty_record_t **drp, *dr;
        int drop_struct_lock = FALSE;
        boolean_t do_free_accounting = B_FALSE;
@@ -904,19 +1046,17 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        ASSERT(!refcount_is_zero(&db->db_holds));
        DMU_TX_DIRTY_BUF(tx, db);
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
        /*
         * Shouldn't dirty a regular buffer in syncing context.  Private
         * objects may be dirtied in syncing context, but only if they
         * were already pre-dirtied in open context.
-        * XXX We may want to prohibit dirtying in syncing context even
-        * if they did pre-dirty.
         */
        ASSERT(!dmu_tx_is_syncing(tx) ||
            BP_IS_HOLE(dn->dn_objset->os_rootbp) ||
-           dn->dn_object == DMU_META_DNODE_OBJECT ||
-           dn->dn_objset->os_dsl_dataset == NULL ||
-           dsl_dir_is_private(dn->dn_objset->os_dsl_dataset->ds_dir));
-
+           DMU_OBJECT_IS_SPECIAL(dn->dn_object) ||
+           dn->dn_objset->os_dsl_dataset == NULL);
        /*
         * We make this assert for private objects as well, but after we
         * check if we're already dirty.  They are allowed to re-dirty
@@ -946,10 +1086,13 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                dn->dn_dirtyctx =
                    (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN);
                ASSERT(dn->dn_dirtyctx_firstset == NULL);
-               dn->dn_dirtyctx_firstset = kmem_alloc(1, KM_SLEEP);
+               dn->dn_dirtyctx_firstset = kmem_alloc(1, KM_PUSHPAGE);
        }
        mutex_exit(&dn->dn_mtx);
 
+       if (db->db_blkid == DMU_SPILL_BLKID)
+               dn->dn_have_spill = B_TRUE;
+
        /*
         * If this buffer is already dirty, we're done.
         */
@@ -959,13 +1102,16 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        while ((dr = *drp) != NULL && dr->dr_txg > tx->tx_txg)
                drp = &dr->dr_next;
        if (dr && dr->dr_txg == tx->tx_txg) {
-               if (db->db_level == 0 && db->db_blkid != DB_BONUS_BLKID) {
+               DB_DNODE_EXIT(db);
+
+               if (db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID) {
                        /*
                         * If this buffer has already been written out,
                         * we now need to reset its state.
                         */
                        dbuf_unoverride(dr);
-                       if (db->db.db_object != DMU_META_DNODE_OBJECT)
+                       if (db->db.db_object != DMU_META_DNODE_OBJECT &&
+                           db->db_state != DB_NOFILL)
                                arc_buf_thaw(db->db_buf);
                }
                mutex_exit(&db->db_mtx);
@@ -975,7 +1121,8 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        /*
         * Only valid if not already dirty.
         */
-       ASSERT(dn->dn_dirtyctx == DN_UNDIRTIED || dn->dn_dirtyctx ==
+       ASSERT(dn->dn_object == 0 ||
+           dn->dn_dirtyctx == DN_UNDIRTIED || dn->dn_dirtyctx ==
            (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN));
 
        ASSERT3U(dn->dn_nlevels, >, db->db_level);
@@ -987,25 +1134,24 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
 
        /*
         * We should only be dirtying in syncing context if it's the
-        * mos, a spa os, or we're initializing the os.  However, we are
-        * allowed to dirty in syncing context provided we already
-        * dirtied it in open context.  Hence we must make this
-        * assertion only if we're not already dirty.
+        * mos or we're initializing the os or it's a special object.
+        * However, we are allowed to dirty in syncing context provided
+        * we already dirtied it in open context.  Hence we must make
+        * this assertion only if we're not already dirty.
         */
-       ASSERT(!dmu_tx_is_syncing(tx) ||
-           os->os_dsl_dataset == NULL ||
-           !dsl_dir_is_private(os->os_dsl_dataset->ds_dir) ||
-           !BP_IS_HOLE(os->os_rootbp));
+       os = dn->dn_objset;
+       ASSERT(!dmu_tx_is_syncing(tx) || DMU_OBJECT_IS_SPECIAL(dn->dn_object) ||
+           os->os_dsl_dataset == NULL || BP_IS_HOLE(os->os_rootbp));
        ASSERT(db->db.db_size != 0);
 
        dprintf_dbuf(db, "size=%llx\n", (u_longlong_t)db->db.db_size);
 
-       if (db->db_blkid != DB_BONUS_BLKID) {
+       if (db->db_blkid != DMU_BONUS_BLKID) {
                /*
                 * Update the accounting.
                 * Note: we delay "free accounting" until after we drop
                 * the db_mtx.  This keeps us from grabbing other locks
-                * (and possibly deadlocking) in bp_get_dasize() while
+                * (and possibly deadlocking) in bp_get_dsize() while
                 * also holding the db_mtx.
                 */
                dnode_willuse_space(dn, db->db.db_size, tx);
@@ -1017,12 +1163,13 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
         * to make a copy of it so that the changes we make in this
         * transaction group won't leak out when we sync the older txg.
         */
-       dr = kmem_zalloc(sizeof (dbuf_dirty_record_t), KM_SLEEP);
+       dr = kmem_zalloc(sizeof (dbuf_dirty_record_t), KM_PUSHPAGE);
+       list_link_init(&dr->dr_dirty_node);
        if (db->db_level == 0) {
                void *data_old = db->db_buf;
 
                if (db->db_state != DB_NOFILL) {
-                       if (db->db_blkid == DB_BONUS_BLKID) {
+                       if (db->db_blkid == DMU_BONUS_BLKID) {
                                dbuf_fix_old_data(db, tx->tx_txg);
                                data_old = db->db.db_data;
                        } else if (db->db.db_object != DMU_META_DNODE_OBJECT) {
@@ -1058,7 +1205,8 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
         * and dbuf_dirty.  We win, as though the dbuf_noread() had
         * happened after the free.
         */
-       if (db->db_level == 0 && db->db_blkid != DB_BONUS_BLKID) {
+       if (db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID &&
+           db->db_blkid != DMU_SPILL_BLKID) {
                mutex_enter(&dn->dn_mtx);
                dnode_clear_range(dn, db->db_blkid, 1, tx);
                mutex_exit(&dn->dn_mtx);
@@ -1074,17 +1222,19 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
 
        mutex_exit(&db->db_mtx);
 
-       if (db->db_blkid == DB_BONUS_BLKID) {
+       if (db->db_blkid == DMU_BONUS_BLKID ||
+           db->db_blkid == DMU_SPILL_BLKID) {
                mutex_enter(&dn->dn_mtx);
                ASSERT(!list_link_active(&dr->dr_dirty_node));
                list_insert_tail(&dn->dn_dirty_records[txgoff], dr);
                mutex_exit(&dn->dn_mtx);
                dnode_setdirty(dn, tx);
+               DB_DNODE_EXIT(db);
                return (dr);
        } else if (do_free_accounting) {
                blkptr_t *bp = db->db_blkptr;
                int64_t willfree = (bp && !BP_IS_HOLE(bp)) ?
-                   bp_get_dasize(os->os_spa, bp) : db->db.db_size;
+                   bp_get_dsize(os->os_spa, bp) : db->db.db_size;
                /*
                 * This is only a guess -- if the dbuf is dirty
                 * in a previous txg, we don't know how much
@@ -1093,6 +1243,7 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                 * db_blkptr, but since this is just a guess,
                 * it's OK if we get an odd answer.
                 */
+               ddt_prefetch(os->os_spa, bp);
                dnode_willuse_space(dn, -willfree, tx);
        }
 
@@ -1116,6 +1267,7 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
 
                        parent = dbuf_hold_level(dn, db->db_level+1,
                            db->db_blkid >> epbs, FTAG);
+                       ASSERT(parent != NULL);
                        parent_held = TRUE;
                }
                if (drop_struct_lock)
@@ -1140,8 +1292,7 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        } else {
                ASSERT(db->db_level+1 == dn->dn_nlevels);
                ASSERT(db->db_blkid < dn->dn_nblkptr);
-               ASSERT(db->db_parent == NULL ||
-                   db->db_parent == db->db_dnode->dn_dbuf);
+               ASSERT(db->db_parent == NULL || db->db_parent == dn->dn_dbuf);
                mutex_enter(&dn->dn_mtx);
                ASSERT(!list_link_active(&dr->dr_dirty_node));
                list_insert_tail(&dn->dn_dirty_records[txgoff], dr);
@@ -1151,21 +1302,21 @@ dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        }
 
        dnode_setdirty(dn, tx);
+       DB_DNODE_EXIT(db);
        return (dr);
 }
 
 static int
 dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
 {
-       dnode_t *dn = db->db_dnode;
+       dnode_t *dn;
        uint64_t txg = tx->tx_txg;
        dbuf_dirty_record_t *dr, **drp;
 
        ASSERT(txg != 0);
-       ASSERT(db->db_blkid != DB_BONUS_BLKID);
+       ASSERT(db->db_blkid != DMU_BONUS_BLKID);
 
        mutex_enter(&db->db_mtx);
-
        /*
         * If this buffer is not dirty, we're done.
         */
@@ -1177,19 +1328,28 @@ dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
                return (0);
        }
        ASSERT(dr->dr_txg == txg);
+       ASSERT(dr->dr_dbuf == db);
+
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
 
        /*
         * If this buffer is currently held, we cannot undirty
         * it, since one of the current holders may be in the
         * middle of an update.  Note that users of dbuf_undirty()
         * should not place a hold on the dbuf before the call.
+        * Also note: we can get here with a spill block, so
+        * test for that similar to how dbuf_dirty does.
         */
        if (refcount_count(&db->db_holds) > db->db_dirtycnt) {
                mutex_exit(&db->db_mtx);
                /* Make sure we don't toss this buffer at sync phase */
-               mutex_enter(&dn->dn_mtx);
-               dnode_clear_range(dn, db->db_blkid, 1, tx);
-               mutex_exit(&dn->dn_mtx);
+               if (db->db_blkid != DMU_SPILL_BLKID) {
+                       mutex_enter(&dn->dn_mtx);
+                       dnode_clear_range(dn, db->db_blkid, 1, tx);
+                       mutex_exit(&dn->dn_mtx);
+               }
+               DB_DNODE_EXIT(db);
                return (0);
        }
 
@@ -1201,16 +1361,24 @@ dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
 
        *drp = dr->dr_next;
 
+       /*
+        * Note that there are three places in dbuf_dirty()
+        * where this dirty record may be put on a list.
+        * Make sure to do a list_remove corresponding to
+        * every one of those list_insert calls.
+        */
        if (dr->dr_parent) {
                mutex_enter(&dr->dr_parent->dt.di.dr_mtx);
                list_remove(&dr->dr_parent->dt.di.dr_children, dr);
                mutex_exit(&dr->dr_parent->dt.di.dr_mtx);
-       } else if (db->db_level+1 == dn->dn_nlevels) {
+       } else if (db->db_blkid == DMU_SPILL_BLKID ||
+           db->db_level+1 == dn->dn_nlevels) {
                ASSERT(db->db_blkptr == NULL || db->db_parent == dn->dn_dbuf);
                mutex_enter(&dn->dn_mtx);
                list_remove(&dn->dn_dirty_records[txg & TXG_MASK], dr);
                mutex_exit(&dn->dn_mtx);
        }
+       DB_DNODE_EXIT(db);
 
        if (db->db_level == 0) {
                if (db->db_state != DB_NOFILL) {
@@ -1236,7 +1404,7 @@ dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        if (refcount_remove(&db->db_holds, (void *)(uintptr_t)txg) == 0) {
                arc_buf_t *buf = db->db_buf;
 
-               ASSERT(arc_released(buf));
+               ASSERT(db->db_state == DB_NOFILL || arc_released(buf));
                dbuf_set_data(db, NULL);
                VERIFY(arc_buf_remove_ref(buf, db) == 1);
                dbuf_evict(db);
@@ -1256,8 +1424,10 @@ dbuf_will_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
        ASSERT(tx->tx_txg != 0);
        ASSERT(!refcount_is_zero(&db->db_holds));
 
-       if (RW_WRITE_HELD(&db->db_dnode->dn_struct_rwlock))
+       DB_DNODE_ENTER(db);
+       if (RW_WRITE_HELD(&DB_DNODE(db)->dn_struct_rwlock))
                rf |= DB_RF_HAVESTRUCT;
+       DB_DNODE_EXIT(db);
        (void) dbuf_read(db, NULL, rf);
        (void) dbuf_dirty(db, tx);
 }
@@ -1277,7 +1447,7 @@ dmu_buf_will_fill(dmu_buf_t *db_fake, dmu_tx_t *tx)
 {
        dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
 
-       ASSERT(db->db_blkid != DB_BONUS_BLKID);
+       ASSERT(db->db_blkid != DMU_BONUS_BLKID);
        ASSERT(tx->tx_txg != 0);
        ASSERT(db->db_level == 0);
        ASSERT(!refcount_is_zero(&db->db_holds));
@@ -1299,7 +1469,7 @@ dbuf_fill_done(dmu_buf_impl_t *db, dmu_tx_t *tx)
 
        if (db->db_state == DB_FILL) {
                if (db->db_level == 0 && db->db_freed_in_flight) {
-                       ASSERT(db->db_blkid != DB_BONUS_BLKID);
+                       ASSERT(db->db_blkid != DMU_BONUS_BLKID);
                        /* we were freed while filling */
                        /* XXX dbuf_undirty? */
                        bzero(db->db.db_data, db->db.db_size);
@@ -1312,6 +1482,69 @@ dbuf_fill_done(dmu_buf_impl_t *db, dmu_tx_t *tx)
 }
 
 /*
+ * Directly assign a provided arc buf to a given dbuf if it's not referenced
+ * by anybody except our caller. Otherwise copy arcbuf's contents to dbuf.
+ */
+void
+dbuf_assign_arcbuf(dmu_buf_impl_t *db, arc_buf_t *buf, dmu_tx_t *tx)
+{
+       ASSERT(!refcount_is_zero(&db->db_holds));
+       ASSERT(db->db_blkid != DMU_BONUS_BLKID);
+       ASSERT(db->db_level == 0);
+       ASSERT(DBUF_GET_BUFC_TYPE(db) == ARC_BUFC_DATA);
+       ASSERT(buf != NULL);
+       ASSERT(arc_buf_size(buf) == db->db.db_size);
+       ASSERT(tx->tx_txg != 0);
+
+       arc_return_buf(buf, db);
+       ASSERT(arc_released(buf));
+
+       mutex_enter(&db->db_mtx);
+
+       while (db->db_state == DB_READ || db->db_state == DB_FILL)
+               cv_wait(&db->db_changed, &db->db_mtx);
+
+       ASSERT(db->db_state == DB_CACHED || db->db_state == DB_UNCACHED);
+
+       if (db->db_state == DB_CACHED &&
+           refcount_count(&db->db_holds) - 1 > db->db_dirtycnt) {
+               mutex_exit(&db->db_mtx);
+               (void) dbuf_dirty(db, tx);
+               bcopy(buf->b_data, db->db.db_data, db->db.db_size);
+               VERIFY(arc_buf_remove_ref(buf, db) == 1);
+               xuio_stat_wbuf_copied();
+               return;
+       }
+
+       xuio_stat_wbuf_nocopy();
+       if (db->db_state == DB_CACHED) {
+               dbuf_dirty_record_t *dr = db->db_last_dirty;
+
+               ASSERT(db->db_buf != NULL);
+               if (dr != NULL && dr->dr_txg == tx->tx_txg) {
+                       ASSERT(dr->dt.dl.dr_data == db->db_buf);
+                       if (!arc_released(db->db_buf)) {
+                               ASSERT(dr->dt.dl.dr_override_state ==
+                                   DR_OVERRIDDEN);
+                               arc_release(db->db_buf, db);
+                       }
+                       dr->dt.dl.dr_data = buf;
+                       VERIFY(arc_buf_remove_ref(db->db_buf, db) == 1);
+               } else if (dr == NULL || dr->dt.dl.dr_data != db->db_buf) {
+                       arc_release(db->db_buf, db);
+                       VERIFY(arc_buf_remove_ref(db->db_buf, db) == 1);
+               }
+               db->db_buf = NULL;
+       }
+       ASSERT(db->db_buf == NULL);
+       dbuf_set_data(db, buf);
+       db->db_state = DB_FILL;
+       mutex_exit(&db->db_mtx);
+       (void) dbuf_dirty(db, tx);
+       dbuf_fill_done(db, tx);
+}
+
+/*
  * "Clear" the contents of this dbuf.  This will mark the dbuf
  * EVICTING and clear *most* of its references.  Unfortunetely,
  * when we are not holding the dn_dbufs_mtx, we can't clear the
@@ -1319,7 +1552,7 @@ dbuf_fill_done(dmu_buf_impl_t *db, dmu_tx_t *tx)
  * in this case.  For callers from the DMU we will usually see:
  *     dbuf_clear()->arc_buf_evict()->dbuf_do_evict()->dbuf_destroy()
  * For the arc callback, we will usually see:
- *     dbuf_do_evict()->dbuf_clear();dbuf_destroy()
+ *     dbuf_do_evict()->dbuf_clear();dbuf_destroy()
  * Sometimes, though, we will get a mix of these two:
  *     DMU: dbuf_clear()->arc_buf_evict()
  *     ARC: dbuf_do_evict()->dbuf_destroy()
@@ -1327,9 +1560,9 @@ dbuf_fill_done(dmu_buf_impl_t *db, dmu_tx_t *tx)
 void
 dbuf_clear(dmu_buf_impl_t *db)
 {
-       dnode_t *dn = db->db_dnode;
+       dnode_t *dn;
        dmu_buf_impl_t *parent = db->db_parent;
-       dmu_buf_impl_t *dndb = dn->dn_dbuf;
+       dmu_buf_impl_t *dndb;
        int dbuf_gone = FALSE;
 
        ASSERT(MUTEX_HELD(&db->db_mtx));
@@ -1339,9 +1572,9 @@ dbuf_clear(dmu_buf_impl_t *db)
 
        if (db->db_state == DB_CACHED) {
                ASSERT(db->db.db_data != NULL);
-               if (db->db_blkid == DB_BONUS_BLKID) {
+               if (db->db_blkid == DMU_BONUS_BLKID) {
                        zio_buf_free(db->db.db_data, DN_MAX_BONUSLEN);
-                       arc_space_return(DN_MAX_BONUSLEN);
+                       arc_space_return(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
                }
                db->db.db_data = NULL;
                db->db_state = DB_UNCACHED;
@@ -1353,10 +1586,26 @@ dbuf_clear(dmu_buf_impl_t *db)
        db->db_state = DB_EVICTING;
        db->db_blkptr = NULL;
 
-       if (db->db_blkid != DB_BONUS_BLKID && MUTEX_HELD(&dn->dn_dbufs_mtx)) {
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+       dndb = dn->dn_dbuf;
+       if (db->db_blkid != DMU_BONUS_BLKID && MUTEX_HELD(&dn->dn_dbufs_mtx)) {
                list_remove(&dn->dn_dbufs, db);
+               (void) atomic_dec_32_nv(&dn->dn_dbufs_count);
+               membar_producer();
+               DB_DNODE_EXIT(db);
+               /*
+                * Decrementing the dbuf count means that the hold corresponding
+                * to the removed dbuf is no longer discounted in dnode_move(),
+                * so the dnode cannot be moved until after we release the hold.
+                * The membar_producer() ensures visibility of the decremented
+                * value in dnode_move(), since DB_DNODE_EXIT doesn't actually
+                * release any lock.
+                */
                dnode_rele(dn, db);
-               db->db_dnode = NULL;
+               db->db_dnode_handle = NULL;
+       } else {
+               DB_DNODE_EXIT(db);
        }
 
        if (db->db_buf)
@@ -1366,23 +1615,37 @@ dbuf_clear(dmu_buf_impl_t *db)
                mutex_exit(&db->db_mtx);
 
        /*
-        * If this dbuf is referened from an indirect dbuf,
+        * If this dbuf is referenced from an indirect dbuf,
         * decrement the ref count on the indirect dbuf.
         */
        if (parent && parent != dndb)
                dbuf_rele(parent, db);
 }
 
-static int
+__attribute__((always_inline))
+static inline int
 dbuf_findbp(dnode_t *dn, int level, uint64_t blkid, int fail_sparse,
-    dmu_buf_impl_t **parentp, blkptr_t **bpp)
+    dmu_buf_impl_t **parentp, blkptr_t **bpp, struct dbuf_hold_impl_data *dh)
 {
        int nlevels, epbs;
 
        *parentp = NULL;
        *bpp = NULL;
 
-       ASSERT(blkid != DB_BONUS_BLKID);
+       ASSERT(blkid != DMU_BONUS_BLKID);
+
+       if (blkid == DMU_SPILL_BLKID) {
+               mutex_enter(&dn->dn_mtx);
+               if (dn->dn_have_spill &&
+                   (dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR))
+                       *bpp = &dn->dn_phys->dn_spill;
+               else
+                       *bpp = NULL;
+               dbuf_add_ref(dn->dn_dbuf, NULL);
+               *parentp = dn->dn_dbuf;
+               mutex_exit(&dn->dn_mtx);
+               return (0);
+       }
 
        if (dn->dn_phys->dn_nlevels == 0)
                nlevels = 1;
@@ -1399,8 +1662,17 @@ dbuf_findbp(dnode_t *dn, int level, uint64_t blkid, int fail_sparse,
                return (ENOENT);
        } else if (level < nlevels-1) {
                /* this block is referenced from an indirect block */
-               int err = dbuf_hold_impl(dn, level+1,
-                   blkid >> epbs, fail_sparse, NULL, parentp);
+               int err;
+               if (dh == NULL) {
+                       err = dbuf_hold_impl(dn, level+1, blkid >> epbs,
+                                       fail_sparse, NULL, parentp);
+               }
+               else {
+                       __dbuf_hold_impl_init(dh + 1, dn, dh->dh_level + 1,
+                                       blkid >> epbs, fail_sparse, NULL,
+                                       parentp, dh->dh_depth + 1);
+                       err = __dbuf_hold_impl(dh + 1);
+               }
                if (err)
                        return (err);
                err = dbuf_read(*parentp, NULL,
@@ -1431,13 +1703,13 @@ static dmu_buf_impl_t *
 dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
     dmu_buf_impl_t *parent, blkptr_t *blkptr)
 {
-       objset_impl_t *os = dn->dn_objset;
+       objset_t *os = dn->dn_objset;
        dmu_buf_impl_t *db, *odb;
 
        ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
        ASSERT(dn->dn_type != DMU_OT_NONE);
 
-       db = kmem_cache_alloc(dbuf_cache, KM_SLEEP);
+       db = kmem_cache_alloc(dbuf_cache, KM_PUSHPAGE);
 
        db->db_objset = os;
        db->db.db_object = dn->dn_object;
@@ -1445,7 +1717,7 @@ dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
        db->db_blkid = blkid;
        db->db_last_dirty = NULL;
        db->db_dirtycnt = 0;
-       db->db_dnode = dn;
+       db->db_dnode_handle = dn->dn_handle;
        db->db_parent = parent;
        db->db_blkptr = blkptr;
 
@@ -1455,16 +1727,20 @@ dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
        db->db_immediate_evict = 0;
        db->db_freed_in_flight = 0;
 
-       if (blkid == DB_BONUS_BLKID) {
+       if (blkid == DMU_BONUS_BLKID) {
                ASSERT3P(parent, ==, dn->dn_dbuf);
                db->db.db_size = DN_MAX_BONUSLEN -
                    (dn->dn_nblkptr-1) * sizeof (blkptr_t);
                ASSERT3U(db->db.db_size, >=, dn->dn_bonuslen);
-               db->db.db_offset = DB_BONUS_BLKID;
+               db->db.db_offset = DMU_BONUS_BLKID;
                db->db_state = DB_UNCACHED;
                /* the bonus dbuf is not placed in the hash table */
-               arc_space_consume(sizeof (dmu_buf_impl_t));
+               arc_space_consume(sizeof (dmu_buf_impl_t), ARC_SPACE_OTHER);
                return (db);
+       } else if (blkid == DMU_SPILL_BLKID) {
+               db->db.db_size = (blkptr != NULL) ?
+                   BP_GET_LSIZE(blkptr) : SPA_MINBLOCKSIZE;
+               db->db.db_offset = 0;
        } else {
                int blocksize =
                    db->db_level ? 1<<dn->dn_indblkshift :  dn->dn_datablksz;
@@ -1490,7 +1766,7 @@ dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
        list_insert_head(&dn->dn_dbufs, db);
        db->db_state = DB_UNCACHED;
        mutex_exit(&dn->dn_dbufs_mtx);
-       arc_space_consume(sizeof (dmu_buf_impl_t));
+       arc_space_consume(sizeof (dmu_buf_impl_t), ARC_SPACE_OTHER);
 
        if (parent && parent != dn->dn_dbuf)
                dbuf_add_ref(parent, db);
@@ -1498,6 +1774,7 @@ dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
        ASSERT(dn->dn_object == DMU_META_DNODE_OBJECT ||
            refcount_count(&dn->dn_holds) > 0);
        (void) refcount_add(&dn->dn_holds, db);
+       (void) atomic_inc_32_nv(&dn->dn_dbufs_count);
 
        dprintf_dbuf(db, "db=%p\n", db);
 
@@ -1532,20 +1809,29 @@ dbuf_destroy(dmu_buf_impl_t *db)
 {
        ASSERT(refcount_is_zero(&db->db_holds));
 
-       if (db->db_blkid != DB_BONUS_BLKID) {
+       if (db->db_blkid != DMU_BONUS_BLKID) {
                /*
                 * If this dbuf is still on the dn_dbufs list,
                 * remove it from that list.
                 */
-               if (db->db_dnode) {
-                       dnode_t *dn = db->db_dnode;
+               if (db->db_dnode_handle != NULL) {
+                       dnode_t *dn;
 
+                       DB_DNODE_ENTER(db);
+                       dn = DB_DNODE(db);
                        mutex_enter(&dn->dn_dbufs_mtx);
                        list_remove(&dn->dn_dbufs, db);
+                       (void) atomic_dec_32_nv(&dn->dn_dbufs_count);
                        mutex_exit(&dn->dn_dbufs_mtx);
-
+                       DB_DNODE_EXIT(db);
+                       /*
+                        * Decrementing the dbuf count means that the hold
+                        * corresponding to the removed dbuf is no longer
+                        * discounted in dnode_move(), so the dnode cannot be
+                        * moved until after we release the hold.
+                        */
                        dnode_rele(dn, db);
-                       db->db_dnode = NULL;
+                       db->db_dnode_handle = NULL;
                }
                dbuf_hash_remove(db);
        }
@@ -1559,7 +1845,7 @@ dbuf_destroy(dmu_buf_impl_t *db)
        ASSERT(db->db_data_pending == NULL);
 
        kmem_cache_free(dbuf_cache, db);
-       arc_space_return(sizeof (dmu_buf_impl_t));
+       arc_space_return(sizeof (dmu_buf_impl_t), ARC_SPACE_OTHER);
 }
 
 void
@@ -1568,45 +1854,36 @@ dbuf_prefetch(dnode_t *dn, uint64_t blkid)
        dmu_buf_impl_t *db = NULL;
        blkptr_t *bp = NULL;
 
-       ASSERT(blkid != DB_BONUS_BLKID);
+       ASSERT(blkid != DMU_BONUS_BLKID);
        ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
 
        if (dnode_block_freed(dn, blkid))
                return;
 
        /* dbuf_find() returns with db_mtx held */
-       if (db = dbuf_find(dn, 0, blkid)) {
-               if (refcount_count(&db->db_holds) > 0) {
-                       /*
-                        * This dbuf is active.  We assume that it is
-                        * already CACHED, or else about to be either
-                        * read or filled.
-                        */
-                       mutex_exit(&db->db_mtx);
-                       return;
-               }
+       if ((db = dbuf_find(dn, 0, blkid))) {
+               /*
+                * This dbuf is already in the cache.  We assume that
+                * it is already CACHED, or else about to be either
+                * read or filled.
+                */
                mutex_exit(&db->db_mtx);
-               db = NULL;
+               return;
        }
 
-       if (dbuf_findbp(dn, 0, blkid, TRUE, &db, &bp) == 0) {
+       if (dbuf_findbp(dn, 0, blkid, TRUE, &db, &bp, NULL) == 0) {
                if (bp && !BP_IS_HOLE(bp)) {
-                       arc_buf_t *pbuf;
+                       int priority = dn->dn_type == DMU_OT_DDT_ZAP ?
+                           ZIO_PRIORITY_DDT_PREFETCH : ZIO_PRIORITY_ASYNC_READ;
+                       dsl_dataset_t *ds = dn->dn_objset->os_dsl_dataset;
                        uint32_t aflags = ARC_NOWAIT | ARC_PREFETCH;
                        zbookmark_t zb;
-                       zb.zb_objset = dn->dn_objset->os_dsl_dataset ?
-                           dn->dn_objset->os_dsl_dataset->ds_object : 0;
-                       zb.zb_object = dn->dn_object;
-                       zb.zb_level = 0;
-                       zb.zb_blkid = blkid;
-
-                       if (db)
-                               pbuf = db->db_buf;
-                       else
-                               pbuf = dn->dn_objset->os_phys_buf;
+
+                       SET_BOOKMARK(&zb, ds ? ds->ds_object : DMU_META_OBJSET,
+                           dn->dn_object, 0, blkid);
 
                        (void) arc_read(NULL, dn->dn_objset->os_spa,
-                           bp, pbuf, NULL, NULL, ZIO_PRIORITY_ASYNC_READ,
+                           bp, NULL, NULL, priority,
                            ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
                            &aflags, &zb);
                }
@@ -1615,98 +1892,142 @@ dbuf_prefetch(dnode_t *dn, uint64_t blkid)
        }
 }
 
+#define DBUF_HOLD_IMPL_MAX_DEPTH       20
+
 /*
  * Returns with db_holds incremented, and db_mtx not held.
  * Note: dn_struct_rwlock must be held.
  */
-int
-dbuf_hold_impl(dnode_t *dn, uint8_t level, uint64_t blkid, int fail_sparse,
-    void *tag, dmu_buf_impl_t **dbp)
+static int
+__dbuf_hold_impl(struct dbuf_hold_impl_data *dh)
 {
-       dmu_buf_impl_t *db, *parent = NULL;
+       ASSERT3S(dh->dh_depth, <, DBUF_HOLD_IMPL_MAX_DEPTH);
+       dh->dh_parent = NULL;
 
-       ASSERT(blkid != DB_BONUS_BLKID);
-       ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
-       ASSERT3U(dn->dn_nlevels, >, level);
+       ASSERT(dh->dh_blkid != DMU_BONUS_BLKID);
+       ASSERT(RW_LOCK_HELD(&dh->dh_dn->dn_struct_rwlock));
+       ASSERT3U(dh->dh_dn->dn_nlevels, >, dh->dh_level);
 
-       *dbp = NULL;
+       *(dh->dh_dbp) = NULL;
 top:
        /* dbuf_find() returns with db_mtx held */
-       db = dbuf_find(dn, level, blkid);
-
-       if (db == NULL) {
-               blkptr_t *bp = NULL;
-               int err;
-
-               ASSERT3P(parent, ==, NULL);
-               err = dbuf_findbp(dn, level, blkid, fail_sparse, &parent, &bp);
-               if (fail_sparse) {
-                       if (err == 0 && bp && BP_IS_HOLE(bp))
-                               err = ENOENT;
-                       if (err) {
-                               if (parent)
-                                       dbuf_rele(parent, NULL);
-                               return (err);
+       dh->dh_db = dbuf_find(dh->dh_dn, dh->dh_level, dh->dh_blkid);
+
+       if (dh->dh_db == NULL) {
+               dh->dh_bp = NULL;
+
+               ASSERT3P(dh->dh_parent, ==, NULL);
+               dh->dh_err = dbuf_findbp(dh->dh_dn, dh->dh_level, dh->dh_blkid,
+                                       dh->dh_fail_sparse, &dh->dh_parent,
+                                       &dh->dh_bp, dh);
+               if (dh->dh_fail_sparse) {
+                       if (dh->dh_err == 0 && dh->dh_bp && BP_IS_HOLE(dh->dh_bp))
+                               dh->dh_err = ENOENT;
+                       if (dh->dh_err) {
+                               if (dh->dh_parent)
+                                       dbuf_rele(dh->dh_parent, NULL);
+                               return (dh->dh_err);
                        }
                }
-               if (err && err != ENOENT)
-                       return (err);
-               db = dbuf_create(dn, level, blkid, parent, bp);
-       }
-
-       if (db->db_buf && refcount_is_zero(&db->db_holds)) {
-               arc_buf_add_ref(db->db_buf, db);
-               if (db->db_buf->b_data == NULL) {
-                       dbuf_clear(db);
-                       if (parent) {
-                               dbuf_rele(parent, NULL);
-                               parent = NULL;
+               if (dh->dh_err && dh->dh_err != ENOENT)
+                       return (dh->dh_err);
+               dh->dh_db = dbuf_create(dh->dh_dn, dh->dh_level, dh->dh_blkid,
+                                       dh->dh_parent, dh->dh_bp);
+       }
+
+       if (dh->dh_db->db_buf && refcount_is_zero(&dh->dh_db->db_holds)) {
+               arc_buf_add_ref(dh->dh_db->db_buf, dh->dh_db);
+               if (dh->dh_db->db_buf->b_data == NULL) {
+                       dbuf_clear(dh->dh_db);
+                       if (dh->dh_parent) {
+                               dbuf_rele(dh->dh_parent, NULL);
+                               dh->dh_parent = NULL;
                        }
                        goto top;
                }
-               ASSERT3P(db->db.db_data, ==, db->db_buf->b_data);
+               ASSERT3P(dh->dh_db->db.db_data, ==, dh->dh_db->db_buf->b_data);
        }
 
-       ASSERT(db->db_buf == NULL || arc_referenced(db->db_buf));
+       ASSERT(dh->dh_db->db_buf == NULL || arc_referenced(dh->dh_db->db_buf));
 
        /*
         * If this buffer is currently syncing out, and we are are
         * still referencing it from db_data, we need to make a copy
         * of it in case we decide we want to dirty it again in this txg.
         */
-       if (db->db_level == 0 && db->db_blkid != DB_BONUS_BLKID &&
-           dn->dn_object != DMU_META_DNODE_OBJECT &&
-           db->db_state == DB_CACHED && db->db_data_pending) {
-               dbuf_dirty_record_t *dr = db->db_data_pending;
-
-               if (dr->dt.dl.dr_data == db->db_buf) {
-                       arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
-
-                       dbuf_set_data(db,
-                           arc_buf_alloc(db->db_dnode->dn_objset->os_spa,
-                           db->db.db_size, db, type));
-                       bcopy(dr->dt.dl.dr_data->b_data, db->db.db_data,
-                           db->db.db_size);
+       if (dh->dh_db->db_level == 0 &&
+           dh->dh_db->db_blkid != DMU_BONUS_BLKID &&
+           dh->dh_dn->dn_object != DMU_META_DNODE_OBJECT &&
+           dh->dh_db->db_state == DB_CACHED && dh->dh_db->db_data_pending) {
+               dh->dh_dr = dh->dh_db->db_data_pending;
+
+               if (dh->dh_dr->dt.dl.dr_data == dh->dh_db->db_buf) {
+                       dh->dh_type = DBUF_GET_BUFC_TYPE(dh->dh_db);
+
+                       dbuf_set_data(dh->dh_db,
+                           arc_buf_alloc(dh->dh_dn->dn_objset->os_spa,
+                           dh->dh_db->db.db_size, dh->dh_db, dh->dh_type));
+                       bcopy(dh->dh_dr->dt.dl.dr_data->b_data,
+                           dh->dh_db->db.db_data, dh->dh_db->db.db_size);
                }
        }
 
-       (void) refcount_add(&db->db_holds, tag);
-       dbuf_update_data(db);
-       DBUF_VERIFY(db);
-       mutex_exit(&db->db_mtx);
+       (void) refcount_add(&dh->dh_db->db_holds, dh->dh_tag);
+       dbuf_update_data(dh->dh_db);
+       DBUF_VERIFY(dh->dh_db);
+       mutex_exit(&dh->dh_db->db_mtx);
 
        /* NOTE: we can't rele the parent until after we drop the db_mtx */
-       if (parent)
-               dbuf_rele(parent, NULL);
+       if (dh->dh_parent)
+               dbuf_rele(dh->dh_parent, NULL);
 
-       ASSERT3P(db->db_dnode, ==, dn);
-       ASSERT3U(db->db_blkid, ==, blkid);
-       ASSERT3U(db->db_level, ==, level);
-       *dbp = db;
+       ASSERT3P(DB_DNODE(dh->dh_db), ==, dh->dh_dn);
+       ASSERT3U(dh->dh_db->db_blkid, ==, dh->dh_blkid);
+       ASSERT3U(dh->dh_db->db_level, ==, dh->dh_level);
+       *(dh->dh_dbp) = dh->dh_db;
 
        return (0);
 }
 
+/*
+ * The following code preserves the recursive function dbuf_hold_impl()
+ * but moves the local variables AND function arguments to the heap to
+ * minimize the stack frame size.  Enough space is initially allocated
+ * on the stack for 20 levels of recursion.
+ */
+int
+dbuf_hold_impl(dnode_t *dn, uint8_t level, uint64_t blkid, int fail_sparse,
+    void *tag, dmu_buf_impl_t **dbp)
+{
+       struct dbuf_hold_impl_data *dh;
+       int error;
+
+       dh = kmem_zalloc(sizeof(struct dbuf_hold_impl_data) *
+           DBUF_HOLD_IMPL_MAX_DEPTH, KM_PUSHPAGE);
+       __dbuf_hold_impl_init(dh, dn, level, blkid, fail_sparse, tag, dbp, 0);
+
+       error = __dbuf_hold_impl(dh);
+
+       kmem_free(dh, sizeof(struct dbuf_hold_impl_data) *
+           DBUF_HOLD_IMPL_MAX_DEPTH);
+
+       return (error);
+}
+
+static void
+__dbuf_hold_impl_init(struct dbuf_hold_impl_data *dh,
+    dnode_t *dn, uint8_t level, uint64_t blkid, int fail_sparse,
+    void *tag, dmu_buf_impl_t **dbp, int depth)
+{
+       dh->dh_dn = dn;
+       dh->dh_level = level;
+       dh->dh_blkid = blkid;
+       dh->dh_fail_sparse = fail_sparse;
+       dh->dh_tag = tag;
+       dh->dh_dbp = dbp;
+       dh->dh_depth = depth;
+}
+
 dmu_buf_impl_t *
 dbuf_hold(dnode_t *dn, uint64_t blkid, void *tag)
 {
@@ -1729,26 +2050,79 @@ dbuf_create_bonus(dnode_t *dn)
        ASSERT(RW_WRITE_HELD(&dn->dn_struct_rwlock));
 
        ASSERT(dn->dn_bonus == NULL);
-       dn->dn_bonus = dbuf_create(dn, 0, DB_BONUS_BLKID, dn->dn_dbuf, NULL);
+       dn->dn_bonus = dbuf_create(dn, 0, DMU_BONUS_BLKID, dn->dn_dbuf, NULL);
+}
+
+int
+dbuf_spill_set_blksz(dmu_buf_t *db_fake, uint64_t blksz, dmu_tx_t *tx)
+{
+       dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
+       dnode_t *dn;
+
+       if (db->db_blkid != DMU_SPILL_BLKID)
+               return (ENOTSUP);
+       if (blksz == 0)
+               blksz = SPA_MINBLOCKSIZE;
+       if (blksz > SPA_MAXBLOCKSIZE)
+               blksz = SPA_MAXBLOCKSIZE;
+       else
+               blksz = P2ROUNDUP(blksz, SPA_MINBLOCKSIZE);
+
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+       rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
+       dbuf_new_size(db, blksz, tx);
+       rw_exit(&dn->dn_struct_rwlock);
+       DB_DNODE_EXIT(db);
+
+       return (0);
+}
+
+void
+dbuf_rm_spill(dnode_t *dn, dmu_tx_t *tx)
+{
+       dbuf_free_range(dn, DMU_SPILL_BLKID, DMU_SPILL_BLKID, tx);
 }
 
 #pragma weak dmu_buf_add_ref = dbuf_add_ref
 void
 dbuf_add_ref(dmu_buf_impl_t *db, void *tag)
 {
-       int64_t holds = refcount_add(&db->db_holds, tag);
-       ASSERT(holds > 1);
+       VERIFY(refcount_add(&db->db_holds, tag) > 1);
 }
 
+/*
+ * If you call dbuf_rele() you had better not be referencing the dnode handle
+ * unless you have some other direct or indirect hold on the dnode. (An indirect
+ * hold is a hold on one of the dnode's dbufs, including the bonus buffer.)
+ * Without that, the dbuf_rele() could lead to a dnode_rele() followed by the
+ * dnode's parent dbuf evicting its dnode handles.
+ */
 #pragma weak dmu_buf_rele = dbuf_rele
 void
 dbuf_rele(dmu_buf_impl_t *db, void *tag)
 {
+       mutex_enter(&db->db_mtx);
+       dbuf_rele_and_unlock(db, tag);
+}
+
+/*
+ * dbuf_rele() for an already-locked dbuf.  This is necessary to allow
+ * db_dirtycnt and db_holds to be updated atomically.
+ */
+void
+dbuf_rele_and_unlock(dmu_buf_impl_t *db, void *tag)
+{
        int64_t holds;
 
-       mutex_enter(&db->db_mtx);
+       ASSERT(MUTEX_HELD(&db->db_mtx));
        DBUF_VERIFY(db);
 
+       /*
+        * Remove the reference to the dbuf before removing its hold on the
+        * dnode so we can guarantee in dnode_move() that a referenced bonus
+        * buffer has a corresponding dnode hold.
+        */
        holds = refcount_remove(&db->db_holds, tag);
        ASSERT(holds >= 0);
 
@@ -1764,9 +2138,22 @@ dbuf_rele(dmu_buf_impl_t *db, void *tag)
                dbuf_evict_user(db);
 
        if (holds == 0) {
-               if (db->db_blkid == DB_BONUS_BLKID) {
+               if (db->db_blkid == DMU_BONUS_BLKID) {
                        mutex_exit(&db->db_mtx);
-                       dnode_rele(db->db_dnode, db);
+
+                       /*
+                        * If the dnode moves here, we cannot cross this barrier
+                        * until the move completes.
+                        */
+                       DB_DNODE_ENTER(db);
+                       (void) atomic_dec_32_nv(&DB_DNODE(db)->dn_dbufs_count);
+                       DB_DNODE_EXIT(db);
+                       /*
+                        * The bonus buffer's dnode hold is no longer discounted
+                        * in dnode_move(). The dnode cannot move until after
+                        * the dnode_rele().
+                        */
+                       dnode_rele(DB_DNODE(db), db);
                } else if (db->db_buf == NULL) {
                        /*
                         * This is a special case: we never associated this
@@ -1785,7 +2172,24 @@ dbuf_rele(dmu_buf_impl_t *db, void *tag)
                        dbuf_evict(db);
                } else {
                        VERIFY(arc_buf_remove_ref(db->db_buf, db) == 0);
-                       if (!DBUF_IS_CACHEABLE(db))
+
+                       /*
+                        * A dbuf will be eligible for eviction if either the
+                        * 'primarycache' property is set or a duplicate
+                        * copy of this buffer is already cached in the arc.
+                        *
+                        * In the case of the 'primarycache' a buffer
+                        * is considered for eviction if it matches the
+                        * criteria set in the property.
+                        *
+                        * To decide if our buffer is considered a
+                        * duplicate, we must call into the arc to determine
+                        * if multiple buffers are referencing the same
+                        * block on-disk. If so, then we simply evict
+                        * ourselves.
+                        */
+                       if (!DBUF_IS_CACHEABLE(db) ||
+                           arc_buf_eviction_needed(db->db_buf))
                                dbuf_clear(db);
                        else
                                mutex_exit(&db->db_mtx);
@@ -1855,6 +2259,19 @@ dmu_buf_get_user(dmu_buf_t *db_fake)
        return (db->db_user_ptr);
 }
 
+boolean_t
+dmu_buf_freeable(dmu_buf_t *dbuf)
+{
+       boolean_t res = B_FALSE;
+       dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbuf;
+
+       if (db->db_blkptr)
+               res = dsl_dataset_block_freeable(db->db_objset->os_dsl_dataset,
+                   db->db_blkptr, db->db_blkptr->blk_birth);
+
+       return (res);
+}
+
 static void
 dbuf_check_blkptr(dnode_t *dn, dmu_buf_impl_t *db)
 {
@@ -1864,6 +2281,11 @@ dbuf_check_blkptr(dnode_t *dn, dmu_buf_impl_t *db)
        if (db->db_blkptr != NULL)
                return;
 
+       if (db->db_blkid == DMU_SPILL_BLKID) {
+               db->db_blkptr = &dn->dn_phys->dn_spill;
+               BP_ZERO(db->db_blkptr);
+               return;
+       }
        if (db->db_level == dn->dn_phys->dn_nlevels-1) {
                /*
                 * This buffer was allocated at a time when there was
@@ -1895,11 +2317,15 @@ dbuf_check_blkptr(dnode_t *dn, dmu_buf_impl_t *db)
        }
 }
 
-static void
+/* dbuf_sync_indirect() is called recursively from dbuf_sync_list() so it
+ * is critical the we not allow the compiler to inline this function in to
+ * dbuf_sync_list() thereby drastically bloating the stack usage.
+ */
+noinline static void
 dbuf_sync_indirect(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
 {
        dmu_buf_impl_t *db = dr->dr_dbuf;
-       dnode_t *dn = db->db_dnode;
+       dnode_t *dn;
        zio_t *zio;
 
        ASSERT(dmu_tx_is_syncing(tx));
@@ -1917,10 +2343,13 @@ dbuf_sync_indirect(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
                mutex_enter(&db->db_mtx);
        }
        ASSERT3U(db->db_state, ==, DB_CACHED);
-       ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
        ASSERT(db->db_buf != NULL);
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+       ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
        dbuf_check_blkptr(dn, db);
+       DB_DNODE_EXIT(db);
 
        db->db_data_pending = dr;
 
@@ -1935,15 +2364,18 @@ dbuf_sync_indirect(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
        zio_nowait(zio);
 }
 
-static void
+/* dbuf_sync_leaf() is called recursively from dbuf_sync_list() so it is
+ * critical the we not allow the compiler to inline this function in to
+ * dbuf_sync_list() thereby drastically bloating the stack usage.
+ */
+noinline static void
 dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
 {
        arc_buf_t **datap = &dr->dt.dl.dr_data;
        dmu_buf_impl_t *db = dr->dr_dbuf;
-       dnode_t *dn = db->db_dnode;
-       objset_impl_t *os = dn->dn_objset;
+       dnode_t *dn;
+       objset_t *os;
        uint64_t txg = tx->tx_txg;
-       int blksz;
 
        ASSERT(dmu_tx_is_syncing(tx));
 
@@ -1965,37 +2397,54 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
        }
        DBUF_VERIFY(db);
 
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+
+       if (db->db_blkid == DMU_SPILL_BLKID) {
+               mutex_enter(&dn->dn_mtx);
+               dn->dn_phys->dn_flags |= DNODE_FLAG_SPILL_BLKPTR;
+               mutex_exit(&dn->dn_mtx);
+       }
+
        /*
         * If this is a bonus buffer, simply copy the bonus data into the
         * dnode.  It will be written out when the dnode is synced (and it
         * will be synced, since it must have been dirty for dbuf_sync to
         * be called).
         */
-       if (db->db_blkid == DB_BONUS_BLKID) {
+       if (db->db_blkid == DMU_BONUS_BLKID) {
                dbuf_dirty_record_t **drp;
 
                ASSERT(*datap != NULL);
-               ASSERT3U(db->db_level, ==, 0);
+               ASSERT0(db->db_level);
                ASSERT3U(dn->dn_phys->dn_bonuslen, <=, DN_MAX_BONUSLEN);
                bcopy(*datap, DN_BONUS(dn->dn_phys), dn->dn_phys->dn_bonuslen);
+               DB_DNODE_EXIT(db);
+
                if (*datap != db->db.db_data) {
                        zio_buf_free(*datap, DN_MAX_BONUSLEN);
-                       arc_space_return(DN_MAX_BONUSLEN);
+                       arc_space_return(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
                }
                db->db_data_pending = NULL;
                drp = &db->db_last_dirty;
                while (*drp != dr)
                        drp = &(*drp)->dr_next;
                ASSERT(dr->dr_next == NULL);
+               ASSERT(dr->dr_dbuf == db);
                *drp = dr->dr_next;
+               if (dr->dr_dbuf->db_level != 0) {
+                       mutex_destroy(&dr->dt.di.dr_mtx);
+                       list_destroy(&dr->dt.di.dr_children);
+               }
                kmem_free(dr, sizeof (dbuf_dirty_record_t));
                ASSERT(db->db_dirtycnt > 0);
                db->db_dirtycnt -= 1;
-               mutex_exit(&db->db_mtx);
-               dbuf_rele(db, (void *)(uintptr_t)txg);
+               dbuf_rele_and_unlock(db, (void *)(uintptr_t)txg);
                return;
        }
 
+       os = dn->dn_objset;
+
        /*
         * This function may have dropped the db_mtx lock allowing a dmu_sync
         * operation to sneak in. As a result, we need to ensure that we
@@ -2005,7 +2454,7 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
        dbuf_check_blkptr(dn, db);
 
        /*
-        * If this buffer is in the middle of an immdiate write,
+        * If this buffer is in the middle of an immediate write,
         * wait for the synchronous IO to complete.
         */
        while (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
@@ -2014,67 +2463,26 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
                ASSERT(dr->dt.dl.dr_override_state != DR_NOT_OVERRIDDEN);
        }
 
-       /*
-        * If this dbuf has already been written out via an immediate write,
-        * just complete the write by copying over the new block pointer and
-        * updating the accounting via the write-completion functions.
-        */
-       if (dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
-               zio_t zio_fake;
-
-               zio_fake.io_private = &db;
-               zio_fake.io_error = 0;
-               zio_fake.io_bp = db->db_blkptr;
-               zio_fake.io_bp_orig = *db->db_blkptr;
-               zio_fake.io_txg = txg;
-               zio_fake.io_flags = 0;
-
-               *db->db_blkptr = dr->dt.dl.dr_overridden_by;
-               dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
-               db->db_data_pending = dr;
-               dr->dr_zio = &zio_fake;
-               mutex_exit(&db->db_mtx);
-
-               ASSERT(!DVA_EQUAL(BP_IDENTITY(zio_fake.io_bp),
-                   BP_IDENTITY(&zio_fake.io_bp_orig)) ||
-                   BP_IS_HOLE(zio_fake.io_bp));
-
-               if (BP_IS_OLDER(&zio_fake.io_bp_orig, txg))
-                       (void) dsl_dataset_block_kill(os->os_dsl_dataset,
-                           &zio_fake.io_bp_orig, dn->dn_zio, tx);
-
-               dbuf_write_ready(&zio_fake, db->db_buf, db);
-               dbuf_write_done(&zio_fake, db->db_buf, db);
-
-               return;
-       }
-
-       if (db->db_state != DB_NOFILL) {
-               blksz = arc_buf_size(*datap);
-
-               if (dn->dn_object != DMU_META_DNODE_OBJECT) {
-                       /*
-                        * If this buffer is currently "in use" (i.e., there
-                        * are active holds and db_data still references it),
-                        * then make a copy before we start the write so that
-                        * any modifications from the open txg will not leak
-                        * into this write.
-                        *
-                        * NOTE: this copy does not need to be made for
-                        * objects only modified in the syncing context (e.g.
-                        * DNONE_DNODE blocks).
-                        */
-                       if (refcount_count(&db->db_holds) > 1 &&
-                           *datap == db->db_buf) {
-                               arc_buf_contents_t type =
-                                   DBUF_GET_BUFC_TYPE(db);
-                               *datap =
-                                   arc_buf_alloc(os->os_spa, blksz, db, type);
-                               bcopy(db->db.db_data, (*datap)->b_data, blksz);
-                       }
-               }
-
-               ASSERT(*datap != NULL);
+       if (db->db_state != DB_NOFILL &&
+           dn->dn_object != DMU_META_DNODE_OBJECT &&
+           refcount_count(&db->db_holds) > 1 &&
+           dr->dt.dl.dr_override_state != DR_OVERRIDDEN &&
+           *datap == db->db_buf) {
+               /*
+                * If this buffer is currently "in use" (i.e., there
+                * are active holds and db_data still references it),
+                * then make a copy before we start the write so that
+                * any modifications from the open txg will not leak
+                * into this write.
+                *
+                * NOTE: this copy does not need to be made for
+                * objects only modified in the syncing context (e.g.
+                * DNONE_DNODE blocks).
+                */
+               int blksz = arc_buf_size(*datap);
+               arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
+               *datap = arc_buf_alloc(os->os_spa, blksz, db, type);
+               bcopy(db->db.db_data, (*datap)->b_data, blksz);
        }
        db->db_data_pending = dr;
 
@@ -2083,10 +2491,20 @@ dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
        dbuf_write(dr, *datap, tx);
 
        ASSERT(!list_link_active(&dr->dr_dirty_node));
-       if (dn->dn_object == DMU_META_DNODE_OBJECT)
+       if (dn->dn_object == DMU_META_DNODE_OBJECT) {
                list_insert_tail(&dn->dn_dirty_records[txg&TXG_MASK], dr);
-       else
+               DB_DNODE_EXIT(db);
+       } else {
+               /*
+                * Although zio_nowait() does not "wait for an IO", it does
+                * initiate the IO. If this is an empty write it seems plausible
+                * that the IO could actually be completed before the nowait
+                * returns. We need to DB_DNODE_EXIT() first in case
+                * zio_nowait() invalidates the dbuf.
+                */
+               DB_DNODE_EXIT(db);
                zio_nowait(dr->dr_zio);
+       }
 }
 
 void
@@ -2094,7 +2512,7 @@ dbuf_sync_list(list_t *list, dmu_tx_t *tx)
 {
        dbuf_dirty_record_t *dr;
 
-       while (dr = list_head(list)) {
+       while ((dr = list_head(list))) {
                if (dr->dr_zio != NULL) {
                        /*
                         * If we find an already initialized zio then we
@@ -2115,141 +2533,53 @@ dbuf_sync_list(list_t *list, dmu_tx_t *tx)
        }
 }
 
-static void
-dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx)
-{
-       dmu_buf_impl_t *db = dr->dr_dbuf;
-       dnode_t *dn = db->db_dnode;
-       objset_impl_t *os = dn->dn_objset;
-       dmu_buf_impl_t *parent = db->db_parent;
-       uint64_t txg = tx->tx_txg;
-       zbookmark_t zb;
-       writeprops_t wp = { 0 };
-       zio_t *zio;
-
-       if (!BP_IS_HOLE(db->db_blkptr) &&
-           (db->db_level > 0 || dn->dn_type == DMU_OT_DNODE)) {
-               /*
-                * Private object buffers are released here rather
-                * than in dbuf_dirty() since they are only modified
-                * in the syncing context and we don't want the
-                * overhead of making multiple copies of the data.
-                */
-               arc_release(data, db);
-       } else if (db->db_state != DB_NOFILL) {
-               ASSERT(arc_released(data));
-               /* XXX why do we need to thaw here? */
-               arc_buf_thaw(data);
-       }
-
-       if (parent != dn->dn_dbuf) {
-               ASSERT(parent && parent->db_data_pending);
-               ASSERT(db->db_level == parent->db_level-1);
-               ASSERT(arc_released(parent->db_buf));
-               zio = parent->db_data_pending->dr_zio;
-       } else {
-               ASSERT(db->db_level == dn->dn_phys->dn_nlevels-1);
-               ASSERT3P(db->db_blkptr, ==,
-                   &dn->dn_phys->dn_blkptr[db->db_blkid]);
-               zio = dn->dn_zio;
-       }
-
-       ASSERT(db->db_level == 0 || data == db->db_buf);
-       ASSERT3U(db->db_blkptr->blk_birth, <=, txg);
-       ASSERT(zio);
-
-       zb.zb_objset = os->os_dsl_dataset ? os->os_dsl_dataset->ds_object : 0;
-       zb.zb_object = db->db.db_object;
-       zb.zb_level = db->db_level;
-       zb.zb_blkid = db->db_blkid;
-
-       wp.wp_type = dn->dn_type;
-       wp.wp_level = db->db_level;
-       wp.wp_copies = os->os_copies;
-       wp.wp_dncompress = dn->dn_compress;
-       wp.wp_oscompress = os->os_compress;
-       wp.wp_dnchecksum = dn->dn_checksum;
-       wp.wp_oschecksum = os->os_checksum;
-
-       if (BP_IS_OLDER(db->db_blkptr, txg))
-               (void) dsl_dataset_block_kill(
-                   os->os_dsl_dataset, db->db_blkptr, zio, tx);
-
-       if (db->db_state == DB_NOFILL) {
-               zio_prop_t zp = { 0 };
-
-               write_policy(os->os_spa, &wp, &zp);
-               dr->dr_zio = zio_write(zio, os->os_spa,
-                   txg, db->db_blkptr, NULL,
-                   db->db.db_size, &zp, dbuf_skip_write_ready,
-                   dbuf_skip_write_done, db, ZIO_PRIORITY_ASYNC_WRITE,
-                   ZIO_FLAG_MUSTSUCCEED, &zb);
-       } else {
-               dr->dr_zio = arc_write(zio, os->os_spa, &wp,
-                   DBUF_IS_L2CACHEABLE(db), txg, db->db_blkptr,
-                   data, dbuf_write_ready, dbuf_write_done, db,
-                   ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
-       }
-}
-
-/* wrapper function for dbuf_write_ready bypassing ARC */
-static void
-dbuf_skip_write_ready(zio_t *zio)
-{
-       blkptr_t *bp = zio->io_bp;
-
-       if (!BP_IS_GANG(bp))
-               zio_skip_write(zio);
-
-       dbuf_write_ready(zio, NULL, zio->io_private);
-}
-
-/* wrapper function for dbuf_write_done bypassing ARC */
-static void
-dbuf_skip_write_done(zio_t *zio)
-{
-       dbuf_write_done(zio, NULL, zio->io_private);
-}
-
 /* ARGSUSED */
 static void
 dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
 {
        dmu_buf_impl_t *db = vdb;
-       dnode_t *dn = db->db_dnode;
-       objset_impl_t *os = dn->dn_objset;
+       dnode_t *dn;
        blkptr_t *bp = zio->io_bp;
        blkptr_t *bp_orig = &zio->io_bp_orig;
+       spa_t *spa = zio->io_spa;
+       int64_t delta;
        uint64_t fill = 0;
-       int old_size, new_size, i;
+       int i;
 
        ASSERT(db->db_blkptr == bp);
 
-       dprintf_dbuf_bp(db, bp_orig, "bp_orig: %s", "");
-
-       old_size = bp_get_dasize(os->os_spa, bp_orig);
-       new_size = bp_get_dasize(os->os_spa, bp);
-
-       dnode_diduse_space(dn, new_size - old_size);
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+       delta = bp_get_dsize_sync(spa, bp) - bp_get_dsize_sync(spa, bp_orig);
+       dnode_diduse_space(dn, delta - zio->io_prev_space_delta);
+       zio->io_prev_space_delta = delta;
 
        if (BP_IS_HOLE(bp)) {
-               dsl_dataset_t *ds = os->os_dsl_dataset;
-               dmu_tx_t *tx = os->os_synctx;
-
-               if (bp_orig->blk_birth == tx->tx_txg)
-                       (void) dsl_dataset_block_kill(ds, bp_orig, zio, tx);
-               ASSERT3U(bp->blk_fill, ==, 0);
+               ASSERT(bp->blk_fill == 0);
+               DB_DNODE_EXIT(db);
                return;
        }
 
-       ASSERT(BP_GET_TYPE(bp) == dn->dn_type);
+       ASSERT((db->db_blkid != DMU_SPILL_BLKID &&
+           BP_GET_TYPE(bp) == dn->dn_type) ||
+           (db->db_blkid == DMU_SPILL_BLKID &&
+           BP_GET_TYPE(bp) == dn->dn_bonustype));
        ASSERT(BP_GET_LEVEL(bp) == db->db_level);
 
        mutex_enter(&db->db_mtx);
 
+#ifdef ZFS_DEBUG
+       if (db->db_blkid == DMU_SPILL_BLKID) {
+               ASSERT(dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR);
+               ASSERT(!(BP_IS_HOLE(db->db_blkptr)) &&
+                   db->db_blkptr == &dn->dn_phys->dn_spill);
+       }
+#endif
+
        if (db->db_level == 0) {
                mutex_enter(&dn->dn_mtx);
-               if (db->db_blkid > dn->dn_phys->dn_maxblkid)
+               if (db->db_blkid > dn->dn_phys->dn_maxblkid &&
+                   db->db_blkid != DMU_SPILL_BLKID)
                        dn->dn_phys->dn_maxblkid = db->db_blkid;
                mutex_exit(&dn->dn_mtx);
 
@@ -2269,27 +2599,14 @@ dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
                for (i = db->db.db_size >> SPA_BLKPTRSHIFT; i > 0; i--, ibp++) {
                        if (BP_IS_HOLE(ibp))
                                continue;
-                       ASSERT3U(BP_GET_LSIZE(ibp), ==,
-                           db->db_level == 1 ? dn->dn_datablksz :
-                           (1<<dn->dn_phys->dn_indblkshift));
                        fill += ibp->blk_fill;
                }
        }
+       DB_DNODE_EXIT(db);
 
        bp->blk_fill = fill;
 
        mutex_exit(&db->db_mtx);
-
-       if (zio->io_flags & ZIO_FLAG_IO_REWRITE) {
-               ASSERT(DVA_EQUAL(BP_IDENTITY(bp), BP_IDENTITY(bp_orig)));
-       } else {
-               dsl_dataset_t *ds = os->os_dsl_dataset;
-               dmu_tx_t *tx = os->os_synctx;
-
-               if (bp_orig->blk_birth == tx->tx_txg)
-                       (void) dsl_dataset_block_kill(ds, bp_orig, zio, tx);
-               dsl_dataset_block_born(ds, bp, tx);
-       }
 }
 
 /* ARGSUSED */
@@ -2297,48 +2614,82 @@ static void
 dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
 {
        dmu_buf_impl_t *db = vdb;
+       blkptr_t *bp = zio->io_bp;
+       blkptr_t *bp_orig = &zio->io_bp_orig;
        uint64_t txg = zio->io_txg;
        dbuf_dirty_record_t **drp, *dr;
 
-       ASSERT3U(zio->io_error, ==, 0);
+       ASSERT0(zio->io_error);
+       ASSERT(db->db_blkptr == bp);
+
+       if (zio->io_flags & ZIO_FLAG_IO_REWRITE) {
+               ASSERT(BP_EQUAL(bp, bp_orig));
+       } else {
+               objset_t *os;
+               dsl_dataset_t *ds;
+               dmu_tx_t *tx;
+
+               DB_GET_OBJSET(&os, db);
+               ds = os->os_dsl_dataset;
+               tx = os->os_synctx;
+
+               (void) dsl_dataset_block_kill(ds, bp_orig, tx, B_TRUE);
+               dsl_dataset_block_born(ds, bp, tx);
+       }
 
        mutex_enter(&db->db_mtx);
 
+       DBUF_VERIFY(db);
+
        drp = &db->db_last_dirty;
        while ((dr = *drp) != db->db_data_pending)
                drp = &dr->dr_next;
        ASSERT(!list_link_active(&dr->dr_dirty_node));
        ASSERT(dr->dr_txg == txg);
+       ASSERT(dr->dr_dbuf == db);
        ASSERT(dr->dr_next == NULL);
        *drp = dr->dr_next;
 
+#ifdef ZFS_DEBUG
+       if (db->db_blkid == DMU_SPILL_BLKID) {
+               dnode_t *dn;
+
+               DB_DNODE_ENTER(db);
+               dn = DB_DNODE(db);
+               ASSERT(dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR);
+               ASSERT(!(BP_IS_HOLE(db->db_blkptr)) &&
+                   db->db_blkptr == &dn->dn_phys->dn_spill);
+               DB_DNODE_EXIT(db);
+       }
+#endif
+
        if (db->db_level == 0) {
-               ASSERT(db->db_blkid != DB_BONUS_BLKID);
+               ASSERT(db->db_blkid != DMU_BONUS_BLKID);
                ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
-
                if (db->db_state != DB_NOFILL) {
                        if (dr->dt.dl.dr_data != db->db_buf)
                                VERIFY(arc_buf_remove_ref(dr->dt.dl.dr_data,
                                    db) == 1);
-                       else if (!BP_IS_HOLE(db->db_blkptr))
+                       else if (!arc_released(db->db_buf))
                                arc_set_callback(db->db_buf, dbuf_do_evict, db);
-                       else
-                               ASSERT(arc_released(db->db_buf));
                }
        } else {
-               dnode_t *dn = db->db_dnode;
+               dnode_t *dn;
 
+               DB_DNODE_ENTER(db);
+               dn = DB_DNODE(db);
                ASSERT(list_head(&dr->dt.di.dr_children) == NULL);
                ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
                if (!BP_IS_HOLE(db->db_blkptr)) {
-                       int epbs =
-                           dn->dn_phys->dn_indblkshift - SPA_BLKPTRSHIFT;
+                       ASSERTV(int epbs = dn->dn_phys->dn_indblkshift -
+                           SPA_BLKPTRSHIFT);
                        ASSERT3U(BP_GET_LSIZE(db->db_blkptr), ==,
                            db->db.db_size);
                        ASSERT3U(dn->dn_phys->dn_maxblkid
                            >> (db->db_level * epbs), >=, db->db_blkid);
                        arc_set_callback(db->db_buf, dbuf_do_evict, db);
                }
+               DB_DNODE_EXIT(db);
                mutex_destroy(&dr->dt.di.dr_mtx);
                list_destroy(&dr->dt.di.dr_children);
        }
@@ -2348,9 +2699,173 @@ dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
        ASSERT(db->db_dirtycnt > 0);
        db->db_dirtycnt -= 1;
        db->db_data_pending = NULL;
+       dbuf_rele_and_unlock(db, (void *)(uintptr_t)txg);
+}
+
+static void
+dbuf_write_nofill_ready(zio_t *zio)
+{
+       dbuf_write_ready(zio, NULL, zio->io_private);
+}
+
+static void
+dbuf_write_nofill_done(zio_t *zio)
+{
+       dbuf_write_done(zio, NULL, zio->io_private);
+}
+
+static void
+dbuf_write_override_ready(zio_t *zio)
+{
+       dbuf_dirty_record_t *dr = zio->io_private;
+       dmu_buf_impl_t *db = dr->dr_dbuf;
+
+       dbuf_write_ready(zio, NULL, db);
+}
+
+static void
+dbuf_write_override_done(zio_t *zio)
+{
+       dbuf_dirty_record_t *dr = zio->io_private;
+       dmu_buf_impl_t *db = dr->dr_dbuf;
+       blkptr_t *obp = &dr->dt.dl.dr_overridden_by;
+
+       mutex_enter(&db->db_mtx);
+       if (!BP_EQUAL(zio->io_bp, obp)) {
+               if (!BP_IS_HOLE(obp))
+                       dsl_free(spa_get_dsl(zio->io_spa), zio->io_txg, obp);
+               arc_release(dr->dt.dl.dr_data, db);
+       }
        mutex_exit(&db->db_mtx);
 
-       dprintf_dbuf_bp(db, zio->io_bp, "bp: %s", "");
+       dbuf_write_done(zio, NULL, db);
+}
+
+static void
+dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx)
+{
+       dmu_buf_impl_t *db = dr->dr_dbuf;
+       dnode_t *dn;
+       objset_t *os;
+       dmu_buf_impl_t *parent = db->db_parent;
+       uint64_t txg = tx->tx_txg;
+       zbookmark_t zb;
+       zio_prop_t zp;
+       zio_t *zio;
+       int wp_flag = 0;
+
+       DB_DNODE_ENTER(db);
+       dn = DB_DNODE(db);
+       os = dn->dn_objset;
+
+       if (db->db_state != DB_NOFILL) {
+               if (db->db_level > 0 || dn->dn_type == DMU_OT_DNODE) {
+                       /*
+                        * Private object buffers are released here rather
+                        * than in dbuf_dirty() since they are only modified
+                        * in the syncing context and we don't want the
+                        * overhead of making multiple copies of the data.
+                        */
+                       if (BP_IS_HOLE(db->db_blkptr)) {
+                               arc_buf_thaw(data);
+                       } else {
+                               dbuf_release_bp(db);
+                       }
+               }
+       }
+
+       if (parent != dn->dn_dbuf) {
+               ASSERT(parent && parent->db_data_pending);
+               ASSERT(db->db_level == parent->db_level-1);
+               ASSERT(arc_released(parent->db_buf));
+               zio = parent->db_data_pending->dr_zio;
+       } else {
+               ASSERT((db->db_level == dn->dn_phys->dn_nlevels-1 &&
+                   db->db_blkid != DMU_SPILL_BLKID) ||
+                   (db->db_blkid == DMU_SPILL_BLKID && db->db_level == 0));
+               if (db->db_blkid != DMU_SPILL_BLKID)
+                       ASSERT3P(db->db_blkptr, ==,
+                           &dn->dn_phys->dn_blkptr[db->db_blkid]);
+               zio = dn->dn_zio;
+       }
+
+       ASSERT(db->db_level == 0 || data == db->db_buf);
+       ASSERT3U(db->db_blkptr->blk_birth, <=, txg);
+       ASSERT(zio);
+
+       SET_BOOKMARK(&zb, os->os_dsl_dataset ?
+           os->os_dsl_dataset->ds_object : DMU_META_OBJSET,
+           db->db.db_object, db->db_level, db->db_blkid);
+
+       if (db->db_blkid == DMU_SPILL_BLKID)
+               wp_flag = WP_SPILL;
+       wp_flag |= (db->db_state == DB_NOFILL) ? WP_NOFILL : 0;
 
-       dbuf_rele(db, (void *)(uintptr_t)txg);
+       dmu_write_policy(os, dn, db->db_level, wp_flag, &zp);
+       DB_DNODE_EXIT(db);
+
+       if (db->db_level == 0 && dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
+               ASSERT(db->db_state != DB_NOFILL);
+               dr->dr_zio = zio_write(zio, os->os_spa, txg,
+                   db->db_blkptr, data->b_data, arc_buf_size(data), &zp,
+                   dbuf_write_override_ready, dbuf_write_override_done, dr,
+                   ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
+               mutex_enter(&db->db_mtx);
+               dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
+               zio_write_override(dr->dr_zio, &dr->dt.dl.dr_overridden_by,
+                   dr->dt.dl.dr_copies);
+               mutex_exit(&db->db_mtx);
+       } else if (db->db_state == DB_NOFILL) {
+               ASSERT(zp.zp_checksum == ZIO_CHECKSUM_OFF);
+               dr->dr_zio = zio_write(zio, os->os_spa, txg,
+                   db->db_blkptr, NULL, db->db.db_size, &zp,
+                   dbuf_write_nofill_ready, dbuf_write_nofill_done, db,
+                   ZIO_PRIORITY_ASYNC_WRITE,
+                   ZIO_FLAG_MUSTSUCCEED | ZIO_FLAG_NODATA, &zb);
+       } else {
+               ASSERT(arc_released(data));
+               dr->dr_zio = arc_write(zio, os->os_spa, txg,
+                   db->db_blkptr, data, DBUF_IS_L2CACHEABLE(db),
+                   DBUF_IS_L2COMPRESSIBLE(db), &zp, dbuf_write_ready,
+                   dbuf_write_done, db, ZIO_PRIORITY_ASYNC_WRITE,
+                   ZIO_FLAG_MUSTSUCCEED, &zb);
+       }
 }
+
+#if defined(_KERNEL) && defined(HAVE_SPL)
+EXPORT_SYMBOL(dbuf_find);
+EXPORT_SYMBOL(dbuf_is_metadata);
+EXPORT_SYMBOL(dbuf_evict);
+EXPORT_SYMBOL(dbuf_loan_arcbuf);
+EXPORT_SYMBOL(dbuf_whichblock);
+EXPORT_SYMBOL(dbuf_read);
+EXPORT_SYMBOL(dbuf_unoverride);
+EXPORT_SYMBOL(dbuf_free_range);
+EXPORT_SYMBOL(dbuf_new_size);
+EXPORT_SYMBOL(dbuf_release_bp);
+EXPORT_SYMBOL(dbuf_dirty);
+EXPORT_SYMBOL(dmu_buf_will_dirty);
+EXPORT_SYMBOL(dmu_buf_will_not_fill);
+EXPORT_SYMBOL(dmu_buf_will_fill);
+EXPORT_SYMBOL(dmu_buf_fill_done);
+EXPORT_SYMBOL(dmu_buf_rele);
+EXPORT_SYMBOL(dbuf_assign_arcbuf);
+EXPORT_SYMBOL(dbuf_clear);
+EXPORT_SYMBOL(dbuf_prefetch);
+EXPORT_SYMBOL(dbuf_hold_impl);
+EXPORT_SYMBOL(dbuf_hold);
+EXPORT_SYMBOL(dbuf_hold_level);
+EXPORT_SYMBOL(dbuf_create_bonus);
+EXPORT_SYMBOL(dbuf_spill_set_blksz);
+EXPORT_SYMBOL(dbuf_rm_spill);
+EXPORT_SYMBOL(dbuf_add_ref);
+EXPORT_SYMBOL(dbuf_rele);
+EXPORT_SYMBOL(dbuf_rele_and_unlock);
+EXPORT_SYMBOL(dbuf_refcount);
+EXPORT_SYMBOL(dbuf_sync_list);
+EXPORT_SYMBOL(dmu_buf_set_user);
+EXPORT_SYMBOL(dmu_buf_set_user_ie);
+EXPORT_SYMBOL(dmu_buf_update_user);
+EXPORT_SYMBOL(dmu_buf_get_user);
+EXPORT_SYMBOL(dmu_buf_freeable);
+#endif