Illumos #2619 and #2747
[zfs.git] / module / zfs / dsl_dataset.c
index 21fdd08..872d44a 100644 (file)
@@ -20,7 +20,7 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2011 by Delphix. All rights reserved.
+ * Copyright (c) 2012 by Delphix. All rights reserved.
  * Copyright (c) 2012, Joyent, Inc. All rights reserved.
  */
 
@@ -35,6 +35,7 @@
 #include <sys/arc.h>
 #include <sys/zio.h>
 #include <sys/zap.h>
+#include <sys/zfeature.h>
 #include <sys/unique.h>
 #include <sys/zfs_context.h>
 #include <sys/zfs_ioctl.h>
@@ -102,7 +103,7 @@ dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
        if (BP_IS_HOLE(bp))
                return;
        ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
-       ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
+       ASSERT(DMU_OT_IS_VALID(BP_GET_TYPE(bp)));
        if (ds == NULL) {
                /*
                 * Account for the meta-objset space in its placeholder
@@ -119,7 +120,7 @@ dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
        mutex_enter(&ds->ds_dir->dd_lock);
        mutex_enter(&ds->ds_lock);
        delta = parent_delta(ds, used);
-       ds->ds_phys->ds_used_bytes += used;
+       ds->ds_phys->ds_referenced_bytes += used;
        ds->ds_phys->ds_compressed_bytes += compressed;
        ds->ds_phys->ds_uncompressed_bytes += uncompressed;
        ds->ds_phys->ds_unique_bytes += used;
@@ -215,8 +216,8 @@ dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
                }
        }
        mutex_enter(&ds->ds_lock);
-       ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
-       ds->ds_phys->ds_used_bytes -= used;
+       ASSERT3U(ds->ds_phys->ds_referenced_bytes, >=, used);
+       ds->ds_phys->ds_referenced_bytes -= used;
        ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
        ds->ds_phys->ds_compressed_bytes -= compressed;
        ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
@@ -823,8 +824,8 @@ dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
                dsphys->ds_prev_snap_obj = origin->ds_object;
                dsphys->ds_prev_snap_txg =
                    origin->ds_phys->ds_creation_txg;
-               dsphys->ds_used_bytes =
-                   origin->ds_phys->ds_used_bytes;
+               dsphys->ds_referenced_bytes =
+                   origin->ds_phys->ds_referenced_bytes;
                dsphys->ds_compressed_bytes =
                    origin->ds_phys->ds_compressed_bytes;
                dsphys->ds_uncompressed_bytes =
@@ -938,7 +939,6 @@ dmu_snapshots_destroy_nvl(nvlist_t *snaps, boolean_t defer, char *failed)
        for (pair = nvlist_next_nvpair(snaps, NULL); pair != NULL;
            pair = nvlist_next_nvpair(snaps, pair)) {
                dsl_dataset_t *ds;
-               int err;
 
                err = dsl_dataset_own(nvpair_name(pair), B_TRUE, dstg, &ds);
                if (err == 0) {
@@ -1088,19 +1088,23 @@ dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
                goto out_free;
 
        /*
-        * remove the objects in open context, so that we won't
-        * have too much to do in syncing context.
+        * If async destruction is not enabled try to remove all objects
+        * while in the open context so that there is less work to do in
+        * the syncing context.
         */
-       for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
-           ds->ds_phys->ds_prev_snap_txg)) {
-               /*
-                * Ignore errors, if there is not enough disk space
-                * we will deal with it in dsl_dataset_destroy_sync().
-                */
-               (void) dmu_free_object(os, obj);
+       if (!spa_feature_is_enabled(dsl_dataset_get_spa(ds),
+           &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY])) {
+               for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
+                   ds->ds_phys->ds_prev_snap_txg)) {
+                       /*
+                        * Ignore errors, if there is not enough disk space
+                        * we will deal with it in dsl_dataset_destroy_sync().
+                        */
+                       (void) dmu_free_object(os, obj);
+               }
+               if (err != ESRCH)
+                       goto out_free;
        }
-       if (err != ESRCH)
-               goto out_free;
 
        /*
         * Only the ZIL knows how to free log blocks.
@@ -1261,7 +1265,7 @@ dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
        ASSERT(!dsl_dataset_is_snapshot(ds));
 
        if (ds->ds_phys->ds_prev_snap_obj != 0)
-               mrs_used = ds->ds_prev->ds_phys->ds_used_bytes;
+               mrs_used = ds->ds_prev->ds_phys->ds_referenced_bytes;
        else
                mrs_used = 0;
 
@@ -1269,7 +1273,7 @@ dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
 
        ASSERT3U(dlused, <=, mrs_used);
        ds->ds_phys->ds_unique_bytes =
-           ds->ds_phys->ds_used_bytes - (mrs_used - dlused);
+           ds->ds_phys->ds_referenced_bytes - (mrs_used - dlused);
 
        if (spa_version(ds->ds_dir->dd_pool->dp_spa) >=
            SPA_VERSION_UNIQUE_ACCURATE)
@@ -1627,12 +1631,36 @@ process_old_deadlist(dsl_dataset_t *ds, dsl_dataset_t *ds_prev,
            ds_next->ds_phys->ds_deadlist_obj);
 }
 
+static int
+old_synchronous_dataset_destroy(dsl_dataset_t *ds, dmu_tx_t *tx)
+{
+       int err;
+       struct killarg ka;
+
+       /*
+        * Free everything that we point to (that's born after
+        * the previous snapshot, if we are a clone)
+        *
+        * NB: this should be very quick, because we already
+        * freed all the objects in open context.
+        */
+       ka.ds = ds;
+       ka.tx = tx;
+       err = traverse_dataset(ds,
+           ds->ds_phys->ds_prev_snap_txg, TRAVERSE_POST,
+           kill_blkptr, &ka);
+       ASSERT3U(err, ==, 0);
+       ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) || ds->ds_phys->ds_unique_bytes == 0);
+
+       return (err);
+}
+
 void
 dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
 {
        struct dsl_ds_destroyarg *dsda = arg1;
        dsl_dataset_t *ds = dsda->ds;
-       int err;
+       int err = 0;
        int after_branch_point = FALSE;
        dsl_pool_t *dp = ds->ds_dir->dd_pool;
        objset_t *mos = dp->dp_meta_objset;
@@ -1773,7 +1801,6 @@ dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
                            tx);
                        dsl_dir_diduse_space(tx->tx_pool->dp_free_dir,
                            DD_USED_HEAD, used, comp, uncomp, tx);
-                       dsl_dir_dirty(tx->tx_pool->dp_free_dir, tx);
 
                        /* Merge our deadlist into next's and free it. */
                        dsl_deadlist_merge(&ds_next->ds_deadlist,
@@ -1849,32 +1876,54 @@ dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
                }
                dsl_dataset_rele(ds_next, FTAG);
        } else {
+               zfeature_info_t *async_destroy =
+                   &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY];
+
                /*
                 * There's no next snapshot, so this is a head dataset.
                 * Destroy the deadlist.  Unless it's a clone, the
                 * deadlist should be empty.  (If it's a clone, it's
                 * safe to ignore the deadlist contents.)
                 */
-               struct killarg ka;
-
                dsl_deadlist_close(&ds->ds_deadlist);
                dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
                ds->ds_phys->ds_deadlist_obj = 0;
 
-               /*
-                * Free everything that we point to (that's born after
-                * the previous snapshot, if we are a clone)
-                *
-                * NB: this should be very quick, because we already
-                * freed all the objects in open context.
-                */
-               ka.ds = ds;
-               ka.tx = tx;
-               err = traverse_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
-                   TRAVERSE_POST, kill_blkptr, &ka);
-               ASSERT3U(err, ==, 0);
-               ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
-                   ds->ds_phys->ds_unique_bytes == 0);
+               if (!spa_feature_is_enabled(dp->dp_spa, async_destroy)) {
+                       err = old_synchronous_dataset_destroy(ds, tx);
+               } else {
+                       /*
+                        * Move the bptree into the pool's list of trees to
+                        * clean up and update space accounting information.
+                        */
+                       uint64_t used, comp, uncomp;
+
+                       ASSERT(err == 0 || err == EBUSY);
+                       if (!spa_feature_is_active(dp->dp_spa, async_destroy)) {
+                               spa_feature_incr(dp->dp_spa, async_destroy, tx);
+                               dp->dp_bptree_obj = bptree_alloc(
+                                   dp->dp_meta_objset, tx);
+                               VERIFY(zap_add(dp->dp_meta_objset,
+                                   DMU_POOL_DIRECTORY_OBJECT,
+                                   DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
+                                   &dp->dp_bptree_obj, tx) == 0);
+                       }
+
+                       used = ds->ds_dir->dd_phys->dd_used_bytes;
+                       comp = ds->ds_dir->dd_phys->dd_compressed_bytes;
+                       uncomp = ds->ds_dir->dd_phys->dd_uncompressed_bytes;
+
+                       ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
+                           ds->ds_phys->ds_unique_bytes == used);
+
+                       bptree_add(dp->dp_meta_objset, dp->dp_bptree_obj,
+                           &ds->ds_phys->ds_bp, ds->ds_phys->ds_prev_snap_txg,
+                           used, comp, uncomp, tx);
+                       dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
+                           -used, -comp, -uncomp, tx);
+                       dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
+                           used, comp, uncomp, tx);
+               }
 
                if (ds->ds_prev != NULL) {
                        if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
@@ -2065,7 +2114,7 @@ dsl_dataset_snapshot_sync(void *arg1, void *arg2, dmu_tx_t *tx)
        dsphys->ds_creation_time = gethrestime_sec();
        dsphys->ds_creation_txg = crtxg;
        dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
-       dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
+       dsphys->ds_referenced_bytes = ds->ds_phys->ds_referenced_bytes;
        dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
        dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
        dsphys->ds_flags = ds->ds_phys->ds_flags;
@@ -2189,10 +2238,22 @@ get_clones_stat(dsl_dataset_t *ds, nvlist_t *nv)
            zap_cursor_advance(&zc)) {
                dsl_dataset_t *clone;
                char buf[ZFS_MAXNAMELEN];
+               /*
+                * Even though we hold the dp_config_rwlock, the dataset
+                * may fail to open, returning ENOENT.  If there is a
+                * thread concurrently attempting to destroy this
+                * dataset, it will have the ds_rwlock held for
+                * RW_WRITER.  Our call to dsl_dataset_hold_obj() ->
+                * dsl_dataset_hold_ref() will fail its
+                * rw_tryenter(&ds->ds_rwlock, RW_READER), drop the
+                * dp_config_rwlock, and wait for the destroy progress
+                * and signal ds_exclusive_cv.  If the destroy was
+                * successful, we will see that
+                * DSL_DATASET_IS_DESTROYED(), and return ENOENT.
+                */
                if (dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
-                   za.za_first_integer, FTAG, &clone) != 0) {
-                       goto fail;
-               }
+                   za.za_first_integer, FTAG, &clone) != 0)
+                       continue;
                dsl_dir_name(clone->ds_dir, buf);
                VERIFY(nvlist_add_boolean(val, buf) == 0);
                dsl_dataset_rele(clone, FTAG);
@@ -2316,7 +2377,7 @@ dsl_dataset_space(dsl_dataset_t *ds,
     uint64_t *refdbytesp, uint64_t *availbytesp,
     uint64_t *usedobjsp, uint64_t *availobjsp)
 {
-       *refdbytesp = ds->ds_phys->ds_used_bytes;
+       *refdbytesp = ds->ds_phys->ds_referenced_bytes;
        *availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
        if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
                *availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
@@ -2652,7 +2713,7 @@ dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
         * Note however, if we stop before we reach the ORIGIN we get:
         * uN + kN + kN-1 + ... + kM - uM-1
         */
-       pa->used = origin_ds->ds_phys->ds_used_bytes;
+       pa->used = origin_ds->ds_phys->ds_referenced_bytes;
        pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
        pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
        for (snap = list_head(&pa->shared_snaps); snap;
@@ -2686,7 +2747,7 @@ dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
         * so we need to subtract out the clone origin's used space.
         */
        if (pa->origin_origin) {
-               pa->used -= pa->origin_origin->ds_phys->ds_used_bytes;
+               pa->used -= pa->origin_origin->ds_phys->ds_referenced_bytes;
                pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
                pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
        }
@@ -3203,8 +3264,8 @@ dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
                dsl_deadlist_space(&csa->ohds->ds_deadlist,
                    &odl_used, &odl_comp, &odl_uncomp);
 
-               dused = csa->cds->ds_phys->ds_used_bytes + cdl_used -
-                   (csa->ohds->ds_phys->ds_used_bytes + odl_used);
+               dused = csa->cds->ds_phys->ds_referenced_bytes + cdl_used -
+                   (csa->ohds->ds_phys->ds_referenced_bytes + odl_used);
                dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
                    (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
                duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
@@ -3233,8 +3294,8 @@ dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
        }
 
        /* swap ds_*_bytes */
-       SWITCH64(csa->ohds->ds_phys->ds_used_bytes,
-           csa->cds->ds_phys->ds_used_bytes);
+       SWITCH64(csa->ohds->ds_phys->ds_referenced_bytes,
+           csa->cds->ds_phys->ds_referenced_bytes);
        SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
            csa->cds->ds_phys->ds_compressed_bytes);
        SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
@@ -3363,8 +3424,9 @@ dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
         * on-disk is over quota and there are no pending changes (which
         * may free up space for us).
         */
-       if (ds->ds_phys->ds_used_bytes + inflight >= ds->ds_quota) {
-               if (inflight > 0 || ds->ds_phys->ds_used_bytes < ds->ds_quota)
+       if (ds->ds_phys->ds_referenced_bytes + inflight >= ds->ds_quota) {
+               if (inflight > 0 ||
+                   ds->ds_phys->ds_referenced_bytes < ds->ds_quota)
                        error = ERESTART;
                else
                        error = EDQUOT;
@@ -3393,7 +3455,7 @@ dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
        if (psa->psa_effective_value == 0)
                return (0);
 
-       if (psa->psa_effective_value < ds->ds_phys->ds_used_bytes ||
+       if (psa->psa_effective_value < ds->ds_phys->ds_referenced_bytes ||
            psa->psa_effective_value < ds->ds_reserved)
                return (ENOSPC);
 
@@ -4141,8 +4203,8 @@ dsl_dataset_space_written(dsl_dataset_t *oldsnap, dsl_dataset_t *new,
        dsl_pool_t *dp = new->ds_dir->dd_pool;
 
        *usedp = 0;
-       *usedp += new->ds_phys->ds_used_bytes;
-       *usedp -= oldsnap->ds_phys->ds_used_bytes;
+       *usedp += new->ds_phys->ds_referenced_bytes;
+       *usedp -= oldsnap->ds_phys->ds_referenced_bytes;
 
        *compp = 0;
        *compp += new->ds_phys->ds_compressed_bytes;
@@ -4158,9 +4220,13 @@ dsl_dataset_space_written(dsl_dataset_t *oldsnap, dsl_dataset_t *new,
                dsl_dataset_t *snap;
                uint64_t used, comp, uncomp;
 
-               err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &snap);
-               if (err != 0)
-                       break;
+               if (snapobj == new->ds_object) {
+                       snap = new;
+               } else {
+                       err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &snap);
+                       if (err != 0)
+                               break;
+               }
 
                if (snap->ds_phys->ds_prev_snap_txg ==
                    oldsnap->ds_phys->ds_creation_txg) {
@@ -4189,7 +4255,8 @@ dsl_dataset_space_written(dsl_dataset_t *oldsnap, dsl_dataset_t *new,
                 * was not a snapshot of/before new.
                 */
                snapobj = snap->ds_phys->ds_prev_snap_obj;
-               dsl_dataset_rele(snap, FTAG);
+               if (snap != new)
+                       dsl_dataset_rele(snap, FTAG);
                if (snapobj == 0) {
                        err = EINVAL;
                        break;