Rebase master to b121
[zfs.git] / module / zfs / dsl_dataset.c
index e488b2b..edc36e7 100644 (file)
@@ -19,7 +19,7 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
@@ -39,6 +39,7 @@
 #include <sys/spa.h>
 #include <sys/zfs_znode.h>
 #include <sys/sunddi.h>
+#include <sys/zvol.h>
 
 static char *dsl_reaper = "the grim reaper";
 
@@ -229,7 +230,7 @@ dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
        return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
 }
 
-int
+boolean_t
 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
 {
        return (blk_birth > dsl_dataset_prev_snap_txg(ds));
@@ -262,6 +263,7 @@ dsl_dataset_evict(dmu_buf_t *db, void *dsv)
        ASSERT(!list_link_active(&ds->ds_synced_link));
 
        mutex_destroy(&ds->ds_lock);
+       mutex_destroy(&ds->ds_recvlock);
        mutex_destroy(&ds->ds_opening_lock);
        mutex_destroy(&ds->ds_deadlist.bpl_lock);
        rw_destroy(&ds->ds_rwlock);
@@ -359,6 +361,7 @@ dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
                ds->ds_phys = dbuf->db_data;
 
                mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
+               mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
                mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
                mutex_init(&ds->ds_deadlist.bpl_lock, NULL, MUTEX_DEFAULT,
                    NULL);
@@ -377,6 +380,7 @@ dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
                         * just opened it.
                         */
                        mutex_destroy(&ds->ds_lock);
+                       mutex_destroy(&ds->ds_recvlock);
                        mutex_destroy(&ds->ds_opening_lock);
                        mutex_destroy(&ds->ds_deadlist.bpl_lock);
                        rw_destroy(&ds->ds_rwlock);
@@ -406,8 +410,15 @@ dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
                                        dsl_dataset_rele(origin, FTAG);
                                }
                        }
-               } else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
-                       err = dsl_dataset_get_snapname(ds);
+               } else {
+                       if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
+                               err = dsl_dataset_get_snapname(ds);
+                       if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
+                               err = zap_count(
+                                   ds->ds_dir->dd_pool->dp_meta_objset,
+                                   ds->ds_phys->ds_userrefs_obj,
+                                   &ds->ds_userrefs);
+                       }
                }
 
                if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
@@ -448,6 +459,7 @@ dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
                                dsl_dataset_drop_ref(ds->ds_prev, ds);
                        dsl_dir_close(ds->ds_dir, ds);
                        mutex_destroy(&ds->ds_lock);
+                       mutex_destroy(&ds->ds_recvlock);
                        mutex_destroy(&ds->ds_opening_lock);
                        mutex_destroy(&ds->ds_deadlist.bpl_lock);
                        rw_destroy(&ds->ds_rwlock);
@@ -548,6 +560,7 @@ dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, int flags, void *owner,
                return (err);
        if (!dsl_dataset_tryown(*dsp, DS_MODE_IS_INCONSISTENT(flags), owner)) {
                dsl_dataset_rele(*dsp, owner);
+               *dsp = NULL;
                return (EBUSY);
        }
        return (0);
@@ -844,6 +857,7 @@ struct destroyarg {
        dsl_sync_task_group_t *dstg;
        char *snapname;
        char *failed;
+       boolean_t defer;
 };
 
 static int
@@ -851,23 +865,30 @@ dsl_snapshot_destroy_one(char *name, void *arg)
 {
        struct destroyarg *da = arg;
        dsl_dataset_t *ds;
-       char *cp;
        int err;
-
-       (void) strcat(name, "@");
-       (void) strcat(name, da->snapname);
-       err = dsl_dataset_own(name, DS_MODE_READONLY | DS_MODE_INCONSISTENT,
+       char *dsname;
+       size_t buflen;
+
+       /* alloc a buffer to hold name@snapname, plus the terminating NULL */
+       buflen = strlen(name) + strlen(da->snapname) + 2;
+       dsname = kmem_alloc(buflen, KM_SLEEP);
+       (void) snprintf(dsname, buflen, "%s@%s", name, da->snapname);
+       err = dsl_dataset_own(dsname, DS_MODE_READONLY | DS_MODE_INCONSISTENT,
            da->dstg, &ds);
-       cp = strchr(name, '@');
-       *cp = '\0';
+       kmem_free(dsname, buflen);
        if (err == 0) {
+               struct dsl_ds_destroyarg *dsda;
+
                dsl_dataset_make_exclusive(ds, da->dstg);
                if (ds->ds_user_ptr) {
                        ds->ds_user_evict_func(ds, ds->ds_user_ptr);
                        ds->ds_user_ptr = NULL;
                }
+               dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg), KM_SLEEP);
+               dsda->ds = ds;
+               dsda->defer = da->defer;
                dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
-                   dsl_dataset_destroy_sync, ds, da->dstg, 0);
+                   dsl_dataset_destroy_sync, dsda, da->dstg, 0);
        } else if (err == ENOENT) {
                err = 0;
        } else {
@@ -881,7 +902,7 @@ dsl_snapshot_destroy_one(char *name, void *arg)
  */
 #pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
 int
-dsl_snapshots_destroy(char *fsname, char *snapname)
+dsl_snapshots_destroy(char *fsname, char *snapname, boolean_t defer)
 {
        int err;
        struct destroyarg da;
@@ -894,6 +915,7 @@ dsl_snapshots_destroy(char *fsname, char *snapname)
        da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
        da.snapname = snapname;
        da.failed = fsname;
+       da.defer = defer;
 
        err = dmu_objset_find(fsname,
            dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
@@ -903,7 +925,9 @@ dsl_snapshots_destroy(char *fsname, char *snapname)
 
        for (dst = list_head(&da.dstg->dstg_tasks); dst;
            dst = list_next(&da.dstg->dstg_tasks, dst)) {
-               dsl_dataset_t *ds = dst->dst_arg1;
+               struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
+               dsl_dataset_t *ds = dsda->ds;
+
                /*
                 * Return the file system name that triggered the error
                 */
@@ -911,7 +935,9 @@ dsl_snapshots_destroy(char *fsname, char *snapname)
                        dsl_dataset_name(ds, fsname);
                        *strchr(fsname, '@') = '\0';
                }
+               ASSERT3P(dsda->rm_origin, ==, NULL);
                dsl_dataset_disown(ds, da.dstg);
+               kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
        }
 
        dsl_sync_task_group_destroy(da.dstg);
@@ -919,18 +945,100 @@ dsl_snapshots_destroy(char *fsname, char *snapname)
        return (err);
 }
 
+static boolean_t
+dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
+{
+       boolean_t might_destroy = B_FALSE;
+
+       mutex_enter(&ds->ds_lock);
+       if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
+           DS_IS_DEFER_DESTROY(ds))
+               might_destroy = B_TRUE;
+       mutex_exit(&ds->ds_lock);
+
+       return (might_destroy);
+}
+
+#ifdef _KERNEL
+static int
+dsl_dataset_zvol_cleanup(dsl_dataset_t *ds, const char *name)
+{
+       int error;
+       objset_t *os;
+
+       error = dmu_objset_open_ds(ds, DMU_OST_ANY, &os);
+       if (error)
+               return (error);
+
+       if (dmu_objset_type(os) == DMU_OST_ZVOL)
+               error = zvol_remove_minor(name);
+       dmu_objset_close(os);
+
+       return (error);
+}
+#endif
+
+/*
+ * If we're removing a clone, and these three conditions are true:
+ *     1) the clone's origin has no other children
+ *     2) the clone's origin has no user references
+ *     3) the clone's origin has been marked for deferred destruction
+ * Then, prepare to remove the origin as part of this sync task group.
+ */
+static int
+dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
+{
+       dsl_dataset_t *ds = dsda->ds;
+       dsl_dataset_t *origin = ds->ds_prev;
+
+       if (dsl_dataset_might_destroy_origin(origin)) {
+               char *name;
+               int namelen;
+               int error;
+
+               namelen = dsl_dataset_namelen(origin) + 1;
+               name = kmem_alloc(namelen, KM_SLEEP);
+               dsl_dataset_name(origin, name);
+#ifdef _KERNEL
+               error = zfs_unmount_snap(name, NULL);
+               if (error) {
+                       kmem_free(name, namelen);
+                       return (error);
+               }
+               error = dsl_dataset_zvol_cleanup(origin, name);
+               if (error) {
+                       kmem_free(name, namelen);
+                       return (error);
+               }
+#endif
+               error = dsl_dataset_own(name,
+                   DS_MODE_READONLY | DS_MODE_INCONSISTENT,
+                   tag, &origin);
+               kmem_free(name, namelen);
+               if (error)
+                       return (error);
+               dsda->rm_origin = origin;
+               dsl_dataset_make_exclusive(origin, tag);
+       }
+
+       return (0);
+}
+
 /*
  * ds must be opened as OWNER.  On return (whether successful or not),
  * ds will be closed and caller can no longer dereference it.
  */
 int
-dsl_dataset_destroy(dsl_dataset_t *ds, void *tag)
+dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
 {
        int err;
        dsl_sync_task_group_t *dstg;
        objset_t *os;
        dsl_dir_t *dd;
        uint64_t obj;
+       struct dsl_ds_destroyarg dsda = {0};
+
+       dsda.ds = ds;
 
        if (dsl_dataset_is_snapshot(ds)) {
                /* Destroying a snapshot is simpler */
@@ -940,9 +1048,12 @@ dsl_dataset_destroy(dsl_dataset_t *ds, void *tag)
                        ds->ds_user_evict_func(ds, ds->ds_user_ptr);
                        ds->ds_user_ptr = NULL;
                }
+               /* NOTE: defer is always B_FALSE for non-snapshots */
+               dsda.defer = defer;
                err = dsl_sync_task_do(ds->ds_dir->dd_pool,
                    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
-                   ds, tag, 0);
+                   &dsda, tag, 0);
+               ASSERT3P(dsda.rm_origin, ==, NULL);
                goto out;
        }
 
@@ -974,6 +1085,27 @@ dsl_dataset_destroy(dsl_dataset_t *ds, void *tag)
                (void) dmu_free_object(os, obj);
        }
 
+       /*
+        * We need to sync out all in-flight IO before we try to evict
+        * (the dataset evict func is trying to clear the cached entries
+        * for this dataset in the ARC).
+        */
+       txg_wait_synced(dd->dd_pool, 0);
+
+       /*
+        * If we managed to free all the objects in open
+        * context, the user space accounting should be zero.
+        */
+       if (ds->ds_phys->ds_bp.blk_fill == 0 &&
+           dmu_objset_userused_enabled(os->os)) {
+               uint64_t count;
+
+               ASSERT(zap_count(os, DMU_USERUSED_OBJECT, &count) != 0 ||
+                   count == 0);
+               ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT, &count) != 0 ||
+                   count == 0);
+       }
+
        dmu_objset_close(os);
        if (err != ESRCH)
                goto out;
@@ -1002,13 +1134,45 @@ dsl_dataset_destroy(dsl_dataset_t *ds, void *tag)
                ds->ds_user_evict_func(ds, ds->ds_user_ptr);
                ds->ds_user_ptr = NULL;
        }
-       dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
-       dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
-           dsl_dataset_destroy_sync, ds, tag, 0);
-       dsl_sync_task_create(dstg, dsl_dir_destroy_check,
-           dsl_dir_destroy_sync, dd, FTAG, 0);
-       err = dsl_sync_task_group_wait(dstg);
-       dsl_sync_task_group_destroy(dstg);
+
+       /*
+        * If we're removing a clone, we might also need to remove its
+        * origin.
+        */
+       do {
+               dsda.need_prep = B_FALSE;
+               if (dsl_dir_is_clone(dd)) {
+                       err = dsl_dataset_origin_rm_prep(&dsda, tag);
+                       if (err) {
+                               dsl_dir_close(dd, FTAG);
+                               goto out;
+                       }
+               }
+
+               dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
+               dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
+                   dsl_dataset_destroy_sync, &dsda, tag, 0);
+               dsl_sync_task_create(dstg, dsl_dir_destroy_check,
+                   dsl_dir_destroy_sync, dd, FTAG, 0);
+               err = dsl_sync_task_group_wait(dstg);
+               dsl_sync_task_group_destroy(dstg);
+
+               /*
+                * We could be racing against 'zfs release' or 'zfs destroy -d'
+                * on the origin snap, in which case we can get EBUSY if we
+                * needed to destroy the origin snap but were not ready to
+                * do so.
+                */
+               if (dsda.need_prep) {
+                       ASSERT(err == EBUSY);
+                       ASSERT(dsl_dir_is_clone(dd));
+                       ASSERT(dsda.rm_origin == NULL);
+               }
+       } while (dsda.need_prep);
+
+       if (dsda.rm_origin != NULL)
+               dsl_dataset_disown(dsda.rm_origin, tag);
+
        /* if it is successful, dsl_dir_destroy_sync will close the dd */
        if (err)
                dsl_dir_close(dd, FTAG);
@@ -1058,7 +1222,6 @@ dsl_dataset_get_user_ptr(dsl_dataset_t *ds)
        return (ds->ds_user_ptr);
 }
 
-
 blkptr_t *
 dsl_dataset_get_blkptr(dsl_dataset_t *ds)
 {
@@ -1164,8 +1327,18 @@ kill_blkptr(spa_t *spa, blkptr_t *bp, const zbookmark_t *zb,
        if (bp == NULL)
                return (0);
 
-       ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
-       (void) dsl_dataset_block_kill(ka->ds, bp, ka->zio, ka->tx);
+       if ((zb->zb_level == -1ULL && zb->zb_blkid != 0) ||
+           (zb->zb_object != 0 && dnp == NULL)) {
+               /*
+                * It's a block in the intent log.  It has no
+                * accounting, so just free it.
+                */
+               VERIFY3U(0, ==, dsl_free(ka->zio, ka->tx->tx_pool,
+                   ka->tx->tx_txg, bp, NULL, NULL, ARC_NOWAIT));
+       } else {
+               ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
+               (void) dsl_dataset_block_kill(ka->ds, bp, ka->zio, ka->tx);
+       }
 
        return (0);
 }
@@ -1180,7 +1353,8 @@ dsl_dataset_rollback_check(void *arg1, void *arg2, dmu_tx_t *tx)
        /*
         * We can only roll back to emptyness if it is a ZPL objset.
         */
-       if (*ost != DMU_OST_ZFS && ds->ds_phys->ds_prev_snap_txg == 0)
+       if (*ost != DMU_OST_ZFS &&
+           ds->ds_phys->ds_prev_snap_txg < TXG_INITIAL)
                return (EINVAL);
 
        /*
@@ -1209,13 +1383,7 @@ dsl_dataset_rollback_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 
        dmu_buf_will_dirty(ds->ds_dbuf, tx);
 
-       /*
-        * Before the roll back destroy the zil.
-        */
        if (ds->ds_user_ptr != NULL) {
-               zil_rollback_destroy(
-                   ((objset_impl_t *)ds->ds_user_ptr)->os_zil, tx);
-
                /*
                 * We need to make sure that the objset_impl_t is reopened after
                 * we do the rollback, otherwise it will have the wrong
@@ -1248,7 +1416,10 @@ dsl_dataset_rollback_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
            ds->ds_phys->ds_deadlist_obj));
 
        {
-               /* Free blkptrs that we gave birth to */
+               /*
+                * Free blkptrs that we gave birth to - this covers
+                * claimed but not played log blocks too.
+                */
                zio_t *zio;
                struct killarg ka;
 
@@ -1262,8 +1433,7 @@ dsl_dataset_rollback_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
                (void) zio_wait(zio);
        }
 
-       ASSERT(!(ds->ds_phys->ds_flags & DS_FLAG_UNIQUE_ACCURATE) ||
-           ds->ds_phys->ds_unique_bytes == 0);
+       ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) || ds->ds_phys->ds_unique_bytes == 0);
 
        if (ds->ds_prev && ds->ds_prev != ds->ds_dir->dd_pool->dp_origin_snap) {
                /* Change our contents to that of the prev snapshot */
@@ -1289,6 +1459,7 @@ dsl_dataset_rollback_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
        } else {
                objset_impl_t *osi;
 
+               ASSERT(*ost != DMU_OST_ZVOL);
                ASSERT3U(ds->ds_phys->ds_used_bytes, ==, 0);
                ASSERT3U(ds->ds_phys->ds_compressed_bytes, ==, 0);
                ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, ==, 0);
@@ -1358,18 +1529,63 @@ dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
            cr, "dataset = %llu", ds->ds_object);
 }
 
+static int
+dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
+    dmu_tx_t *tx)
+{
+       dsl_dataset_t *ds = dsda->ds;
+       dsl_dataset_t *ds_prev = ds->ds_prev;
+
+       if (dsl_dataset_might_destroy_origin(ds_prev)) {
+               struct dsl_ds_destroyarg ndsda = {0};
+
+               /*
+                * If we're not prepared to remove the origin, don't remove
+                * the clone either.
+                */
+               if (dsda->rm_origin == NULL) {
+                       dsda->need_prep = B_TRUE;
+                       return (EBUSY);
+               }
+
+               ndsda.ds = ds_prev;
+               ndsda.is_origin_rm = B_TRUE;
+               return (dsl_dataset_destroy_check(&ndsda, tag, tx));
+       }
+
+       /*
+        * If we're not going to remove the origin after all,
+        * undo the open context setup.
+        */
+       if (dsda->rm_origin != NULL) {
+               dsl_dataset_disown(dsda->rm_origin, tag);
+               dsda->rm_origin = NULL;
+       }
+
+       return (0);
+}
+
 /* ARGSUSED */
 int
 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
 {
-       dsl_dataset_t *ds = arg1;
+       struct dsl_ds_destroyarg *dsda = arg1;
+       dsl_dataset_t *ds = dsda->ds;
 
        /* we have an owner hold, so noone else can destroy us */
        ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
 
-       /* Can't delete a branch point. */
-       if (ds->ds_phys->ds_num_children > 1)
-               return (EEXIST);
+       /*
+        * Only allow deferred destroy on pools that support it.
+        * NOTE: deferred destroy is only supported on snapshots.
+        */
+       if (dsda->defer) {
+               if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
+                   SPA_VERSION_USERREFS)
+                       return (ENOTSUP);
+               ASSERT(dsl_dataset_is_snapshot(ds));
+               return (0);
+       }
 
        /*
         * Can't delete a head dataset if there are snapshots of it.
@@ -1387,6 +1603,31 @@ dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
        if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
                return (EAGAIN);
 
+       if (dsl_dataset_is_snapshot(ds)) {
+               /*
+                * If this snapshot has an elevated user reference count,
+                * we can't destroy it yet.
+                */
+               if (ds->ds_userrefs > 0 && !dsda->releasing)
+                       return (EBUSY);
+
+               mutex_enter(&ds->ds_lock);
+               /*
+                * Can't delete a branch point. However, if we're destroying
+                * a clone and removing its origin due to it having a user
+                * hold count of 0 and having been marked for deferred destroy,
+                * it's OK for the origin to have a single clone.
+                */
+               if (ds->ds_phys->ds_num_children >
+                   (dsda->is_origin_rm ? 2 : 1)) {
+                       mutex_exit(&ds->ds_lock);
+                       return (EEXIST);
+               }
+               mutex_exit(&ds->ds_lock);
+       } else if (dsl_dir_is_clone(ds->ds_dir)) {
+               return (dsl_dataset_origin_check(dsda, arg2, tx));
+       }
+
        /* XXX we should do some i/o error checking... */
        return (0);
 }
@@ -1434,7 +1675,8 @@ dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
 void
 dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
 {
-       dsl_dataset_t *ds = arg1;
+       struct dsl_ds_destroyarg *dsda = arg1;
+       dsl_dataset_t *ds = dsda->ds;
        zio_t *zio;
        int err;
        int after_branch_point = FALSE;
@@ -1444,11 +1686,20 @@ dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
        uint64_t obj;
 
        ASSERT(ds->ds_owner);
-       ASSERT3U(ds->ds_phys->ds_num_children, <=, 1);
+       ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
        ASSERT(ds->ds_prev == NULL ||
            ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
        ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
 
+       if (dsda->defer) {
+               ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
+               if (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1) {
+                       dmu_buf_will_dirty(ds->ds_dbuf, tx);
+                       ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
+                       return;
+               }
+       }
+
        /* signal any waiters that this dataset is going away */
        mutex_enter(&ds->ds_lock);
        ds->ds_owner = dsl_reaper;
@@ -1481,7 +1732,7 @@ dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
                dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
                if (after_branch_point &&
                    ds_prev->ds_phys->ds_next_clones_obj != 0) {
-                       VERIFY(0 == zap_remove_int(mos,
+                       VERIFY3U(0, ==, zap_remove_int(mos,
                            ds_prev->ds_phys->ds_next_clones_obj, obj, tx));
                        if (ds->ds_phys->ds_next_snap_obj != 0) {
                                VERIFY(0 == zap_add_int(mos,
@@ -1494,6 +1745,20 @@ dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
                        /* This clone is toast. */
                        ASSERT(ds_prev->ds_phys->ds_num_children > 1);
                        ds_prev->ds_phys->ds_num_children--;
+
+                       /*
+                        * If the clone's origin has no other clones, no
+                        * user holds, and has been marked for deferred
+                        * deletion, then we should have done the necessary
+                        * destroy setup for it.
+                        */
+                       if (ds_prev->ds_phys->ds_num_children == 1 &&
+                           ds_prev->ds_userrefs == 0 &&
+                           DS_IS_DEFER_DESTROY(ds_prev)) {
+                               ASSERT3P(dsda->rm_origin, !=, NULL);
+                       } else {
+                               ASSERT3P(dsda->rm_origin, ==, NULL);
+                       }
                } else if (!after_branch_point) {
                        ds_prev->ds_phys->ds_next_snap_obj =
                            ds->ds_phys->ds_next_snap_obj;
@@ -1654,7 +1919,7 @@ dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
                err = traverse_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
                    TRAVERSE_POST, kill_blkptr, &ka);
                ASSERT3U(err, ==, 0);
-               ASSERT(spa_version(dp->dp_spa) < SPA_VERSION_UNIQUE_ACCURATE ||
+               ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
                    ds->ds_phys->ds_unique_bytes == 0);
        }
 
@@ -1706,10 +1971,32 @@ dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
        }
        if (ds->ds_phys->ds_props_obj != 0)
                VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
+       if (ds->ds_phys->ds_userrefs_obj != 0)
+               VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
        dsl_dir_close(ds->ds_dir, ds);
        ds->ds_dir = NULL;
        dsl_dataset_drain_refs(ds, tag);
        VERIFY(0 == dmu_object_free(mos, obj, tx));
+
+       if (dsda->rm_origin) {
+               /*
+                * Remove the origin of the clone we just destroyed.
+                */
+               dsl_dataset_t *origin = ds->ds_prev;
+               struct dsl_ds_destroyarg ndsda = {0};
+
+               ASSERT3P(origin, ==, dsda->rm_origin);
+               if (origin->ds_user_ptr) {
+                       origin->ds_user_evict_func(origin, origin->ds_user_ptr);
+                       origin->ds_user_ptr = NULL;
+               }
+
+               dsl_dataset_rele(origin, tag);
+               ds->ds_prev = NULL;
+
+               ndsda.ds = origin;
+               dsl_dataset_destroy_sync(&ndsda, tag, cr, tx);
+       }
 }
 
 static int
@@ -1924,6 +2211,9 @@ dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
            ds->ds_reserved);
        dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
            ds->ds_phys->ds_guid);
+       dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS, ds->ds_userrefs);
+       dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
+           DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
 
        if (ds->ds_phys->ds_next_snap_obj) {
                /*
@@ -2204,6 +2494,12 @@ dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
        err = dsl_dir_open(oldname, FTAG, &dd, &tail);
        if (err)
                return (err);
+       /*
+        * If there are more than 2 references there may be holds
+        * hanging around that haven't been cleared out yet.
+        */
+       if (dmu_buf_refcount(dd->dd_dbuf) > 2)
+               txg_wait_synced(dd->dd_pool, 0);
        if (tail == NULL) {
                int delta = strlen(newname) - strlen(oldname);
 
@@ -2577,7 +2873,7 @@ snaplist_destroy(list_t *l, boolean_t own)
 {
        struct promotenode *snap;
 
-       if (!list_link_active(&l->list_head))
+       if (!l || !list_link_active(&l->list_head))
                return;
 
        while ((snap = list_tail(l)) != NULL) {
@@ -2986,7 +3282,7 @@ dsl_dataset_set_quota_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
 
        ds->ds_quota = new_quota;
 
-       dsl_prop_set_uint64_sync(ds->ds_dir, "refquota", new_quota, cr, tx);
+       dsl_dir_prop_set_uint64_sync(ds->ds_dir, "refquota", new_quota, cr, tx);
 
        spa_history_internal_log(LOG_DS_REFQUOTA, ds->ds_dir->dd_pool->dp_spa,
            tx, cr, "%lld dataset = %llu ",
@@ -3024,12 +3320,8 @@ dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
        dsl_dataset_t *ds = arg1;
        uint64_t *reservationp = arg2;
        uint64_t new_reservation = *reservationp;
-       int64_t delta;
        uint64_t unique;
 
-       if (new_reservation > INT64_MAX)
-               return (EOVERFLOW);
-
        if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
            SPA_VERSION_REFRESERVATION)
                return (ENOTSUP);
@@ -3046,15 +3338,18 @@ dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
 
        mutex_enter(&ds->ds_lock);
        unique = dsl_dataset_unique(ds);
-       delta = MAX(unique, new_reservation) - MAX(unique, ds->ds_reserved);
        mutex_exit(&ds->ds_lock);
 
-       if (delta > 0 &&
-           delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
-               return (ENOSPC);
-       if (delta > 0 && ds->ds_quota > 0 &&
-           new_reservation > ds->ds_quota)
-               return (ENOSPC);
+       if (MAX(unique, new_reservation) > MAX(unique, ds->ds_reserved)) {
+               uint64_t delta = MAX(unique, new_reservation) -
+                   MAX(unique, ds->ds_reserved);
+
+               if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
+                       return (ENOSPC);
+               if (ds->ds_quota > 0 &&
+                   new_reservation > ds->ds_quota)
+                       return (ENOSPC);
+       }
 
        return (0);
 }
@@ -3082,7 +3377,7 @@ dsl_dataset_set_reservation_sync(void *arg1, void *arg2, cred_t *cr,
 
        dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
        mutex_exit(&ds->ds_dir->dd_lock);
-       dsl_prop_set_uint64_sync(ds->ds_dir, "refreservation",
+       dsl_dir_prop_set_uint64_sync(ds->ds_dir, "refreservation",
            new_reservation, cr, tx);
 
        spa_history_internal_log(LOG_DS_REFRESERV,
@@ -3106,3 +3401,421 @@ dsl_dataset_set_reservation(const char *dsname, uint64_t reservation)
        dsl_dataset_rele(ds, FTAG);
        return (err);
 }
+
+static int
+dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
+{
+       dsl_dataset_t *ds = arg1;
+       char *htag = arg2;
+       objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+       int error = 0;
+
+       if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
+               return (ENOTSUP);
+
+       if (!dsl_dataset_is_snapshot(ds))
+               return (EINVAL);
+
+       if (strlen(htag) >= ZAP_MAXNAMELEN)
+               return (ENAMETOOLONG);
+
+       /* tags must be unique */
+       mutex_enter(&ds->ds_lock);
+       if (ds->ds_phys->ds_userrefs_obj) {
+               error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
+                   8, 1, tx);
+               if (error == 0)
+                       error = EEXIST;
+               else if (error == ENOENT)
+                       error = 0;
+       }
+       mutex_exit(&ds->ds_lock);
+
+       return (error);
+}
+
+static void
+dsl_dataset_user_hold_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
+{
+       dsl_dataset_t *ds = arg1;
+       char *htag = arg2;
+       objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+       time_t now = gethrestime_sec();
+       uint64_t zapobj;
+
+       mutex_enter(&ds->ds_lock);
+       if (ds->ds_phys->ds_userrefs_obj == 0) {
+               /*
+                * This is the first user hold for this dataset.  Create
+                * the userrefs zap object.
+                */
+               dmu_buf_will_dirty(ds->ds_dbuf, tx);
+               zapobj = ds->ds_phys->ds_userrefs_obj =
+                   zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
+       } else {
+               zapobj = ds->ds_phys->ds_userrefs_obj;
+       }
+       ds->ds_userrefs++;
+       mutex_exit(&ds->ds_lock);
+
+       VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
+
+       spa_history_internal_log(LOG_DS_USER_HOLD,
+           ds->ds_dir->dd_pool->dp_spa, tx, cr, "<%s> dataset = %llu",
+           htag, ds->ds_object);
+}
+
+struct dsl_ds_holdarg {
+       dsl_sync_task_group_t *dstg;
+       char *htag;
+       char *snapname;
+       boolean_t recursive;
+       char failed[MAXPATHLEN];
+};
+
+static int
+dsl_dataset_user_hold_one(char *dsname, void *arg)
+{
+       struct dsl_ds_holdarg *ha = arg;
+       dsl_dataset_t *ds;
+       int error;
+       char *name;
+       size_t buflen;
+
+       /* alloc a buffer to hold dsname@snapname plus terminating NULL */
+       buflen = strlen(dsname) + strlen(ha->snapname) + 2;
+       name = kmem_alloc(buflen, KM_SLEEP);
+       (void) snprintf(name, buflen, "%s@%s", dsname, ha->snapname);
+       error = dsl_dataset_hold(name, ha->dstg, &ds);
+       kmem_free(name, buflen);
+       if (error == 0) {
+               dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
+                   dsl_dataset_user_hold_sync, ds, ha->htag, 0);
+       } else if (error == ENOENT && ha->recursive) {
+               error = 0;
+       } else {
+               (void) strcpy(ha->failed, dsname);
+       }
+       return (error);
+}
+
+int
+dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
+    boolean_t recursive)
+{
+       struct dsl_ds_holdarg *ha;
+       dsl_sync_task_t *dst;
+       spa_t *spa;
+       int error;
+
+       ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
+
+       (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
+
+       error = spa_open(dsname, &spa, FTAG);
+       if (error) {
+               kmem_free(ha, sizeof (struct dsl_ds_holdarg));
+               return (error);
+       }
+
+       ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
+       ha->htag = htag;
+       ha->snapname = snapname;
+       ha->recursive = recursive;
+       if (recursive) {
+               error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
+                   ha, DS_FIND_CHILDREN);
+       } else {
+               error = dsl_dataset_user_hold_one(dsname, ha);
+       }
+       if (error == 0)
+               error = dsl_sync_task_group_wait(ha->dstg);
+
+       for (dst = list_head(&ha->dstg->dstg_tasks); dst;
+           dst = list_next(&ha->dstg->dstg_tasks, dst)) {
+               dsl_dataset_t *ds = dst->dst_arg1;
+
+               if (dst->dst_err) {
+                       dsl_dataset_name(ds, ha->failed);
+                       *strchr(ha->failed, '@') = '\0';
+               }
+               dsl_dataset_rele(ds, ha->dstg);
+       }
+
+       if (error)
+               (void) strcpy(dsname, ha->failed);
+
+       dsl_sync_task_group_destroy(ha->dstg);
+       kmem_free(ha, sizeof (struct dsl_ds_holdarg));
+       spa_close(spa, FTAG);
+       return (error);
+}
+
+struct dsl_ds_releasearg {
+       dsl_dataset_t *ds;
+       const char *htag;
+       boolean_t own;          /* do we own or just hold ds? */
+};
+
+static int
+dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
+    boolean_t *might_destroy)
+{
+       objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+       uint64_t zapobj;
+       uint64_t tmp;
+       int error;
+
+       *might_destroy = B_FALSE;
+
+       mutex_enter(&ds->ds_lock);
+       zapobj = ds->ds_phys->ds_userrefs_obj;
+       if (zapobj == 0) {
+               /* The tag can't possibly exist */
+               mutex_exit(&ds->ds_lock);
+               return (ESRCH);
+       }
+
+       /* Make sure the tag exists */
+       error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
+       if (error) {
+               mutex_exit(&ds->ds_lock);
+               if (error == ENOENT)
+                       error = ESRCH;
+               return (error);
+       }
+
+       if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
+           DS_IS_DEFER_DESTROY(ds))
+               *might_destroy = B_TRUE;
+
+       mutex_exit(&ds->ds_lock);
+       return (0);
+}
+
+static int
+dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
+{
+       struct dsl_ds_releasearg *ra = arg1;
+       dsl_dataset_t *ds = ra->ds;
+       boolean_t might_destroy;
+       int error;
+
+       if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
+               return (ENOTSUP);
+
+       error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
+       if (error)
+               return (error);
+
+       if (might_destroy) {
+               struct dsl_ds_destroyarg dsda = {0};
+
+               if (dmu_tx_is_syncing(tx)) {
+                       /*
+                        * If we're not prepared to remove the snapshot,
+                        * we can't allow the release to happen right now.
+                        */
+                       if (!ra->own)
+                               return (EBUSY);
+                       if (ds->ds_user_ptr) {
+                               ds->ds_user_evict_func(ds, ds->ds_user_ptr);
+                               ds->ds_user_ptr = NULL;
+                       }
+               }
+               dsda.ds = ds;
+               dsda.releasing = B_TRUE;
+               return (dsl_dataset_destroy_check(&dsda, tag, tx));
+       }
+
+       return (0);
+}
+
+static void
+dsl_dataset_user_release_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
+{
+       struct dsl_ds_releasearg *ra = arg1;
+       dsl_dataset_t *ds = ra->ds;
+       spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
+       objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+       uint64_t zapobj;
+       uint64_t dsobj = ds->ds_object;
+       uint64_t refs;
+
+       mutex_enter(&ds->ds_lock);
+       ds->ds_userrefs--;
+       refs = ds->ds_userrefs;
+       mutex_exit(&ds->ds_lock);
+       zapobj = ds->ds_phys->ds_userrefs_obj;
+       VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
+       if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
+           DS_IS_DEFER_DESTROY(ds)) {
+               struct dsl_ds_destroyarg dsda = {0};
+
+               ASSERT(ra->own);
+               dsda.ds = ds;
+               dsda.releasing = B_TRUE;
+               /* We already did the destroy_check */
+               dsl_dataset_destroy_sync(&dsda, tag, cr, tx);
+       }
+
+       spa_history_internal_log(LOG_DS_USER_RELEASE,
+           spa, tx, cr, "<%s> %lld dataset = %llu",
+           ra->htag, (longlong_t)refs, dsobj);
+}
+
+static int
+dsl_dataset_user_release_one(char *dsname, void *arg)
+{
+       struct dsl_ds_holdarg *ha = arg;
+       struct dsl_ds_releasearg *ra;
+       dsl_dataset_t *ds;
+       int error;
+       void *dtag = ha->dstg;
+       char *name;
+       size_t buflen;
+       boolean_t own = B_FALSE;
+       boolean_t might_destroy;
+
+       if (strlen(ha->htag) >= ZAP_MAXNAMELEN)
+               return (ENAMETOOLONG);
+
+       /* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
+       buflen = strlen(dsname) + strlen(ha->snapname) + 2;
+       name = kmem_alloc(buflen, KM_SLEEP);
+       (void) snprintf(name, buflen, "%s@%s", dsname, ha->snapname);
+       error = dsl_dataset_hold(name, dtag, &ds);
+       kmem_free(name, buflen);
+       if (error == ENOENT && ha->recursive)
+               return (0);
+       (void) strcpy(ha->failed, dsname);
+       if (error)
+               return (error);
+
+       ASSERT(dsl_dataset_is_snapshot(ds));
+
+       error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
+       if (error) {
+               dsl_dataset_rele(ds, dtag);
+               return (error);
+       }
+
+       if (might_destroy) {
+#ifdef _KERNEL
+               error = zfs_unmount_snap(name, NULL);
+               if (error) {
+                       dsl_dataset_rele(ds, dtag);
+                       return (error);
+               }
+               error = dsl_dataset_zvol_cleanup(ds, name);
+               if (error) {
+                       dsl_dataset_rele(ds, dtag);
+                       return (error);
+               }
+#endif
+               if (!dsl_dataset_tryown(ds,
+                   DS_MODE_READONLY | DS_MODE_INCONSISTENT, dtag)) {
+                       dsl_dataset_rele(ds, dtag);
+                       return (EBUSY);
+               } else {
+                       own = B_TRUE;
+                       dsl_dataset_make_exclusive(ds, dtag);
+               }
+       }
+
+       ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
+       ra->ds = ds;
+       ra->htag = ha->htag;
+       ra->own = own;
+       dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
+           dsl_dataset_user_release_sync, ra, dtag, 0);
+
+       return (0);
+}
+
+int
+dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
+    boolean_t recursive)
+{
+       struct dsl_ds_holdarg *ha;
+       dsl_sync_task_t *dst;
+       spa_t *spa;
+       int error;
+
+       ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
+
+       (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
+
+       error = spa_open(dsname, &spa, FTAG);
+       if (error) {
+               kmem_free(ha, sizeof (struct dsl_ds_holdarg));
+               return (error);
+       }
+
+       ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
+       ha->htag = htag;
+       ha->snapname = snapname;
+       ha->recursive = recursive;
+       if (recursive) {
+               error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
+                   ha, DS_FIND_CHILDREN);
+       } else {
+               error = dsl_dataset_user_release_one(dsname, ha);
+       }
+       if (error == 0)
+               error = dsl_sync_task_group_wait(ha->dstg);
+
+       for (dst = list_head(&ha->dstg->dstg_tasks); dst;
+           dst = list_next(&ha->dstg->dstg_tasks, dst)) {
+               struct dsl_ds_releasearg *ra = dst->dst_arg1;
+               dsl_dataset_t *ds = ra->ds;
+
+               if (dst->dst_err)
+                       dsl_dataset_name(ds, ha->failed);
+
+               if (ra->own)
+                       dsl_dataset_disown(ds, ha->dstg);
+               else
+                       dsl_dataset_rele(ds, ha->dstg);
+
+               kmem_free(ra, sizeof (struct dsl_ds_releasearg));
+       }
+
+       if (error)
+               (void) strcpy(dsname, ha->failed);
+
+       dsl_sync_task_group_destroy(ha->dstg);
+       kmem_free(ha, sizeof (struct dsl_ds_holdarg));
+       spa_close(spa, FTAG);
+       return (error);
+}
+
+int
+dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
+{
+       dsl_dataset_t *ds;
+       int err;
+
+       err = dsl_dataset_hold(dsname, FTAG, &ds);
+       if (err)
+               return (err);
+
+       VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
+       if (ds->ds_phys->ds_userrefs_obj != 0) {
+               zap_attribute_t *za;
+               zap_cursor_t zc;
+
+               za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
+               for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
+                   ds->ds_phys->ds_userrefs_obj);
+                   zap_cursor_retrieve(&zc, za) == 0;
+                   zap_cursor_advance(&zc)) {
+                       VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
+                           za->za_first_integer));
+               }
+               zap_cursor_fini(&zc);
+               kmem_free(za, sizeof (zap_attribute_t));
+       }
+       dsl_dataset_rele(ds, FTAG);
+       return (0);
+}