Illumos #2619 and #2747
[zfs.git] / module / zfs / dsl_dataset.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright (c) 2012 by Delphix. All rights reserved.
24  * Copyright (c) 2012, Joyent, Inc. All rights reserved.
25  */
26
27 #include <sys/dmu_objset.h>
28 #include <sys/dsl_dataset.h>
29 #include <sys/dsl_dir.h>
30 #include <sys/dsl_prop.h>
31 #include <sys/dsl_synctask.h>
32 #include <sys/dmu_traverse.h>
33 #include <sys/dmu_impl.h>
34 #include <sys/dmu_tx.h>
35 #include <sys/arc.h>
36 #include <sys/zio.h>
37 #include <sys/zap.h>
38 #include <sys/zfeature.h>
39 #include <sys/unique.h>
40 #include <sys/zfs_context.h>
41 #include <sys/zfs_ioctl.h>
42 #include <sys/spa.h>
43 #include <sys/zfs_znode.h>
44 #include <sys/zfs_onexit.h>
45 #include <sys/zvol.h>
46 #include <sys/dsl_scan.h>
47 #include <sys/dsl_deadlist.h>
48
49 static char *dsl_reaper = "the grim reaper";
50
51 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
52 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
53 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
54
55 #define SWITCH64(x, y) \
56         { \
57                 uint64_t __tmp = (x); \
58                 (x) = (y); \
59                 (y) = __tmp; \
60         }
61
62 #define DS_REF_MAX      (1ULL << 62)
63
64 #define DSL_DEADLIST_BLOCKSIZE  SPA_MAXBLOCKSIZE
65
66 #define DSL_DATASET_IS_DESTROYED(ds)    ((ds)->ds_owner == dsl_reaper)
67
68
69 /*
70  * Figure out how much of this delta should be propogated to the dsl_dir
71  * layer.  If there's a refreservation, that space has already been
72  * partially accounted for in our ancestors.
73  */
74 static int64_t
75 parent_delta(dsl_dataset_t *ds, int64_t delta)
76 {
77         uint64_t old_bytes, new_bytes;
78
79         if (ds->ds_reserved == 0)
80                 return (delta);
81
82         old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
83         new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
84
85         ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
86         return (new_bytes - old_bytes);
87 }
88
89 void
90 dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
91 {
92         int used, compressed, uncompressed;
93         int64_t delta;
94
95         used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
96         compressed = BP_GET_PSIZE(bp);
97         uncompressed = BP_GET_UCSIZE(bp);
98
99         dprintf_bp(bp, "ds=%p", ds);
100
101         ASSERT(dmu_tx_is_syncing(tx));
102         /* It could have been compressed away to nothing */
103         if (BP_IS_HOLE(bp))
104                 return;
105         ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
106         ASSERT(DMU_OT_IS_VALID(BP_GET_TYPE(bp)));
107         if (ds == NULL) {
108                 /*
109                  * Account for the meta-objset space in its placeholder
110                  * dsl_dir.
111                  */
112                 ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
113                 dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
114                     used, compressed, uncompressed, tx);
115                 dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
116                 return;
117         }
118         dmu_buf_will_dirty(ds->ds_dbuf, tx);
119
120         mutex_enter(&ds->ds_dir->dd_lock);
121         mutex_enter(&ds->ds_lock);
122         delta = parent_delta(ds, used);
123         ds->ds_phys->ds_referenced_bytes += used;
124         ds->ds_phys->ds_compressed_bytes += compressed;
125         ds->ds_phys->ds_uncompressed_bytes += uncompressed;
126         ds->ds_phys->ds_unique_bytes += used;
127         mutex_exit(&ds->ds_lock);
128         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
129             compressed, uncompressed, tx);
130         dsl_dir_transfer_space(ds->ds_dir, used - delta,
131             DD_USED_REFRSRV, DD_USED_HEAD, tx);
132         mutex_exit(&ds->ds_dir->dd_lock);
133 }
134
135 int
136 dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
137     boolean_t async)
138 {
139         int used, compressed, uncompressed;
140
141         if (BP_IS_HOLE(bp))
142                 return (0);
143
144         ASSERT(dmu_tx_is_syncing(tx));
145         ASSERT(bp->blk_birth <= tx->tx_txg);
146
147         used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
148         compressed = BP_GET_PSIZE(bp);
149         uncompressed = BP_GET_UCSIZE(bp);
150
151         ASSERT(used > 0);
152         if (ds == NULL) {
153                 /*
154                  * Account for the meta-objset space in its placeholder
155                  * dataset.
156                  */
157                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
158
159                 dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
160                     -used, -compressed, -uncompressed, tx);
161                 dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
162                 return (used);
163         }
164         ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
165
166         ASSERT(!dsl_dataset_is_snapshot(ds));
167         dmu_buf_will_dirty(ds->ds_dbuf, tx);
168
169         if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
170                 int64_t delta;
171
172                 dprintf_bp(bp, "freeing ds=%llu", ds->ds_object);
173                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
174
175                 mutex_enter(&ds->ds_dir->dd_lock);
176                 mutex_enter(&ds->ds_lock);
177                 ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
178                     !DS_UNIQUE_IS_ACCURATE(ds));
179                 delta = parent_delta(ds, -used);
180                 ds->ds_phys->ds_unique_bytes -= used;
181                 mutex_exit(&ds->ds_lock);
182                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
183                     delta, -compressed, -uncompressed, tx);
184                 dsl_dir_transfer_space(ds->ds_dir, -used - delta,
185                     DD_USED_REFRSRV, DD_USED_HEAD, tx);
186                 mutex_exit(&ds->ds_dir->dd_lock);
187         } else {
188                 dprintf_bp(bp, "putting on dead list: %s", "");
189                 if (async) {
190                         /*
191                          * We are here as part of zio's write done callback,
192                          * which means we're a zio interrupt thread.  We can't
193                          * call dsl_deadlist_insert() now because it may block
194                          * waiting for I/O.  Instead, put bp on the deferred
195                          * queue and let dsl_pool_sync() finish the job.
196                          */
197                         bplist_append(&ds->ds_pending_deadlist, bp);
198                 } else {
199                         dsl_deadlist_insert(&ds->ds_deadlist, bp, tx);
200                 }
201                 ASSERT3U(ds->ds_prev->ds_object, ==,
202                     ds->ds_phys->ds_prev_snap_obj);
203                 ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
204                 /* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
205                 if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
206                     ds->ds_object && bp->blk_birth >
207                     ds->ds_prev->ds_phys->ds_prev_snap_txg) {
208                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
209                         mutex_enter(&ds->ds_prev->ds_lock);
210                         ds->ds_prev->ds_phys->ds_unique_bytes += used;
211                         mutex_exit(&ds->ds_prev->ds_lock);
212                 }
213                 if (bp->blk_birth > ds->ds_dir->dd_origin_txg) {
214                         dsl_dir_transfer_space(ds->ds_dir, used,
215                             DD_USED_HEAD, DD_USED_SNAP, tx);
216                 }
217         }
218         mutex_enter(&ds->ds_lock);
219         ASSERT3U(ds->ds_phys->ds_referenced_bytes, >=, used);
220         ds->ds_phys->ds_referenced_bytes -= used;
221         ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
222         ds->ds_phys->ds_compressed_bytes -= compressed;
223         ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
224         ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
225         mutex_exit(&ds->ds_lock);
226
227         return (used);
228 }
229
230 uint64_t
231 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
232 {
233         uint64_t trysnap = 0;
234
235         if (ds == NULL)
236                 return (0);
237         /*
238          * The snapshot creation could fail, but that would cause an
239          * incorrect FALSE return, which would only result in an
240          * overestimation of the amount of space that an operation would
241          * consume, which is OK.
242          *
243          * There's also a small window where we could miss a pending
244          * snapshot, because we could set the sync task in the quiescing
245          * phase.  So this should only be used as a guess.
246          */
247         if (ds->ds_trysnap_txg >
248             spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
249                 trysnap = ds->ds_trysnap_txg;
250         return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
251 }
252
253 boolean_t
254 dsl_dataset_block_freeable(dsl_dataset_t *ds, const blkptr_t *bp,
255     uint64_t blk_birth)
256 {
257         if (blk_birth <= dsl_dataset_prev_snap_txg(ds))
258                 return (B_FALSE);
259
260         ddt_prefetch(dsl_dataset_get_spa(ds), bp);
261
262         return (B_TRUE);
263 }
264
265 /* ARGSUSED */
266 static void
267 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
268 {
269         dsl_dataset_t *ds = dsv;
270
271         ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
272
273         unique_remove(ds->ds_fsid_guid);
274
275         if (ds->ds_objset != NULL)
276                 dmu_objset_evict(ds->ds_objset);
277
278         if (ds->ds_prev) {
279                 dsl_dataset_drop_ref(ds->ds_prev, ds);
280                 ds->ds_prev = NULL;
281         }
282
283         bplist_destroy(&ds->ds_pending_deadlist);
284         if (db != NULL) {
285                 dsl_deadlist_close(&ds->ds_deadlist);
286         } else {
287                 ASSERT(ds->ds_deadlist.dl_dbuf == NULL);
288                 ASSERT(!ds->ds_deadlist.dl_oldfmt);
289         }
290         if (ds->ds_dir)
291                 dsl_dir_close(ds->ds_dir, ds);
292
293         ASSERT(!list_link_active(&ds->ds_synced_link));
294
295         mutex_destroy(&ds->ds_lock);
296         mutex_destroy(&ds->ds_recvlock);
297         mutex_destroy(&ds->ds_opening_lock);
298         rw_destroy(&ds->ds_rwlock);
299         cv_destroy(&ds->ds_exclusive_cv);
300
301         kmem_free(ds, sizeof (dsl_dataset_t));
302 }
303
304 static int
305 dsl_dataset_get_snapname(dsl_dataset_t *ds)
306 {
307         dsl_dataset_phys_t *headphys;
308         int err;
309         dmu_buf_t *headdbuf;
310         dsl_pool_t *dp = ds->ds_dir->dd_pool;
311         objset_t *mos = dp->dp_meta_objset;
312
313         if (ds->ds_snapname[0])
314                 return (0);
315         if (ds->ds_phys->ds_next_snap_obj == 0)
316                 return (0);
317
318         err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
319             FTAG, &headdbuf);
320         if (err)
321                 return (err);
322         headphys = headdbuf->db_data;
323         err = zap_value_search(dp->dp_meta_objset,
324             headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
325         dmu_buf_rele(headdbuf, FTAG);
326         return (err);
327 }
328
329 static int
330 dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
331 {
332         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
333         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
334         matchtype_t mt;
335         int err;
336
337         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
338                 mt = MT_FIRST;
339         else
340                 mt = MT_EXACT;
341
342         err = zap_lookup_norm(mos, snapobj, name, 8, 1,
343             value, mt, NULL, 0, NULL);
344         if (err == ENOTSUP && mt == MT_FIRST)
345                 err = zap_lookup(mos, snapobj, name, 8, 1, value);
346         return (err);
347 }
348
349 static int
350 dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
351 {
352         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
353         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
354         matchtype_t mt;
355         int err;
356
357         dsl_dir_snap_cmtime_update(ds->ds_dir);
358
359         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
360                 mt = MT_FIRST;
361         else
362                 mt = MT_EXACT;
363
364         err = zap_remove_norm(mos, snapobj, name, mt, tx);
365         if (err == ENOTSUP && mt == MT_FIRST)
366                 err = zap_remove(mos, snapobj, name, tx);
367         return (err);
368 }
369
370 static int
371 dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
372     dsl_dataset_t **dsp)
373 {
374         objset_t *mos = dp->dp_meta_objset;
375         dmu_buf_t *dbuf;
376         dsl_dataset_t *ds;
377         int err;
378         dmu_object_info_t doi;
379
380         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
381             dsl_pool_sync_context(dp));
382
383         err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
384         if (err)
385                 return (err);
386
387         /* Make sure dsobj has the correct object type. */
388         dmu_object_info_from_db(dbuf, &doi);
389         if (doi.doi_type != DMU_OT_DSL_DATASET)
390                 return (EINVAL);
391
392         ds = dmu_buf_get_user(dbuf);
393         if (ds == NULL) {
394                 dsl_dataset_t *winner = NULL;
395
396                 ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_PUSHPAGE);
397                 ds->ds_dbuf = dbuf;
398                 ds->ds_object = dsobj;
399                 ds->ds_phys = dbuf->db_data;
400                 list_link_init(&ds->ds_synced_link);
401
402                 mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
403                 mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
404                 mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
405                 mutex_init(&ds->ds_sendstream_lock, NULL, MUTEX_DEFAULT, NULL);
406
407                 rw_init(&ds->ds_rwlock, NULL, RW_DEFAULT, NULL);
408                 cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
409
410                 bplist_create(&ds->ds_pending_deadlist);
411                 dsl_deadlist_open(&ds->ds_deadlist,
412                     mos, ds->ds_phys->ds_deadlist_obj);
413
414                 list_create(&ds->ds_sendstreams, sizeof (dmu_sendarg_t),
415                     offsetof(dmu_sendarg_t, dsa_link));
416
417                 if (err == 0) {
418                         err = dsl_dir_open_obj(dp,
419                             ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
420                 }
421                 if (err) {
422                         mutex_destroy(&ds->ds_lock);
423                         mutex_destroy(&ds->ds_recvlock);
424                         mutex_destroy(&ds->ds_opening_lock);
425                         rw_destroy(&ds->ds_rwlock);
426                         cv_destroy(&ds->ds_exclusive_cv);
427                         bplist_destroy(&ds->ds_pending_deadlist);
428                         dsl_deadlist_close(&ds->ds_deadlist);
429                         kmem_free(ds, sizeof (dsl_dataset_t));
430                         dmu_buf_rele(dbuf, tag);
431                         return (err);
432                 }
433
434                 if (!dsl_dataset_is_snapshot(ds)) {
435                         ds->ds_snapname[0] = '\0';
436                         if (ds->ds_phys->ds_prev_snap_obj) {
437                                 err = dsl_dataset_get_ref(dp,
438                                     ds->ds_phys->ds_prev_snap_obj,
439                                     ds, &ds->ds_prev);
440                         }
441                 } else {
442                         if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
443                                 err = dsl_dataset_get_snapname(ds);
444                         if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
445                                 err = zap_count(
446                                     ds->ds_dir->dd_pool->dp_meta_objset,
447                                     ds->ds_phys->ds_userrefs_obj,
448                                     &ds->ds_userrefs);
449                         }
450                 }
451
452                 if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
453                         /*
454                          * In sync context, we're called with either no lock
455                          * or with the write lock.  If we're not syncing,
456                          * we're always called with the read lock held.
457                          */
458                         boolean_t need_lock =
459                             !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
460                             dsl_pool_sync_context(dp);
461
462                         if (need_lock)
463                                 rw_enter(&dp->dp_config_rwlock, RW_READER);
464
465                         err = dsl_prop_get_ds(ds,
466                             "refreservation", sizeof (uint64_t), 1,
467                             &ds->ds_reserved, NULL);
468                         if (err == 0) {
469                                 err = dsl_prop_get_ds(ds,
470                                     "refquota", sizeof (uint64_t), 1,
471                                     &ds->ds_quota, NULL);
472                         }
473
474                         if (need_lock)
475                                 rw_exit(&dp->dp_config_rwlock);
476                 } else {
477                         ds->ds_reserved = ds->ds_quota = 0;
478                 }
479
480                 if (err == 0) {
481                         winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
482                             dsl_dataset_evict);
483                 }
484                 if (err || winner) {
485                         bplist_destroy(&ds->ds_pending_deadlist);
486                         dsl_deadlist_close(&ds->ds_deadlist);
487                         if (ds->ds_prev)
488                                 dsl_dataset_drop_ref(ds->ds_prev, ds);
489                         dsl_dir_close(ds->ds_dir, ds);
490                         mutex_destroy(&ds->ds_lock);
491                         mutex_destroy(&ds->ds_recvlock);
492                         mutex_destroy(&ds->ds_opening_lock);
493                         rw_destroy(&ds->ds_rwlock);
494                         cv_destroy(&ds->ds_exclusive_cv);
495                         kmem_free(ds, sizeof (dsl_dataset_t));
496                         if (err) {
497                                 dmu_buf_rele(dbuf, tag);
498                                 return (err);
499                         }
500                         ds = winner;
501                 } else {
502                         ds->ds_fsid_guid =
503                             unique_insert(ds->ds_phys->ds_fsid_guid);
504                 }
505         }
506         ASSERT3P(ds->ds_dbuf, ==, dbuf);
507         ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
508         ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
509             spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
510             dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
511         mutex_enter(&ds->ds_lock);
512         if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
513                 mutex_exit(&ds->ds_lock);
514                 dmu_buf_rele(ds->ds_dbuf, tag);
515                 return (ENOENT);
516         }
517         mutex_exit(&ds->ds_lock);
518         *dsp = ds;
519         return (0);
520 }
521
522 static int
523 dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
524 {
525         dsl_pool_t *dp = ds->ds_dir->dd_pool;
526
527         /*
528          * In syncing context we don't want the rwlock lock: there
529          * may be an existing writer waiting for sync phase to
530          * finish.  We don't need to worry about such writers, since
531          * sync phase is single-threaded, so the writer can't be
532          * doing anything while we are active.
533          */
534         if (dsl_pool_sync_context(dp)) {
535                 ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
536                 return (0);
537         }
538
539         /*
540          * Normal users will hold the ds_rwlock as a READER until they
541          * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
542          * drop their READER lock after they set the ds_owner field.
543          *
544          * If the dataset is being destroyed, the destroy thread will
545          * obtain a WRITER lock for exclusive access after it's done its
546          * open-context work and then change the ds_owner to
547          * dsl_reaper once destruction is assured.  So threads
548          * may block here temporarily, until the "destructability" of
549          * the dataset is determined.
550          */
551         ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
552         mutex_enter(&ds->ds_lock);
553         while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
554                 rw_exit(&dp->dp_config_rwlock);
555                 cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
556                 if (DSL_DATASET_IS_DESTROYED(ds)) {
557                         mutex_exit(&ds->ds_lock);
558                         dsl_dataset_drop_ref(ds, tag);
559                         rw_enter(&dp->dp_config_rwlock, RW_READER);
560                         return (ENOENT);
561                 }
562                 /*
563                  * The dp_config_rwlock lives above the ds_lock. And
564                  * we need to check DSL_DATASET_IS_DESTROYED() while
565                  * holding the ds_lock, so we have to drop and reacquire
566                  * the ds_lock here.
567                  */
568                 mutex_exit(&ds->ds_lock);
569                 rw_enter(&dp->dp_config_rwlock, RW_READER);
570                 mutex_enter(&ds->ds_lock);
571         }
572         mutex_exit(&ds->ds_lock);
573         return (0);
574 }
575
576 int
577 dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
578     dsl_dataset_t **dsp)
579 {
580         int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
581
582         if (err)
583                 return (err);
584         return (dsl_dataset_hold_ref(*dsp, tag));
585 }
586
587 int
588 dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
589     void *tag, dsl_dataset_t **dsp)
590 {
591         int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
592         if (err)
593                 return (err);
594         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
595                 dsl_dataset_rele(*dsp, tag);
596                 *dsp = NULL;
597                 return (EBUSY);
598         }
599         return (0);
600 }
601
602 int
603 dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
604 {
605         dsl_dir_t *dd;
606         dsl_pool_t *dp;
607         const char *snapname;
608         uint64_t obj;
609         int err = 0;
610
611         err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
612         if (err)
613                 return (err);
614
615         dp = dd->dd_pool;
616         obj = dd->dd_phys->dd_head_dataset_obj;
617         rw_enter(&dp->dp_config_rwlock, RW_READER);
618         if (obj)
619                 err = dsl_dataset_get_ref(dp, obj, tag, dsp);
620         else
621                 err = ENOENT;
622         if (err)
623                 goto out;
624
625         err = dsl_dataset_hold_ref(*dsp, tag);
626
627         /* we may be looking for a snapshot */
628         if (err == 0 && snapname != NULL) {
629                 dsl_dataset_t *ds = NULL;
630
631                 if (*snapname++ != '@') {
632                         dsl_dataset_rele(*dsp, tag);
633                         err = ENOENT;
634                         goto out;
635                 }
636
637                 dprintf("looking for snapshot '%s'\n", snapname);
638                 err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
639                 if (err == 0)
640                         err = dsl_dataset_get_ref(dp, obj, tag, &ds);
641                 dsl_dataset_rele(*dsp, tag);
642
643                 ASSERT3U((err == 0), ==, (ds != NULL));
644
645                 if (ds) {
646                         mutex_enter(&ds->ds_lock);
647                         if (ds->ds_snapname[0] == 0)
648                                 (void) strlcpy(ds->ds_snapname, snapname,
649                                     sizeof (ds->ds_snapname));
650                         mutex_exit(&ds->ds_lock);
651                         err = dsl_dataset_hold_ref(ds, tag);
652                         *dsp = err ? NULL : ds;
653                 }
654         }
655 out:
656         rw_exit(&dp->dp_config_rwlock);
657         dsl_dir_close(dd, FTAG);
658         return (err);
659 }
660
661 int
662 dsl_dataset_own(const char *name, boolean_t inconsistentok,
663     void *tag, dsl_dataset_t **dsp)
664 {
665         int err = dsl_dataset_hold(name, tag, dsp);
666         if (err)
667                 return (err);
668         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
669                 dsl_dataset_rele(*dsp, tag);
670                 return (EBUSY);
671         }
672         return (0);
673 }
674
675 void
676 dsl_dataset_name(dsl_dataset_t *ds, char *name)
677 {
678         if (ds == NULL) {
679                 (void) strcpy(name, "mos");
680         } else {
681                 dsl_dir_name(ds->ds_dir, name);
682                 VERIFY(0 == dsl_dataset_get_snapname(ds));
683                 if (ds->ds_snapname[0]) {
684                         (void) strcat(name, "@");
685                         /*
686                          * We use a "recursive" mutex so that we
687                          * can call dprintf_ds() with ds_lock held.
688                          */
689                         if (!MUTEX_HELD(&ds->ds_lock)) {
690                                 mutex_enter(&ds->ds_lock);
691                                 (void) strcat(name, ds->ds_snapname);
692                                 mutex_exit(&ds->ds_lock);
693                         } else {
694                                 (void) strcat(name, ds->ds_snapname);
695                         }
696                 }
697         }
698 }
699
700 static int
701 dsl_dataset_namelen(dsl_dataset_t *ds)
702 {
703         int result;
704
705         if (ds == NULL) {
706                 result = 3;     /* "mos" */
707         } else {
708                 result = dsl_dir_namelen(ds->ds_dir);
709                 VERIFY(0 == dsl_dataset_get_snapname(ds));
710                 if (ds->ds_snapname[0]) {
711                         ++result;       /* adding one for the @-sign */
712                         if (!MUTEX_HELD(&ds->ds_lock)) {
713                                 mutex_enter(&ds->ds_lock);
714                                 result += strlen(ds->ds_snapname);
715                                 mutex_exit(&ds->ds_lock);
716                         } else {
717                                 result += strlen(ds->ds_snapname);
718                         }
719                 }
720         }
721
722         return (result);
723 }
724
725 void
726 dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
727 {
728         dmu_buf_rele(ds->ds_dbuf, tag);
729 }
730
731 void
732 dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
733 {
734         if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
735                 rw_exit(&ds->ds_rwlock);
736         }
737         dsl_dataset_drop_ref(ds, tag);
738 }
739
740 void
741 dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
742 {
743         ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
744             (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
745
746         mutex_enter(&ds->ds_lock);
747         ds->ds_owner = NULL;
748         if (RW_WRITE_HELD(&ds->ds_rwlock)) {
749                 rw_exit(&ds->ds_rwlock);
750                 cv_broadcast(&ds->ds_exclusive_cv);
751         }
752         mutex_exit(&ds->ds_lock);
753         if (ds->ds_dbuf)
754                 dsl_dataset_drop_ref(ds, tag);
755         else
756                 dsl_dataset_evict(NULL, ds);
757 }
758
759 boolean_t
760 dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
761 {
762         boolean_t gotit = FALSE;
763
764         mutex_enter(&ds->ds_lock);
765         if (ds->ds_owner == NULL &&
766             (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
767                 ds->ds_owner = tag;
768                 if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
769                         rw_exit(&ds->ds_rwlock);
770                 gotit = TRUE;
771         }
772         mutex_exit(&ds->ds_lock);
773         return (gotit);
774 }
775
776 void
777 dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
778 {
779         ASSERT3P(owner, ==, ds->ds_owner);
780         if (!RW_WRITE_HELD(&ds->ds_rwlock))
781                 rw_enter(&ds->ds_rwlock, RW_WRITER);
782 }
783
784 uint64_t
785 dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
786     uint64_t flags, dmu_tx_t *tx)
787 {
788         dsl_pool_t *dp = dd->dd_pool;
789         dmu_buf_t *dbuf;
790         dsl_dataset_phys_t *dsphys;
791         uint64_t dsobj;
792         objset_t *mos = dp->dp_meta_objset;
793
794         if (origin == NULL)
795                 origin = dp->dp_origin_snap;
796
797         ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
798         ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
799         ASSERT(dmu_tx_is_syncing(tx));
800         ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
801
802         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
803             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
804         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
805         dmu_buf_will_dirty(dbuf, tx);
806         dsphys = dbuf->db_data;
807         bzero(dsphys, sizeof (dsl_dataset_phys_t));
808         dsphys->ds_dir_obj = dd->dd_object;
809         dsphys->ds_flags = flags;
810         dsphys->ds_fsid_guid = unique_create();
811         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
812             sizeof (dsphys->ds_guid));
813         dsphys->ds_snapnames_zapobj =
814             zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
815             DMU_OT_NONE, 0, tx);
816         dsphys->ds_creation_time = gethrestime_sec();
817         dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
818
819         if (origin == NULL) {
820                 dsphys->ds_deadlist_obj = dsl_deadlist_alloc(mos, tx);
821         } else {
822                 dsl_dataset_t *ohds;
823
824                 dsphys->ds_prev_snap_obj = origin->ds_object;
825                 dsphys->ds_prev_snap_txg =
826                     origin->ds_phys->ds_creation_txg;
827                 dsphys->ds_referenced_bytes =
828                     origin->ds_phys->ds_referenced_bytes;
829                 dsphys->ds_compressed_bytes =
830                     origin->ds_phys->ds_compressed_bytes;
831                 dsphys->ds_uncompressed_bytes =
832                     origin->ds_phys->ds_uncompressed_bytes;
833                 dsphys->ds_bp = origin->ds_phys->ds_bp;
834                 dsphys->ds_flags |= origin->ds_phys->ds_flags;
835
836                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
837                 origin->ds_phys->ds_num_children++;
838
839                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
840                     origin->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ohds));
841                 dsphys->ds_deadlist_obj = dsl_deadlist_clone(&ohds->ds_deadlist,
842                     dsphys->ds_prev_snap_txg, dsphys->ds_prev_snap_obj, tx);
843                 dsl_dataset_rele(ohds, FTAG);
844
845                 if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
846                         if (origin->ds_phys->ds_next_clones_obj == 0) {
847                                 origin->ds_phys->ds_next_clones_obj =
848                                     zap_create(mos,
849                                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
850                         }
851                         VERIFY(0 == zap_add_int(mos,
852                             origin->ds_phys->ds_next_clones_obj,
853                             dsobj, tx));
854                 }
855
856                 dmu_buf_will_dirty(dd->dd_dbuf, tx);
857                 dd->dd_phys->dd_origin_obj = origin->ds_object;
858                 if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
859                         if (origin->ds_dir->dd_phys->dd_clones == 0) {
860                                 dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
861                                 origin->ds_dir->dd_phys->dd_clones =
862                                     zap_create(mos,
863                                     DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
864                         }
865                         VERIFY3U(0, ==, zap_add_int(mos,
866                             origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
867                 }
868         }
869
870         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
871                 dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
872
873         dmu_buf_rele(dbuf, FTAG);
874
875         dmu_buf_will_dirty(dd->dd_dbuf, tx);
876         dd->dd_phys->dd_head_dataset_obj = dsobj;
877
878         return (dsobj);
879 }
880
881 uint64_t
882 dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
883     dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
884 {
885         dsl_pool_t *dp = pdd->dd_pool;
886         uint64_t dsobj, ddobj;
887         dsl_dir_t *dd;
888
889         ASSERT(lastname[0] != '@');
890
891         ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
892         VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
893
894         dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
895
896         dsl_deleg_set_create_perms(dd, tx, cr);
897
898         dsl_dir_close(dd, FTAG);
899
900         /*
901          * If we are creating a clone, make sure we zero out any stale
902          * data from the origin snapshots zil header.
903          */
904         if (origin != NULL) {
905                 dsl_dataset_t *ds;
906                 objset_t *os;
907
908                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
909                 VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
910                 bzero(&os->os_zil_header, sizeof (os->os_zil_header));
911                 dsl_dataset_dirty(ds, tx);
912                 dsl_dataset_rele(ds, FTAG);
913         }
914
915         return (dsobj);
916 }
917
918 /*
919  * The snapshots must all be in the same pool.
920  */
921 int
922 dmu_snapshots_destroy_nvl(nvlist_t *snaps, boolean_t defer, char *failed)
923 {
924         int err;
925         dsl_sync_task_t *dst;
926         spa_t *spa;
927         nvpair_t *pair;
928         dsl_sync_task_group_t *dstg;
929
930         pair = nvlist_next_nvpair(snaps, NULL);
931         if (pair == NULL)
932                 return (0);
933
934         err = spa_open(nvpair_name(pair), &spa, FTAG);
935         if (err)
936                 return (err);
937         dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
938
939         for (pair = nvlist_next_nvpair(snaps, NULL); pair != NULL;
940             pair = nvlist_next_nvpair(snaps, pair)) {
941                 dsl_dataset_t *ds;
942
943                 err = dsl_dataset_own(nvpair_name(pair), B_TRUE, dstg, &ds);
944                 if (err == 0) {
945                         struct dsl_ds_destroyarg *dsda;
946
947                         dsl_dataset_make_exclusive(ds, dstg);
948                         dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg),
949                             KM_SLEEP);
950                         dsda->ds = ds;
951                         dsda->defer = defer;
952                         dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
953                             dsl_dataset_destroy_sync, dsda, dstg, 0);
954                 } else if (err == ENOENT) {
955                         err = 0;
956                 } else {
957                         (void) strcpy(failed, nvpair_name(pair));
958                         break;
959                 }
960         }
961
962         if (err == 0)
963                 err = dsl_sync_task_group_wait(dstg);
964
965         for (dst = list_head(&dstg->dstg_tasks); dst;
966             dst = list_next(&dstg->dstg_tasks, dst)) {
967                 struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
968                 dsl_dataset_t *ds = dsda->ds;
969
970                 /*
971                  * Return the file system name that triggered the error
972                  */
973                 if (dst->dst_err) {
974                         dsl_dataset_name(ds, failed);
975                 }
976                 ASSERT3P(dsda->rm_origin, ==, NULL);
977                 dsl_dataset_disown(ds, dstg);
978                 kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
979         }
980
981         dsl_sync_task_group_destroy(dstg);
982         spa_close(spa, FTAG);
983         return (err);
984
985 }
986
987 static boolean_t
988 dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
989 {
990         boolean_t might_destroy = B_FALSE;
991
992         mutex_enter(&ds->ds_lock);
993         if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
994             DS_IS_DEFER_DESTROY(ds))
995                 might_destroy = B_TRUE;
996         mutex_exit(&ds->ds_lock);
997
998         return (might_destroy);
999 }
1000
1001 /*
1002  * If we're removing a clone, and these three conditions are true:
1003  *      1) the clone's origin has no other children
1004  *      2) the clone's origin has no user references
1005  *      3) the clone's origin has been marked for deferred destruction
1006  * Then, prepare to remove the origin as part of this sync task group.
1007  */
1008 static int
1009 dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
1010 {
1011         dsl_dataset_t *ds = dsda->ds;
1012         dsl_dataset_t *origin = ds->ds_prev;
1013
1014         if (dsl_dataset_might_destroy_origin(origin)) {
1015                 char *name;
1016                 int namelen;
1017                 int error;
1018
1019                 namelen = dsl_dataset_namelen(origin) + 1;
1020                 name = kmem_alloc(namelen, KM_SLEEP);
1021                 dsl_dataset_name(origin, name);
1022 #ifdef _KERNEL
1023                 error = zfs_unmount_snap(name, NULL);
1024                 if (error) {
1025                         kmem_free(name, namelen);
1026                         return (error);
1027                 }
1028 #endif
1029                 error = dsl_dataset_own(name, B_TRUE, tag, &origin);
1030                 kmem_free(name, namelen);
1031                 if (error)
1032                         return (error);
1033                 dsda->rm_origin = origin;
1034                 dsl_dataset_make_exclusive(origin, tag);
1035         }
1036
1037         return (0);
1038 }
1039
1040 /*
1041  * ds must be opened as OWNER.  On return (whether successful or not),
1042  * ds will be closed and caller can no longer dereference it.
1043  */
1044 int
1045 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1046 {
1047         int err;
1048         dsl_sync_task_group_t *dstg;
1049         objset_t *os;
1050         dsl_dir_t *dd;
1051         uint64_t obj;
1052         struct dsl_ds_destroyarg dsda = { 0 };
1053         dsl_dataset_t *dummy_ds;
1054
1055         dsda.ds = ds;
1056
1057         if (dsl_dataset_is_snapshot(ds)) {
1058                 /* Destroying a snapshot is simpler */
1059                 dsl_dataset_make_exclusive(ds, tag);
1060
1061                 dsda.defer = defer;
1062                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1063                     dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1064                     &dsda, tag, 0);
1065                 ASSERT3P(dsda.rm_origin, ==, NULL);
1066                 goto out;
1067         } else if (defer) {
1068                 err = EINVAL;
1069                 goto out;
1070         }
1071
1072         dd = ds->ds_dir;
1073         dummy_ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
1074         dummy_ds->ds_dir = dd;
1075         dummy_ds->ds_object = ds->ds_object;
1076
1077         /*
1078          * Check for errors and mark this ds as inconsistent, in
1079          * case we crash while freeing the objects.
1080          */
1081         err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
1082             dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1083         if (err)
1084                 goto out_free;
1085
1086         err = dmu_objset_from_ds(ds, &os);
1087         if (err)
1088                 goto out_free;
1089
1090         /*
1091          * If async destruction is not enabled try to remove all objects
1092          * while in the open context so that there is less work to do in
1093          * the syncing context.
1094          */
1095         if (!spa_feature_is_enabled(dsl_dataset_get_spa(ds),
1096             &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY])) {
1097                 for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1098                     ds->ds_phys->ds_prev_snap_txg)) {
1099                         /*
1100                          * Ignore errors, if there is not enough disk space
1101                          * we will deal with it in dsl_dataset_destroy_sync().
1102                          */
1103                         (void) dmu_free_object(os, obj);
1104                 }
1105                 if (err != ESRCH)
1106                         goto out_free;
1107         }
1108
1109         /*
1110          * Only the ZIL knows how to free log blocks.
1111          */
1112         zil_destroy(dmu_objset_zil(os), B_FALSE);
1113
1114         /*
1115          * Sync out all in-flight IO.
1116          */
1117         txg_wait_synced(dd->dd_pool, 0);
1118
1119         /*
1120          * If we managed to free all the objects in open
1121          * context, the user space accounting should be zero.
1122          */
1123         if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1124             dmu_objset_userused_enabled(os)) {
1125                 ASSERTV(uint64_t count);
1126                 ASSERT(zap_count(os, DMU_USERUSED_OBJECT, &count) != 0 ||
1127                     count == 0);
1128                 ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT, &count) != 0 ||
1129                     count == 0);
1130         }
1131
1132         rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1133         err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1134         rw_exit(&dd->dd_pool->dp_config_rwlock);
1135
1136         if (err)
1137                 goto out_free;
1138
1139         /*
1140          * Blow away the dsl_dir + head dataset.
1141          */
1142         dsl_dataset_make_exclusive(ds, tag);
1143         /*
1144          * If we're removing a clone, we might also need to remove its
1145          * origin.
1146          */
1147         do {
1148                 dsda.need_prep = B_FALSE;
1149                 if (dsl_dir_is_clone(dd)) {
1150                         err = dsl_dataset_origin_rm_prep(&dsda, tag);
1151                         if (err) {
1152                                 dsl_dir_close(dd, FTAG);
1153                                 goto out_free;
1154                         }
1155                 }
1156
1157                 dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1158                 dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1159                     dsl_dataset_destroy_sync, &dsda, tag, 0);
1160                 dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1161                     dsl_dir_destroy_sync, dummy_ds, FTAG, 0);
1162                 err = dsl_sync_task_group_wait(dstg);
1163                 dsl_sync_task_group_destroy(dstg);
1164
1165                 /*
1166                  * We could be racing against 'zfs release' or 'zfs destroy -d'
1167                  * on the origin snap, in which case we can get EBUSY if we
1168                  * needed to destroy the origin snap but were not ready to
1169                  * do so.
1170                  */
1171                 if (dsda.need_prep) {
1172                         ASSERT(err == EBUSY);
1173                         ASSERT(dsl_dir_is_clone(dd));
1174                         ASSERT(dsda.rm_origin == NULL);
1175                 }
1176         } while (dsda.need_prep);
1177
1178         if (dsda.rm_origin != NULL)
1179                 dsl_dataset_disown(dsda.rm_origin, tag);
1180
1181         /* if it is successful, dsl_dir_destroy_sync will close the dd */
1182         if (err)
1183                 dsl_dir_close(dd, FTAG);
1184
1185 out_free:
1186         kmem_free(dummy_ds, sizeof (dsl_dataset_t));
1187 out:
1188         dsl_dataset_disown(ds, tag);
1189         return (err);
1190 }
1191
1192 blkptr_t *
1193 dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1194 {
1195         return (&ds->ds_phys->ds_bp);
1196 }
1197
1198 void
1199 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1200 {
1201         ASSERT(dmu_tx_is_syncing(tx));
1202         /* If it's the meta-objset, set dp_meta_rootbp */
1203         if (ds == NULL) {
1204                 tx->tx_pool->dp_meta_rootbp = *bp;
1205         } else {
1206                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1207                 ds->ds_phys->ds_bp = *bp;
1208         }
1209 }
1210
1211 spa_t *
1212 dsl_dataset_get_spa(dsl_dataset_t *ds)
1213 {
1214         return (ds->ds_dir->dd_pool->dp_spa);
1215 }
1216
1217 void
1218 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1219 {
1220         dsl_pool_t *dp;
1221
1222         if (ds == NULL) /* this is the meta-objset */
1223                 return;
1224
1225         ASSERT(ds->ds_objset != NULL);
1226
1227         if (ds->ds_phys->ds_next_snap_obj != 0)
1228                 panic("dirtying snapshot!");
1229
1230         dp = ds->ds_dir->dd_pool;
1231
1232         if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1233                 /* up the hold count until we can be written out */
1234                 dmu_buf_add_ref(ds->ds_dbuf, ds);
1235         }
1236 }
1237
1238 boolean_t
1239 dsl_dataset_is_dirty(dsl_dataset_t *ds)
1240 {
1241         int t;
1242
1243         for (t = 0; t < TXG_SIZE; t++) {
1244                 if (txg_list_member(&ds->ds_dir->dd_pool->dp_dirty_datasets,
1245                     ds, t))
1246                         return (B_TRUE);
1247         }
1248         return (B_FALSE);
1249 }
1250
1251 /*
1252  * The unique space in the head dataset can be calculated by subtracting
1253  * the space used in the most recent snapshot, that is still being used
1254  * in this file system, from the space currently in use.  To figure out
1255  * the space in the most recent snapshot still in use, we need to take
1256  * the total space used in the snapshot and subtract out the space that
1257  * has been freed up since the snapshot was taken.
1258  */
1259 static void
1260 dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1261 {
1262         uint64_t mrs_used;
1263         uint64_t dlused, dlcomp, dluncomp;
1264
1265         ASSERT(!dsl_dataset_is_snapshot(ds));
1266
1267         if (ds->ds_phys->ds_prev_snap_obj != 0)
1268                 mrs_used = ds->ds_prev->ds_phys->ds_referenced_bytes;
1269         else
1270                 mrs_used = 0;
1271
1272         dsl_deadlist_space(&ds->ds_deadlist, &dlused, &dlcomp, &dluncomp);
1273
1274         ASSERT3U(dlused, <=, mrs_used);
1275         ds->ds_phys->ds_unique_bytes =
1276             ds->ds_phys->ds_referenced_bytes - (mrs_used - dlused);
1277
1278         if (spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1279             SPA_VERSION_UNIQUE_ACCURATE)
1280                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1281 }
1282
1283 struct killarg {
1284         dsl_dataset_t *ds;
1285         dmu_tx_t *tx;
1286 };
1287
1288 /* ARGSUSED */
1289 static int
1290 kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, arc_buf_t *pbuf,
1291     const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1292 {
1293         struct killarg *ka = arg;
1294         dmu_tx_t *tx = ka->tx;
1295
1296         if (bp == NULL)
1297                 return (0);
1298
1299         if (zb->zb_level == ZB_ZIL_LEVEL) {
1300                 ASSERT(zilog != NULL);
1301                 /*
1302                  * It's a block in the intent log.  It has no
1303                  * accounting, so just free it.
1304                  */
1305                 dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1306         } else {
1307                 ASSERT(zilog == NULL);
1308                 ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1309                 (void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1310         }
1311
1312         return (0);
1313 }
1314
1315 /* ARGSUSED */
1316 static int
1317 dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1318 {
1319         dsl_dataset_t *ds = arg1;
1320         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1321         uint64_t count;
1322         int err;
1323
1324         /*
1325          * Can't delete a head dataset if there are snapshots of it.
1326          * (Except if the only snapshots are from the branch we cloned
1327          * from.)
1328          */
1329         if (ds->ds_prev != NULL &&
1330             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1331                 return (EBUSY);
1332
1333         /*
1334          * This is really a dsl_dir thing, but check it here so that
1335          * we'll be less likely to leave this dataset inconsistent &
1336          * nearly destroyed.
1337          */
1338         err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1339         if (err)
1340                 return (err);
1341         if (count != 0)
1342                 return (EEXIST);
1343
1344         return (0);
1345 }
1346
1347 /* ARGSUSED */
1348 static void
1349 dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1350 {
1351         dsl_dataset_t *ds = arg1;
1352         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1353
1354         /* Mark it as inconsistent on-disk, in case we crash */
1355         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1356         ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1357
1358         spa_history_log_internal(LOG_DS_DESTROY_BEGIN, dp->dp_spa, tx,
1359             "dataset = %llu", ds->ds_object);
1360 }
1361
1362 static int
1363 dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1364     dmu_tx_t *tx)
1365 {
1366         dsl_dataset_t *ds = dsda->ds;
1367         dsl_dataset_t *ds_prev = ds->ds_prev;
1368
1369         if (dsl_dataset_might_destroy_origin(ds_prev)) {
1370                 struct dsl_ds_destroyarg ndsda = {0};
1371
1372                 /*
1373                  * If we're not prepared to remove the origin, don't remove
1374                  * the clone either.
1375                  */
1376                 if (dsda->rm_origin == NULL) {
1377                         dsda->need_prep = B_TRUE;
1378                         return (EBUSY);
1379                 }
1380
1381                 ndsda.ds = ds_prev;
1382                 ndsda.is_origin_rm = B_TRUE;
1383                 return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1384         }
1385
1386         /*
1387          * If we're not going to remove the origin after all,
1388          * undo the open context setup.
1389          */
1390         if (dsda->rm_origin != NULL) {
1391                 dsl_dataset_disown(dsda->rm_origin, tag);
1392                 dsda->rm_origin = NULL;
1393         }
1394
1395         return (0);
1396 }
1397
1398 /*
1399  * If you add new checks here, you may need to add
1400  * additional checks to the "temporary" case in
1401  * snapshot_check() in dmu_objset.c.
1402  */
1403 /* ARGSUSED */
1404 int
1405 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1406 {
1407         struct dsl_ds_destroyarg *dsda = arg1;
1408         dsl_dataset_t *ds = dsda->ds;
1409
1410         /* we have an owner hold, so noone else can destroy us */
1411         ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1412
1413         /*
1414          * Only allow deferred destroy on pools that support it.
1415          * NOTE: deferred destroy is only supported on snapshots.
1416          */
1417         if (dsda->defer) {
1418                 if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1419                     SPA_VERSION_USERREFS)
1420                         return (ENOTSUP);
1421                 ASSERT(dsl_dataset_is_snapshot(ds));
1422                 return (0);
1423         }
1424
1425         /*
1426          * Can't delete a head dataset if there are snapshots of it.
1427          * (Except if the only snapshots are from the branch we cloned
1428          * from.)
1429          */
1430         if (ds->ds_prev != NULL &&
1431             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1432                 return (EBUSY);
1433
1434         /*
1435          * If we made changes this txg, traverse_dsl_dataset won't find
1436          * them.  Try again.
1437          */
1438         if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1439                 return (EAGAIN);
1440
1441         if (dsl_dataset_is_snapshot(ds)) {
1442                 /*
1443                  * If this snapshot has an elevated user reference count,
1444                  * we can't destroy it yet.
1445                  */
1446                 if (ds->ds_userrefs > 0 && !dsda->releasing)
1447                         return (EBUSY);
1448
1449                 mutex_enter(&ds->ds_lock);
1450                 /*
1451                  * Can't delete a branch point. However, if we're destroying
1452                  * a clone and removing its origin due to it having a user
1453                  * hold count of 0 and having been marked for deferred destroy,
1454                  * it's OK for the origin to have a single clone.
1455                  */
1456                 if (ds->ds_phys->ds_num_children >
1457                     (dsda->is_origin_rm ? 2 : 1)) {
1458                         mutex_exit(&ds->ds_lock);
1459                         return (EEXIST);
1460                 }
1461                 mutex_exit(&ds->ds_lock);
1462         } else if (dsl_dir_is_clone(ds->ds_dir)) {
1463                 return (dsl_dataset_origin_check(dsda, arg2, tx));
1464         }
1465
1466         /* XXX we should do some i/o error checking... */
1467         return (0);
1468 }
1469
1470 struct refsarg {
1471         kmutex_t lock;
1472         boolean_t gone;
1473         kcondvar_t cv;
1474 };
1475
1476 /* ARGSUSED */
1477 static void
1478 dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1479 {
1480         struct refsarg *arg = argv;
1481
1482         mutex_enter(&arg->lock);
1483         arg->gone = TRUE;
1484         cv_signal(&arg->cv);
1485         mutex_exit(&arg->lock);
1486 }
1487
1488 static void
1489 dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1490 {
1491         struct refsarg arg;
1492
1493         mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1494         cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1495         arg.gone = FALSE;
1496         (void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1497             dsl_dataset_refs_gone);
1498         dmu_buf_rele(ds->ds_dbuf, tag);
1499         mutex_enter(&arg.lock);
1500         while (!arg.gone)
1501                 cv_wait(&arg.cv, &arg.lock);
1502         ASSERT(arg.gone);
1503         mutex_exit(&arg.lock);
1504         ds->ds_dbuf = NULL;
1505         ds->ds_phys = NULL;
1506         mutex_destroy(&arg.lock);
1507         cv_destroy(&arg.cv);
1508 }
1509
1510 static void
1511 remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1512 {
1513         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1514         int err;
1515         ASSERTV(uint64_t count);
1516
1517         ASSERT(ds->ds_phys->ds_num_children >= 2);
1518         err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1519         /*
1520          * The err should not be ENOENT, but a bug in a previous version
1521          * of the code could cause upgrade_clones_cb() to not set
1522          * ds_next_snap_obj when it should, leading to a missing entry.
1523          * If we knew that the pool was created after
1524          * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1525          * ENOENT.  However, at least we can check that we don't have
1526          * too many entries in the next_clones_obj even after failing to
1527          * remove this one.
1528          */
1529         if (err != ENOENT) {
1530                 VERIFY3U(err, ==, 0);
1531         }
1532         ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1533             &count));
1534         ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1535 }
1536
1537 static void
1538 dsl_dataset_remove_clones_key(dsl_dataset_t *ds, uint64_t mintxg, dmu_tx_t *tx)
1539 {
1540         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1541         zap_cursor_t zc;
1542         zap_attribute_t za;
1543
1544         /*
1545          * If it is the old version, dd_clones doesn't exist so we can't
1546          * find the clones, but deadlist_remove_key() is a no-op so it
1547          * doesn't matter.
1548          */
1549         if (ds->ds_dir->dd_phys->dd_clones == 0)
1550                 return;
1551
1552         for (zap_cursor_init(&zc, mos, ds->ds_dir->dd_phys->dd_clones);
1553             zap_cursor_retrieve(&zc, &za) == 0;
1554             zap_cursor_advance(&zc)) {
1555                 dsl_dataset_t *clone;
1556
1557                 VERIFY3U(0, ==, dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
1558                     za.za_first_integer, FTAG, &clone));
1559                 if (clone->ds_dir->dd_origin_txg > mintxg) {
1560                         dsl_deadlist_remove_key(&clone->ds_deadlist,
1561                             mintxg, tx);
1562                         dsl_dataset_remove_clones_key(clone, mintxg, tx);
1563                 }
1564                 dsl_dataset_rele(clone, FTAG);
1565         }
1566         zap_cursor_fini(&zc);
1567 }
1568
1569 struct process_old_arg {
1570         dsl_dataset_t *ds;
1571         dsl_dataset_t *ds_prev;
1572         boolean_t after_branch_point;
1573         zio_t *pio;
1574         uint64_t used, comp, uncomp;
1575 };
1576
1577 static int
1578 process_old_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
1579 {
1580         struct process_old_arg *poa = arg;
1581         dsl_pool_t *dp = poa->ds->ds_dir->dd_pool;
1582
1583         if (bp->blk_birth <= poa->ds->ds_phys->ds_prev_snap_txg) {
1584                 dsl_deadlist_insert(&poa->ds->ds_deadlist, bp, tx);
1585                 if (poa->ds_prev && !poa->after_branch_point &&
1586                     bp->blk_birth >
1587                     poa->ds_prev->ds_phys->ds_prev_snap_txg) {
1588                         poa->ds_prev->ds_phys->ds_unique_bytes +=
1589                             bp_get_dsize_sync(dp->dp_spa, bp);
1590                 }
1591         } else {
1592                 poa->used += bp_get_dsize_sync(dp->dp_spa, bp);
1593                 poa->comp += BP_GET_PSIZE(bp);
1594                 poa->uncomp += BP_GET_UCSIZE(bp);
1595                 dsl_free_sync(poa->pio, dp, tx->tx_txg, bp);
1596         }
1597         return (0);
1598 }
1599
1600 static void
1601 process_old_deadlist(dsl_dataset_t *ds, dsl_dataset_t *ds_prev,
1602     dsl_dataset_t *ds_next, boolean_t after_branch_point, dmu_tx_t *tx)
1603 {
1604         struct process_old_arg poa = { 0 };
1605         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1606         objset_t *mos = dp->dp_meta_objset;
1607
1608         ASSERT(ds->ds_deadlist.dl_oldfmt);
1609         ASSERT(ds_next->ds_deadlist.dl_oldfmt);
1610
1611         poa.ds = ds;
1612         poa.ds_prev = ds_prev;
1613         poa.after_branch_point = after_branch_point;
1614         poa.pio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1615         VERIFY3U(0, ==, bpobj_iterate(&ds_next->ds_deadlist.dl_bpobj,
1616             process_old_cb, &poa, tx));
1617         VERIFY3U(zio_wait(poa.pio), ==, 0);
1618         ASSERT3U(poa.used, ==, ds->ds_phys->ds_unique_bytes);
1619
1620         /* change snapused */
1621         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1622             -poa.used, -poa.comp, -poa.uncomp, tx);
1623
1624         /* swap next's deadlist to our deadlist */
1625         dsl_deadlist_close(&ds->ds_deadlist);
1626         dsl_deadlist_close(&ds_next->ds_deadlist);
1627         SWITCH64(ds_next->ds_phys->ds_deadlist_obj,
1628             ds->ds_phys->ds_deadlist_obj);
1629         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
1630         dsl_deadlist_open(&ds_next->ds_deadlist, mos,
1631             ds_next->ds_phys->ds_deadlist_obj);
1632 }
1633
1634 static int
1635 old_synchronous_dataset_destroy(dsl_dataset_t *ds, dmu_tx_t *tx)
1636 {
1637         int err;
1638         struct killarg ka;
1639
1640         /*
1641          * Free everything that we point to (that's born after
1642          * the previous snapshot, if we are a clone)
1643          *
1644          * NB: this should be very quick, because we already
1645          * freed all the objects in open context.
1646          */
1647         ka.ds = ds;
1648         ka.tx = tx;
1649         err = traverse_dataset(ds,
1650             ds->ds_phys->ds_prev_snap_txg, TRAVERSE_POST,
1651             kill_blkptr, &ka);
1652         ASSERT3U(err, ==, 0);
1653         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) || ds->ds_phys->ds_unique_bytes == 0);
1654
1655         return (err);
1656 }
1657
1658 void
1659 dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
1660 {
1661         struct dsl_ds_destroyarg *dsda = arg1;
1662         dsl_dataset_t *ds = dsda->ds;
1663         int err = 0;
1664         int after_branch_point = FALSE;
1665         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1666         objset_t *mos = dp->dp_meta_objset;
1667         dsl_dataset_t *ds_prev = NULL;
1668         boolean_t wont_destroy;
1669         uint64_t obj;
1670
1671         wont_destroy = (dsda->defer &&
1672             (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1));
1673
1674         ASSERT(ds->ds_owner || wont_destroy);
1675         ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1676         ASSERT(ds->ds_prev == NULL ||
1677             ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1678         ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1679
1680         if (wont_destroy) {
1681                 ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1682                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1683                 ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1684                 return;
1685         }
1686
1687         /* signal any waiters that this dataset is going away */
1688         mutex_enter(&ds->ds_lock);
1689         ds->ds_owner = dsl_reaper;
1690         cv_broadcast(&ds->ds_exclusive_cv);
1691         mutex_exit(&ds->ds_lock);
1692
1693         /* Remove our reservation */
1694         if (ds->ds_reserved != 0) {
1695                 dsl_prop_setarg_t psa;
1696                 uint64_t value = 0;
1697
1698                 dsl_prop_setarg_init_uint64(&psa, "refreservation",
1699                     (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
1700                     &value);
1701                 psa.psa_effective_value = 0;    /* predict default value */
1702
1703                 dsl_dataset_set_reservation_sync(ds, &psa, tx);
1704                 ASSERT3U(ds->ds_reserved, ==, 0);
1705         }
1706
1707         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1708
1709         dsl_scan_ds_destroyed(ds, tx);
1710
1711         obj = ds->ds_object;
1712
1713         if (ds->ds_phys->ds_prev_snap_obj != 0) {
1714                 if (ds->ds_prev) {
1715                         ds_prev = ds->ds_prev;
1716                 } else {
1717                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1718                             ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1719                 }
1720                 after_branch_point =
1721                     (ds_prev->ds_phys->ds_next_snap_obj != obj);
1722
1723                 dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1724                 if (after_branch_point &&
1725                     ds_prev->ds_phys->ds_next_clones_obj != 0) {
1726                         remove_from_next_clones(ds_prev, obj, tx);
1727                         if (ds->ds_phys->ds_next_snap_obj != 0) {
1728                                 VERIFY(0 == zap_add_int(mos,
1729                                     ds_prev->ds_phys->ds_next_clones_obj,
1730                                     ds->ds_phys->ds_next_snap_obj, tx));
1731                         }
1732                 }
1733                 if (after_branch_point &&
1734                     ds->ds_phys->ds_next_snap_obj == 0) {
1735                         /* This clone is toast. */
1736                         ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1737                         ds_prev->ds_phys->ds_num_children--;
1738
1739                         /*
1740                          * If the clone's origin has no other clones, no
1741                          * user holds, and has been marked for deferred
1742                          * deletion, then we should have done the necessary
1743                          * destroy setup for it.
1744                          */
1745                         if (ds_prev->ds_phys->ds_num_children == 1 &&
1746                             ds_prev->ds_userrefs == 0 &&
1747                             DS_IS_DEFER_DESTROY(ds_prev)) {
1748                                 ASSERT3P(dsda->rm_origin, !=, NULL);
1749                         } else {
1750                                 ASSERT3P(dsda->rm_origin, ==, NULL);
1751                         }
1752                 } else if (!after_branch_point) {
1753                         ds_prev->ds_phys->ds_next_snap_obj =
1754                             ds->ds_phys->ds_next_snap_obj;
1755                 }
1756         }
1757
1758         if (dsl_dataset_is_snapshot(ds)) {
1759                 dsl_dataset_t *ds_next;
1760                 uint64_t old_unique;
1761                 uint64_t used = 0, comp = 0, uncomp = 0;
1762
1763                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1764                     ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1765                 ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1766
1767                 old_unique = ds_next->ds_phys->ds_unique_bytes;
1768
1769                 dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1770                 ds_next->ds_phys->ds_prev_snap_obj =
1771                     ds->ds_phys->ds_prev_snap_obj;
1772                 ds_next->ds_phys->ds_prev_snap_txg =
1773                     ds->ds_phys->ds_prev_snap_txg;
1774                 ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1775                     ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1776
1777
1778                 if (ds_next->ds_deadlist.dl_oldfmt) {
1779                         process_old_deadlist(ds, ds_prev, ds_next,
1780                             after_branch_point, tx);
1781                 } else {
1782                         /* Adjust prev's unique space. */
1783                         if (ds_prev && !after_branch_point) {
1784                                 dsl_deadlist_space_range(&ds_next->ds_deadlist,
1785                                     ds_prev->ds_phys->ds_prev_snap_txg,
1786                                     ds->ds_phys->ds_prev_snap_txg,
1787                                     &used, &comp, &uncomp);
1788                                 ds_prev->ds_phys->ds_unique_bytes += used;
1789                         }
1790
1791                         /* Adjust snapused. */
1792                         dsl_deadlist_space_range(&ds_next->ds_deadlist,
1793                             ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
1794                             &used, &comp, &uncomp);
1795                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1796                             -used, -comp, -uncomp, tx);
1797
1798                         /* Move blocks to be freed to pool's free list. */
1799                         dsl_deadlist_move_bpobj(&ds_next->ds_deadlist,
1800                             &dp->dp_free_bpobj, ds->ds_phys->ds_prev_snap_txg,
1801                             tx);
1802                         dsl_dir_diduse_space(tx->tx_pool->dp_free_dir,
1803                             DD_USED_HEAD, used, comp, uncomp, tx);
1804
1805                         /* Merge our deadlist into next's and free it. */
1806                         dsl_deadlist_merge(&ds_next->ds_deadlist,
1807                             ds->ds_phys->ds_deadlist_obj, tx);
1808                 }
1809                 dsl_deadlist_close(&ds->ds_deadlist);
1810                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1811
1812                 /* Collapse range in clone heads */
1813                 dsl_dataset_remove_clones_key(ds,
1814                     ds->ds_phys->ds_creation_txg, tx);
1815
1816                 if (dsl_dataset_is_snapshot(ds_next)) {
1817                         dsl_dataset_t *ds_nextnext;
1818                         dsl_dataset_t *hds;
1819
1820                         /*
1821                          * Update next's unique to include blocks which
1822                          * were previously shared by only this snapshot
1823                          * and it.  Those blocks will be born after the
1824                          * prev snap and before this snap, and will have
1825                          * died after the next snap and before the one
1826                          * after that (ie. be on the snap after next's
1827                          * deadlist).
1828                          */
1829                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1830                             ds_next->ds_phys->ds_next_snap_obj,
1831                             FTAG, &ds_nextnext));
1832                         dsl_deadlist_space_range(&ds_nextnext->ds_deadlist,
1833                             ds->ds_phys->ds_prev_snap_txg,
1834                             ds->ds_phys->ds_creation_txg,
1835                             &used, &comp, &uncomp);
1836                         ds_next->ds_phys->ds_unique_bytes += used;
1837                         dsl_dataset_rele(ds_nextnext, FTAG);
1838                         ASSERT3P(ds_next->ds_prev, ==, NULL);
1839
1840                         /* Collapse range in this head. */
1841                         VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
1842                             ds->ds_dir->dd_phys->dd_head_dataset_obj,
1843                             FTAG, &hds));
1844                         dsl_deadlist_remove_key(&hds->ds_deadlist,
1845                             ds->ds_phys->ds_creation_txg, tx);
1846                         dsl_dataset_rele(hds, FTAG);
1847
1848                 } else {
1849                         ASSERT3P(ds_next->ds_prev, ==, ds);
1850                         dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1851                         ds_next->ds_prev = NULL;
1852                         if (ds_prev) {
1853                                 VERIFY(0 == dsl_dataset_get_ref(dp,
1854                                     ds->ds_phys->ds_prev_snap_obj,
1855                                     ds_next, &ds_next->ds_prev));
1856                         }
1857
1858                         dsl_dataset_recalc_head_uniq(ds_next);
1859
1860                         /*
1861                          * Reduce the amount of our unconsmed refreservation
1862                          * being charged to our parent by the amount of
1863                          * new unique data we have gained.
1864                          */
1865                         if (old_unique < ds_next->ds_reserved) {
1866                                 int64_t mrsdelta;
1867                                 uint64_t new_unique =
1868                                     ds_next->ds_phys->ds_unique_bytes;
1869
1870                                 ASSERT(old_unique <= new_unique);
1871                                 mrsdelta = MIN(new_unique - old_unique,
1872                                     ds_next->ds_reserved - old_unique);
1873                                 dsl_dir_diduse_space(ds->ds_dir,
1874                                     DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1875                         }
1876                 }
1877                 dsl_dataset_rele(ds_next, FTAG);
1878         } else {
1879                 zfeature_info_t *async_destroy =
1880                     &spa_feature_table[SPA_FEATURE_ASYNC_DESTROY];
1881
1882                 /*
1883                  * There's no next snapshot, so this is a head dataset.
1884                  * Destroy the deadlist.  Unless it's a clone, the
1885                  * deadlist should be empty.  (If it's a clone, it's
1886                  * safe to ignore the deadlist contents.)
1887                  */
1888                 dsl_deadlist_close(&ds->ds_deadlist);
1889                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1890                 ds->ds_phys->ds_deadlist_obj = 0;
1891
1892                 if (!spa_feature_is_enabled(dp->dp_spa, async_destroy)) {
1893                         err = old_synchronous_dataset_destroy(ds, tx);
1894                 } else {
1895                         /*
1896                          * Move the bptree into the pool's list of trees to
1897                          * clean up and update space accounting information.
1898                          */
1899                         uint64_t used, comp, uncomp;
1900
1901                         ASSERT(err == 0 || err == EBUSY);
1902                         if (!spa_feature_is_active(dp->dp_spa, async_destroy)) {
1903                                 spa_feature_incr(dp->dp_spa, async_destroy, tx);
1904                                 dp->dp_bptree_obj = bptree_alloc(
1905                                     dp->dp_meta_objset, tx);
1906                                 VERIFY(zap_add(dp->dp_meta_objset,
1907                                     DMU_POOL_DIRECTORY_OBJECT,
1908                                     DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
1909                                     &dp->dp_bptree_obj, tx) == 0);
1910                         }
1911
1912                         used = ds->ds_dir->dd_phys->dd_used_bytes;
1913                         comp = ds->ds_dir->dd_phys->dd_compressed_bytes;
1914                         uncomp = ds->ds_dir->dd_phys->dd_uncompressed_bytes;
1915
1916                         ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1917                             ds->ds_phys->ds_unique_bytes == used);
1918
1919                         bptree_add(dp->dp_meta_objset, dp->dp_bptree_obj,
1920                             &ds->ds_phys->ds_bp, ds->ds_phys->ds_prev_snap_txg,
1921                             used, comp, uncomp, tx);
1922                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
1923                             -used, -comp, -uncomp, tx);
1924                         dsl_dir_diduse_space(dp->dp_free_dir, DD_USED_HEAD,
1925                             used, comp, uncomp, tx);
1926                 }
1927
1928                 if (ds->ds_prev != NULL) {
1929                         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
1930                                 VERIFY3U(0, ==, zap_remove_int(mos,
1931                                     ds->ds_prev->ds_dir->dd_phys->dd_clones,
1932                                     ds->ds_object, tx));
1933                         }
1934                         dsl_dataset_rele(ds->ds_prev, ds);
1935                         ds->ds_prev = ds_prev = NULL;
1936                 }
1937         }
1938
1939         /*
1940          * This must be done after the dsl_traverse(), because it will
1941          * re-open the objset.
1942          */
1943         if (ds->ds_objset) {
1944                 dmu_objset_evict(ds->ds_objset);
1945                 ds->ds_objset = NULL;
1946         }
1947
1948         if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1949                 /* Erase the link in the dir */
1950                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1951                 ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1952                 ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1953                 err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1954                 ASSERT(err == 0);
1955         } else {
1956                 /* remove from snapshot namespace */
1957                 dsl_dataset_t *ds_head;
1958                 ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1959                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1960                     ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1961                 VERIFY(0 == dsl_dataset_get_snapname(ds));
1962 #ifdef ZFS_DEBUG
1963                 {
1964                         uint64_t val;
1965
1966                         err = dsl_dataset_snap_lookup(ds_head,
1967                             ds->ds_snapname, &val);
1968                         ASSERT3U(err, ==, 0);
1969                         ASSERT3U(val, ==, obj);
1970                 }
1971 #endif
1972                 err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
1973                 ASSERT(err == 0);
1974                 dsl_dataset_rele(ds_head, FTAG);
1975         }
1976
1977         if (ds_prev && ds->ds_prev != ds_prev)
1978                 dsl_dataset_rele(ds_prev, FTAG);
1979
1980         spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
1981         spa_history_log_internal(LOG_DS_DESTROY, dp->dp_spa, tx,
1982             "dataset = %llu", ds->ds_object);
1983
1984         if (ds->ds_phys->ds_next_clones_obj != 0) {
1985                 ASSERTV(uint64_t count);
1986                 ASSERT(0 == zap_count(mos,
1987                     ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
1988                 VERIFY(0 == dmu_object_free(mos,
1989                     ds->ds_phys->ds_next_clones_obj, tx));
1990         }
1991         if (ds->ds_phys->ds_props_obj != 0)
1992                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
1993         if (ds->ds_phys->ds_userrefs_obj != 0)
1994                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
1995         dsl_dir_close(ds->ds_dir, ds);
1996         ds->ds_dir = NULL;
1997         dsl_dataset_drain_refs(ds, tag);
1998         VERIFY(0 == dmu_object_free(mos, obj, tx));
1999
2000         if (dsda->rm_origin) {
2001                 /*
2002                  * Remove the origin of the clone we just destroyed.
2003                  */
2004                 struct dsl_ds_destroyarg ndsda = {0};
2005
2006                 ndsda.ds = dsda->rm_origin;
2007                 dsl_dataset_destroy_sync(&ndsda, tag, tx);
2008         }
2009 }
2010
2011 static int
2012 dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
2013 {
2014         uint64_t asize;
2015
2016         if (!dmu_tx_is_syncing(tx))
2017                 return (0);
2018
2019         /*
2020          * If there's an fs-only reservation, any blocks that might become
2021          * owned by the snapshot dataset must be accommodated by space
2022          * outside of the reservation.
2023          */
2024         ASSERT(ds->ds_reserved == 0 || DS_UNIQUE_IS_ACCURATE(ds));
2025         asize = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2026         if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
2027                 return (ENOSPC);
2028
2029         /*
2030          * Propogate any reserved space for this snapshot to other
2031          * snapshot checks in this sync group.
2032          */
2033         if (asize > 0)
2034                 dsl_dir_willuse_space(ds->ds_dir, asize, tx);
2035
2036         return (0);
2037 }
2038
2039 int
2040 dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
2041 {
2042         dsl_dataset_t *ds = arg1;
2043         const char *snapname = arg2;
2044         int err;
2045         uint64_t value;
2046
2047         /*
2048          * We don't allow multiple snapshots of the same txg.  If there
2049          * is already one, try again.
2050          */
2051         if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
2052                 return (EAGAIN);
2053
2054         /*
2055          * Check for conflicting name snapshot name.
2056          */
2057         err = dsl_dataset_snap_lookup(ds, snapname, &value);
2058         if (err == 0)
2059                 return (EEXIST);
2060         if (err != ENOENT)
2061                 return (err);
2062
2063         /*
2064          * Check that the dataset's name is not too long.  Name consists
2065          * of the dataset's length + 1 for the @-sign + snapshot name's length
2066          */
2067         if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
2068                 return (ENAMETOOLONG);
2069
2070         err = dsl_dataset_snapshot_reserve_space(ds, tx);
2071         if (err)
2072                 return (err);
2073
2074         ds->ds_trysnap_txg = tx->tx_txg;
2075         return (0);
2076 }
2077
2078 void
2079 dsl_dataset_snapshot_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2080 {
2081         dsl_dataset_t *ds = arg1;
2082         const char *snapname = arg2;
2083         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2084         dmu_buf_t *dbuf;
2085         dsl_dataset_phys_t *dsphys;
2086         uint64_t dsobj, crtxg;
2087         objset_t *mos = dp->dp_meta_objset;
2088         int err;
2089
2090         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
2091
2092         /*
2093          * The origin's ds_creation_txg has to be < TXG_INITIAL
2094          */
2095         if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
2096                 crtxg = 1;
2097         else
2098                 crtxg = tx->tx_txg;
2099
2100         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
2101             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
2102         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
2103         dmu_buf_will_dirty(dbuf, tx);
2104         dsphys = dbuf->db_data;
2105         bzero(dsphys, sizeof (dsl_dataset_phys_t));
2106         dsphys->ds_dir_obj = ds->ds_dir->dd_object;
2107         dsphys->ds_fsid_guid = unique_create();
2108         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
2109             sizeof (dsphys->ds_guid));
2110         dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
2111         dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
2112         dsphys->ds_next_snap_obj = ds->ds_object;
2113         dsphys->ds_num_children = 1;
2114         dsphys->ds_creation_time = gethrestime_sec();
2115         dsphys->ds_creation_txg = crtxg;
2116         dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
2117         dsphys->ds_referenced_bytes = ds->ds_phys->ds_referenced_bytes;
2118         dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
2119         dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
2120         dsphys->ds_flags = ds->ds_phys->ds_flags;
2121         dsphys->ds_bp = ds->ds_phys->ds_bp;
2122         dmu_buf_rele(dbuf, FTAG);
2123
2124         ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
2125         if (ds->ds_prev) {
2126                 uint64_t next_clones_obj =
2127                     ds->ds_prev->ds_phys->ds_next_clones_obj;
2128                 ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
2129                     ds->ds_object ||
2130                     ds->ds_prev->ds_phys->ds_num_children > 1);
2131                 if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
2132                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2133                         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
2134                             ds->ds_prev->ds_phys->ds_creation_txg);
2135                         ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
2136                 } else if (next_clones_obj != 0) {
2137                         remove_from_next_clones(ds->ds_prev,
2138                             dsphys->ds_next_snap_obj, tx);
2139                         VERIFY3U(0, ==, zap_add_int(mos,
2140                             next_clones_obj, dsobj, tx));
2141                 }
2142         }
2143
2144         /*
2145          * If we have a reference-reservation on this dataset, we will
2146          * need to increase the amount of refreservation being charged
2147          * since our unique space is going to zero.
2148          */
2149         if (ds->ds_reserved) {
2150                 int64_t delta;
2151                 ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
2152                 delta = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2153                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
2154                     delta, 0, 0, tx);
2155         }
2156
2157         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2158         zfs_dbgmsg("taking snapshot %s@%s/%llu; newkey=%llu",
2159             ds->ds_dir->dd_myname, snapname, dsobj,
2160             ds->ds_phys->ds_prev_snap_txg);
2161         ds->ds_phys->ds_deadlist_obj = dsl_deadlist_clone(&ds->ds_deadlist,
2162             UINT64_MAX, ds->ds_phys->ds_prev_snap_obj, tx);
2163         dsl_deadlist_close(&ds->ds_deadlist);
2164         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
2165         dsl_deadlist_add_key(&ds->ds_deadlist,
2166             ds->ds_phys->ds_prev_snap_txg, tx);
2167
2168         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
2169         ds->ds_phys->ds_prev_snap_obj = dsobj;
2170         ds->ds_phys->ds_prev_snap_txg = crtxg;
2171         ds->ds_phys->ds_unique_bytes = 0;
2172         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
2173                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
2174
2175         err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
2176             snapname, 8, 1, &dsobj, tx);
2177         ASSERT(err == 0);
2178
2179         if (ds->ds_prev)
2180                 dsl_dataset_drop_ref(ds->ds_prev, ds);
2181         VERIFY(0 == dsl_dataset_get_ref(dp,
2182             ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
2183
2184         dsl_scan_ds_snapshotted(ds, tx);
2185
2186         dsl_dir_snap_cmtime_update(ds->ds_dir);
2187
2188         spa_history_log_internal(LOG_DS_SNAPSHOT, dp->dp_spa, tx,
2189             "dataset = %llu", dsobj);
2190 }
2191
2192 void
2193 dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
2194 {
2195         ASSERT(dmu_tx_is_syncing(tx));
2196         ASSERT(ds->ds_objset != NULL);
2197         ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
2198
2199         /*
2200          * in case we had to change ds_fsid_guid when we opened it,
2201          * sync it out now.
2202          */
2203         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2204         ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
2205
2206         dsl_dir_dirty(ds->ds_dir, tx);
2207         dmu_objset_sync(ds->ds_objset, zio, tx);
2208 }
2209
2210 static void
2211 get_clones_stat(dsl_dataset_t *ds, nvlist_t *nv)
2212 {
2213         uint64_t count = 0;
2214         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
2215         zap_cursor_t zc;
2216         zap_attribute_t za;
2217         nvlist_t *propval;
2218         nvlist_t *val;
2219
2220         rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2221         VERIFY(nvlist_alloc(&propval, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2222         VERIFY(nvlist_alloc(&val, NV_UNIQUE_NAME, KM_SLEEP) == 0);
2223
2224         /*
2225          * There may me missing entries in ds_next_clones_obj
2226          * due to a bug in a previous version of the code.
2227          * Only trust it if it has the right number of entries.
2228          */
2229         if (ds->ds_phys->ds_next_clones_obj != 0) {
2230                 ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
2231                     &count));
2232         }
2233         if (count != ds->ds_phys->ds_num_children - 1) {
2234                 goto fail;
2235         }
2236         for (zap_cursor_init(&zc, mos, ds->ds_phys->ds_next_clones_obj);
2237             zap_cursor_retrieve(&zc, &za) == 0;
2238             zap_cursor_advance(&zc)) {
2239                 dsl_dataset_t *clone;
2240                 char buf[ZFS_MAXNAMELEN];
2241                 /*
2242                  * Even though we hold the dp_config_rwlock, the dataset
2243                  * may fail to open, returning ENOENT.  If there is a
2244                  * thread concurrently attempting to destroy this
2245                  * dataset, it will have the ds_rwlock held for
2246                  * RW_WRITER.  Our call to dsl_dataset_hold_obj() ->
2247                  * dsl_dataset_hold_ref() will fail its
2248                  * rw_tryenter(&ds->ds_rwlock, RW_READER), drop the
2249                  * dp_config_rwlock, and wait for the destroy progress
2250                  * and signal ds_exclusive_cv.  If the destroy was
2251                  * successful, we will see that
2252                  * DSL_DATASET_IS_DESTROYED(), and return ENOENT.
2253                  */
2254                 if (dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
2255                     za.za_first_integer, FTAG, &clone) != 0)
2256                         continue;
2257                 dsl_dir_name(clone->ds_dir, buf);
2258                 VERIFY(nvlist_add_boolean(val, buf) == 0);
2259                 dsl_dataset_rele(clone, FTAG);
2260         }
2261         zap_cursor_fini(&zc);
2262         VERIFY(nvlist_add_nvlist(propval, ZPROP_VALUE, val) == 0);
2263         VERIFY(nvlist_add_nvlist(nv, zfs_prop_to_name(ZFS_PROP_CLONES),
2264             propval) == 0);
2265 fail:
2266         nvlist_free(val);
2267         nvlist_free(propval);
2268         rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2269 }
2270
2271 void
2272 dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
2273 {
2274         uint64_t refd, avail, uobjs, aobjs, ratio;
2275
2276         dsl_dir_stats(ds->ds_dir, nv);
2277
2278         dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2279         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2280         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2281
2282         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2283             ds->ds_phys->ds_creation_time);
2284         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2285             ds->ds_phys->ds_creation_txg);
2286         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2287             ds->ds_quota);
2288         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2289             ds->ds_reserved);
2290         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2291             ds->ds_phys->ds_guid);
2292         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2293             ds->ds_phys->ds_unique_bytes);
2294         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2295             ds->ds_object);
2296         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
2297             ds->ds_userrefs);
2298         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2299             DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2300
2301         if (ds->ds_phys->ds_prev_snap_obj != 0) {
2302                 uint64_t written, comp, uncomp;
2303                 dsl_pool_t *dp = ds->ds_dir->dd_pool;
2304                 dsl_dataset_t *prev;
2305                 int err;
2306
2307                 rw_enter(&dp->dp_config_rwlock, RW_READER);
2308                 err = dsl_dataset_hold_obj(dp,
2309                     ds->ds_phys->ds_prev_snap_obj, FTAG, &prev);
2310                 rw_exit(&dp->dp_config_rwlock);
2311                 if (err == 0) {
2312                         err = dsl_dataset_space_written(prev, ds, &written,
2313                             &comp, &uncomp);
2314                         dsl_dataset_rele(prev, FTAG);
2315                         if (err == 0) {
2316                                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_WRITTEN,
2317                                     written);
2318                         }
2319                 }
2320         }
2321
2322         ratio = ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2323             (ds->ds_phys->ds_uncompressed_bytes * 100 /
2324             ds->ds_phys->ds_compressed_bytes);
2325         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRATIO, ratio);
2326
2327         if (ds->ds_phys->ds_next_snap_obj) {
2328                 /*
2329                  * This is a snapshot; override the dd's space used with
2330                  * our unique space and compression ratio.
2331                  */
2332                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2333                     ds->ds_phys->ds_unique_bytes);
2334                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO, ratio);
2335
2336                 get_clones_stat(ds, nv);
2337         }
2338 }
2339
2340 void
2341 dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2342 {
2343         stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2344         stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2345         stat->dds_guid = ds->ds_phys->ds_guid;
2346         if (ds->ds_phys->ds_next_snap_obj) {
2347                 stat->dds_is_snapshot = B_TRUE;
2348                 stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2349         } else {
2350                 stat->dds_is_snapshot = B_FALSE;
2351                 stat->dds_num_clones = 0;
2352         }
2353
2354         /* clone origin is really a dsl_dir thing... */
2355         rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2356         if (dsl_dir_is_clone(ds->ds_dir)) {
2357                 dsl_dataset_t *ods;
2358
2359                 VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2360                     ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2361                 dsl_dataset_name(ods, stat->dds_origin);
2362                 dsl_dataset_drop_ref(ods, FTAG);
2363         } else {
2364                 stat->dds_origin[0] = '\0';
2365         }
2366         rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2367 }
2368
2369 uint64_t
2370 dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2371 {
2372         return (ds->ds_fsid_guid);
2373 }
2374
2375 void
2376 dsl_dataset_space(dsl_dataset_t *ds,
2377     uint64_t *refdbytesp, uint64_t *availbytesp,
2378     uint64_t *usedobjsp, uint64_t *availobjsp)
2379 {
2380         *refdbytesp = ds->ds_phys->ds_referenced_bytes;
2381         *availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2382         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2383                 *availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2384         if (ds->ds_quota != 0) {
2385                 /*
2386                  * Adjust available bytes according to refquota
2387                  */
2388                 if (*refdbytesp < ds->ds_quota)
2389                         *availbytesp = MIN(*availbytesp,
2390                             ds->ds_quota - *refdbytesp);
2391                 else
2392                         *availbytesp = 0;
2393         }
2394         *usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2395         *availobjsp = DN_MAX_OBJECT - *usedobjsp;
2396 }
2397
2398 boolean_t
2399 dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2400 {
2401         ASSERTV(dsl_pool_t *dp = ds->ds_dir->dd_pool);
2402
2403         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2404             dsl_pool_sync_context(dp));
2405         if (ds->ds_prev == NULL)
2406                 return (B_FALSE);
2407         if (ds->ds_phys->ds_bp.blk_birth >
2408             ds->ds_prev->ds_phys->ds_creation_txg) {
2409                 objset_t *os, *os_prev;
2410                 /*
2411                  * It may be that only the ZIL differs, because it was
2412                  * reset in the head.  Don't count that as being
2413                  * modified.
2414                  */
2415                 if (dmu_objset_from_ds(ds, &os) != 0)
2416                         return (B_TRUE);
2417                 if (dmu_objset_from_ds(ds->ds_prev, &os_prev) != 0)
2418                         return (B_TRUE);
2419                 return (bcmp(&os->os_phys->os_meta_dnode,
2420                     &os_prev->os_phys->os_meta_dnode,
2421                     sizeof (os->os_phys->os_meta_dnode)) != 0);
2422         }
2423         return (B_FALSE);
2424 }
2425
2426 /* ARGSUSED */
2427 static int
2428 dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2429 {
2430         dsl_dataset_t *ds = arg1;
2431         char *newsnapname = arg2;
2432         dsl_dir_t *dd = ds->ds_dir;
2433         dsl_dataset_t *hds;
2434         uint64_t val;
2435         int err;
2436
2437         err = dsl_dataset_hold_obj(dd->dd_pool,
2438             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2439         if (err)
2440                 return (err);
2441
2442         /* new name better not be in use */
2443         err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2444         dsl_dataset_rele(hds, FTAG);
2445
2446         if (err == 0)
2447                 err = EEXIST;
2448         else if (err == ENOENT)
2449                 err = 0;
2450
2451         /* dataset name + 1 for the "@" + the new snapshot name must fit */
2452         if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2453                 err = ENAMETOOLONG;
2454
2455         return (err);
2456 }
2457
2458 static void
2459 dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2460 {
2461         dsl_dataset_t *ds = arg1;
2462         const char *newsnapname = arg2;
2463         dsl_dir_t *dd = ds->ds_dir;
2464         objset_t *mos = dd->dd_pool->dp_meta_objset;
2465         dsl_dataset_t *hds;
2466         int err;
2467
2468         ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2469
2470         VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2471             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2472
2473         VERIFY(0 == dsl_dataset_get_snapname(ds));
2474         err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
2475         ASSERT3U(err, ==, 0);
2476         mutex_enter(&ds->ds_lock);
2477         (void) strcpy(ds->ds_snapname, newsnapname);
2478         mutex_exit(&ds->ds_lock);
2479         err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2480             ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2481         ASSERT3U(err, ==, 0);
2482
2483         spa_history_log_internal(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
2484             "dataset = %llu", ds->ds_object);
2485         dsl_dataset_rele(hds, FTAG);
2486 }
2487
2488 struct renamesnaparg {
2489         dsl_sync_task_group_t *dstg;
2490         char failed[MAXPATHLEN];
2491         char *oldsnap;
2492         char *newsnap;
2493 };
2494
2495 static int
2496 dsl_snapshot_rename_one(const char *name, void *arg)
2497 {
2498         struct renamesnaparg *ra = arg;
2499         dsl_dataset_t *ds = NULL;
2500         char *snapname;
2501         int err;
2502
2503         snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
2504         (void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
2505
2506         /*
2507          * For recursive snapshot renames the parent won't be changing
2508          * so we just pass name for both the to/from argument.
2509          */
2510         err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
2511         if (err != 0) {
2512                 strfree(snapname);
2513                 return (err == ENOENT ? 0 : err);
2514         }
2515
2516 #ifdef _KERNEL
2517         /*
2518          * For all filesystems undergoing rename, we'll need to unmount it.
2519          */
2520         (void) zfs_unmount_snap(snapname, NULL);
2521 #endif
2522         err = dsl_dataset_hold(snapname, ra->dstg, &ds);
2523         strfree(snapname);
2524         if (err != 0)
2525                 return (err == ENOENT ? 0 : err);
2526
2527         dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2528             dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2529
2530         return (0);
2531 }
2532
2533 static int
2534 dsl_recursive_rename(char *oldname, const char *newname)
2535 {
2536         int err;
2537         struct renamesnaparg *ra;
2538         dsl_sync_task_t *dst;
2539         spa_t *spa;
2540         char *cp, *fsname = spa_strdup(oldname);
2541         int len = strlen(oldname) + 1;
2542
2543         /* truncate the snapshot name to get the fsname */
2544         cp = strchr(fsname, '@');
2545         *cp = '\0';
2546
2547         err = spa_open(fsname, &spa, FTAG);
2548         if (err) {
2549                 kmem_free(fsname, len);
2550                 return (err);
2551         }
2552         ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2553         ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2554
2555         ra->oldsnap = strchr(oldname, '@') + 1;
2556         ra->newsnap = strchr(newname, '@') + 1;
2557         *ra->failed = '\0';
2558
2559         err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2560             DS_FIND_CHILDREN);
2561         kmem_free(fsname, len);
2562
2563         if (err == 0) {
2564                 err = dsl_sync_task_group_wait(ra->dstg);
2565         }
2566
2567         for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2568             dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2569                 dsl_dataset_t *ds = dst->dst_arg1;
2570                 if (dst->dst_err) {
2571                         dsl_dir_name(ds->ds_dir, ra->failed);
2572                         (void) strlcat(ra->failed, "@", sizeof (ra->failed));
2573                         (void) strlcat(ra->failed, ra->newsnap,
2574                             sizeof (ra->failed));
2575                 }
2576                 dsl_dataset_rele(ds, ra->dstg);
2577         }
2578
2579         if (err)
2580                 (void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
2581
2582         dsl_sync_task_group_destroy(ra->dstg);
2583         kmem_free(ra, sizeof (struct renamesnaparg));
2584         spa_close(spa, FTAG);
2585         return (err);
2586 }
2587
2588 static int
2589 dsl_valid_rename(const char *oldname, void *arg)
2590 {
2591         int delta = *(int *)arg;
2592
2593         if (strlen(oldname) + delta >= MAXNAMELEN)
2594                 return (ENAMETOOLONG);
2595
2596         return (0);
2597 }
2598
2599 #pragma weak dmu_objset_rename = dsl_dataset_rename
2600 int
2601 dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
2602 {
2603         dsl_dir_t *dd;
2604         dsl_dataset_t *ds;
2605         const char *tail;
2606         int err;
2607
2608         err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2609         if (err)
2610                 return (err);
2611
2612         if (tail == NULL) {
2613                 int delta = strlen(newname) - strlen(oldname);
2614
2615                 /* if we're growing, validate child name lengths */
2616                 if (delta > 0)
2617                         err = dmu_objset_find(oldname, dsl_valid_rename,
2618                             &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2619
2620                 if (err == 0)
2621                         err = dsl_dir_rename(dd, newname);
2622                 dsl_dir_close(dd, FTAG);
2623                 return (err);
2624         }
2625
2626         if (tail[0] != '@') {
2627                 /* the name ended in a nonexistent component */
2628                 dsl_dir_close(dd, FTAG);
2629                 return (ENOENT);
2630         }
2631
2632         dsl_dir_close(dd, FTAG);
2633
2634         /* new name must be snapshot in same filesystem */
2635         tail = strchr(newname, '@');
2636         if (tail == NULL)
2637                 return (EINVAL);
2638         tail++;
2639         if (strncmp(oldname, newname, tail - newname) != 0)
2640                 return (EXDEV);
2641
2642         if (recursive) {
2643                 err = dsl_recursive_rename(oldname, newname);
2644         } else {
2645                 err = dsl_dataset_hold(oldname, FTAG, &ds);
2646                 if (err)
2647                         return (err);
2648
2649                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2650                     dsl_dataset_snapshot_rename_check,
2651                     dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2652
2653                 dsl_dataset_rele(ds, FTAG);
2654         }
2655
2656         return (err);
2657 }
2658
2659 struct promotenode {
2660         list_node_t link;
2661         dsl_dataset_t *ds;
2662 };
2663
2664 struct promotearg {
2665         list_t shared_snaps, origin_snaps, clone_snaps;
2666         dsl_dataset_t *origin_origin;
2667         uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2668         char *err_ds;
2669 };
2670
2671 static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2672
2673 static int
2674 dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2675 {
2676         dsl_dataset_t *hds = arg1;
2677         struct promotearg *pa = arg2;
2678         struct promotenode *snap = list_head(&pa->shared_snaps);
2679         dsl_dataset_t *origin_ds = snap->ds;
2680         int err;
2681         uint64_t unused;
2682
2683         /* Check that it is a real clone */
2684         if (!dsl_dir_is_clone(hds->ds_dir))
2685                 return (EINVAL);
2686
2687         /* Since this is so expensive, don't do the preliminary check */
2688         if (!dmu_tx_is_syncing(tx))
2689                 return (0);
2690
2691         if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2692                 return (EXDEV);
2693
2694         /* compute origin's new unique space */
2695         snap = list_tail(&pa->clone_snaps);
2696         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2697         dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2698             origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2699             &pa->unique, &unused, &unused);
2700
2701         /*
2702          * Walk the snapshots that we are moving
2703          *
2704          * Compute space to transfer.  Consider the incremental changes
2705          * to used for each snapshot:
2706          * (my used) = (prev's used) + (blocks born) - (blocks killed)
2707          * So each snapshot gave birth to:
2708          * (blocks born) = (my used) - (prev's used) + (blocks killed)
2709          * So a sequence would look like:
2710          * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2711          * Which simplifies to:
2712          * uN + kN + kN-1 + ... + k1 + k0
2713          * Note however, if we stop before we reach the ORIGIN we get:
2714          * uN + kN + kN-1 + ... + kM - uM-1
2715          */
2716         pa->used = origin_ds->ds_phys->ds_referenced_bytes;
2717         pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2718         pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2719         for (snap = list_head(&pa->shared_snaps); snap;
2720             snap = list_next(&pa->shared_snaps, snap)) {
2721                 uint64_t val, dlused, dlcomp, dluncomp;
2722                 dsl_dataset_t *ds = snap->ds;
2723
2724                 /* Check that the snapshot name does not conflict */
2725                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2726                 err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2727                 if (err == 0) {
2728                         err = EEXIST;
2729                         goto out;
2730                 }
2731                 if (err != ENOENT)
2732                         goto out;
2733
2734                 /* The very first snapshot does not have a deadlist */
2735                 if (ds->ds_phys->ds_prev_snap_obj == 0)
2736                         continue;
2737
2738                 dsl_deadlist_space(&ds->ds_deadlist,
2739                     &dlused, &dlcomp, &dluncomp);
2740                 pa->used += dlused;
2741                 pa->comp += dlcomp;
2742                 pa->uncomp += dluncomp;
2743         }
2744
2745         /*
2746          * If we are a clone of a clone then we never reached ORIGIN,
2747          * so we need to subtract out the clone origin's used space.
2748          */
2749         if (pa->origin_origin) {
2750                 pa->used -= pa->origin_origin->ds_phys->ds_referenced_bytes;
2751                 pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2752                 pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2753         }
2754
2755         /* Check that there is enough space here */
2756         err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2757             pa->used);
2758         if (err)
2759                 return (err);
2760
2761         /*
2762          * Compute the amounts of space that will be used by snapshots
2763          * after the promotion (for both origin and clone).  For each,
2764          * it is the amount of space that will be on all of their
2765          * deadlists (that was not born before their new origin).
2766          */
2767         if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2768                 uint64_t space;
2769
2770                 /*
2771                  * Note, typically this will not be a clone of a clone,
2772                  * so dd_origin_txg will be < TXG_INITIAL, so
2773                  * these snaplist_space() -> dsl_deadlist_space_range()
2774                  * calls will be fast because they do not have to
2775                  * iterate over all bps.
2776                  */
2777                 snap = list_head(&pa->origin_snaps);
2778                 err = snaplist_space(&pa->shared_snaps,
2779                     snap->ds->ds_dir->dd_origin_txg, &pa->cloneusedsnap);
2780                 if (err)
2781                         return (err);
2782
2783                 err = snaplist_space(&pa->clone_snaps,
2784                     snap->ds->ds_dir->dd_origin_txg, &space);
2785                 if (err)
2786                         return (err);
2787                 pa->cloneusedsnap += space;
2788         }
2789         if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2790                 err = snaplist_space(&pa->origin_snaps,
2791                     origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2792                 if (err)
2793                         return (err);
2794         }
2795
2796         return (0);
2797 out:
2798         pa->err_ds =  snap->ds->ds_snapname;
2799         return (err);
2800 }
2801
2802 static void
2803 dsl_dataset_promote_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2804 {
2805         dsl_dataset_t *hds = arg1;
2806         struct promotearg *pa = arg2;
2807         struct promotenode *snap = list_head(&pa->shared_snaps);
2808         dsl_dataset_t *origin_ds = snap->ds;
2809         dsl_dataset_t *origin_head;
2810         dsl_dir_t *dd = hds->ds_dir;
2811         dsl_pool_t *dp = hds->ds_dir->dd_pool;
2812         dsl_dir_t *odd = NULL;
2813         uint64_t oldnext_obj;
2814         int64_t delta;
2815
2816         ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2817
2818         snap = list_head(&pa->origin_snaps);
2819         origin_head = snap->ds;
2820
2821         /*
2822          * We need to explicitly open odd, since origin_ds's dd will be
2823          * changing.
2824          */
2825         VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2826             NULL, FTAG, &odd));
2827
2828         /* change origin's next snap */
2829         dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2830         oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2831         snap = list_tail(&pa->clone_snaps);
2832         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2833         origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2834
2835         /* change the origin's next clone */
2836         if (origin_ds->ds_phys->ds_next_clones_obj) {
2837                 remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2838                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2839                     origin_ds->ds_phys->ds_next_clones_obj,
2840                     oldnext_obj, tx));
2841         }
2842
2843         /* change origin */
2844         dmu_buf_will_dirty(dd->dd_dbuf, tx);
2845         ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2846         dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2847         dd->dd_origin_txg = origin_head->ds_dir->dd_origin_txg;
2848         dmu_buf_will_dirty(odd->dd_dbuf, tx);
2849         odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2850         origin_head->ds_dir->dd_origin_txg =
2851             origin_ds->ds_phys->ds_creation_txg;
2852
2853         /* change dd_clone entries */
2854         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2855                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2856                     odd->dd_phys->dd_clones, hds->ds_object, tx));
2857                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2858                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2859                     hds->ds_object, tx));
2860
2861                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2862                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2863                     origin_head->ds_object, tx));
2864                 if (dd->dd_phys->dd_clones == 0) {
2865                         dd->dd_phys->dd_clones = zap_create(dp->dp_meta_objset,
2866                             DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
2867                 }
2868                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2869                     dd->dd_phys->dd_clones, origin_head->ds_object, tx));
2870
2871         }
2872
2873         /* move snapshots to this dir */
2874         for (snap = list_head(&pa->shared_snaps); snap;
2875             snap = list_next(&pa->shared_snaps, snap)) {
2876                 dsl_dataset_t *ds = snap->ds;
2877
2878                 /* unregister props as dsl_dir is changing */
2879                 if (ds->ds_objset) {
2880                         dmu_objset_evict(ds->ds_objset);
2881                         ds->ds_objset = NULL;
2882                 }
2883                 /* move snap name entry */
2884                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2885                 VERIFY(0 == dsl_dataset_snap_remove(origin_head,
2886                     ds->ds_snapname, tx));
2887                 VERIFY(0 == zap_add(dp->dp_meta_objset,
2888                     hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2889                     8, 1, &ds->ds_object, tx));
2890
2891                 /* change containing dsl_dir */
2892                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
2893                 ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
2894                 ds->ds_phys->ds_dir_obj = dd->dd_object;
2895                 ASSERT3P(ds->ds_dir, ==, odd);
2896                 dsl_dir_close(ds->ds_dir, ds);
2897                 VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
2898                     NULL, ds, &ds->ds_dir));
2899
2900                 /* move any clone references */
2901                 if (ds->ds_phys->ds_next_clones_obj &&
2902                     spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2903                         zap_cursor_t zc;
2904                         zap_attribute_t za;
2905
2906                         for (zap_cursor_init(&zc, dp->dp_meta_objset,
2907                             ds->ds_phys->ds_next_clones_obj);
2908                             zap_cursor_retrieve(&zc, &za) == 0;
2909                             zap_cursor_advance(&zc)) {
2910                                 dsl_dataset_t *cnds;
2911                                 uint64_t o;
2912
2913                                 if (za.za_first_integer == oldnext_obj) {
2914                                         /*
2915                                          * We've already moved the
2916                                          * origin's reference.
2917                                          */
2918                                         continue;
2919                                 }
2920
2921                                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
2922                                     za.za_first_integer, FTAG, &cnds));
2923                                 o = cnds->ds_dir->dd_phys->dd_head_dataset_obj;
2924
2925                                 VERIFY3U(zap_remove_int(dp->dp_meta_objset,
2926                                     odd->dd_phys->dd_clones, o, tx), ==, 0);
2927                                 VERIFY3U(zap_add_int(dp->dp_meta_objset,
2928                                     dd->dd_phys->dd_clones, o, tx), ==, 0);
2929                                 dsl_dataset_rele(cnds, FTAG);
2930                         }
2931                         zap_cursor_fini(&zc);
2932                 }
2933
2934                 ASSERT3U(dsl_prop_numcb(ds), ==, 0);
2935         }
2936
2937         /*
2938          * Change space accounting.
2939          * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
2940          * both be valid, or both be 0 (resulting in delta == 0).  This
2941          * is true for each of {clone,origin} independently.
2942          */
2943
2944         delta = pa->cloneusedsnap -
2945             dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2946         ASSERT3S(delta, >=, 0);
2947         ASSERT3U(pa->used, >=, delta);
2948         dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
2949         dsl_dir_diduse_space(dd, DD_USED_HEAD,
2950             pa->used - delta, pa->comp, pa->uncomp, tx);
2951
2952         delta = pa->originusedsnap -
2953             odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2954         ASSERT3S(delta, <=, 0);
2955         ASSERT3U(pa->used, >=, -delta);
2956         dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
2957         dsl_dir_diduse_space(odd, DD_USED_HEAD,
2958             -pa->used - delta, -pa->comp, -pa->uncomp, tx);
2959
2960         origin_ds->ds_phys->ds_unique_bytes = pa->unique;
2961
2962         /* log history record */
2963         spa_history_log_internal(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
2964             "dataset = %llu", hds->ds_object);
2965
2966         dsl_dir_close(odd, FTAG);
2967 }
2968
2969 static char *snaplist_tag = "snaplist";
2970 /*
2971  * Make a list of dsl_dataset_t's for the snapshots between first_obj
2972  * (exclusive) and last_obj (inclusive).  The list will be in reverse
2973  * order (last_obj will be the list_head()).  If first_obj == 0, do all
2974  * snapshots back to this dataset's origin.
2975  */
2976 static int
2977 snaplist_make(dsl_pool_t *dp, boolean_t own,
2978     uint64_t first_obj, uint64_t last_obj, list_t *l)
2979 {
2980         uint64_t obj = last_obj;
2981
2982         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
2983
2984         list_create(l, sizeof (struct promotenode),
2985             offsetof(struct promotenode, link));
2986
2987         while (obj != first_obj) {
2988                 dsl_dataset_t *ds;
2989                 struct promotenode *snap;
2990                 int err;
2991
2992                 if (own) {
2993                         err = dsl_dataset_own_obj(dp, obj,
2994                             0, snaplist_tag, &ds);
2995                         if (err == 0)
2996                                 dsl_dataset_make_exclusive(ds, snaplist_tag);
2997                 } else {
2998                         err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
2999                 }
3000                 if (err == ENOENT) {
3001                         /* lost race with snapshot destroy */
3002                         struct promotenode *last = list_tail(l);
3003                         ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
3004                         obj = last->ds->ds_phys->ds_prev_snap_obj;
3005                         continue;
3006                 } else if (err) {
3007                         return (err);
3008                 }
3009
3010                 if (first_obj == 0)
3011                         first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
3012
3013                 snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
3014                 snap->ds = ds;
3015                 list_insert_tail(l, snap);
3016                 obj = ds->ds_phys->ds_prev_snap_obj;
3017         }
3018
3019         return (0);
3020 }
3021
3022 static int
3023 snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
3024 {
3025         struct promotenode *snap;
3026
3027         *spacep = 0;
3028         for (snap = list_head(l); snap; snap = list_next(l, snap)) {
3029                 uint64_t used, comp, uncomp;
3030                 dsl_deadlist_space_range(&snap->ds->ds_deadlist,
3031                     mintxg, UINT64_MAX, &used, &comp, &uncomp);
3032                 *spacep += used;
3033         }
3034         return (0);
3035 }
3036
3037 static void
3038 snaplist_destroy(list_t *l, boolean_t own)
3039 {
3040         struct promotenode *snap;
3041
3042         if (!l || !list_link_active(&l->list_head))
3043                 return;
3044
3045         while ((snap = list_tail(l)) != NULL) {
3046                 list_remove(l, snap);
3047                 if (own)
3048                         dsl_dataset_disown(snap->ds, snaplist_tag);
3049                 else
3050                         dsl_dataset_rele(snap->ds, snaplist_tag);
3051                 kmem_free(snap, sizeof (struct promotenode));
3052         }
3053         list_destroy(l);
3054 }
3055
3056 /*
3057  * Promote a clone.  Nomenclature note:
3058  * "clone" or "cds": the original clone which is being promoted
3059  * "origin" or "ods": the snapshot which is originally clone's origin
3060  * "origin head" or "ohds": the dataset which is the head
3061  * (filesystem/volume) for the origin
3062  * "origin origin": the origin of the origin's filesystem (typically
3063  * NULL, indicating that the clone is not a clone of a clone).
3064  */
3065 int
3066 dsl_dataset_promote(const char *name, char *conflsnap)
3067 {
3068         dsl_dataset_t *ds;
3069         dsl_dir_t *dd;
3070         dsl_pool_t *dp;
3071         dmu_object_info_t doi;
3072         struct promotearg pa;
3073         struct promotenode *snap;
3074         int err;
3075
3076         bzero(&pa, sizeof(struct promotearg));
3077         err = dsl_dataset_hold(name, FTAG, &ds);
3078         if (err)
3079                 return (err);
3080         dd = ds->ds_dir;
3081         dp = dd->dd_pool;
3082
3083         err = dmu_object_info(dp->dp_meta_objset,
3084             ds->ds_phys->ds_snapnames_zapobj, &doi);
3085         if (err) {
3086                 dsl_dataset_rele(ds, FTAG);
3087                 return (err);
3088         }
3089
3090         if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
3091                 dsl_dataset_rele(ds, FTAG);
3092                 return (EINVAL);
3093         }
3094
3095         /*
3096          * We are going to inherit all the snapshots taken before our
3097          * origin (i.e., our new origin will be our parent's origin).
3098          * Take ownership of them so that we can rename them into our
3099          * namespace.
3100          */
3101         rw_enter(&dp->dp_config_rwlock, RW_READER);
3102
3103         err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
3104             &pa.shared_snaps);
3105         if (err != 0)
3106                 goto out;
3107
3108         err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
3109         if (err != 0)
3110                 goto out;
3111
3112         snap = list_head(&pa.shared_snaps);
3113         ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
3114         err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
3115             snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
3116         if (err != 0)
3117                 goto out;
3118
3119         if (snap->ds->ds_dir->dd_phys->dd_origin_obj != 0) {
3120                 err = dsl_dataset_hold_obj(dp,
3121                     snap->ds->ds_dir->dd_phys->dd_origin_obj,
3122                     FTAG, &pa.origin_origin);
3123                 if (err != 0)
3124                         goto out;
3125         }
3126
3127 out:
3128         rw_exit(&dp->dp_config_rwlock);
3129
3130         /*
3131          * Add in 128x the snapnames zapobj size, since we will be moving
3132          * a bunch of snapnames to the promoted ds, and dirtying their
3133          * bonus buffers.
3134          */
3135         if (err == 0) {
3136                 err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
3137                     dsl_dataset_promote_sync, ds, &pa,
3138                     2 + 2 * doi.doi_physical_blocks_512);
3139                 if (err && pa.err_ds && conflsnap)
3140                         (void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
3141         }
3142
3143         snaplist_destroy(&pa.shared_snaps, B_TRUE);
3144         snaplist_destroy(&pa.clone_snaps, B_FALSE);
3145         snaplist_destroy(&pa.origin_snaps, B_FALSE);
3146         if (pa.origin_origin)
3147                 dsl_dataset_rele(pa.origin_origin, FTAG);
3148         dsl_dataset_rele(ds, FTAG);
3149         return (err);
3150 }
3151
3152 struct cloneswaparg {
3153         dsl_dataset_t *cds; /* clone dataset */
3154         dsl_dataset_t *ohds; /* origin's head dataset */
3155         boolean_t force;
3156         int64_t unused_refres_delta; /* change in unconsumed refreservation */
3157 };
3158
3159 /* ARGSUSED */
3160 static int
3161 dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
3162 {
3163         struct cloneswaparg *csa = arg1;
3164
3165         /* they should both be heads */
3166         if (dsl_dataset_is_snapshot(csa->cds) ||
3167             dsl_dataset_is_snapshot(csa->ohds))
3168                 return (EINVAL);
3169
3170         /* the branch point should be just before them */
3171         if (csa->cds->ds_prev != csa->ohds->ds_prev)
3172                 return (EINVAL);
3173
3174         /* cds should be the clone (unless they are unrelated) */
3175         if (csa->cds->ds_prev != NULL &&
3176             csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
3177             csa->ohds->ds_object !=
3178             csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
3179                 return (EINVAL);
3180
3181         /* the clone should be a child of the origin */
3182         if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
3183                 return (EINVAL);
3184
3185         /* ohds shouldn't be modified unless 'force' */
3186         if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
3187                 return (ETXTBSY);
3188
3189         /* adjust amount of any unconsumed refreservation */
3190         csa->unused_refres_delta =
3191             (int64_t)MIN(csa->ohds->ds_reserved,
3192             csa->ohds->ds_phys->ds_unique_bytes) -
3193             (int64_t)MIN(csa->ohds->ds_reserved,
3194             csa->cds->ds_phys->ds_unique_bytes);
3195
3196         if (csa->unused_refres_delta > 0 &&
3197             csa->unused_refres_delta >
3198             dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
3199                 return (ENOSPC);
3200
3201         if (csa->ohds->ds_quota != 0 &&
3202             csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
3203                 return (EDQUOT);
3204
3205         return (0);
3206 }
3207
3208 /* ARGSUSED */
3209 static void
3210 dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3211 {
3212         struct cloneswaparg *csa = arg1;
3213         dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
3214
3215         ASSERT(csa->cds->ds_reserved == 0);
3216         ASSERT(csa->ohds->ds_quota == 0 ||
3217             csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
3218
3219         dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
3220         dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
3221
3222         if (csa->cds->ds_objset != NULL) {
3223                 dmu_objset_evict(csa->cds->ds_objset);
3224                 csa->cds->ds_objset = NULL;
3225         }
3226
3227         if (csa->ohds->ds_objset != NULL) {
3228                 dmu_objset_evict(csa->ohds->ds_objset);
3229                 csa->ohds->ds_objset = NULL;
3230         }
3231
3232         /*
3233          * Reset origin's unique bytes, if it exists.
3234          */
3235         if (csa->cds->ds_prev) {
3236                 dsl_dataset_t *origin = csa->cds->ds_prev;
3237                 uint64_t comp, uncomp;
3238
3239                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
3240                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3241                     origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
3242                     &origin->ds_phys->ds_unique_bytes, &comp, &uncomp);
3243         }
3244
3245         /* swap blkptrs */
3246         {
3247                 blkptr_t tmp;
3248                 tmp = csa->ohds->ds_phys->ds_bp;
3249                 csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
3250                 csa->cds->ds_phys->ds_bp = tmp;
3251         }
3252
3253         /* set dd_*_bytes */
3254         {
3255                 int64_t dused, dcomp, duncomp;
3256                 uint64_t cdl_used, cdl_comp, cdl_uncomp;
3257                 uint64_t odl_used, odl_comp, odl_uncomp;
3258
3259                 ASSERT3U(csa->cds->ds_dir->dd_phys->
3260                     dd_used_breakdown[DD_USED_SNAP], ==, 0);
3261
3262                 dsl_deadlist_space(&csa->cds->ds_deadlist,
3263                     &cdl_used, &cdl_comp, &cdl_uncomp);
3264                 dsl_deadlist_space(&csa->ohds->ds_deadlist,
3265                     &odl_used, &odl_comp, &odl_uncomp);
3266
3267                 dused = csa->cds->ds_phys->ds_referenced_bytes + cdl_used -
3268                     (csa->ohds->ds_phys->ds_referenced_bytes + odl_used);
3269                 dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
3270                     (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
3271                 duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
3272                     cdl_uncomp -
3273                     (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
3274
3275                 dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
3276                     dused, dcomp, duncomp, tx);
3277                 dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
3278                     -dused, -dcomp, -duncomp, tx);
3279
3280                 /*
3281                  * The difference in the space used by snapshots is the
3282                  * difference in snapshot space due to the head's
3283                  * deadlist (since that's the only thing that's
3284                  * changing that affects the snapused).
3285                  */
3286                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3287                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3288                     &cdl_used, &cdl_comp, &cdl_uncomp);
3289                 dsl_deadlist_space_range(&csa->ohds->ds_deadlist,
3290                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3291                     &odl_used, &odl_comp, &odl_uncomp);
3292                 dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
3293                     DD_USED_HEAD, DD_USED_SNAP, tx);
3294         }
3295
3296         /* swap ds_*_bytes */
3297         SWITCH64(csa->ohds->ds_phys->ds_referenced_bytes,
3298             csa->cds->ds_phys->ds_referenced_bytes);
3299         SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
3300             csa->cds->ds_phys->ds_compressed_bytes);
3301         SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
3302             csa->cds->ds_phys->ds_uncompressed_bytes);
3303         SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
3304             csa->cds->ds_phys->ds_unique_bytes);
3305
3306         /* apply any parent delta for change in unconsumed refreservation */
3307         dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
3308             csa->unused_refres_delta, 0, 0, tx);
3309
3310         /*
3311          * Swap deadlists.
3312          */
3313         dsl_deadlist_close(&csa->cds->ds_deadlist);
3314         dsl_deadlist_close(&csa->ohds->ds_deadlist);
3315         SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
3316             csa->cds->ds_phys->ds_deadlist_obj);
3317         dsl_deadlist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
3318             csa->cds->ds_phys->ds_deadlist_obj);
3319         dsl_deadlist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
3320             csa->ohds->ds_phys->ds_deadlist_obj);
3321
3322         dsl_scan_ds_clone_swapped(csa->ohds, csa->cds, tx);
3323 }
3324
3325 /*
3326  * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
3327  * recv" into an existing fs to swizzle the file system to the new
3328  * version, and by "zfs rollback".  Can also be used to swap two
3329  * independent head datasets if neither has any snapshots.
3330  */
3331 int
3332 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
3333     boolean_t force)
3334 {
3335         struct cloneswaparg csa;
3336         int error;
3337
3338         ASSERT(clone->ds_owner);
3339         ASSERT(origin_head->ds_owner);
3340 retry:
3341         /*
3342          * Need exclusive access for the swap. If we're swapping these
3343          * datasets back after an error, we already hold the locks.
3344          */
3345         if (!RW_WRITE_HELD(&clone->ds_rwlock))
3346                 rw_enter(&clone->ds_rwlock, RW_WRITER);
3347         if (!RW_WRITE_HELD(&origin_head->ds_rwlock) &&
3348             !rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
3349                 rw_exit(&clone->ds_rwlock);
3350                 rw_enter(&origin_head->ds_rwlock, RW_WRITER);
3351                 if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
3352                         rw_exit(&origin_head->ds_rwlock);
3353                         goto retry;
3354                 }
3355         }
3356         csa.cds = clone;
3357         csa.ohds = origin_head;
3358         csa.force = force;
3359         error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3360             dsl_dataset_clone_swap_check,
3361             dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3362         return (error);
3363 }
3364
3365 /*
3366  * Given a pool name and a dataset object number in that pool,
3367  * return the name of that dataset.
3368  */
3369 int
3370 dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3371 {
3372         spa_t *spa;
3373         dsl_pool_t *dp;
3374         dsl_dataset_t *ds;
3375         int error;
3376
3377         if ((error = spa_open(pname, &spa, FTAG)) != 0)
3378                 return (error);
3379         dp = spa_get_dsl(spa);
3380         rw_enter(&dp->dp_config_rwlock, RW_READER);
3381         if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3382                 dsl_dataset_name(ds, buf);
3383                 dsl_dataset_rele(ds, FTAG);
3384         }
3385         rw_exit(&dp->dp_config_rwlock);
3386         spa_close(spa, FTAG);
3387
3388         return (error);
3389 }
3390
3391 int
3392 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3393     uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3394 {
3395         int error = 0;
3396
3397         ASSERT3S(asize, >, 0);
3398
3399         /*
3400          * *ref_rsrv is the portion of asize that will come from any
3401          * unconsumed refreservation space.
3402          */
3403         *ref_rsrv = 0;
3404
3405         mutex_enter(&ds->ds_lock);
3406         /*
3407          * Make a space adjustment for reserved bytes.
3408          */
3409         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3410                 ASSERT3U(*used, >=,
3411                     ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3412                 *used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3413                 *ref_rsrv =
3414                     asize - MIN(asize, parent_delta(ds, asize + inflight));
3415         }
3416
3417         if (!check_quota || ds->ds_quota == 0) {
3418                 mutex_exit(&ds->ds_lock);
3419                 return (0);
3420         }
3421         /*
3422          * If they are requesting more space, and our current estimate
3423          * is over quota, they get to try again unless the actual
3424          * on-disk is over quota and there are no pending changes (which
3425          * may free up space for us).
3426          */
3427         if (ds->ds_phys->ds_referenced_bytes + inflight >= ds->ds_quota) {
3428                 if (inflight > 0 ||
3429                     ds->ds_phys->ds_referenced_bytes < ds->ds_quota)
3430                         error = ERESTART;
3431                 else
3432                         error = EDQUOT;
3433
3434                 DMU_TX_STAT_BUMP(dmu_tx_quota);
3435         }
3436         mutex_exit(&ds->ds_lock);
3437
3438         return (error);
3439 }
3440
3441 /* ARGSUSED */
3442 static int
3443 dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3444 {
3445         dsl_dataset_t *ds = arg1;
3446         dsl_prop_setarg_t *psa = arg2;
3447         int err;
3448
3449         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3450                 return (ENOTSUP);
3451
3452         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3453                 return (err);
3454
3455         if (psa->psa_effective_value == 0)
3456                 return (0);
3457
3458         if (psa->psa_effective_value < ds->ds_phys->ds_referenced_bytes ||
3459             psa->psa_effective_value < ds->ds_reserved)
3460                 return (ENOSPC);
3461
3462         return (0);
3463 }
3464
3465 extern void dsl_prop_set_sync(void *, void *, dmu_tx_t *);
3466
3467 void
3468 dsl_dataset_set_quota_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3469 {
3470         dsl_dataset_t *ds = arg1;
3471         dsl_prop_setarg_t *psa = arg2;
3472         uint64_t effective_value = psa->psa_effective_value;
3473
3474         dsl_prop_set_sync(ds, psa, tx);
3475         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3476
3477         if (ds->ds_quota != effective_value) {
3478                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3479                 ds->ds_quota = effective_value;
3480         }
3481 }
3482
3483 int
3484 dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
3485 {
3486         dsl_dataset_t *ds;
3487         dsl_prop_setarg_t psa;
3488         int err;
3489
3490         dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
3491
3492         err = dsl_dataset_hold(dsname, FTAG, &ds);
3493         if (err)
3494                 return (err);
3495
3496         /*
3497          * If someone removes a file, then tries to set the quota, we
3498          * want to make sure the file freeing takes effect.
3499          */
3500         txg_wait_open(ds->ds_dir->dd_pool, 0);
3501
3502         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3503             dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3504             ds, &psa, 0);
3505
3506         dsl_dataset_rele(ds, FTAG);
3507         return (err);
3508 }
3509
3510 static int
3511 dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3512 {
3513         dsl_dataset_t *ds = arg1;
3514         dsl_prop_setarg_t *psa = arg2;
3515         uint64_t effective_value;
3516         uint64_t unique;
3517         int err;
3518
3519         if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3520             SPA_VERSION_REFRESERVATION)
3521                 return (ENOTSUP);
3522
3523         if (dsl_dataset_is_snapshot(ds))
3524                 return (EINVAL);
3525
3526         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3527                 return (err);
3528
3529         effective_value = psa->psa_effective_value;
3530
3531         /*
3532          * If we are doing the preliminary check in open context, the
3533          * space estimates may be inaccurate.
3534          */
3535         if (!dmu_tx_is_syncing(tx))
3536                 return (0);
3537
3538         mutex_enter(&ds->ds_lock);
3539         if (!DS_UNIQUE_IS_ACCURATE(ds))
3540                 dsl_dataset_recalc_head_uniq(ds);
3541         unique = ds->ds_phys->ds_unique_bytes;
3542         mutex_exit(&ds->ds_lock);
3543
3544         if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
3545                 uint64_t delta = MAX(unique, effective_value) -
3546                     MAX(unique, ds->ds_reserved);
3547
3548                 if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3549                         return (ENOSPC);
3550                 if (ds->ds_quota > 0 &&
3551                     effective_value > ds->ds_quota)
3552                         return (ENOSPC);
3553         }
3554
3555         return (0);
3556 }
3557
3558 static void
3559 dsl_dataset_set_reservation_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3560 {
3561         dsl_dataset_t *ds = arg1;
3562         dsl_prop_setarg_t *psa = arg2;
3563         uint64_t effective_value = psa->psa_effective_value;
3564         uint64_t unique;
3565         int64_t delta;
3566
3567         dsl_prop_set_sync(ds, psa, tx);
3568         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3569
3570         dmu_buf_will_dirty(ds->ds_dbuf, tx);
3571
3572         mutex_enter(&ds->ds_dir->dd_lock);
3573         mutex_enter(&ds->ds_lock);
3574         ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
3575         unique = ds->ds_phys->ds_unique_bytes;
3576         delta = MAX(0, (int64_t)(effective_value - unique)) -
3577             MAX(0, (int64_t)(ds->ds_reserved - unique));
3578         ds->ds_reserved = effective_value;
3579         mutex_exit(&ds->ds_lock);
3580
3581         dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3582         mutex_exit(&ds->ds_dir->dd_lock);
3583 }
3584
3585 int
3586 dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
3587     uint64_t reservation)
3588 {
3589         dsl_dataset_t *ds;
3590         dsl_prop_setarg_t psa;
3591         int err;
3592
3593         dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
3594             &reservation);
3595
3596         err = dsl_dataset_hold(dsname, FTAG, &ds);
3597         if (err)
3598                 return (err);
3599
3600         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3601             dsl_dataset_set_reservation_check,
3602             dsl_dataset_set_reservation_sync, ds, &psa, 0);
3603
3604         dsl_dataset_rele(ds, FTAG);
3605         return (err);
3606 }
3607
3608 typedef struct zfs_hold_cleanup_arg {
3609         dsl_pool_t *dp;
3610         uint64_t dsobj;
3611         char htag[MAXNAMELEN];
3612 } zfs_hold_cleanup_arg_t;
3613
3614 static void
3615 dsl_dataset_user_release_onexit(void *arg)
3616 {
3617         zfs_hold_cleanup_arg_t *ca = arg;
3618
3619         (void) dsl_dataset_user_release_tmp(ca->dp, ca->dsobj, ca->htag,
3620             B_TRUE);
3621         kmem_free(ca, sizeof (zfs_hold_cleanup_arg_t));
3622 }
3623
3624 void
3625 dsl_register_onexit_hold_cleanup(dsl_dataset_t *ds, const char *htag,
3626     minor_t minor)
3627 {
3628         zfs_hold_cleanup_arg_t *ca;
3629
3630         ca = kmem_alloc(sizeof (zfs_hold_cleanup_arg_t), KM_SLEEP);
3631         ca->dp = ds->ds_dir->dd_pool;
3632         ca->dsobj = ds->ds_object;
3633         (void) strlcpy(ca->htag, htag, sizeof (ca->htag));
3634         VERIFY3U(0, ==, zfs_onexit_add_cb(minor,
3635             dsl_dataset_user_release_onexit, ca, NULL));
3636 }
3637
3638 /*
3639  * If you add new checks here, you may need to add
3640  * additional checks to the "temporary" case in
3641  * snapshot_check() in dmu_objset.c.
3642  */
3643 static int
3644 dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3645 {
3646         dsl_dataset_t *ds = arg1;
3647         struct dsl_ds_holdarg *ha = arg2;
3648         char *htag = ha->htag;
3649         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3650         int error = 0;
3651
3652         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3653                 return (ENOTSUP);
3654
3655         if (!dsl_dataset_is_snapshot(ds))
3656                 return (EINVAL);
3657
3658         /* tags must be unique */
3659         mutex_enter(&ds->ds_lock);
3660         if (ds->ds_phys->ds_userrefs_obj) {
3661                 error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3662                     8, 1, tx);
3663                 if (error == 0)
3664                         error = EEXIST;
3665                 else if (error == ENOENT)
3666                         error = 0;
3667         }
3668         mutex_exit(&ds->ds_lock);
3669
3670         if (error == 0 && ha->temphold &&
3671             strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
3672                 error = E2BIG;
3673
3674         return (error);
3675 }
3676
3677 void
3678 dsl_dataset_user_hold_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3679 {
3680         dsl_dataset_t *ds = arg1;
3681         struct dsl_ds_holdarg *ha = arg2;
3682         char *htag = ha->htag;
3683         dsl_pool_t *dp = ds->ds_dir->dd_pool;
3684         objset_t *mos = dp->dp_meta_objset;
3685         uint64_t now = gethrestime_sec();
3686         uint64_t zapobj;
3687
3688         mutex_enter(&ds->ds_lock);
3689         if (ds->ds_phys->ds_userrefs_obj == 0) {
3690                 /*
3691                  * This is the first user hold for this dataset.  Create
3692                  * the userrefs zap object.
3693                  */
3694                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3695                 zapobj = ds->ds_phys->ds_userrefs_obj =
3696                     zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3697         } else {
3698                 zapobj = ds->ds_phys->ds_userrefs_obj;
3699         }
3700         ds->ds_userrefs++;
3701         mutex_exit(&ds->ds_lock);
3702
3703         VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3704
3705         if (ha->temphold) {
3706                 VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3707                     htag, &now, tx));
3708         }
3709
3710         spa_history_log_internal(LOG_DS_USER_HOLD,
3711             dp->dp_spa, tx, "<%s> temp = %d dataset = %llu", htag,
3712             (int)ha->temphold, ds->ds_object);
3713 }
3714
3715 static int
3716 dsl_dataset_user_hold_one(const char *dsname, void *arg)
3717 {
3718         struct dsl_ds_holdarg *ha = arg;
3719         dsl_dataset_t *ds;
3720         int error;
3721         char *name;
3722
3723         /* alloc a buffer to hold dsname@snapname plus terminating NULL */
3724         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3725         error = dsl_dataset_hold(name, ha->dstg, &ds);
3726         strfree(name);
3727         if (error == 0) {
3728                 ha->gotone = B_TRUE;
3729                 dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3730                     dsl_dataset_user_hold_sync, ds, ha, 0);
3731         } else if (error == ENOENT && ha->recursive) {
3732                 error = 0;
3733         } else {
3734                 (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3735         }
3736         return (error);
3737 }
3738
3739 int
3740 dsl_dataset_user_hold_for_send(dsl_dataset_t *ds, char *htag,
3741     boolean_t temphold)
3742 {
3743         struct dsl_ds_holdarg *ha;
3744         int error;
3745
3746         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3747         ha->htag = htag;
3748         ha->temphold = temphold;
3749         error = dsl_sync_task_do(ds->ds_dir->dd_pool,
3750             dsl_dataset_user_hold_check, dsl_dataset_user_hold_sync,
3751             ds, ha, 0);
3752         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3753
3754         return (error);
3755 }
3756
3757 int
3758 dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3759     boolean_t recursive, boolean_t temphold, int cleanup_fd)
3760 {
3761         struct dsl_ds_holdarg *ha;
3762         dsl_sync_task_t *dst;
3763         spa_t *spa;
3764         int error;
3765         minor_t minor = 0;
3766
3767         if (cleanup_fd != -1) {
3768                 /* Currently we only support cleanup-on-exit of tempholds. */
3769                 if (!temphold)
3770                         return (EINVAL);
3771                 error = zfs_onexit_fd_hold(cleanup_fd, &minor);
3772                 if (error)
3773                         return (error);
3774         }
3775
3776         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3777
3778         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3779
3780         error = spa_open(dsname, &spa, FTAG);
3781         if (error) {
3782                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3783                 if (cleanup_fd != -1)
3784                         zfs_onexit_fd_rele(cleanup_fd);
3785                 return (error);
3786         }
3787
3788         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3789         ha->htag = htag;
3790         ha->snapname = snapname;
3791         ha->recursive = recursive;
3792         ha->temphold = temphold;
3793
3794         if (recursive) {
3795                 error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3796                     ha, DS_FIND_CHILDREN);
3797         } else {
3798                 error = dsl_dataset_user_hold_one(dsname, ha);
3799         }
3800         if (error == 0)
3801                 error = dsl_sync_task_group_wait(ha->dstg);
3802
3803         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3804             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3805                 dsl_dataset_t *ds = dst->dst_arg1;
3806
3807                 if (dst->dst_err) {
3808                         dsl_dataset_name(ds, ha->failed);
3809                         *strchr(ha->failed, '@') = '\0';
3810                 } else if (error == 0 && minor != 0 && temphold) {
3811                         /*
3812                          * If this hold is to be released upon process exit,
3813                          * register that action now.
3814                          */
3815                         dsl_register_onexit_hold_cleanup(ds, htag, minor);
3816                 }
3817                 dsl_dataset_rele(ds, ha->dstg);
3818         }
3819
3820         if (error == 0 && recursive && !ha->gotone)
3821                 error = ENOENT;
3822
3823         if (error)
3824                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3825
3826         dsl_sync_task_group_destroy(ha->dstg);
3827
3828         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3829         spa_close(spa, FTAG);
3830         if (cleanup_fd != -1)
3831                 zfs_onexit_fd_rele(cleanup_fd);
3832         return (error);
3833 }
3834
3835 struct dsl_ds_releasearg {
3836         dsl_dataset_t *ds;
3837         const char *htag;
3838         boolean_t own;          /* do we own or just hold ds? */
3839 };
3840
3841 static int
3842 dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3843     boolean_t *might_destroy)
3844 {
3845         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3846         uint64_t zapobj;
3847         uint64_t tmp;
3848         int error;
3849
3850         *might_destroy = B_FALSE;
3851
3852         mutex_enter(&ds->ds_lock);
3853         zapobj = ds->ds_phys->ds_userrefs_obj;
3854         if (zapobj == 0) {
3855                 /* The tag can't possibly exist */
3856                 mutex_exit(&ds->ds_lock);
3857                 return (ESRCH);
3858         }
3859
3860         /* Make sure the tag exists */
3861         error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3862         if (error) {
3863                 mutex_exit(&ds->ds_lock);
3864                 if (error == ENOENT)
3865                         error = ESRCH;
3866                 return (error);
3867         }
3868
3869         if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3870             DS_IS_DEFER_DESTROY(ds))
3871                 *might_destroy = B_TRUE;
3872
3873         mutex_exit(&ds->ds_lock);
3874         return (0);
3875 }
3876
3877 static int
3878 dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
3879 {
3880         struct dsl_ds_releasearg *ra = arg1;
3881         dsl_dataset_t *ds = ra->ds;
3882         boolean_t might_destroy;
3883         int error;
3884
3885         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3886                 return (ENOTSUP);
3887
3888         error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
3889         if (error)
3890                 return (error);
3891
3892         if (might_destroy) {
3893                 struct dsl_ds_destroyarg dsda = {0};
3894
3895                 if (dmu_tx_is_syncing(tx)) {
3896                         /*
3897                          * If we're not prepared to remove the snapshot,
3898                          * we can't allow the release to happen right now.
3899                          */
3900                         if (!ra->own)
3901                                 return (EBUSY);
3902                 }
3903                 dsda.ds = ds;
3904                 dsda.releasing = B_TRUE;
3905                 return (dsl_dataset_destroy_check(&dsda, tag, tx));
3906         }
3907
3908         return (0);
3909 }
3910
3911 static void
3912 dsl_dataset_user_release_sync(void *arg1, void *tag, dmu_tx_t *tx)
3913 {
3914         struct dsl_ds_releasearg *ra = arg1;
3915         dsl_dataset_t *ds = ra->ds;
3916         dsl_pool_t *dp = ds->ds_dir->dd_pool;
3917         objset_t *mos = dp->dp_meta_objset;
3918         uint64_t zapobj;
3919         uint64_t dsobj = ds->ds_object;
3920         uint64_t refs;
3921         int error;
3922
3923         mutex_enter(&ds->ds_lock);
3924         ds->ds_userrefs--;
3925         refs = ds->ds_userrefs;
3926         mutex_exit(&ds->ds_lock);
3927         error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
3928         VERIFY(error == 0 || error == ENOENT);
3929         zapobj = ds->ds_phys->ds_userrefs_obj;
3930         VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
3931
3932         spa_history_log_internal(LOG_DS_USER_RELEASE,
3933             dp->dp_spa, tx, "<%s> %lld dataset = %llu",
3934             ra->htag, (longlong_t)refs, dsobj);
3935
3936         if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
3937             DS_IS_DEFER_DESTROY(ds)) {
3938                 struct dsl_ds_destroyarg dsda = {0};
3939
3940                 ASSERT(ra->own);
3941                 dsda.ds = ds;
3942                 dsda.releasing = B_TRUE;
3943                 /* We already did the destroy_check */
3944                 dsl_dataset_destroy_sync(&dsda, tag, tx);
3945         }
3946 }
3947
3948 static int
3949 dsl_dataset_user_release_one(const char *dsname, void *arg)
3950 {
3951         struct dsl_ds_holdarg *ha = arg;
3952         struct dsl_ds_releasearg *ra;
3953         dsl_dataset_t *ds;
3954         int error;
3955         void *dtag = ha->dstg;
3956         char *name;
3957         boolean_t own = B_FALSE;
3958         boolean_t might_destroy;
3959
3960         /* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
3961         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3962         error = dsl_dataset_hold(name, dtag, &ds);
3963         strfree(name);
3964         if (error == ENOENT && ha->recursive)
3965                 return (0);
3966         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3967         if (error)
3968                 return (error);
3969
3970         ha->gotone = B_TRUE;
3971
3972         ASSERT(dsl_dataset_is_snapshot(ds));
3973
3974         error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
3975         if (error) {
3976                 dsl_dataset_rele(ds, dtag);
3977                 return (error);
3978         }
3979
3980         if (might_destroy) {
3981 #ifdef _KERNEL
3982                 name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3983                 error = zfs_unmount_snap(name, NULL);
3984                 strfree(name);
3985                 if (error) {
3986                         dsl_dataset_rele(ds, dtag);
3987                         return (error);
3988                 }
3989 #endif
3990                 if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
3991                         dsl_dataset_rele(ds, dtag);
3992                         return (EBUSY);
3993                 } else {
3994                         own = B_TRUE;
3995                         dsl_dataset_make_exclusive(ds, dtag);
3996                 }
3997         }
3998
3999         ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
4000         ra->ds = ds;
4001         ra->htag = ha->htag;
4002         ra->own = own;
4003         dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
4004             dsl_dataset_user_release_sync, ra, dtag, 0);
4005
4006         return (0);
4007 }
4008
4009 int
4010 dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
4011     boolean_t recursive)
4012 {
4013         struct dsl_ds_holdarg *ha;
4014         dsl_sync_task_t *dst;
4015         spa_t *spa;
4016         int error;
4017
4018 top:
4019         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
4020
4021         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
4022
4023         error = spa_open(dsname, &spa, FTAG);
4024         if (error) {
4025                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4026                 return (error);
4027         }
4028
4029         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
4030         ha->htag = htag;
4031         ha->snapname = snapname;
4032         ha->recursive = recursive;
4033         if (recursive) {
4034                 error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
4035                     ha, DS_FIND_CHILDREN);
4036         } else {
4037                 error = dsl_dataset_user_release_one(dsname, ha);
4038         }
4039         if (error == 0)
4040                 error = dsl_sync_task_group_wait(ha->dstg);
4041
4042         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
4043             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
4044                 struct dsl_ds_releasearg *ra = dst->dst_arg1;
4045                 dsl_dataset_t *ds = ra->ds;
4046
4047                 if (dst->dst_err)
4048                         dsl_dataset_name(ds, ha->failed);
4049
4050                 if (ra->own)
4051                         dsl_dataset_disown(ds, ha->dstg);
4052                 else
4053                         dsl_dataset_rele(ds, ha->dstg);
4054
4055                 kmem_free(ra, sizeof (struct dsl_ds_releasearg));
4056         }
4057
4058         if (error == 0 && recursive && !ha->gotone)
4059                 error = ENOENT;
4060
4061         if (error && error != EBUSY)
4062                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
4063
4064         dsl_sync_task_group_destroy(ha->dstg);
4065         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
4066         spa_close(spa, FTAG);
4067
4068         /*
4069          * We can get EBUSY if we were racing with deferred destroy and
4070          * dsl_dataset_user_release_check() hadn't done the necessary
4071          * open context setup.  We can also get EBUSY if we're racing
4072          * with destroy and that thread is the ds_owner.  Either way
4073          * the busy condition should be transient, and we should retry
4074          * the release operation.
4075          */
4076         if (error == EBUSY)
4077                 goto top;
4078
4079         return (error);
4080 }
4081
4082 /*
4083  * Called at spa_load time (with retry == B_FALSE) to release a stale
4084  * temporary user hold. Also called by the onexit code (with retry == B_TRUE).
4085  */
4086 int
4087 dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag,
4088     boolean_t retry)
4089 {
4090         dsl_dataset_t *ds;
4091         char *snap;
4092         char *name;
4093         int namelen;
4094         int error;
4095
4096         do {
4097                 rw_enter(&dp->dp_config_rwlock, RW_READER);
4098                 error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
4099                 rw_exit(&dp->dp_config_rwlock);
4100                 if (error)
4101                         return (error);
4102                 namelen = dsl_dataset_namelen(ds)+1;
4103                 name = kmem_alloc(namelen, KM_SLEEP);
4104                 dsl_dataset_name(ds, name);
4105                 dsl_dataset_rele(ds, FTAG);
4106
4107                 snap = strchr(name, '@');
4108                 *snap = '\0';
4109                 ++snap;
4110                 error = dsl_dataset_user_release(name, snap, htag, B_FALSE);
4111                 kmem_free(name, namelen);
4112
4113                 /*
4114                  * The object can't have been destroyed because we have a hold,
4115                  * but it might have been renamed, resulting in ENOENT.  Retry
4116                  * if we've been requested to do so.
4117                  *
4118                  * It would be nice if we could use the dsobj all the way
4119                  * through and avoid ENOENT entirely.  But we might need to
4120                  * unmount the snapshot, and there's currently no way to lookup
4121                  * a vfsp using a ZFS object id.
4122                  */
4123         } while ((error == ENOENT) && retry);
4124
4125         return (error);
4126 }
4127
4128 int
4129 dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
4130 {
4131         dsl_dataset_t *ds;
4132         int err;
4133
4134         err = dsl_dataset_hold(dsname, FTAG, &ds);
4135         if (err)
4136                 return (err);
4137
4138         VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
4139         if (ds->ds_phys->ds_userrefs_obj != 0) {
4140                 zap_attribute_t *za;
4141                 zap_cursor_t zc;
4142
4143                 za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
4144                 for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
4145                     ds->ds_phys->ds_userrefs_obj);
4146                     zap_cursor_retrieve(&zc, za) == 0;
4147                     zap_cursor_advance(&zc)) {
4148                         VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
4149                             za->za_first_integer));
4150                 }
4151                 zap_cursor_fini(&zc);
4152                 kmem_free(za, sizeof (zap_attribute_t));
4153         }
4154         dsl_dataset_rele(ds, FTAG);
4155         return (0);
4156 }
4157
4158 /*
4159  * Note, this function is used as the callback for dmu_objset_find().  We
4160  * always return 0 so that we will continue to find and process
4161  * inconsistent datasets, even if we encounter an error trying to
4162  * process one of them.
4163  */
4164 /* ARGSUSED */
4165 int
4166 dsl_destroy_inconsistent(const char *dsname, void *arg)
4167 {
4168         dsl_dataset_t *ds;
4169
4170         if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
4171                 if (DS_IS_INCONSISTENT(ds))
4172                         (void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
4173                 else
4174                         dsl_dataset_disown(ds, FTAG);
4175         }
4176         return (0);
4177 }
4178
4179
4180 /*
4181  * Return (in *usedp) the amount of space written in new that is not
4182  * present in oldsnap.  New may be a snapshot or the head.  Old must be
4183  * a snapshot before new, in new's filesystem (or its origin).  If not then
4184  * fail and return EINVAL.
4185  *
4186  * The written space is calculated by considering two components:  First, we
4187  * ignore any freed space, and calculate the written as new's used space
4188  * minus old's used space.  Next, we add in the amount of space that was freed
4189  * between the two snapshots, thus reducing new's used space relative to old's.
4190  * Specifically, this is the space that was born before old->ds_creation_txg,
4191  * and freed before new (ie. on new's deadlist or a previous deadlist).
4192  *
4193  * space freed                         [---------------------]
4194  * snapshots                       ---O-------O--------O-------O------
4195  *                                         oldsnap            new
4196  */
4197 int
4198 dsl_dataset_space_written(dsl_dataset_t *oldsnap, dsl_dataset_t *new,
4199     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4200 {
4201         int err = 0;
4202         uint64_t snapobj;
4203         dsl_pool_t *dp = new->ds_dir->dd_pool;
4204
4205         *usedp = 0;
4206         *usedp += new->ds_phys->ds_referenced_bytes;
4207         *usedp -= oldsnap->ds_phys->ds_referenced_bytes;
4208
4209         *compp = 0;
4210         *compp += new->ds_phys->ds_compressed_bytes;
4211         *compp -= oldsnap->ds_phys->ds_compressed_bytes;
4212
4213         *uncompp = 0;
4214         *uncompp += new->ds_phys->ds_uncompressed_bytes;
4215         *uncompp -= oldsnap->ds_phys->ds_uncompressed_bytes;
4216
4217         rw_enter(&dp->dp_config_rwlock, RW_READER);
4218         snapobj = new->ds_object;
4219         while (snapobj != oldsnap->ds_object) {
4220                 dsl_dataset_t *snap;
4221                 uint64_t used, comp, uncomp;
4222
4223                 if (snapobj == new->ds_object) {
4224                         snap = new;
4225                 } else {
4226                         err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &snap);
4227                         if (err != 0)
4228                                 break;
4229                 }
4230
4231                 if (snap->ds_phys->ds_prev_snap_txg ==
4232                     oldsnap->ds_phys->ds_creation_txg) {
4233                         /*
4234                          * The blocks in the deadlist can not be born after
4235                          * ds_prev_snap_txg, so get the whole deadlist space,
4236                          * which is more efficient (especially for old-format
4237                          * deadlists).  Unfortunately the deadlist code
4238                          * doesn't have enough information to make this
4239                          * optimization itself.
4240                          */
4241                         dsl_deadlist_space(&snap->ds_deadlist,
4242                             &used, &comp, &uncomp);
4243                 } else {
4244                         dsl_deadlist_space_range(&snap->ds_deadlist,
4245                             0, oldsnap->ds_phys->ds_creation_txg,
4246                             &used, &comp, &uncomp);
4247                 }
4248                 *usedp += used;
4249                 *compp += comp;
4250                 *uncompp += uncomp;
4251
4252                 /*
4253                  * If we get to the beginning of the chain of snapshots
4254                  * (ds_prev_snap_obj == 0) before oldsnap, then oldsnap
4255                  * was not a snapshot of/before new.
4256                  */
4257                 snapobj = snap->ds_phys->ds_prev_snap_obj;
4258                 if (snap != new)
4259                         dsl_dataset_rele(snap, FTAG);
4260                 if (snapobj == 0) {
4261                         err = EINVAL;
4262                         break;
4263                 }
4264
4265         }
4266         rw_exit(&dp->dp_config_rwlock);
4267         return (err);
4268 }
4269
4270 /*
4271  * Return (in *usedp) the amount of space that will be reclaimed if firstsnap,
4272  * lastsnap, and all snapshots in between are deleted.
4273  *
4274  * blocks that would be freed            [---------------------------]
4275  * snapshots                       ---O-------O--------O-------O--------O
4276  *                                        firstsnap        lastsnap
4277  *
4278  * This is the set of blocks that were born after the snap before firstsnap,
4279  * (birth > firstsnap->prev_snap_txg) and died before the snap after the
4280  * last snap (ie, is on lastsnap->ds_next->ds_deadlist or an earlier deadlist).
4281  * We calculate this by iterating over the relevant deadlists (from the snap
4282  * after lastsnap, backward to the snap after firstsnap), summing up the
4283  * space on the deadlist that was born after the snap before firstsnap.
4284  */
4285 int
4286 dsl_dataset_space_wouldfree(dsl_dataset_t *firstsnap,
4287     dsl_dataset_t *lastsnap,
4288     uint64_t *usedp, uint64_t *compp, uint64_t *uncompp)
4289 {
4290         int err = 0;
4291         uint64_t snapobj;
4292         dsl_pool_t *dp = firstsnap->ds_dir->dd_pool;
4293
4294         ASSERT(dsl_dataset_is_snapshot(firstsnap));
4295         ASSERT(dsl_dataset_is_snapshot(lastsnap));
4296
4297         /*
4298          * Check that the snapshots are in the same dsl_dir, and firstsnap
4299          * is before lastsnap.
4300          */
4301         if (firstsnap->ds_dir != lastsnap->ds_dir ||
4302             firstsnap->ds_phys->ds_creation_txg >
4303             lastsnap->ds_phys->ds_creation_txg)
4304                 return (EINVAL);
4305
4306         *usedp = *compp = *uncompp = 0;
4307
4308         rw_enter(&dp->dp_config_rwlock, RW_READER);
4309         snapobj = lastsnap->ds_phys->ds_next_snap_obj;
4310         while (snapobj != firstsnap->ds_object) {
4311                 dsl_dataset_t *ds;
4312                 uint64_t used, comp, uncomp;
4313
4314                 err = dsl_dataset_hold_obj(dp, snapobj, FTAG, &ds);
4315                 if (err != 0)
4316                         break;
4317
4318                 dsl_deadlist_space_range(&ds->ds_deadlist,
4319                     firstsnap->ds_phys->ds_prev_snap_txg, UINT64_MAX,
4320                     &used, &comp, &uncomp);
4321                 *usedp += used;
4322                 *compp += comp;
4323                 *uncompp += uncomp;
4324
4325                 snapobj = ds->ds_phys->ds_prev_snap_obj;
4326                 ASSERT3U(snapobj, !=, 0);
4327                 dsl_dataset_rele(ds, FTAG);
4328         }
4329         rw_exit(&dp->dp_config_rwlock);
4330         return (err);
4331 }
4332
4333 #if defined(_KERNEL) && defined(HAVE_SPL)
4334 EXPORT_SYMBOL(dmu_snapshots_destroy_nvl);
4335 EXPORT_SYMBOL(dsl_dataset_hold);
4336 EXPORT_SYMBOL(dsl_dataset_hold_obj);
4337 EXPORT_SYMBOL(dsl_dataset_own);
4338 EXPORT_SYMBOL(dsl_dataset_own_obj);
4339 EXPORT_SYMBOL(dsl_dataset_name);
4340 EXPORT_SYMBOL(dsl_dataset_rele);
4341 EXPORT_SYMBOL(dsl_dataset_disown);
4342 EXPORT_SYMBOL(dsl_dataset_drop_ref);
4343 EXPORT_SYMBOL(dsl_dataset_tryown);
4344 EXPORT_SYMBOL(dsl_dataset_make_exclusive);
4345 EXPORT_SYMBOL(dsl_dataset_create_sync);
4346 EXPORT_SYMBOL(dsl_dataset_create_sync_dd);
4347 EXPORT_SYMBOL(dsl_dataset_destroy);
4348 EXPORT_SYMBOL(dsl_dataset_destroy_check);
4349 EXPORT_SYMBOL(dsl_dataset_destroy_sync);
4350 EXPORT_SYMBOL(dsl_dataset_snapshot_check);
4351 EXPORT_SYMBOL(dsl_dataset_snapshot_sync);
4352 EXPORT_SYMBOL(dsl_dataset_rename);
4353 EXPORT_SYMBOL(dsl_dataset_promote);
4354 EXPORT_SYMBOL(dsl_dataset_clone_swap);
4355 EXPORT_SYMBOL(dsl_dataset_user_hold);
4356 EXPORT_SYMBOL(dsl_dataset_user_release);
4357 EXPORT_SYMBOL(dsl_dataset_user_release_tmp);
4358 EXPORT_SYMBOL(dsl_dataset_get_holds);
4359 EXPORT_SYMBOL(dsl_dataset_get_blkptr);
4360 EXPORT_SYMBOL(dsl_dataset_set_blkptr);
4361 EXPORT_SYMBOL(dsl_dataset_get_spa);
4362 EXPORT_SYMBOL(dsl_dataset_modified_since_lastsnap);
4363 EXPORT_SYMBOL(dsl_dataset_space_written);
4364 EXPORT_SYMBOL(dsl_dataset_space_wouldfree);
4365 EXPORT_SYMBOL(dsl_dataset_sync);
4366 EXPORT_SYMBOL(dsl_dataset_block_born);
4367 EXPORT_SYMBOL(dsl_dataset_block_kill);
4368 EXPORT_SYMBOL(dsl_dataset_block_freeable);
4369 EXPORT_SYMBOL(dsl_dataset_prev_snap_txg);
4370 EXPORT_SYMBOL(dsl_dataset_dirty);
4371 EXPORT_SYMBOL(dsl_dataset_stats);
4372 EXPORT_SYMBOL(dsl_dataset_fast_stat);
4373 EXPORT_SYMBOL(dsl_dataset_space);
4374 EXPORT_SYMBOL(dsl_dataset_fsid_guid);
4375 EXPORT_SYMBOL(dsl_dsobj_to_dsname);
4376 EXPORT_SYMBOL(dsl_dataset_check_quota);
4377 EXPORT_SYMBOL(dsl_dataset_set_quota);
4378 EXPORT_SYMBOL(dsl_dataset_set_quota_sync);
4379 EXPORT_SYMBOL(dsl_dataset_set_reservation);
4380 EXPORT_SYMBOL(dsl_destroy_inconsistent);
4381 #endif