Fix gcc uninitialized variable warnings
[zfs.git] / module / zfs / dsl_dataset.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24
25 #include <sys/dmu_objset.h>
26 #include <sys/dsl_dataset.h>
27 #include <sys/dsl_dir.h>
28 #include <sys/dsl_prop.h>
29 #include <sys/dsl_synctask.h>
30 #include <sys/dmu_traverse.h>
31 #include <sys/dmu_tx.h>
32 #include <sys/arc.h>
33 #include <sys/zio.h>
34 #include <sys/zap.h>
35 #include <sys/unique.h>
36 #include <sys/zfs_context.h>
37 #include <sys/zfs_ioctl.h>
38 #include <sys/spa.h>
39 #include <sys/zfs_znode.h>
40 #include <sys/zfs_onexit.h>
41 #include <sys/zvol.h>
42 #include <sys/dsl_scan.h>
43 #include <sys/dsl_deadlist.h>
44
45 static char *dsl_reaper = "the grim reaper";
46
47 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
48 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
49 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
50
51 #define SWITCH64(x, y) \
52         { \
53                 uint64_t __tmp = (x); \
54                 (x) = (y); \
55                 (y) = __tmp; \
56         }
57
58 #define DS_REF_MAX      (1ULL << 62)
59
60 #define DSL_DEADLIST_BLOCKSIZE  SPA_MAXBLOCKSIZE
61
62 #define DSL_DATASET_IS_DESTROYED(ds)    ((ds)->ds_owner == dsl_reaper)
63
64
65 /*
66  * Figure out how much of this delta should be propogated to the dsl_dir
67  * layer.  If there's a refreservation, that space has already been
68  * partially accounted for in our ancestors.
69  */
70 static int64_t
71 parent_delta(dsl_dataset_t *ds, int64_t delta)
72 {
73         uint64_t old_bytes, new_bytes;
74
75         if (ds->ds_reserved == 0)
76                 return (delta);
77
78         old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
79         new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
80
81         ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
82         return (new_bytes - old_bytes);
83 }
84
85 void
86 dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
87 {
88         int used, compressed, uncompressed;
89         int64_t delta;
90
91         used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
92         compressed = BP_GET_PSIZE(bp);
93         uncompressed = BP_GET_UCSIZE(bp);
94
95         dprintf_bp(bp, "ds=%p", ds);
96
97         ASSERT(dmu_tx_is_syncing(tx));
98         /* It could have been compressed away to nothing */
99         if (BP_IS_HOLE(bp))
100                 return;
101         ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
102         ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
103         if (ds == NULL) {
104                 /*
105                  * Account for the meta-objset space in its placeholder
106                  * dsl_dir.
107                  */
108                 ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
109                 dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
110                     used, compressed, uncompressed, tx);
111                 dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
112                 return;
113         }
114         dmu_buf_will_dirty(ds->ds_dbuf, tx);
115
116         mutex_enter(&ds->ds_dir->dd_lock);
117         mutex_enter(&ds->ds_lock);
118         delta = parent_delta(ds, used);
119         ds->ds_phys->ds_used_bytes += used;
120         ds->ds_phys->ds_compressed_bytes += compressed;
121         ds->ds_phys->ds_uncompressed_bytes += uncompressed;
122         ds->ds_phys->ds_unique_bytes += used;
123         mutex_exit(&ds->ds_lock);
124         dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
125             compressed, uncompressed, tx);
126         dsl_dir_transfer_space(ds->ds_dir, used - delta,
127             DD_USED_REFRSRV, DD_USED_HEAD, tx);
128         mutex_exit(&ds->ds_dir->dd_lock);
129 }
130
131 int
132 dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
133     boolean_t async)
134 {
135         int used, compressed, uncompressed;
136
137         if (BP_IS_HOLE(bp))
138                 return (0);
139
140         ASSERT(dmu_tx_is_syncing(tx));
141         ASSERT(bp->blk_birth <= tx->tx_txg);
142
143         used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
144         compressed = BP_GET_PSIZE(bp);
145         uncompressed = BP_GET_UCSIZE(bp);
146
147         ASSERT(used > 0);
148         if (ds == NULL) {
149                 /*
150                  * Account for the meta-objset space in its placeholder
151                  * dataset.
152                  */
153                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
154
155                 dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
156                     -used, -compressed, -uncompressed, tx);
157                 dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
158                 return (used);
159         }
160         ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
161
162         ASSERT(!dsl_dataset_is_snapshot(ds));
163         dmu_buf_will_dirty(ds->ds_dbuf, tx);
164
165         if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
166                 int64_t delta;
167
168                 dprintf_bp(bp, "freeing ds=%llu", ds->ds_object);
169                 dsl_free(tx->tx_pool, tx->tx_txg, bp);
170
171                 mutex_enter(&ds->ds_dir->dd_lock);
172                 mutex_enter(&ds->ds_lock);
173                 ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
174                     !DS_UNIQUE_IS_ACCURATE(ds));
175                 delta = parent_delta(ds, -used);
176                 ds->ds_phys->ds_unique_bytes -= used;
177                 mutex_exit(&ds->ds_lock);
178                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
179                     delta, -compressed, -uncompressed, tx);
180                 dsl_dir_transfer_space(ds->ds_dir, -used - delta,
181                     DD_USED_REFRSRV, DD_USED_HEAD, tx);
182                 mutex_exit(&ds->ds_dir->dd_lock);
183         } else {
184                 dprintf_bp(bp, "putting on dead list: %s", "");
185                 if (async) {
186                         /*
187                          * We are here as part of zio's write done callback,
188                          * which means we're a zio interrupt thread.  We can't
189                          * call dsl_deadlist_insert() now because it may block
190                          * waiting for I/O.  Instead, put bp on the deferred
191                          * queue and let dsl_pool_sync() finish the job.
192                          */
193                         bplist_append(&ds->ds_pending_deadlist, bp);
194                 } else {
195                         dsl_deadlist_insert(&ds->ds_deadlist, bp, tx);
196                 }
197                 ASSERT3U(ds->ds_prev->ds_object, ==,
198                     ds->ds_phys->ds_prev_snap_obj);
199                 ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
200                 /* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
201                 if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
202                     ds->ds_object && bp->blk_birth >
203                     ds->ds_prev->ds_phys->ds_prev_snap_txg) {
204                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
205                         mutex_enter(&ds->ds_prev->ds_lock);
206                         ds->ds_prev->ds_phys->ds_unique_bytes += used;
207                         mutex_exit(&ds->ds_prev->ds_lock);
208                 }
209                 if (bp->blk_birth > ds->ds_dir->dd_origin_txg) {
210                         dsl_dir_transfer_space(ds->ds_dir, used,
211                             DD_USED_HEAD, DD_USED_SNAP, tx);
212                 }
213         }
214         mutex_enter(&ds->ds_lock);
215         ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
216         ds->ds_phys->ds_used_bytes -= used;
217         ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
218         ds->ds_phys->ds_compressed_bytes -= compressed;
219         ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
220         ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
221         mutex_exit(&ds->ds_lock);
222
223         return (used);
224 }
225
226 uint64_t
227 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
228 {
229         uint64_t trysnap = 0;
230
231         if (ds == NULL)
232                 return (0);
233         /*
234          * The snapshot creation could fail, but that would cause an
235          * incorrect FALSE return, which would only result in an
236          * overestimation of the amount of space that an operation would
237          * consume, which is OK.
238          *
239          * There's also a small window where we could miss a pending
240          * snapshot, because we could set the sync task in the quiescing
241          * phase.  So this should only be used as a guess.
242          */
243         if (ds->ds_trysnap_txg >
244             spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
245                 trysnap = ds->ds_trysnap_txg;
246         return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
247 }
248
249 boolean_t
250 dsl_dataset_block_freeable(dsl_dataset_t *ds, const blkptr_t *bp,
251     uint64_t blk_birth)
252 {
253         if (blk_birth <= dsl_dataset_prev_snap_txg(ds))
254                 return (B_FALSE);
255
256         ddt_prefetch(dsl_dataset_get_spa(ds), bp);
257
258         return (B_TRUE);
259 }
260
261 /* ARGSUSED */
262 static void
263 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
264 {
265         dsl_dataset_t *ds = dsv;
266
267         ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
268
269         unique_remove(ds->ds_fsid_guid);
270
271         if (ds->ds_objset != NULL)
272                 dmu_objset_evict(ds->ds_objset);
273
274         if (ds->ds_prev) {
275                 dsl_dataset_drop_ref(ds->ds_prev, ds);
276                 ds->ds_prev = NULL;
277         }
278
279         bplist_destroy(&ds->ds_pending_deadlist);
280         if (db != NULL) {
281                 dsl_deadlist_close(&ds->ds_deadlist);
282         } else {
283                 ASSERT(ds->ds_deadlist.dl_dbuf == NULL);
284                 ASSERT(!ds->ds_deadlist.dl_oldfmt);
285         }
286         if (ds->ds_dir)
287                 dsl_dir_close(ds->ds_dir, ds);
288
289         ASSERT(!list_link_active(&ds->ds_synced_link));
290
291         mutex_destroy(&ds->ds_lock);
292         mutex_destroy(&ds->ds_recvlock);
293         mutex_destroy(&ds->ds_opening_lock);
294         rw_destroy(&ds->ds_rwlock);
295         cv_destroy(&ds->ds_exclusive_cv);
296
297         kmem_free(ds, sizeof (dsl_dataset_t));
298 }
299
300 static int
301 dsl_dataset_get_snapname(dsl_dataset_t *ds)
302 {
303         dsl_dataset_phys_t *headphys;
304         int err;
305         dmu_buf_t *headdbuf;
306         dsl_pool_t *dp = ds->ds_dir->dd_pool;
307         objset_t *mos = dp->dp_meta_objset;
308
309         if (ds->ds_snapname[0])
310                 return (0);
311         if (ds->ds_phys->ds_next_snap_obj == 0)
312                 return (0);
313
314         err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
315             FTAG, &headdbuf);
316         if (err)
317                 return (err);
318         headphys = headdbuf->db_data;
319         err = zap_value_search(dp->dp_meta_objset,
320             headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
321         dmu_buf_rele(headdbuf, FTAG);
322         return (err);
323 }
324
325 static int
326 dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
327 {
328         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
329         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
330         matchtype_t mt;
331         int err;
332
333         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
334                 mt = MT_FIRST;
335         else
336                 mt = MT_EXACT;
337
338         err = zap_lookup_norm(mos, snapobj, name, 8, 1,
339             value, mt, NULL, 0, NULL);
340         if (err == ENOTSUP && mt == MT_FIRST)
341                 err = zap_lookup(mos, snapobj, name, 8, 1, value);
342         return (err);
343 }
344
345 static int
346 dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
347 {
348         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
349         uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
350         matchtype_t mt;
351         int err;
352
353         dsl_dir_snap_cmtime_update(ds->ds_dir);
354
355         if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
356                 mt = MT_FIRST;
357         else
358                 mt = MT_EXACT;
359
360         err = zap_remove_norm(mos, snapobj, name, mt, tx);
361         if (err == ENOTSUP && mt == MT_FIRST)
362                 err = zap_remove(mos, snapobj, name, tx);
363         return (err);
364 }
365
366 static int
367 dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
368     dsl_dataset_t **dsp)
369 {
370         objset_t *mos = dp->dp_meta_objset;
371         dmu_buf_t *dbuf;
372         dsl_dataset_t *ds;
373         int err;
374         dmu_object_info_t doi;
375
376         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
377             dsl_pool_sync_context(dp));
378
379         err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
380         if (err)
381                 return (err);
382
383         /* Make sure dsobj has the correct object type. */
384         dmu_object_info_from_db(dbuf, &doi);
385         if (doi.doi_type != DMU_OT_DSL_DATASET)
386                 return (EINVAL);
387
388         ds = dmu_buf_get_user(dbuf);
389         if (ds == NULL) {
390                 dsl_dataset_t *winner = NULL;
391
392                 ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
393                 ds->ds_dbuf = dbuf;
394                 ds->ds_object = dsobj;
395                 ds->ds_phys = dbuf->db_data;
396
397                 mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
398                 mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
399                 mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
400                 rw_init(&ds->ds_rwlock, 0, 0, 0);
401                 cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
402
403                 bplist_create(&ds->ds_pending_deadlist);
404                 dsl_deadlist_open(&ds->ds_deadlist,
405                     mos, ds->ds_phys->ds_deadlist_obj);
406
407                 if (err == 0) {
408                         err = dsl_dir_open_obj(dp,
409                             ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
410                 }
411                 if (err) {
412                         mutex_destroy(&ds->ds_lock);
413                         mutex_destroy(&ds->ds_recvlock);
414                         mutex_destroy(&ds->ds_opening_lock);
415                         rw_destroy(&ds->ds_rwlock);
416                         cv_destroy(&ds->ds_exclusive_cv);
417                         bplist_destroy(&ds->ds_pending_deadlist);
418                         dsl_deadlist_close(&ds->ds_deadlist);
419                         kmem_free(ds, sizeof (dsl_dataset_t));
420                         dmu_buf_rele(dbuf, tag);
421                         return (err);
422                 }
423
424                 if (!dsl_dataset_is_snapshot(ds)) {
425                         ds->ds_snapname[0] = '\0';
426                         if (ds->ds_phys->ds_prev_snap_obj) {
427                                 err = dsl_dataset_get_ref(dp,
428                                     ds->ds_phys->ds_prev_snap_obj,
429                                     ds, &ds->ds_prev);
430                         }
431                 } else {
432                         if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
433                                 err = dsl_dataset_get_snapname(ds);
434                         if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
435                                 err = zap_count(
436                                     ds->ds_dir->dd_pool->dp_meta_objset,
437                                     ds->ds_phys->ds_userrefs_obj,
438                                     &ds->ds_userrefs);
439                         }
440                 }
441
442                 if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
443                         /*
444                          * In sync context, we're called with either no lock
445                          * or with the write lock.  If we're not syncing,
446                          * we're always called with the read lock held.
447                          */
448                         boolean_t need_lock =
449                             !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
450                             dsl_pool_sync_context(dp);
451
452                         if (need_lock)
453                                 rw_enter(&dp->dp_config_rwlock, RW_READER);
454
455                         err = dsl_prop_get_ds(ds,
456                             "refreservation", sizeof (uint64_t), 1,
457                             &ds->ds_reserved, NULL);
458                         if (err == 0) {
459                                 err = dsl_prop_get_ds(ds,
460                                     "refquota", sizeof (uint64_t), 1,
461                                     &ds->ds_quota, NULL);
462                         }
463
464                         if (need_lock)
465                                 rw_exit(&dp->dp_config_rwlock);
466                 } else {
467                         ds->ds_reserved = ds->ds_quota = 0;
468                 }
469
470                 if (err == 0) {
471                         winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
472                             dsl_dataset_evict);
473                 }
474                 if (err || winner) {
475                         bplist_destroy(&ds->ds_pending_deadlist);
476                         dsl_deadlist_close(&ds->ds_deadlist);
477                         if (ds->ds_prev)
478                                 dsl_dataset_drop_ref(ds->ds_prev, ds);
479                         dsl_dir_close(ds->ds_dir, ds);
480                         mutex_destroy(&ds->ds_lock);
481                         mutex_destroy(&ds->ds_recvlock);
482                         mutex_destroy(&ds->ds_opening_lock);
483                         rw_destroy(&ds->ds_rwlock);
484                         cv_destroy(&ds->ds_exclusive_cv);
485                         kmem_free(ds, sizeof (dsl_dataset_t));
486                         if (err) {
487                                 dmu_buf_rele(dbuf, tag);
488                                 return (err);
489                         }
490                         ds = winner;
491                 } else {
492                         ds->ds_fsid_guid =
493                             unique_insert(ds->ds_phys->ds_fsid_guid);
494                 }
495         }
496         ASSERT3P(ds->ds_dbuf, ==, dbuf);
497         ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
498         ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
499             spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
500             dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
501         mutex_enter(&ds->ds_lock);
502         if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
503                 mutex_exit(&ds->ds_lock);
504                 dmu_buf_rele(ds->ds_dbuf, tag);
505                 return (ENOENT);
506         }
507         mutex_exit(&ds->ds_lock);
508         *dsp = ds;
509         return (0);
510 }
511
512 static int
513 dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
514 {
515         dsl_pool_t *dp = ds->ds_dir->dd_pool;
516
517         /*
518          * In syncing context we don't want the rwlock lock: there
519          * may be an existing writer waiting for sync phase to
520          * finish.  We don't need to worry about such writers, since
521          * sync phase is single-threaded, so the writer can't be
522          * doing anything while we are active.
523          */
524         if (dsl_pool_sync_context(dp)) {
525                 ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
526                 return (0);
527         }
528
529         /*
530          * Normal users will hold the ds_rwlock as a READER until they
531          * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
532          * drop their READER lock after they set the ds_owner field.
533          *
534          * If the dataset is being destroyed, the destroy thread will
535          * obtain a WRITER lock for exclusive access after it's done its
536          * open-context work and then change the ds_owner to
537          * dsl_reaper once destruction is assured.  So threads
538          * may block here temporarily, until the "destructability" of
539          * the dataset is determined.
540          */
541         ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
542         mutex_enter(&ds->ds_lock);
543         while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
544                 rw_exit(&dp->dp_config_rwlock);
545                 cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
546                 if (DSL_DATASET_IS_DESTROYED(ds)) {
547                         mutex_exit(&ds->ds_lock);
548                         dsl_dataset_drop_ref(ds, tag);
549                         rw_enter(&dp->dp_config_rwlock, RW_READER);
550                         return (ENOENT);
551                 }
552                 /*
553                  * The dp_config_rwlock lives above the ds_lock. And
554                  * we need to check DSL_DATASET_IS_DESTROYED() while
555                  * holding the ds_lock, so we have to drop and reacquire
556                  * the ds_lock here.
557                  */
558                 mutex_exit(&ds->ds_lock);
559                 rw_enter(&dp->dp_config_rwlock, RW_READER);
560                 mutex_enter(&ds->ds_lock);
561         }
562         mutex_exit(&ds->ds_lock);
563         return (0);
564 }
565
566 int
567 dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
568     dsl_dataset_t **dsp)
569 {
570         int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
571
572         if (err)
573                 return (err);
574         return (dsl_dataset_hold_ref(*dsp, tag));
575 }
576
577 int
578 dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
579     void *tag, dsl_dataset_t **dsp)
580 {
581         int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
582         if (err)
583                 return (err);
584         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
585                 dsl_dataset_rele(*dsp, tag);
586                 *dsp = NULL;
587                 return (EBUSY);
588         }
589         return (0);
590 }
591
592 int
593 dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
594 {
595         dsl_dir_t *dd;
596         dsl_pool_t *dp;
597         const char *snapname;
598         uint64_t obj;
599         int err = 0;
600
601         err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
602         if (err)
603                 return (err);
604
605         dp = dd->dd_pool;
606         obj = dd->dd_phys->dd_head_dataset_obj;
607         rw_enter(&dp->dp_config_rwlock, RW_READER);
608         if (obj)
609                 err = dsl_dataset_get_ref(dp, obj, tag, dsp);
610         else
611                 err = ENOENT;
612         if (err)
613                 goto out;
614
615         err = dsl_dataset_hold_ref(*dsp, tag);
616
617         /* we may be looking for a snapshot */
618         if (err == 0 && snapname != NULL) {
619                 dsl_dataset_t *ds = NULL;
620
621                 if (*snapname++ != '@') {
622                         dsl_dataset_rele(*dsp, tag);
623                         err = ENOENT;
624                         goto out;
625                 }
626
627                 dprintf("looking for snapshot '%s'\n", snapname);
628                 err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
629                 if (err == 0)
630                         err = dsl_dataset_get_ref(dp, obj, tag, &ds);
631                 dsl_dataset_rele(*dsp, tag);
632
633                 ASSERT3U((err == 0), ==, (ds != NULL));
634
635                 if (ds) {
636                         mutex_enter(&ds->ds_lock);
637                         if (ds->ds_snapname[0] == 0)
638                                 (void) strlcpy(ds->ds_snapname, snapname,
639                                     sizeof (ds->ds_snapname));
640                         mutex_exit(&ds->ds_lock);
641                         err = dsl_dataset_hold_ref(ds, tag);
642                         *dsp = err ? NULL : ds;
643                 }
644         }
645 out:
646         rw_exit(&dp->dp_config_rwlock);
647         dsl_dir_close(dd, FTAG);
648         return (err);
649 }
650
651 int
652 dsl_dataset_own(const char *name, boolean_t inconsistentok,
653     void *tag, dsl_dataset_t **dsp)
654 {
655         int err = dsl_dataset_hold(name, tag, dsp);
656         if (err)
657                 return (err);
658         if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
659                 dsl_dataset_rele(*dsp, tag);
660                 return (EBUSY);
661         }
662         return (0);
663 }
664
665 void
666 dsl_dataset_name(dsl_dataset_t *ds, char *name)
667 {
668         if (ds == NULL) {
669                 (void) strcpy(name, "mos");
670         } else {
671                 dsl_dir_name(ds->ds_dir, name);
672                 VERIFY(0 == dsl_dataset_get_snapname(ds));
673                 if (ds->ds_snapname[0]) {
674                         (void) strcat(name, "@");
675                         /*
676                          * We use a "recursive" mutex so that we
677                          * can call dprintf_ds() with ds_lock held.
678                          */
679                         if (!MUTEX_HELD(&ds->ds_lock)) {
680                                 mutex_enter(&ds->ds_lock);
681                                 (void) strcat(name, ds->ds_snapname);
682                                 mutex_exit(&ds->ds_lock);
683                         } else {
684                                 (void) strcat(name, ds->ds_snapname);
685                         }
686                 }
687         }
688 }
689
690 static int
691 dsl_dataset_namelen(dsl_dataset_t *ds)
692 {
693         int result;
694
695         if (ds == NULL) {
696                 result = 3;     /* "mos" */
697         } else {
698                 result = dsl_dir_namelen(ds->ds_dir);
699                 VERIFY(0 == dsl_dataset_get_snapname(ds));
700                 if (ds->ds_snapname[0]) {
701                         ++result;       /* adding one for the @-sign */
702                         if (!MUTEX_HELD(&ds->ds_lock)) {
703                                 mutex_enter(&ds->ds_lock);
704                                 result += strlen(ds->ds_snapname);
705                                 mutex_exit(&ds->ds_lock);
706                         } else {
707                                 result += strlen(ds->ds_snapname);
708                         }
709                 }
710         }
711
712         return (result);
713 }
714
715 void
716 dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
717 {
718         dmu_buf_rele(ds->ds_dbuf, tag);
719 }
720
721 void
722 dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
723 {
724         if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
725                 rw_exit(&ds->ds_rwlock);
726         }
727         dsl_dataset_drop_ref(ds, tag);
728 }
729
730 void
731 dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
732 {
733         ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
734             (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
735
736         mutex_enter(&ds->ds_lock);
737         ds->ds_owner = NULL;
738         if (RW_WRITE_HELD(&ds->ds_rwlock)) {
739                 rw_exit(&ds->ds_rwlock);
740                 cv_broadcast(&ds->ds_exclusive_cv);
741         }
742         mutex_exit(&ds->ds_lock);
743         if (ds->ds_dbuf)
744                 dsl_dataset_drop_ref(ds, tag);
745         else
746                 dsl_dataset_evict(NULL, ds);
747 }
748
749 boolean_t
750 dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
751 {
752         boolean_t gotit = FALSE;
753
754         mutex_enter(&ds->ds_lock);
755         if (ds->ds_owner == NULL &&
756             (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
757                 ds->ds_owner = tag;
758                 if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
759                         rw_exit(&ds->ds_rwlock);
760                 gotit = TRUE;
761         }
762         mutex_exit(&ds->ds_lock);
763         return (gotit);
764 }
765
766 void
767 dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
768 {
769         ASSERT3P(owner, ==, ds->ds_owner);
770         if (!RW_WRITE_HELD(&ds->ds_rwlock))
771                 rw_enter(&ds->ds_rwlock, RW_WRITER);
772 }
773
774 uint64_t
775 dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
776     uint64_t flags, dmu_tx_t *tx)
777 {
778         dsl_pool_t *dp = dd->dd_pool;
779         dmu_buf_t *dbuf;
780         dsl_dataset_phys_t *dsphys;
781         uint64_t dsobj;
782         objset_t *mos = dp->dp_meta_objset;
783
784         if (origin == NULL)
785                 origin = dp->dp_origin_snap;
786
787         ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
788         ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
789         ASSERT(dmu_tx_is_syncing(tx));
790         ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
791
792         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
793             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
794         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
795         dmu_buf_will_dirty(dbuf, tx);
796         dsphys = dbuf->db_data;
797         bzero(dsphys, sizeof (dsl_dataset_phys_t));
798         dsphys->ds_dir_obj = dd->dd_object;
799         dsphys->ds_flags = flags;
800         dsphys->ds_fsid_guid = unique_create();
801         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
802             sizeof (dsphys->ds_guid));
803         dsphys->ds_snapnames_zapobj =
804             zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
805             DMU_OT_NONE, 0, tx);
806         dsphys->ds_creation_time = gethrestime_sec();
807         dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
808
809         if (origin == NULL) {
810                 dsphys->ds_deadlist_obj = dsl_deadlist_alloc(mos, tx);
811         } else {
812                 dsl_dataset_t *ohds;
813
814                 dsphys->ds_prev_snap_obj = origin->ds_object;
815                 dsphys->ds_prev_snap_txg =
816                     origin->ds_phys->ds_creation_txg;
817                 dsphys->ds_used_bytes =
818                     origin->ds_phys->ds_used_bytes;
819                 dsphys->ds_compressed_bytes =
820                     origin->ds_phys->ds_compressed_bytes;
821                 dsphys->ds_uncompressed_bytes =
822                     origin->ds_phys->ds_uncompressed_bytes;
823                 dsphys->ds_bp = origin->ds_phys->ds_bp;
824                 dsphys->ds_flags |= origin->ds_phys->ds_flags;
825
826                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
827                 origin->ds_phys->ds_num_children++;
828
829                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
830                     origin->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ohds));
831                 dsphys->ds_deadlist_obj = dsl_deadlist_clone(&ohds->ds_deadlist,
832                     dsphys->ds_prev_snap_txg, dsphys->ds_prev_snap_obj, tx);
833                 dsl_dataset_rele(ohds, FTAG);
834
835                 if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
836                         if (origin->ds_phys->ds_next_clones_obj == 0) {
837                                 origin->ds_phys->ds_next_clones_obj =
838                                     zap_create(mos,
839                                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
840                         }
841                         VERIFY(0 == zap_add_int(mos,
842                             origin->ds_phys->ds_next_clones_obj,
843                             dsobj, tx));
844                 }
845
846                 dmu_buf_will_dirty(dd->dd_dbuf, tx);
847                 dd->dd_phys->dd_origin_obj = origin->ds_object;
848                 if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
849                         if (origin->ds_dir->dd_phys->dd_clones == 0) {
850                                 dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
851                                 origin->ds_dir->dd_phys->dd_clones =
852                                     zap_create(mos,
853                                     DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
854                         }
855                         VERIFY3U(0, ==, zap_add_int(mos,
856                             origin->ds_dir->dd_phys->dd_clones, dsobj, tx));
857                 }
858         }
859
860         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
861                 dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
862
863         dmu_buf_rele(dbuf, FTAG);
864
865         dmu_buf_will_dirty(dd->dd_dbuf, tx);
866         dd->dd_phys->dd_head_dataset_obj = dsobj;
867
868         return (dsobj);
869 }
870
871 uint64_t
872 dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
873     dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
874 {
875         dsl_pool_t *dp = pdd->dd_pool;
876         uint64_t dsobj, ddobj;
877         dsl_dir_t *dd;
878
879         ASSERT(lastname[0] != '@');
880
881         ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
882         VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
883
884         dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
885
886         dsl_deleg_set_create_perms(dd, tx, cr);
887
888         dsl_dir_close(dd, FTAG);
889
890         /*
891          * If we are creating a clone, make sure we zero out any stale
892          * data from the origin snapshots zil header.
893          */
894         if (origin != NULL) {
895                 dsl_dataset_t *ds;
896                 objset_t *os;
897
898                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
899                 VERIFY3U(0, ==, dmu_objset_from_ds(ds, &os));
900                 bzero(&os->os_zil_header, sizeof (os->os_zil_header));
901                 dsl_dataset_dirty(ds, tx);
902                 dsl_dataset_rele(ds, FTAG);
903         }
904
905         return (dsobj);
906 }
907
908 struct destroyarg {
909         dsl_sync_task_group_t *dstg;
910         char *snapname;
911         char *failed;
912         boolean_t defer;
913 };
914
915 static int
916 dsl_snapshot_destroy_one(const char *name, void *arg)
917 {
918         struct destroyarg *da = arg;
919         dsl_dataset_t *ds;
920         int err;
921         char *dsname;
922
923         dsname = kmem_asprintf("%s@%s", name, da->snapname);
924         err = dsl_dataset_own(dsname, B_TRUE, da->dstg, &ds);
925         strfree(dsname);
926         if (err == 0) {
927                 struct dsl_ds_destroyarg *dsda;
928
929                 dsl_dataset_make_exclusive(ds, da->dstg);
930                 dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg), KM_SLEEP);
931                 dsda->ds = ds;
932                 dsda->defer = da->defer;
933                 dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
934                     dsl_dataset_destroy_sync, dsda, da->dstg, 0);
935         } else if (err == ENOENT) {
936                 err = 0;
937         } else {
938                 (void) strcpy(da->failed, name);
939         }
940         return (err);
941 }
942
943 /*
944  * Destroy 'snapname' in all descendants of 'fsname'.
945  */
946 #pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
947 int
948 dsl_snapshots_destroy(char *fsname, char *snapname, boolean_t defer)
949 {
950         int err;
951         struct destroyarg da;
952         dsl_sync_task_t *dst;
953         spa_t *spa;
954
955         err = spa_open(fsname, &spa, FTAG);
956         if (err)
957                 return (err);
958         da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
959         da.snapname = snapname;
960         da.failed = fsname;
961         da.defer = defer;
962
963         err = dmu_objset_find(fsname,
964             dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
965
966         if (err == 0)
967                 err = dsl_sync_task_group_wait(da.dstg);
968
969         for (dst = list_head(&da.dstg->dstg_tasks); dst;
970             dst = list_next(&da.dstg->dstg_tasks, dst)) {
971                 struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
972                 dsl_dataset_t *ds = dsda->ds;
973
974                 /*
975                  * Return the file system name that triggered the error
976                  */
977                 if (dst->dst_err) {
978                         dsl_dataset_name(ds, fsname);
979                         *strchr(fsname, '@') = '\0';
980                 }
981                 ASSERT3P(dsda->rm_origin, ==, NULL);
982                 dsl_dataset_disown(ds, da.dstg);
983                 kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
984         }
985
986         dsl_sync_task_group_destroy(da.dstg);
987         spa_close(spa, FTAG);
988         return (err);
989 }
990
991 static boolean_t
992 dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
993 {
994         boolean_t might_destroy = B_FALSE;
995
996         mutex_enter(&ds->ds_lock);
997         if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
998             DS_IS_DEFER_DESTROY(ds))
999                 might_destroy = B_TRUE;
1000         mutex_exit(&ds->ds_lock);
1001
1002         return (might_destroy);
1003 }
1004
1005 /*
1006  * If we're removing a clone, and these three conditions are true:
1007  *      1) the clone's origin has no other children
1008  *      2) the clone's origin has no user references
1009  *      3) the clone's origin has been marked for deferred destruction
1010  * Then, prepare to remove the origin as part of this sync task group.
1011  */
1012 static int
1013 dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
1014 {
1015         dsl_dataset_t *ds = dsda->ds;
1016         dsl_dataset_t *origin = ds->ds_prev;
1017
1018         if (dsl_dataset_might_destroy_origin(origin)) {
1019                 char *name;
1020                 int namelen;
1021                 int error;
1022
1023                 namelen = dsl_dataset_namelen(origin) + 1;
1024                 name = kmem_alloc(namelen, KM_SLEEP);
1025                 dsl_dataset_name(origin, name);
1026 #ifdef _KERNEL
1027                 error = zfs_unmount_snap(name, NULL);
1028                 if (error) {
1029                         kmem_free(name, namelen);
1030                         return (error);
1031                 }
1032 #endif
1033                 error = dsl_dataset_own(name, B_TRUE, tag, &origin);
1034                 kmem_free(name, namelen);
1035                 if (error)
1036                         return (error);
1037                 dsda->rm_origin = origin;
1038                 dsl_dataset_make_exclusive(origin, tag);
1039         }
1040
1041         return (0);
1042 }
1043
1044 /*
1045  * ds must be opened as OWNER.  On return (whether successful or not),
1046  * ds will be closed and caller can no longer dereference it.
1047  */
1048 int
1049 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1050 {
1051         int err;
1052         dsl_sync_task_group_t *dstg;
1053         objset_t *os;
1054         dsl_dir_t *dd;
1055         uint64_t obj;
1056         struct dsl_ds_destroyarg dsda = { 0 };
1057         dsl_dataset_t dummy_ds = { 0 };
1058
1059         dsda.ds = ds;
1060
1061         if (dsl_dataset_is_snapshot(ds)) {
1062                 /* Destroying a snapshot is simpler */
1063                 dsl_dataset_make_exclusive(ds, tag);
1064
1065                 dsda.defer = defer;
1066                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1067                     dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1068                     &dsda, tag, 0);
1069                 ASSERT3P(dsda.rm_origin, ==, NULL);
1070                 goto out;
1071         } else if (defer) {
1072                 err = EINVAL;
1073                 goto out;
1074         }
1075
1076         dd = ds->ds_dir;
1077         dummy_ds.ds_dir = dd;
1078         dummy_ds.ds_object = ds->ds_object;
1079
1080         /*
1081          * Check for errors and mark this ds as inconsistent, in
1082          * case we crash while freeing the objects.
1083          */
1084         err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
1085             dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1086         if (err)
1087                 goto out;
1088
1089         err = dmu_objset_from_ds(ds, &os);
1090         if (err)
1091                 goto out;
1092
1093         /*
1094          * remove the objects in open context, so that we won't
1095          * have too much to do in syncing context.
1096          */
1097         for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1098             ds->ds_phys->ds_prev_snap_txg)) {
1099                 /*
1100                  * Ignore errors, if there is not enough disk space
1101                  * we will deal with it in dsl_dataset_destroy_sync().
1102                  */
1103                 (void) dmu_free_object(os, obj);
1104         }
1105         if (err != ESRCH)
1106                 goto out;
1107
1108         /*
1109          * Only the ZIL knows how to free log blocks.
1110          */
1111         zil_destroy(dmu_objset_zil(os), B_FALSE);
1112
1113         /*
1114          * Sync out all in-flight IO.
1115          */
1116         txg_wait_synced(dd->dd_pool, 0);
1117
1118         /*
1119          * If we managed to free all the objects in open
1120          * context, the user space accounting should be zero.
1121          */
1122         if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1123             dmu_objset_userused_enabled(os)) {
1124                 ASSERTV(uint64_t count);
1125                 ASSERT(zap_count(os, DMU_USERUSED_OBJECT, &count) != 0 ||
1126                     count == 0);
1127                 ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT, &count) != 0 ||
1128                     count == 0);
1129         }
1130
1131         rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1132         err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1133         rw_exit(&dd->dd_pool->dp_config_rwlock);
1134
1135         if (err)
1136                 goto out;
1137
1138         /*
1139          * Blow away the dsl_dir + head dataset.
1140          */
1141         dsl_dataset_make_exclusive(ds, tag);
1142         /*
1143          * If we're removing a clone, we might also need to remove its
1144          * origin.
1145          */
1146         do {
1147                 dsda.need_prep = B_FALSE;
1148                 if (dsl_dir_is_clone(dd)) {
1149                         err = dsl_dataset_origin_rm_prep(&dsda, tag);
1150                         if (err) {
1151                                 dsl_dir_close(dd, FTAG);
1152                                 goto out;
1153                         }
1154                 }
1155
1156                 dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1157                 dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1158                     dsl_dataset_destroy_sync, &dsda, tag, 0);
1159                 dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1160                     dsl_dir_destroy_sync, &dummy_ds, FTAG, 0);
1161                 err = dsl_sync_task_group_wait(dstg);
1162                 dsl_sync_task_group_destroy(dstg);
1163
1164                 /*
1165                  * We could be racing against 'zfs release' or 'zfs destroy -d'
1166                  * on the origin snap, in which case we can get EBUSY if we
1167                  * needed to destroy the origin snap but were not ready to
1168                  * do so.
1169                  */
1170                 if (dsda.need_prep) {
1171                         ASSERT(err == EBUSY);
1172                         ASSERT(dsl_dir_is_clone(dd));
1173                         ASSERT(dsda.rm_origin == NULL);
1174                 }
1175         } while (dsda.need_prep);
1176
1177         if (dsda.rm_origin != NULL)
1178                 dsl_dataset_disown(dsda.rm_origin, tag);
1179
1180         /* if it is successful, dsl_dir_destroy_sync will close the dd */
1181         if (err)
1182                 dsl_dir_close(dd, FTAG);
1183 out:
1184         dsl_dataset_disown(ds, tag);
1185         return (err);
1186 }
1187
1188 blkptr_t *
1189 dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1190 {
1191         return (&ds->ds_phys->ds_bp);
1192 }
1193
1194 void
1195 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1196 {
1197         ASSERT(dmu_tx_is_syncing(tx));
1198         /* If it's the meta-objset, set dp_meta_rootbp */
1199         if (ds == NULL) {
1200                 tx->tx_pool->dp_meta_rootbp = *bp;
1201         } else {
1202                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1203                 ds->ds_phys->ds_bp = *bp;
1204         }
1205 }
1206
1207 spa_t *
1208 dsl_dataset_get_spa(dsl_dataset_t *ds)
1209 {
1210         return (ds->ds_dir->dd_pool->dp_spa);
1211 }
1212
1213 void
1214 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1215 {
1216         dsl_pool_t *dp;
1217
1218         if (ds == NULL) /* this is the meta-objset */
1219                 return;
1220
1221         ASSERT(ds->ds_objset != NULL);
1222
1223         if (ds->ds_phys->ds_next_snap_obj != 0)
1224                 panic("dirtying snapshot!");
1225
1226         dp = ds->ds_dir->dd_pool;
1227
1228         if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1229                 /* up the hold count until we can be written out */
1230                 dmu_buf_add_ref(ds->ds_dbuf, ds);
1231         }
1232 }
1233
1234 /*
1235  * The unique space in the head dataset can be calculated by subtracting
1236  * the space used in the most recent snapshot, that is still being used
1237  * in this file system, from the space currently in use.  To figure out
1238  * the space in the most recent snapshot still in use, we need to take
1239  * the total space used in the snapshot and subtract out the space that
1240  * has been freed up since the snapshot was taken.
1241  */
1242 static void
1243 dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1244 {
1245         uint64_t mrs_used;
1246         uint64_t dlused, dlcomp, dluncomp;
1247
1248         ASSERT(!dsl_dataset_is_snapshot(ds));
1249
1250         if (ds->ds_phys->ds_prev_snap_obj != 0)
1251                 mrs_used = ds->ds_prev->ds_phys->ds_used_bytes;
1252         else
1253                 mrs_used = 0;
1254
1255         dsl_deadlist_space(&ds->ds_deadlist, &dlused, &dlcomp, &dluncomp);
1256
1257         ASSERT3U(dlused, <=, mrs_used);
1258         ds->ds_phys->ds_unique_bytes =
1259             ds->ds_phys->ds_used_bytes - (mrs_used - dlused);
1260
1261         if (spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1262             SPA_VERSION_UNIQUE_ACCURATE)
1263                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1264 }
1265
1266 struct killarg {
1267         dsl_dataset_t *ds;
1268         dmu_tx_t *tx;
1269 };
1270
1271 /* ARGSUSED */
1272 static int
1273 kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, arc_buf_t *pbuf,
1274     const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1275 {
1276         struct killarg *ka = arg;
1277         dmu_tx_t *tx = ka->tx;
1278
1279         if (bp == NULL)
1280                 return (0);
1281
1282         if (zb->zb_level == ZB_ZIL_LEVEL) {
1283                 ASSERT(zilog != NULL);
1284                 /*
1285                  * It's a block in the intent log.  It has no
1286                  * accounting, so just free it.
1287                  */
1288                 dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1289         } else {
1290                 ASSERT(zilog == NULL);
1291                 ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1292                 (void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1293         }
1294
1295         return (0);
1296 }
1297
1298 /* ARGSUSED */
1299 static int
1300 dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1301 {
1302         dsl_dataset_t *ds = arg1;
1303         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1304         uint64_t count;
1305         int err;
1306
1307         /*
1308          * Can't delete a head dataset if there are snapshots of it.
1309          * (Except if the only snapshots are from the branch we cloned
1310          * from.)
1311          */
1312         if (ds->ds_prev != NULL &&
1313             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1314                 return (EBUSY);
1315
1316         /*
1317          * This is really a dsl_dir thing, but check it here so that
1318          * we'll be less likely to leave this dataset inconsistent &
1319          * nearly destroyed.
1320          */
1321         err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1322         if (err)
1323                 return (err);
1324         if (count != 0)
1325                 return (EEXIST);
1326
1327         return (0);
1328 }
1329
1330 /* ARGSUSED */
1331 static void
1332 dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1333 {
1334         dsl_dataset_t *ds = arg1;
1335         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1336
1337         /* Mark it as inconsistent on-disk, in case we crash */
1338         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1339         ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1340
1341         spa_history_log_internal(LOG_DS_DESTROY_BEGIN, dp->dp_spa, tx,
1342             "dataset = %llu", ds->ds_object);
1343 }
1344
1345 static int
1346 dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1347     dmu_tx_t *tx)
1348 {
1349         dsl_dataset_t *ds = dsda->ds;
1350         dsl_dataset_t *ds_prev = ds->ds_prev;
1351
1352         if (dsl_dataset_might_destroy_origin(ds_prev)) {
1353                 struct dsl_ds_destroyarg ndsda = {0};
1354
1355                 /*
1356                  * If we're not prepared to remove the origin, don't remove
1357                  * the clone either.
1358                  */
1359                 if (dsda->rm_origin == NULL) {
1360                         dsda->need_prep = B_TRUE;
1361                         return (EBUSY);
1362                 }
1363
1364                 ndsda.ds = ds_prev;
1365                 ndsda.is_origin_rm = B_TRUE;
1366                 return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1367         }
1368
1369         /*
1370          * If we're not going to remove the origin after all,
1371          * undo the open context setup.
1372          */
1373         if (dsda->rm_origin != NULL) {
1374                 dsl_dataset_disown(dsda->rm_origin, tag);
1375                 dsda->rm_origin = NULL;
1376         }
1377
1378         return (0);
1379 }
1380
1381 /*
1382  * If you add new checks here, you may need to add
1383  * additional checks to the "temporary" case in
1384  * snapshot_check() in dmu_objset.c.
1385  */
1386 /* ARGSUSED */
1387 int
1388 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1389 {
1390         struct dsl_ds_destroyarg *dsda = arg1;
1391         dsl_dataset_t *ds = dsda->ds;
1392
1393         /* we have an owner hold, so noone else can destroy us */
1394         ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1395
1396         /*
1397          * Only allow deferred destroy on pools that support it.
1398          * NOTE: deferred destroy is only supported on snapshots.
1399          */
1400         if (dsda->defer) {
1401                 if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1402                     SPA_VERSION_USERREFS)
1403                         return (ENOTSUP);
1404                 ASSERT(dsl_dataset_is_snapshot(ds));
1405                 return (0);
1406         }
1407
1408         /*
1409          * Can't delete a head dataset if there are snapshots of it.
1410          * (Except if the only snapshots are from the branch we cloned
1411          * from.)
1412          */
1413         if (ds->ds_prev != NULL &&
1414             ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1415                 return (EBUSY);
1416
1417         /*
1418          * If we made changes this txg, traverse_dsl_dataset won't find
1419          * them.  Try again.
1420          */
1421         if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1422                 return (EAGAIN);
1423
1424         if (dsl_dataset_is_snapshot(ds)) {
1425                 /*
1426                  * If this snapshot has an elevated user reference count,
1427                  * we can't destroy it yet.
1428                  */
1429                 if (ds->ds_userrefs > 0 && !dsda->releasing)
1430                         return (EBUSY);
1431
1432                 mutex_enter(&ds->ds_lock);
1433                 /*
1434                  * Can't delete a branch point. However, if we're destroying
1435                  * a clone and removing its origin due to it having a user
1436                  * hold count of 0 and having been marked for deferred destroy,
1437                  * it's OK for the origin to have a single clone.
1438                  */
1439                 if (ds->ds_phys->ds_num_children >
1440                     (dsda->is_origin_rm ? 2 : 1)) {
1441                         mutex_exit(&ds->ds_lock);
1442                         return (EEXIST);
1443                 }
1444                 mutex_exit(&ds->ds_lock);
1445         } else if (dsl_dir_is_clone(ds->ds_dir)) {
1446                 return (dsl_dataset_origin_check(dsda, arg2, tx));
1447         }
1448
1449         /* XXX we should do some i/o error checking... */
1450         return (0);
1451 }
1452
1453 struct refsarg {
1454         kmutex_t lock;
1455         boolean_t gone;
1456         kcondvar_t cv;
1457 };
1458
1459 /* ARGSUSED */
1460 static void
1461 dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1462 {
1463         struct refsarg *arg = argv;
1464
1465         mutex_enter(&arg->lock);
1466         arg->gone = TRUE;
1467         cv_signal(&arg->cv);
1468         mutex_exit(&arg->lock);
1469 }
1470
1471 static void
1472 dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1473 {
1474         struct refsarg arg;
1475
1476         mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1477         cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1478         arg.gone = FALSE;
1479         (void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1480             dsl_dataset_refs_gone);
1481         dmu_buf_rele(ds->ds_dbuf, tag);
1482         mutex_enter(&arg.lock);
1483         while (!arg.gone)
1484                 cv_wait(&arg.cv, &arg.lock);
1485         ASSERT(arg.gone);
1486         mutex_exit(&arg.lock);
1487         ds->ds_dbuf = NULL;
1488         ds->ds_phys = NULL;
1489         mutex_destroy(&arg.lock);
1490         cv_destroy(&arg.cv);
1491 }
1492
1493 static void
1494 remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1495 {
1496         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1497         int err;
1498         ASSERTV(uint64_t count);
1499
1500         ASSERT(ds->ds_phys->ds_num_children >= 2);
1501         err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1502         /*
1503          * The err should not be ENOENT, but a bug in a previous version
1504          * of the code could cause upgrade_clones_cb() to not set
1505          * ds_next_snap_obj when it should, leading to a missing entry.
1506          * If we knew that the pool was created after
1507          * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1508          * ENOENT.  However, at least we can check that we don't have
1509          * too many entries in the next_clones_obj even after failing to
1510          * remove this one.
1511          */
1512         if (err != ENOENT) {
1513                 VERIFY3U(err, ==, 0);
1514         }
1515         ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1516             &count));
1517         ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1518 }
1519
1520 static void
1521 dsl_dataset_remove_clones_key(dsl_dataset_t *ds, uint64_t mintxg, dmu_tx_t *tx)
1522 {
1523         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1524         zap_cursor_t zc;
1525         zap_attribute_t za;
1526
1527         /*
1528          * If it is the old version, dd_clones doesn't exist so we can't
1529          * find the clones, but deadlist_remove_key() is a no-op so it
1530          * doesn't matter.
1531          */
1532         if (ds->ds_dir->dd_phys->dd_clones == 0)
1533                 return;
1534
1535         for (zap_cursor_init(&zc, mos, ds->ds_dir->dd_phys->dd_clones);
1536             zap_cursor_retrieve(&zc, &za) == 0;
1537             zap_cursor_advance(&zc)) {
1538                 dsl_dataset_t *clone;
1539
1540                 VERIFY3U(0, ==, dsl_dataset_hold_obj(ds->ds_dir->dd_pool,
1541                     za.za_first_integer, FTAG, &clone));
1542                 if (clone->ds_dir->dd_origin_txg > mintxg) {
1543                         dsl_deadlist_remove_key(&clone->ds_deadlist,
1544                             mintxg, tx);
1545                         dsl_dataset_remove_clones_key(clone, mintxg, tx);
1546                 }
1547                 dsl_dataset_rele(clone, FTAG);
1548         }
1549         zap_cursor_fini(&zc);
1550 }
1551
1552 struct process_old_arg {
1553         dsl_dataset_t *ds;
1554         dsl_dataset_t *ds_prev;
1555         boolean_t after_branch_point;
1556         zio_t *pio;
1557         uint64_t used, comp, uncomp;
1558 };
1559
1560 static int
1561 process_old_cb(void *arg, const blkptr_t *bp, dmu_tx_t *tx)
1562 {
1563         struct process_old_arg *poa = arg;
1564         dsl_pool_t *dp = poa->ds->ds_dir->dd_pool;
1565
1566         if (bp->blk_birth <= poa->ds->ds_phys->ds_prev_snap_txg) {
1567                 dsl_deadlist_insert(&poa->ds->ds_deadlist, bp, tx);
1568                 if (poa->ds_prev && !poa->after_branch_point &&
1569                     bp->blk_birth >
1570                     poa->ds_prev->ds_phys->ds_prev_snap_txg) {
1571                         poa->ds_prev->ds_phys->ds_unique_bytes +=
1572                             bp_get_dsize_sync(dp->dp_spa, bp);
1573                 }
1574         } else {
1575                 poa->used += bp_get_dsize_sync(dp->dp_spa, bp);
1576                 poa->comp += BP_GET_PSIZE(bp);
1577                 poa->uncomp += BP_GET_UCSIZE(bp);
1578                 dsl_free_sync(poa->pio, dp, tx->tx_txg, bp);
1579         }
1580         return (0);
1581 }
1582
1583 static void
1584 process_old_deadlist(dsl_dataset_t *ds, dsl_dataset_t *ds_prev,
1585     dsl_dataset_t *ds_next, boolean_t after_branch_point, dmu_tx_t *tx)
1586 {
1587         struct process_old_arg poa = { 0 };
1588         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1589         objset_t *mos = dp->dp_meta_objset;
1590
1591         ASSERT(ds->ds_deadlist.dl_oldfmt);
1592         ASSERT(ds_next->ds_deadlist.dl_oldfmt);
1593
1594         poa.ds = ds;
1595         poa.ds_prev = ds_prev;
1596         poa.after_branch_point = after_branch_point;
1597         poa.pio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1598         VERIFY3U(0, ==, bpobj_iterate(&ds_next->ds_deadlist.dl_bpobj,
1599             process_old_cb, &poa, tx));
1600         VERIFY3U(zio_wait(poa.pio), ==, 0);
1601         ASSERT3U(poa.used, ==, ds->ds_phys->ds_unique_bytes);
1602
1603         /* change snapused */
1604         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1605             -poa.used, -poa.comp, -poa.uncomp, tx);
1606
1607         /* swap next's deadlist to our deadlist */
1608         dsl_deadlist_close(&ds->ds_deadlist);
1609         dsl_deadlist_close(&ds_next->ds_deadlist);
1610         SWITCH64(ds_next->ds_phys->ds_deadlist_obj,
1611             ds->ds_phys->ds_deadlist_obj);
1612         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
1613         dsl_deadlist_open(&ds_next->ds_deadlist, mos,
1614             ds_next->ds_phys->ds_deadlist_obj);
1615 }
1616
1617 void
1618 dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
1619 {
1620         struct dsl_ds_destroyarg *dsda = arg1;
1621         dsl_dataset_t *ds = dsda->ds;
1622         int err;
1623         int after_branch_point = FALSE;
1624         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1625         objset_t *mos = dp->dp_meta_objset;
1626         dsl_dataset_t *ds_prev = NULL;
1627         boolean_t wont_destroy;
1628         uint64_t obj;
1629
1630         wont_destroy = (dsda->defer &&
1631             (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1));
1632
1633         ASSERT(ds->ds_owner || wont_destroy);
1634         ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1635         ASSERT(ds->ds_prev == NULL ||
1636             ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1637         ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1638
1639         if (wont_destroy) {
1640                 ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1641                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
1642                 ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1643                 return;
1644         }
1645
1646         /* signal any waiters that this dataset is going away */
1647         mutex_enter(&ds->ds_lock);
1648         ds->ds_owner = dsl_reaper;
1649         cv_broadcast(&ds->ds_exclusive_cv);
1650         mutex_exit(&ds->ds_lock);
1651
1652         /* Remove our reservation */
1653         if (ds->ds_reserved != 0) {
1654                 dsl_prop_setarg_t psa;
1655                 uint64_t value = 0;
1656
1657                 dsl_prop_setarg_init_uint64(&psa, "refreservation",
1658                     (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
1659                     &value);
1660                 psa.psa_effective_value = 0;    /* predict default value */
1661
1662                 dsl_dataset_set_reservation_sync(ds, &psa, tx);
1663                 ASSERT3U(ds->ds_reserved, ==, 0);
1664         }
1665
1666         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1667
1668         dsl_scan_ds_destroyed(ds, tx);
1669
1670         obj = ds->ds_object;
1671
1672         if (ds->ds_phys->ds_prev_snap_obj != 0) {
1673                 if (ds->ds_prev) {
1674                         ds_prev = ds->ds_prev;
1675                 } else {
1676                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1677                             ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1678                 }
1679                 after_branch_point =
1680                     (ds_prev->ds_phys->ds_next_snap_obj != obj);
1681
1682                 dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1683                 if (after_branch_point &&
1684                     ds_prev->ds_phys->ds_next_clones_obj != 0) {
1685                         remove_from_next_clones(ds_prev, obj, tx);
1686                         if (ds->ds_phys->ds_next_snap_obj != 0) {
1687                                 VERIFY(0 == zap_add_int(mos,
1688                                     ds_prev->ds_phys->ds_next_clones_obj,
1689                                     ds->ds_phys->ds_next_snap_obj, tx));
1690                         }
1691                 }
1692                 if (after_branch_point &&
1693                     ds->ds_phys->ds_next_snap_obj == 0) {
1694                         /* This clone is toast. */
1695                         ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1696                         ds_prev->ds_phys->ds_num_children--;
1697
1698                         /*
1699                          * If the clone's origin has no other clones, no
1700                          * user holds, and has been marked for deferred
1701                          * deletion, then we should have done the necessary
1702                          * destroy setup for it.
1703                          */
1704                         if (ds_prev->ds_phys->ds_num_children == 1 &&
1705                             ds_prev->ds_userrefs == 0 &&
1706                             DS_IS_DEFER_DESTROY(ds_prev)) {
1707                                 ASSERT3P(dsda->rm_origin, !=, NULL);
1708                         } else {
1709                                 ASSERT3P(dsda->rm_origin, ==, NULL);
1710                         }
1711                 } else if (!after_branch_point) {
1712                         ds_prev->ds_phys->ds_next_snap_obj =
1713                             ds->ds_phys->ds_next_snap_obj;
1714                 }
1715         }
1716
1717         if (dsl_dataset_is_snapshot(ds)) {
1718                 dsl_dataset_t *ds_next;
1719                 uint64_t old_unique;
1720                 uint64_t used = 0, comp = 0, uncomp = 0;
1721
1722                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1723                     ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1724                 ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1725
1726                 old_unique = ds_next->ds_phys->ds_unique_bytes;
1727
1728                 dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1729                 ds_next->ds_phys->ds_prev_snap_obj =
1730                     ds->ds_phys->ds_prev_snap_obj;
1731                 ds_next->ds_phys->ds_prev_snap_txg =
1732                     ds->ds_phys->ds_prev_snap_txg;
1733                 ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1734                     ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1735
1736
1737                 if (ds_next->ds_deadlist.dl_oldfmt) {
1738                         process_old_deadlist(ds, ds_prev, ds_next,
1739                             after_branch_point, tx);
1740                 } else {
1741                         /* Adjust prev's unique space. */
1742                         if (ds_prev && !after_branch_point) {
1743                                 dsl_deadlist_space_range(&ds_next->ds_deadlist,
1744                                     ds_prev->ds_phys->ds_prev_snap_txg,
1745                                     ds->ds_phys->ds_prev_snap_txg,
1746                                     &used, &comp, &uncomp);
1747                                 ds_prev->ds_phys->ds_unique_bytes += used;
1748                         }
1749
1750                         /* Adjust snapused. */
1751                         dsl_deadlist_space_range(&ds_next->ds_deadlist,
1752                             ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
1753                             &used, &comp, &uncomp);
1754                         dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1755                             -used, -comp, -uncomp, tx);
1756
1757                         /* Move blocks to be freed to pool's free list. */
1758                         dsl_deadlist_move_bpobj(&ds_next->ds_deadlist,
1759                             &dp->dp_free_bpobj, ds->ds_phys->ds_prev_snap_txg,
1760                             tx);
1761                         dsl_dir_diduse_space(tx->tx_pool->dp_free_dir,
1762                             DD_USED_HEAD, used, comp, uncomp, tx);
1763                         dsl_dir_dirty(tx->tx_pool->dp_free_dir, tx);
1764
1765                         /* Merge our deadlist into next's and free it. */
1766                         dsl_deadlist_merge(&ds_next->ds_deadlist,
1767                             ds->ds_phys->ds_deadlist_obj, tx);
1768                 }
1769                 dsl_deadlist_close(&ds->ds_deadlist);
1770                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1771
1772                 /* Collapse range in clone heads */
1773                 dsl_dataset_remove_clones_key(ds,
1774                     ds->ds_phys->ds_creation_txg, tx);
1775
1776                 if (dsl_dataset_is_snapshot(ds_next)) {
1777                         dsl_dataset_t *ds_nextnext;
1778                         dsl_dataset_t *hds;
1779
1780                         /*
1781                          * Update next's unique to include blocks which
1782                          * were previously shared by only this snapshot
1783                          * and it.  Those blocks will be born after the
1784                          * prev snap and before this snap, and will have
1785                          * died after the next snap and before the one
1786                          * after that (ie. be on the snap after next's
1787                          * deadlist).
1788                          */
1789                         VERIFY(0 == dsl_dataset_hold_obj(dp,
1790                             ds_next->ds_phys->ds_next_snap_obj,
1791                             FTAG, &ds_nextnext));
1792                         dsl_deadlist_space_range(&ds_nextnext->ds_deadlist,
1793                             ds->ds_phys->ds_prev_snap_txg,
1794                             ds->ds_phys->ds_creation_txg,
1795                             &used, &comp, &uncomp);
1796                         ds_next->ds_phys->ds_unique_bytes += used;
1797                         dsl_dataset_rele(ds_nextnext, FTAG);
1798                         ASSERT3P(ds_next->ds_prev, ==, NULL);
1799
1800                         /* Collapse range in this head. */
1801                         VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
1802                             ds->ds_dir->dd_phys->dd_head_dataset_obj,
1803                             FTAG, &hds));
1804                         dsl_deadlist_remove_key(&hds->ds_deadlist,
1805                             ds->ds_phys->ds_creation_txg, tx);
1806                         dsl_dataset_rele(hds, FTAG);
1807
1808                 } else {
1809                         ASSERT3P(ds_next->ds_prev, ==, ds);
1810                         dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1811                         ds_next->ds_prev = NULL;
1812                         if (ds_prev) {
1813                                 VERIFY(0 == dsl_dataset_get_ref(dp,
1814                                     ds->ds_phys->ds_prev_snap_obj,
1815                                     ds_next, &ds_next->ds_prev));
1816                         }
1817
1818                         dsl_dataset_recalc_head_uniq(ds_next);
1819
1820                         /*
1821                          * Reduce the amount of our unconsmed refreservation
1822                          * being charged to our parent by the amount of
1823                          * new unique data we have gained.
1824                          */
1825                         if (old_unique < ds_next->ds_reserved) {
1826                                 int64_t mrsdelta;
1827                                 uint64_t new_unique =
1828                                     ds_next->ds_phys->ds_unique_bytes;
1829
1830                                 ASSERT(old_unique <= new_unique);
1831                                 mrsdelta = MIN(new_unique - old_unique,
1832                                     ds_next->ds_reserved - old_unique);
1833                                 dsl_dir_diduse_space(ds->ds_dir,
1834                                     DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1835                         }
1836                 }
1837                 dsl_dataset_rele(ds_next, FTAG);
1838         } else {
1839                 /*
1840                  * There's no next snapshot, so this is a head dataset.
1841                  * Destroy the deadlist.  Unless it's a clone, the
1842                  * deadlist should be empty.  (If it's a clone, it's
1843                  * safe to ignore the deadlist contents.)
1844                  */
1845                 struct killarg ka;
1846
1847                 dsl_deadlist_close(&ds->ds_deadlist);
1848                 dsl_deadlist_free(mos, ds->ds_phys->ds_deadlist_obj, tx);
1849                 ds->ds_phys->ds_deadlist_obj = 0;
1850
1851                 /*
1852                  * Free everything that we point to (that's born after
1853                  * the previous snapshot, if we are a clone)
1854                  *
1855                  * NB: this should be very quick, because we already
1856                  * freed all the objects in open context.
1857                  */
1858                 ka.ds = ds;
1859                 ka.tx = tx;
1860                 err = traverse_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
1861                     TRAVERSE_POST, kill_blkptr, &ka);
1862                 ASSERT3U(err, ==, 0);
1863                 ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1864                     ds->ds_phys->ds_unique_bytes == 0);
1865
1866                 if (ds->ds_prev != NULL) {
1867                         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
1868                                 VERIFY3U(0, ==, zap_remove_int(mos,
1869                                     ds->ds_prev->ds_dir->dd_phys->dd_clones,
1870                                     ds->ds_object, tx));
1871                         }
1872                         dsl_dataset_rele(ds->ds_prev, ds);
1873                         ds->ds_prev = ds_prev = NULL;
1874                 }
1875         }
1876
1877         /*
1878          * This must be done after the dsl_traverse(), because it will
1879          * re-open the objset.
1880          */
1881         if (ds->ds_objset) {
1882                 dmu_objset_evict(ds->ds_objset);
1883                 ds->ds_objset = NULL;
1884         }
1885
1886         if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1887                 /* Erase the link in the dir */
1888                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1889                 ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1890                 ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1891                 err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1892                 ASSERT(err == 0);
1893         } else {
1894                 /* remove from snapshot namespace */
1895                 dsl_dataset_t *ds_head;
1896                 ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1897                 VERIFY(0 == dsl_dataset_hold_obj(dp,
1898                     ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1899                 VERIFY(0 == dsl_dataset_get_snapname(ds));
1900 #ifdef ZFS_DEBUG
1901                 {
1902                         uint64_t val;
1903
1904                         err = dsl_dataset_snap_lookup(ds_head,
1905                             ds->ds_snapname, &val);
1906                         ASSERT3U(err, ==, 0);
1907                         ASSERT3U(val, ==, obj);
1908                 }
1909 #endif
1910                 err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
1911                 ASSERT(err == 0);
1912                 dsl_dataset_rele(ds_head, FTAG);
1913         }
1914
1915         if (ds_prev && ds->ds_prev != ds_prev)
1916                 dsl_dataset_rele(ds_prev, FTAG);
1917
1918         spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
1919         spa_history_log_internal(LOG_DS_DESTROY, dp->dp_spa, tx,
1920             "dataset = %llu", ds->ds_object);
1921
1922         if (ds->ds_phys->ds_next_clones_obj != 0) {
1923                 ASSERTV(uint64_t count);
1924                 ASSERT(0 == zap_count(mos,
1925                     ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
1926                 VERIFY(0 == dmu_object_free(mos,
1927                     ds->ds_phys->ds_next_clones_obj, tx));
1928         }
1929         if (ds->ds_phys->ds_props_obj != 0)
1930                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
1931         if (ds->ds_phys->ds_userrefs_obj != 0)
1932                 VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
1933         dsl_dir_close(ds->ds_dir, ds);
1934         ds->ds_dir = NULL;
1935         dsl_dataset_drain_refs(ds, tag);
1936         VERIFY(0 == dmu_object_free(mos, obj, tx));
1937
1938         if (dsda->rm_origin) {
1939                 /*
1940                  * Remove the origin of the clone we just destroyed.
1941                  */
1942                 struct dsl_ds_destroyarg ndsda = {0};
1943
1944                 ndsda.ds = dsda->rm_origin;
1945                 dsl_dataset_destroy_sync(&ndsda, tag, tx);
1946         }
1947 }
1948
1949 static int
1950 dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
1951 {
1952         uint64_t asize;
1953
1954         if (!dmu_tx_is_syncing(tx))
1955                 return (0);
1956
1957         /*
1958          * If there's an fs-only reservation, any blocks that might become
1959          * owned by the snapshot dataset must be accommodated by space
1960          * outside of the reservation.
1961          */
1962         ASSERT(ds->ds_reserved == 0 || DS_UNIQUE_IS_ACCURATE(ds));
1963         asize = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
1964         if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
1965                 return (ENOSPC);
1966
1967         /*
1968          * Propogate any reserved space for this snapshot to other
1969          * snapshot checks in this sync group.
1970          */
1971         if (asize > 0)
1972                 dsl_dir_willuse_space(ds->ds_dir, asize, tx);
1973
1974         return (0);
1975 }
1976
1977 int
1978 dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
1979 {
1980         dsl_dataset_t *ds = arg1;
1981         const char *snapname = arg2;
1982         int err;
1983         uint64_t value;
1984
1985         /*
1986          * We don't allow multiple snapshots of the same txg.  If there
1987          * is already one, try again.
1988          */
1989         if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
1990                 return (EAGAIN);
1991
1992         /*
1993          * Check for conflicting name snapshot name.
1994          */
1995         err = dsl_dataset_snap_lookup(ds, snapname, &value);
1996         if (err == 0)
1997                 return (EEXIST);
1998         if (err != ENOENT)
1999                 return (err);
2000
2001         /*
2002          * Check that the dataset's name is not too long.  Name consists
2003          * of the dataset's length + 1 for the @-sign + snapshot name's length
2004          */
2005         if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
2006                 return (ENAMETOOLONG);
2007
2008         err = dsl_dataset_snapshot_reserve_space(ds, tx);
2009         if (err)
2010                 return (err);
2011
2012         ds->ds_trysnap_txg = tx->tx_txg;
2013         return (0);
2014 }
2015
2016 void
2017 dsl_dataset_snapshot_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2018 {
2019         dsl_dataset_t *ds = arg1;
2020         const char *snapname = arg2;
2021         dsl_pool_t *dp = ds->ds_dir->dd_pool;
2022         dmu_buf_t *dbuf;
2023         dsl_dataset_phys_t *dsphys;
2024         uint64_t dsobj, crtxg;
2025         objset_t *mos = dp->dp_meta_objset;
2026         int err;
2027
2028         ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
2029
2030         /*
2031          * The origin's ds_creation_txg has to be < TXG_INITIAL
2032          */
2033         if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
2034                 crtxg = 1;
2035         else
2036                 crtxg = tx->tx_txg;
2037
2038         dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
2039             DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
2040         VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
2041         dmu_buf_will_dirty(dbuf, tx);
2042         dsphys = dbuf->db_data;
2043         bzero(dsphys, sizeof (dsl_dataset_phys_t));
2044         dsphys->ds_dir_obj = ds->ds_dir->dd_object;
2045         dsphys->ds_fsid_guid = unique_create();
2046         (void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
2047             sizeof (dsphys->ds_guid));
2048         dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
2049         dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
2050         dsphys->ds_next_snap_obj = ds->ds_object;
2051         dsphys->ds_num_children = 1;
2052         dsphys->ds_creation_time = gethrestime_sec();
2053         dsphys->ds_creation_txg = crtxg;
2054         dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
2055         dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
2056         dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
2057         dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
2058         dsphys->ds_flags = ds->ds_phys->ds_flags;
2059         dsphys->ds_bp = ds->ds_phys->ds_bp;
2060         dmu_buf_rele(dbuf, FTAG);
2061
2062         ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
2063         if (ds->ds_prev) {
2064                 uint64_t next_clones_obj =
2065                     ds->ds_prev->ds_phys->ds_next_clones_obj;
2066                 ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
2067                     ds->ds_object ||
2068                     ds->ds_prev->ds_phys->ds_num_children > 1);
2069                 if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
2070                         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2071                         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
2072                             ds->ds_prev->ds_phys->ds_creation_txg);
2073                         ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
2074                 } else if (next_clones_obj != 0) {
2075                         remove_from_next_clones(ds->ds_prev,
2076                             dsphys->ds_next_snap_obj, tx);
2077                         VERIFY3U(0, ==, zap_add_int(mos,
2078                             next_clones_obj, dsobj, tx));
2079                 }
2080         }
2081
2082         /*
2083          * If we have a reference-reservation on this dataset, we will
2084          * need to increase the amount of refreservation being charged
2085          * since our unique space is going to zero.
2086          */
2087         if (ds->ds_reserved) {
2088                 int64_t delta;
2089                 ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
2090                 delta = MIN(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
2091                 dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
2092                     delta, 0, 0, tx);
2093         }
2094
2095         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2096         zfs_dbgmsg("taking snapshot %s@%s/%llu; newkey=%llu",
2097             ds->ds_dir->dd_myname, snapname, dsobj,
2098             ds->ds_phys->ds_prev_snap_txg);
2099         ds->ds_phys->ds_deadlist_obj = dsl_deadlist_clone(&ds->ds_deadlist,
2100             UINT64_MAX, ds->ds_phys->ds_prev_snap_obj, tx);
2101         dsl_deadlist_close(&ds->ds_deadlist);
2102         dsl_deadlist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
2103         dsl_deadlist_add_key(&ds->ds_deadlist,
2104             ds->ds_phys->ds_prev_snap_txg, tx);
2105
2106         ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
2107         ds->ds_phys->ds_prev_snap_obj = dsobj;
2108         ds->ds_phys->ds_prev_snap_txg = crtxg;
2109         ds->ds_phys->ds_unique_bytes = 0;
2110         if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
2111                 ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
2112
2113         err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
2114             snapname, 8, 1, &dsobj, tx);
2115         ASSERT(err == 0);
2116
2117         if (ds->ds_prev)
2118                 dsl_dataset_drop_ref(ds->ds_prev, ds);
2119         VERIFY(0 == dsl_dataset_get_ref(dp,
2120             ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
2121
2122         dsl_scan_ds_snapshotted(ds, tx);
2123
2124         dsl_dir_snap_cmtime_update(ds->ds_dir);
2125
2126         spa_history_log_internal(LOG_DS_SNAPSHOT, dp->dp_spa, tx,
2127             "dataset = %llu", dsobj);
2128 }
2129
2130 void
2131 dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
2132 {
2133         ASSERT(dmu_tx_is_syncing(tx));
2134         ASSERT(ds->ds_objset != NULL);
2135         ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
2136
2137         /*
2138          * in case we had to change ds_fsid_guid when we opened it,
2139          * sync it out now.
2140          */
2141         dmu_buf_will_dirty(ds->ds_dbuf, tx);
2142         ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
2143
2144         dsl_dir_dirty(ds->ds_dir, tx);
2145         dmu_objset_sync(ds->ds_objset, zio, tx);
2146 }
2147
2148 void
2149 dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
2150 {
2151         uint64_t refd, avail, uobjs, aobjs;
2152
2153         dsl_dir_stats(ds->ds_dir, nv);
2154
2155         dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2156         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2157         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2158
2159         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2160             ds->ds_phys->ds_creation_time);
2161         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2162             ds->ds_phys->ds_creation_txg);
2163         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2164             ds->ds_quota);
2165         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2166             ds->ds_reserved);
2167         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2168             ds->ds_phys->ds_guid);
2169         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2170             ds->ds_phys->ds_unique_bytes);
2171         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2172             ds->ds_object);
2173         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
2174             ds->ds_userrefs);
2175         dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2176             DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2177
2178         if (ds->ds_phys->ds_next_snap_obj) {
2179                 /*
2180                  * This is a snapshot; override the dd's space used with
2181                  * our unique space and compression ratio.
2182                  */
2183                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2184                     ds->ds_phys->ds_unique_bytes);
2185                 dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO,
2186                     ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2187                     (ds->ds_phys->ds_uncompressed_bytes * 100 /
2188                     ds->ds_phys->ds_compressed_bytes));
2189         }
2190 }
2191
2192 void
2193 dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2194 {
2195         stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2196         stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2197         stat->dds_guid = ds->ds_phys->ds_guid;
2198         if (ds->ds_phys->ds_next_snap_obj) {
2199                 stat->dds_is_snapshot = B_TRUE;
2200                 stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2201         } else {
2202                 stat->dds_is_snapshot = B_FALSE;
2203                 stat->dds_num_clones = 0;
2204         }
2205
2206         /* clone origin is really a dsl_dir thing... */
2207         rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2208         if (dsl_dir_is_clone(ds->ds_dir)) {
2209                 dsl_dataset_t *ods;
2210
2211                 VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2212                     ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2213                 dsl_dataset_name(ods, stat->dds_origin);
2214                 dsl_dataset_drop_ref(ods, FTAG);
2215         } else {
2216                 stat->dds_origin[0] = '\0';
2217         }
2218         rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2219 }
2220
2221 uint64_t
2222 dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2223 {
2224         return (ds->ds_fsid_guid);
2225 }
2226
2227 void
2228 dsl_dataset_space(dsl_dataset_t *ds,
2229     uint64_t *refdbytesp, uint64_t *availbytesp,
2230     uint64_t *usedobjsp, uint64_t *availobjsp)
2231 {
2232         *refdbytesp = ds->ds_phys->ds_used_bytes;
2233         *availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2234         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2235                 *availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2236         if (ds->ds_quota != 0) {
2237                 /*
2238                  * Adjust available bytes according to refquota
2239                  */
2240                 if (*refdbytesp < ds->ds_quota)
2241                         *availbytesp = MIN(*availbytesp,
2242                             ds->ds_quota - *refdbytesp);
2243                 else
2244                         *availbytesp = 0;
2245         }
2246         *usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2247         *availobjsp = DN_MAX_OBJECT - *usedobjsp;
2248 }
2249
2250 boolean_t
2251 dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2252 {
2253         ASSERTV(dsl_pool_t *dp = ds->ds_dir->dd_pool);
2254
2255         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2256             dsl_pool_sync_context(dp));
2257         if (ds->ds_prev == NULL)
2258                 return (B_FALSE);
2259         if (ds->ds_phys->ds_bp.blk_birth >
2260             ds->ds_prev->ds_phys->ds_creation_txg) {
2261                 objset_t *os, *os_prev;
2262                 /*
2263                  * It may be that only the ZIL differs, because it was
2264                  * reset in the head.  Don't count that as being
2265                  * modified.
2266                  */
2267                 if (dmu_objset_from_ds(ds, &os) != 0)
2268                         return (B_TRUE);
2269                 if (dmu_objset_from_ds(ds->ds_prev, &os_prev) != 0)
2270                         return (B_TRUE);
2271                 return (bcmp(&os->os_phys->os_meta_dnode,
2272                     &os_prev->os_phys->os_meta_dnode,
2273                     sizeof (os->os_phys->os_meta_dnode)) != 0);
2274         }
2275         return (B_FALSE);
2276 }
2277
2278 /* ARGSUSED */
2279 static int
2280 dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2281 {
2282         dsl_dataset_t *ds = arg1;
2283         char *newsnapname = arg2;
2284         dsl_dir_t *dd = ds->ds_dir;
2285         dsl_dataset_t *hds;
2286         uint64_t val;
2287         int err;
2288
2289         err = dsl_dataset_hold_obj(dd->dd_pool,
2290             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2291         if (err)
2292                 return (err);
2293
2294         /* new name better not be in use */
2295         err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2296         dsl_dataset_rele(hds, FTAG);
2297
2298         if (err == 0)
2299                 err = EEXIST;
2300         else if (err == ENOENT)
2301                 err = 0;
2302
2303         /* dataset name + 1 for the "@" + the new snapshot name must fit */
2304         if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2305                 err = ENAMETOOLONG;
2306
2307         return (err);
2308 }
2309
2310 static void
2311 dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2312 {
2313         dsl_dataset_t *ds = arg1;
2314         const char *newsnapname = arg2;
2315         dsl_dir_t *dd = ds->ds_dir;
2316         objset_t *mos = dd->dd_pool->dp_meta_objset;
2317         dsl_dataset_t *hds;
2318         int err;
2319
2320         ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2321
2322         VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2323             dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2324
2325         VERIFY(0 == dsl_dataset_get_snapname(ds));
2326         err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
2327         ASSERT3U(err, ==, 0);
2328         mutex_enter(&ds->ds_lock);
2329         (void) strcpy(ds->ds_snapname, newsnapname);
2330         mutex_exit(&ds->ds_lock);
2331         err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2332             ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2333         ASSERT3U(err, ==, 0);
2334
2335         spa_history_log_internal(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
2336             "dataset = %llu", ds->ds_object);
2337         dsl_dataset_rele(hds, FTAG);
2338 }
2339
2340 struct renamesnaparg {
2341         dsl_sync_task_group_t *dstg;
2342         char failed[MAXPATHLEN];
2343         char *oldsnap;
2344         char *newsnap;
2345 };
2346
2347 static int
2348 dsl_snapshot_rename_one(const char *name, void *arg)
2349 {
2350         struct renamesnaparg *ra = arg;
2351         dsl_dataset_t *ds = NULL;
2352         char *snapname;
2353         int err;
2354
2355         snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
2356         (void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
2357
2358         /*
2359          * For recursive snapshot renames the parent won't be changing
2360          * so we just pass name for both the to/from argument.
2361          */
2362         err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
2363         if (err != 0) {
2364                 strfree(snapname);
2365                 return (err == ENOENT ? 0 : err);
2366         }
2367
2368 #ifdef _KERNEL
2369         /*
2370          * For all filesystems undergoing rename, we'll need to unmount it.
2371          */
2372         (void) zfs_unmount_snap(snapname, NULL);
2373 #endif
2374         err = dsl_dataset_hold(snapname, ra->dstg, &ds);
2375         strfree(snapname);
2376         if (err != 0)
2377                 return (err == ENOENT ? 0 : err);
2378
2379         dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2380             dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2381
2382         return (0);
2383 }
2384
2385 static int
2386 dsl_recursive_rename(char *oldname, const char *newname)
2387 {
2388         int err;
2389         struct renamesnaparg *ra;
2390         dsl_sync_task_t *dst;
2391         spa_t *spa;
2392         char *cp, *fsname = spa_strdup(oldname);
2393         int len = strlen(oldname) + 1;
2394
2395         /* truncate the snapshot name to get the fsname */
2396         cp = strchr(fsname, '@');
2397         *cp = '\0';
2398
2399         err = spa_open(fsname, &spa, FTAG);
2400         if (err) {
2401                 kmem_free(fsname, len);
2402                 return (err);
2403         }
2404         ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2405         ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2406
2407         ra->oldsnap = strchr(oldname, '@') + 1;
2408         ra->newsnap = strchr(newname, '@') + 1;
2409         *ra->failed = '\0';
2410
2411         err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2412             DS_FIND_CHILDREN);
2413         kmem_free(fsname, len);
2414
2415         if (err == 0) {
2416                 err = dsl_sync_task_group_wait(ra->dstg);
2417         }
2418
2419         for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2420             dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2421                 dsl_dataset_t *ds = dst->dst_arg1;
2422                 if (dst->dst_err) {
2423                         dsl_dir_name(ds->ds_dir, ra->failed);
2424                         (void) strlcat(ra->failed, "@", sizeof (ra->failed));
2425                         (void) strlcat(ra->failed, ra->newsnap,
2426                             sizeof (ra->failed));
2427                 }
2428                 dsl_dataset_rele(ds, ra->dstg);
2429         }
2430
2431         if (err)
2432                 (void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
2433
2434         dsl_sync_task_group_destroy(ra->dstg);
2435         kmem_free(ra, sizeof (struct renamesnaparg));
2436         spa_close(spa, FTAG);
2437         return (err);
2438 }
2439
2440 static int
2441 dsl_valid_rename(const char *oldname, void *arg)
2442 {
2443         int delta = *(int *)arg;
2444
2445         if (strlen(oldname) + delta >= MAXNAMELEN)
2446                 return (ENAMETOOLONG);
2447
2448         return (0);
2449 }
2450
2451 #pragma weak dmu_objset_rename = dsl_dataset_rename
2452 int
2453 dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
2454 {
2455         dsl_dir_t *dd;
2456         dsl_dataset_t *ds;
2457         const char *tail;
2458         int err;
2459
2460         err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2461         if (err)
2462                 return (err);
2463
2464         if (tail == NULL) {
2465                 int delta = strlen(newname) - strlen(oldname);
2466
2467                 /* if we're growing, validate child name lengths */
2468                 if (delta > 0)
2469                         err = dmu_objset_find(oldname, dsl_valid_rename,
2470                             &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2471
2472                 if (err == 0)
2473                         err = dsl_dir_rename(dd, newname);
2474                 dsl_dir_close(dd, FTAG);
2475                 return (err);
2476         }
2477
2478         if (tail[0] != '@') {
2479                 /* the name ended in a nonexistent component */
2480                 dsl_dir_close(dd, FTAG);
2481                 return (ENOENT);
2482         }
2483
2484         dsl_dir_close(dd, FTAG);
2485
2486         /* new name must be snapshot in same filesystem */
2487         tail = strchr(newname, '@');
2488         if (tail == NULL)
2489                 return (EINVAL);
2490         tail++;
2491         if (strncmp(oldname, newname, tail - newname) != 0)
2492                 return (EXDEV);
2493
2494         if (recursive) {
2495                 err = dsl_recursive_rename(oldname, newname);
2496         } else {
2497                 err = dsl_dataset_hold(oldname, FTAG, &ds);
2498                 if (err)
2499                         return (err);
2500
2501                 err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2502                     dsl_dataset_snapshot_rename_check,
2503                     dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2504
2505                 dsl_dataset_rele(ds, FTAG);
2506         }
2507
2508         return (err);
2509 }
2510
2511 struct promotenode {
2512         list_node_t link;
2513         dsl_dataset_t *ds;
2514 };
2515
2516 struct promotearg {
2517         list_t shared_snaps, origin_snaps, clone_snaps;
2518         dsl_dataset_t *origin_origin;
2519         uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2520         char *err_ds;
2521 };
2522
2523 static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2524
2525 static int
2526 dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2527 {
2528         dsl_dataset_t *hds = arg1;
2529         struct promotearg *pa = arg2;
2530         struct promotenode *snap = list_head(&pa->shared_snaps);
2531         dsl_dataset_t *origin_ds = snap->ds;
2532         int err;
2533         uint64_t unused;
2534
2535         /* Check that it is a real clone */
2536         if (!dsl_dir_is_clone(hds->ds_dir))
2537                 return (EINVAL);
2538
2539         /* Since this is so expensive, don't do the preliminary check */
2540         if (!dmu_tx_is_syncing(tx))
2541                 return (0);
2542
2543         if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2544                 return (EXDEV);
2545
2546         /* compute origin's new unique space */
2547         snap = list_tail(&pa->clone_snaps);
2548         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2549         dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2550             origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2551             &pa->unique, &unused, &unused);
2552
2553         /*
2554          * Walk the snapshots that we are moving
2555          *
2556          * Compute space to transfer.  Consider the incremental changes
2557          * to used for each snapshot:
2558          * (my used) = (prev's used) + (blocks born) - (blocks killed)
2559          * So each snapshot gave birth to:
2560          * (blocks born) = (my used) - (prev's used) + (blocks killed)
2561          * So a sequence would look like:
2562          * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2563          * Which simplifies to:
2564          * uN + kN + kN-1 + ... + k1 + k0
2565          * Note however, if we stop before we reach the ORIGIN we get:
2566          * uN + kN + kN-1 + ... + kM - uM-1
2567          */
2568         pa->used = origin_ds->ds_phys->ds_used_bytes;
2569         pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2570         pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2571         for (snap = list_head(&pa->shared_snaps); snap;
2572             snap = list_next(&pa->shared_snaps, snap)) {
2573                 uint64_t val, dlused, dlcomp, dluncomp;
2574                 dsl_dataset_t *ds = snap->ds;
2575
2576                 /* Check that the snapshot name does not conflict */
2577                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2578                 err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2579                 if (err == 0) {
2580                         err = EEXIST;
2581                         goto out;
2582                 }
2583                 if (err != ENOENT)
2584                         goto out;
2585
2586                 /* The very first snapshot does not have a deadlist */
2587                 if (ds->ds_phys->ds_prev_snap_obj == 0)
2588                         continue;
2589
2590                 dsl_deadlist_space(&ds->ds_deadlist,
2591                     &dlused, &dlcomp, &dluncomp);
2592                 pa->used += dlused;
2593                 pa->comp += dlcomp;
2594                 pa->uncomp += dluncomp;
2595         }
2596
2597         /*
2598          * If we are a clone of a clone then we never reached ORIGIN,
2599          * so we need to subtract out the clone origin's used space.
2600          */
2601         if (pa->origin_origin) {
2602                 pa->used -= pa->origin_origin->ds_phys->ds_used_bytes;
2603                 pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2604                 pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2605         }
2606
2607         /* Check that there is enough space here */
2608         err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2609             pa->used);
2610         if (err)
2611                 return (err);
2612
2613         /*
2614          * Compute the amounts of space that will be used by snapshots
2615          * after the promotion (for both origin and clone).  For each,
2616          * it is the amount of space that will be on all of their
2617          * deadlists (that was not born before their new origin).
2618          */
2619         if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2620                 uint64_t space;
2621
2622                 /*
2623                  * Note, typically this will not be a clone of a clone,
2624                  * so dd_origin_txg will be < TXG_INITIAL, so
2625                  * these snaplist_space() -> dsl_deadlist_space_range()
2626                  * calls will be fast because they do not have to
2627                  * iterate over all bps.
2628                  */
2629                 snap = list_head(&pa->origin_snaps);
2630                 err = snaplist_space(&pa->shared_snaps,
2631                     snap->ds->ds_dir->dd_origin_txg, &pa->cloneusedsnap);
2632                 if (err)
2633                         return (err);
2634
2635                 err = snaplist_space(&pa->clone_snaps,
2636                     snap->ds->ds_dir->dd_origin_txg, &space);
2637                 if (err)
2638                         return (err);
2639                 pa->cloneusedsnap += space;
2640         }
2641         if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2642                 err = snaplist_space(&pa->origin_snaps,
2643                     origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2644                 if (err)
2645                         return (err);
2646         }
2647
2648         return (0);
2649 out:
2650         pa->err_ds =  snap->ds->ds_snapname;
2651         return (err);
2652 }
2653
2654 static void
2655 dsl_dataset_promote_sync(void *arg1, void *arg2, dmu_tx_t *tx)
2656 {
2657         dsl_dataset_t *hds = arg1;
2658         struct promotearg *pa = arg2;
2659         struct promotenode *snap = list_head(&pa->shared_snaps);
2660         dsl_dataset_t *origin_ds = snap->ds;
2661         dsl_dataset_t *origin_head;
2662         dsl_dir_t *dd = hds->ds_dir;
2663         dsl_pool_t *dp = hds->ds_dir->dd_pool;
2664         dsl_dir_t *odd = NULL;
2665         uint64_t oldnext_obj;
2666         int64_t delta;
2667
2668         ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2669
2670         snap = list_head(&pa->origin_snaps);
2671         origin_head = snap->ds;
2672
2673         /*
2674          * We need to explicitly open odd, since origin_ds's dd will be
2675          * changing.
2676          */
2677         VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2678             NULL, FTAG, &odd));
2679
2680         /* change origin's next snap */
2681         dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2682         oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2683         snap = list_tail(&pa->clone_snaps);
2684         ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2685         origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2686
2687         /* change the origin's next clone */
2688         if (origin_ds->ds_phys->ds_next_clones_obj) {
2689                 remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2690                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2691                     origin_ds->ds_phys->ds_next_clones_obj,
2692                     oldnext_obj, tx));
2693         }
2694
2695         /* change origin */
2696         dmu_buf_will_dirty(dd->dd_dbuf, tx);
2697         ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2698         dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2699         dd->dd_origin_txg = origin_head->ds_dir->dd_origin_txg;
2700         dmu_buf_will_dirty(odd->dd_dbuf, tx);
2701         odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2702         origin_head->ds_dir->dd_origin_txg =
2703             origin_ds->ds_phys->ds_creation_txg;
2704
2705         /* change dd_clone entries */
2706         if (spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2707                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2708                     odd->dd_phys->dd_clones, hds->ds_object, tx));
2709                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2710                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2711                     hds->ds_object, tx));
2712
2713                 VERIFY3U(0, ==, zap_remove_int(dp->dp_meta_objset,
2714                     pa->origin_origin->ds_dir->dd_phys->dd_clones,
2715                     origin_head->ds_object, tx));
2716                 if (dd->dd_phys->dd_clones == 0) {
2717                         dd->dd_phys->dd_clones = zap_create(dp->dp_meta_objset,
2718                             DMU_OT_DSL_CLONES, DMU_OT_NONE, 0, tx);
2719                 }
2720                 VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2721                     dd->dd_phys->dd_clones, origin_head->ds_object, tx));
2722
2723         }
2724
2725         /* move snapshots to this dir */
2726         for (snap = list_head(&pa->shared_snaps); snap;
2727             snap = list_next(&pa->shared_snaps, snap)) {
2728                 dsl_dataset_t *ds = snap->ds;
2729
2730                 /* unregister props as dsl_dir is changing */
2731                 if (ds->ds_objset) {
2732                         dmu_objset_evict(ds->ds_objset);
2733                         ds->ds_objset = NULL;
2734                 }
2735                 /* move snap name entry */
2736                 VERIFY(0 == dsl_dataset_get_snapname(ds));
2737                 VERIFY(0 == dsl_dataset_snap_remove(origin_head,
2738                     ds->ds_snapname, tx));
2739                 VERIFY(0 == zap_add(dp->dp_meta_objset,
2740                     hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2741                     8, 1, &ds->ds_object, tx));
2742
2743                 /* change containing dsl_dir */
2744                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
2745                 ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
2746                 ds->ds_phys->ds_dir_obj = dd->dd_object;
2747                 ASSERT3P(ds->ds_dir, ==, odd);
2748                 dsl_dir_close(ds->ds_dir, ds);
2749                 VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
2750                     NULL, ds, &ds->ds_dir));
2751
2752                 /* move any clone references */
2753                 if (ds->ds_phys->ds_next_clones_obj &&
2754                     spa_version(dp->dp_spa) >= SPA_VERSION_DIR_CLONES) {
2755                         zap_cursor_t zc;
2756                         zap_attribute_t za;
2757
2758                         for (zap_cursor_init(&zc, dp->dp_meta_objset,
2759                             ds->ds_phys->ds_next_clones_obj);
2760                             zap_cursor_retrieve(&zc, &za) == 0;
2761                             zap_cursor_advance(&zc)) {
2762                                 dsl_dataset_t *cnds;
2763                                 uint64_t o;
2764
2765                                 if (za.za_first_integer == oldnext_obj) {
2766                                         /*
2767                                          * We've already moved the
2768                                          * origin's reference.
2769                                          */
2770                                         continue;
2771                                 }
2772
2773                                 VERIFY3U(0, ==, dsl_dataset_hold_obj(dp,
2774                                     za.za_first_integer, FTAG, &cnds));
2775                                 o = cnds->ds_dir->dd_phys->dd_head_dataset_obj;
2776
2777                                 VERIFY3U(zap_remove_int(dp->dp_meta_objset,
2778                                     odd->dd_phys->dd_clones, o, tx), ==, 0);
2779                                 VERIFY3U(zap_add_int(dp->dp_meta_objset,
2780                                     dd->dd_phys->dd_clones, o, tx), ==, 0);
2781                                 dsl_dataset_rele(cnds, FTAG);
2782                         }
2783                         zap_cursor_fini(&zc);
2784                 }
2785
2786                 ASSERT3U(dsl_prop_numcb(ds), ==, 0);
2787         }
2788
2789         /*
2790          * Change space accounting.
2791          * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
2792          * both be valid, or both be 0 (resulting in delta == 0).  This
2793          * is true for each of {clone,origin} independently.
2794          */
2795
2796         delta = pa->cloneusedsnap -
2797             dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2798         ASSERT3S(delta, >=, 0);
2799         ASSERT3U(pa->used, >=, delta);
2800         dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
2801         dsl_dir_diduse_space(dd, DD_USED_HEAD,
2802             pa->used - delta, pa->comp, pa->uncomp, tx);
2803
2804         delta = pa->originusedsnap -
2805             odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2806         ASSERT3S(delta, <=, 0);
2807         ASSERT3U(pa->used, >=, -delta);
2808         dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
2809         dsl_dir_diduse_space(odd, DD_USED_HEAD,
2810             -pa->used - delta, -pa->comp, -pa->uncomp, tx);
2811
2812         origin_ds->ds_phys->ds_unique_bytes = pa->unique;
2813
2814         /* log history record */
2815         spa_history_log_internal(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
2816             "dataset = %llu", hds->ds_object);
2817
2818         dsl_dir_close(odd, FTAG);
2819 }
2820
2821 static char *snaplist_tag = "snaplist";
2822 /*
2823  * Make a list of dsl_dataset_t's for the snapshots between first_obj
2824  * (exclusive) and last_obj (inclusive).  The list will be in reverse
2825  * order (last_obj will be the list_head()).  If first_obj == 0, do all
2826  * snapshots back to this dataset's origin.
2827  */
2828 static int
2829 snaplist_make(dsl_pool_t *dp, boolean_t own,
2830     uint64_t first_obj, uint64_t last_obj, list_t *l)
2831 {
2832         uint64_t obj = last_obj;
2833
2834         ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
2835
2836         list_create(l, sizeof (struct promotenode),
2837             offsetof(struct promotenode, link));
2838
2839         while (obj != first_obj) {
2840                 dsl_dataset_t *ds;
2841                 struct promotenode *snap;
2842                 int err;
2843
2844                 if (own) {
2845                         err = dsl_dataset_own_obj(dp, obj,
2846                             0, snaplist_tag, &ds);
2847                         if (err == 0)
2848                                 dsl_dataset_make_exclusive(ds, snaplist_tag);
2849                 } else {
2850                         err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
2851                 }
2852                 if (err == ENOENT) {
2853                         /* lost race with snapshot destroy */
2854                         struct promotenode *last = list_tail(l);
2855                         ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
2856                         obj = last->ds->ds_phys->ds_prev_snap_obj;
2857                         continue;
2858                 } else if (err) {
2859                         return (err);
2860                 }
2861
2862                 if (first_obj == 0)
2863                         first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
2864
2865                 snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
2866                 snap->ds = ds;
2867                 list_insert_tail(l, snap);
2868                 obj = ds->ds_phys->ds_prev_snap_obj;
2869         }
2870
2871         return (0);
2872 }
2873
2874 static int
2875 snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
2876 {
2877         struct promotenode *snap;
2878
2879         *spacep = 0;
2880         for (snap = list_head(l); snap; snap = list_next(l, snap)) {
2881                 uint64_t used, comp, uncomp;
2882                 dsl_deadlist_space_range(&snap->ds->ds_deadlist,
2883                     mintxg, UINT64_MAX, &used, &comp, &uncomp);
2884                 *spacep += used;
2885         }
2886         return (0);
2887 }
2888
2889 static void
2890 snaplist_destroy(list_t *l, boolean_t own)
2891 {
2892         struct promotenode *snap;
2893
2894         if (!l || !list_link_active(&l->list_head))
2895                 return;
2896
2897         while ((snap = list_tail(l)) != NULL) {
2898                 list_remove(l, snap);
2899                 if (own)
2900                         dsl_dataset_disown(snap->ds, snaplist_tag);
2901                 else
2902                         dsl_dataset_rele(snap->ds, snaplist_tag);
2903                 kmem_free(snap, sizeof (struct promotenode));
2904         }
2905         list_destroy(l);
2906 }
2907
2908 /*
2909  * Promote a clone.  Nomenclature note:
2910  * "clone" or "cds": the original clone which is being promoted
2911  * "origin" or "ods": the snapshot which is originally clone's origin
2912  * "origin head" or "ohds": the dataset which is the head
2913  * (filesystem/volume) for the origin
2914  * "origin origin": the origin of the origin's filesystem (typically
2915  * NULL, indicating that the clone is not a clone of a clone).
2916  */
2917 int
2918 dsl_dataset_promote(const char *name, char *conflsnap)
2919 {
2920         dsl_dataset_t *ds;
2921         dsl_dir_t *dd;
2922         dsl_pool_t *dp;
2923         dmu_object_info_t doi;
2924         struct promotearg pa;
2925         struct promotenode *snap;
2926         int err;
2927
2928         bzero(&pa, sizeof(struct promotearg));
2929         err = dsl_dataset_hold(name, FTAG, &ds);
2930         if (err)
2931                 return (err);
2932         dd = ds->ds_dir;
2933         dp = dd->dd_pool;
2934
2935         err = dmu_object_info(dp->dp_meta_objset,
2936             ds->ds_phys->ds_snapnames_zapobj, &doi);
2937         if (err) {
2938                 dsl_dataset_rele(ds, FTAG);
2939                 return (err);
2940         }
2941
2942         if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
2943                 dsl_dataset_rele(ds, FTAG);
2944                 return (EINVAL);
2945         }
2946
2947         /*
2948          * We are going to inherit all the snapshots taken before our
2949          * origin (i.e., our new origin will be our parent's origin).
2950          * Take ownership of them so that we can rename them into our
2951          * namespace.
2952          */
2953         rw_enter(&dp->dp_config_rwlock, RW_READER);
2954
2955         err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
2956             &pa.shared_snaps);
2957         if (err != 0)
2958                 goto out;
2959
2960         err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
2961         if (err != 0)
2962                 goto out;
2963
2964         snap = list_head(&pa.shared_snaps);
2965         ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
2966         err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
2967             snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
2968         if (err != 0)
2969                 goto out;
2970
2971         if (snap->ds->ds_dir->dd_phys->dd_origin_obj != 0) {
2972                 err = dsl_dataset_hold_obj(dp,
2973                     snap->ds->ds_dir->dd_phys->dd_origin_obj,
2974                     FTAG, &pa.origin_origin);
2975                 if (err != 0)
2976                         goto out;
2977         }
2978
2979 out:
2980         rw_exit(&dp->dp_config_rwlock);
2981
2982         /*
2983          * Add in 128x the snapnames zapobj size, since we will be moving
2984          * a bunch of snapnames to the promoted ds, and dirtying their
2985          * bonus buffers.
2986          */
2987         if (err == 0) {
2988                 err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
2989                     dsl_dataset_promote_sync, ds, &pa,
2990                     2 + 2 * doi.doi_physical_blocks_512);
2991                 if (err && pa.err_ds && conflsnap)
2992                         (void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
2993         }
2994
2995         snaplist_destroy(&pa.shared_snaps, B_TRUE);
2996         snaplist_destroy(&pa.clone_snaps, B_FALSE);
2997         snaplist_destroy(&pa.origin_snaps, B_FALSE);
2998         if (pa.origin_origin)
2999                 dsl_dataset_rele(pa.origin_origin, FTAG);
3000         dsl_dataset_rele(ds, FTAG);
3001         return (err);
3002 }
3003
3004 struct cloneswaparg {
3005         dsl_dataset_t *cds; /* clone dataset */
3006         dsl_dataset_t *ohds; /* origin's head dataset */
3007         boolean_t force;
3008         int64_t unused_refres_delta; /* change in unconsumed refreservation */
3009 };
3010
3011 /* ARGSUSED */
3012 static int
3013 dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
3014 {
3015         struct cloneswaparg *csa = arg1;
3016
3017         /* they should both be heads */
3018         if (dsl_dataset_is_snapshot(csa->cds) ||
3019             dsl_dataset_is_snapshot(csa->ohds))
3020                 return (EINVAL);
3021
3022         /* the branch point should be just before them */
3023         if (csa->cds->ds_prev != csa->ohds->ds_prev)
3024                 return (EINVAL);
3025
3026         /* cds should be the clone (unless they are unrelated) */
3027         if (csa->cds->ds_prev != NULL &&
3028             csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
3029             csa->ohds->ds_object !=
3030             csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
3031                 return (EINVAL);
3032
3033         /* the clone should be a child of the origin */
3034         if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
3035                 return (EINVAL);
3036
3037         /* ohds shouldn't be modified unless 'force' */
3038         if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
3039                 return (ETXTBSY);
3040
3041         /* adjust amount of any unconsumed refreservation */
3042         csa->unused_refres_delta =
3043             (int64_t)MIN(csa->ohds->ds_reserved,
3044             csa->ohds->ds_phys->ds_unique_bytes) -
3045             (int64_t)MIN(csa->ohds->ds_reserved,
3046             csa->cds->ds_phys->ds_unique_bytes);
3047
3048         if (csa->unused_refres_delta > 0 &&
3049             csa->unused_refres_delta >
3050             dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
3051                 return (ENOSPC);
3052
3053         if (csa->ohds->ds_quota != 0 &&
3054             csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
3055                 return (EDQUOT);
3056
3057         return (0);
3058 }
3059
3060 /* ARGSUSED */
3061 static void
3062 dsl_dataset_clone_swap_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3063 {
3064         struct cloneswaparg *csa = arg1;
3065         dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
3066
3067         ASSERT(csa->cds->ds_reserved == 0);
3068         ASSERT(csa->ohds->ds_quota == 0 ||
3069             csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
3070
3071         dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
3072         dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
3073
3074         if (csa->cds->ds_objset != NULL) {
3075                 dmu_objset_evict(csa->cds->ds_objset);
3076                 csa->cds->ds_objset = NULL;
3077         }
3078
3079         if (csa->ohds->ds_objset != NULL) {
3080                 dmu_objset_evict(csa->ohds->ds_objset);
3081                 csa->ohds->ds_objset = NULL;
3082         }
3083
3084         /*
3085          * Reset origin's unique bytes, if it exists.
3086          */
3087         if (csa->cds->ds_prev) {
3088                 dsl_dataset_t *origin = csa->cds->ds_prev;
3089                 uint64_t comp, uncomp;
3090
3091                 dmu_buf_will_dirty(origin->ds_dbuf, tx);
3092                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3093                     origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
3094                     &origin->ds_phys->ds_unique_bytes, &comp, &uncomp);
3095         }
3096
3097         /* swap blkptrs */
3098         {
3099                 blkptr_t tmp;
3100                 tmp = csa->ohds->ds_phys->ds_bp;
3101                 csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
3102                 csa->cds->ds_phys->ds_bp = tmp;
3103         }
3104
3105         /* set dd_*_bytes */
3106         {
3107                 int64_t dused, dcomp, duncomp;
3108                 uint64_t cdl_used, cdl_comp, cdl_uncomp;
3109                 uint64_t odl_used, odl_comp, odl_uncomp;
3110
3111                 ASSERT3U(csa->cds->ds_dir->dd_phys->
3112                     dd_used_breakdown[DD_USED_SNAP], ==, 0);
3113
3114                 dsl_deadlist_space(&csa->cds->ds_deadlist,
3115                     &cdl_used, &cdl_comp, &cdl_uncomp);
3116                 dsl_deadlist_space(&csa->ohds->ds_deadlist,
3117                     &odl_used, &odl_comp, &odl_uncomp);
3118
3119                 dused = csa->cds->ds_phys->ds_used_bytes + cdl_used -
3120                     (csa->ohds->ds_phys->ds_used_bytes + odl_used);
3121                 dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
3122                     (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
3123                 duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
3124                     cdl_uncomp -
3125                     (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
3126
3127                 dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
3128                     dused, dcomp, duncomp, tx);
3129                 dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
3130                     -dused, -dcomp, -duncomp, tx);
3131
3132                 /*
3133                  * The difference in the space used by snapshots is the
3134                  * difference in snapshot space due to the head's
3135                  * deadlist (since that's the only thing that's
3136                  * changing that affects the snapused).
3137                  */
3138                 dsl_deadlist_space_range(&csa->cds->ds_deadlist,
3139                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3140                     &cdl_used, &cdl_comp, &cdl_uncomp);
3141                 dsl_deadlist_space_range(&csa->ohds->ds_deadlist,
3142                     csa->ohds->ds_dir->dd_origin_txg, UINT64_MAX,
3143                     &odl_used, &odl_comp, &odl_uncomp);
3144                 dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
3145                     DD_USED_HEAD, DD_USED_SNAP, tx);
3146         }
3147
3148         /* swap ds_*_bytes */
3149         SWITCH64(csa->ohds->ds_phys->ds_used_bytes,
3150             csa->cds->ds_phys->ds_used_bytes);
3151         SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
3152             csa->cds->ds_phys->ds_compressed_bytes);
3153         SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
3154             csa->cds->ds_phys->ds_uncompressed_bytes);
3155         SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
3156             csa->cds->ds_phys->ds_unique_bytes);
3157
3158         /* apply any parent delta for change in unconsumed refreservation */
3159         dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
3160             csa->unused_refres_delta, 0, 0, tx);
3161
3162         /*
3163          * Swap deadlists.
3164          */
3165         dsl_deadlist_close(&csa->cds->ds_deadlist);
3166         dsl_deadlist_close(&csa->ohds->ds_deadlist);
3167         SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
3168             csa->cds->ds_phys->ds_deadlist_obj);
3169         dsl_deadlist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
3170             csa->cds->ds_phys->ds_deadlist_obj);
3171         dsl_deadlist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
3172             csa->ohds->ds_phys->ds_deadlist_obj);
3173
3174         dsl_scan_ds_clone_swapped(csa->ohds, csa->cds, tx);
3175 }
3176
3177 /*
3178  * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
3179  * recv" into an existing fs to swizzle the file system to the new
3180  * version, and by "zfs rollback".  Can also be used to swap two
3181  * independent head datasets if neither has any snapshots.
3182  */
3183 int
3184 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
3185     boolean_t force)
3186 {
3187         struct cloneswaparg csa;
3188         int error;
3189
3190         ASSERT(clone->ds_owner);
3191         ASSERT(origin_head->ds_owner);
3192 retry:
3193         /*
3194          * Need exclusive access for the swap. If we're swapping these
3195          * datasets back after an error, we already hold the locks.
3196          */
3197         if (!RW_WRITE_HELD(&clone->ds_rwlock))
3198                 rw_enter(&clone->ds_rwlock, RW_WRITER);
3199         if (!RW_WRITE_HELD(&origin_head->ds_rwlock) &&
3200             !rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
3201                 rw_exit(&clone->ds_rwlock);
3202                 rw_enter(&origin_head->ds_rwlock, RW_WRITER);
3203                 if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
3204                         rw_exit(&origin_head->ds_rwlock);
3205                         goto retry;
3206                 }
3207         }
3208         csa.cds = clone;
3209         csa.ohds = origin_head;
3210         csa.force = force;
3211         error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3212             dsl_dataset_clone_swap_check,
3213             dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3214         return (error);
3215 }
3216
3217 /*
3218  * Given a pool name and a dataset object number in that pool,
3219  * return the name of that dataset.
3220  */
3221 int
3222 dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3223 {
3224         spa_t *spa;
3225         dsl_pool_t *dp;
3226         dsl_dataset_t *ds;
3227         int error;
3228
3229         if ((error = spa_open(pname, &spa, FTAG)) != 0)
3230                 return (error);
3231         dp = spa_get_dsl(spa);
3232         rw_enter(&dp->dp_config_rwlock, RW_READER);
3233         if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3234                 dsl_dataset_name(ds, buf);
3235                 dsl_dataset_rele(ds, FTAG);
3236         }
3237         rw_exit(&dp->dp_config_rwlock);
3238         spa_close(spa, FTAG);
3239
3240         return (error);
3241 }
3242
3243 int
3244 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3245     uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3246 {
3247         int error = 0;
3248
3249         ASSERT3S(asize, >, 0);
3250
3251         /*
3252          * *ref_rsrv is the portion of asize that will come from any
3253          * unconsumed refreservation space.
3254          */
3255         *ref_rsrv = 0;
3256
3257         mutex_enter(&ds->ds_lock);
3258         /*
3259          * Make a space adjustment for reserved bytes.
3260          */
3261         if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3262                 ASSERT3U(*used, >=,
3263                     ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3264                 *used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3265                 *ref_rsrv =
3266                     asize - MIN(asize, parent_delta(ds, asize + inflight));
3267         }
3268
3269         if (!check_quota || ds->ds_quota == 0) {
3270                 mutex_exit(&ds->ds_lock);
3271                 return (0);
3272         }
3273         /*
3274          * If they are requesting more space, and our current estimate
3275          * is over quota, they get to try again unless the actual
3276          * on-disk is over quota and there are no pending changes (which
3277          * may free up space for us).
3278          */
3279         if (ds->ds_phys->ds_used_bytes + inflight >= ds->ds_quota) {
3280                 if (inflight > 0 || ds->ds_phys->ds_used_bytes < ds->ds_quota)
3281                         error = ERESTART;
3282                 else
3283                         error = EDQUOT;
3284         }
3285         mutex_exit(&ds->ds_lock);
3286
3287         return (error);
3288 }
3289
3290 /* ARGSUSED */
3291 static int
3292 dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3293 {
3294         dsl_dataset_t *ds = arg1;
3295         dsl_prop_setarg_t *psa = arg2;
3296         int err;
3297
3298         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3299                 return (ENOTSUP);
3300
3301         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3302                 return (err);
3303
3304         if (psa->psa_effective_value == 0)
3305                 return (0);
3306
3307         if (psa->psa_effective_value < ds->ds_phys->ds_used_bytes ||
3308             psa->psa_effective_value < ds->ds_reserved)
3309                 return (ENOSPC);
3310
3311         return (0);
3312 }
3313
3314 extern void dsl_prop_set_sync(void *, void *, dmu_tx_t *);
3315
3316 void
3317 dsl_dataset_set_quota_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3318 {
3319         dsl_dataset_t *ds = arg1;
3320         dsl_prop_setarg_t *psa = arg2;
3321         uint64_t effective_value = psa->psa_effective_value;
3322
3323         dsl_prop_set_sync(ds, psa, tx);
3324         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3325
3326         if (ds->ds_quota != effective_value) {
3327                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3328                 ds->ds_quota = effective_value;
3329
3330                 spa_history_log_internal(LOG_DS_REFQUOTA,
3331                     ds->ds_dir->dd_pool->dp_spa, tx, "%lld dataset = %llu ",
3332                     (longlong_t)ds->ds_quota, ds->ds_object);
3333         }
3334 }
3335
3336 int
3337 dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
3338 {
3339         dsl_dataset_t *ds;
3340         dsl_prop_setarg_t psa;
3341         int err;
3342
3343         dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
3344
3345         err = dsl_dataset_hold(dsname, FTAG, &ds);
3346         if (err)
3347                 return (err);
3348
3349         /*
3350          * If someone removes a file, then tries to set the quota, we
3351          * want to make sure the file freeing takes effect.
3352          */
3353         txg_wait_open(ds->ds_dir->dd_pool, 0);
3354
3355         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3356             dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3357             ds, &psa, 0);
3358
3359         dsl_dataset_rele(ds, FTAG);
3360         return (err);
3361 }
3362
3363 static int
3364 dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3365 {
3366         dsl_dataset_t *ds = arg1;
3367         dsl_prop_setarg_t *psa = arg2;
3368         uint64_t effective_value;
3369         uint64_t unique;
3370         int err;
3371
3372         if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3373             SPA_VERSION_REFRESERVATION)
3374                 return (ENOTSUP);
3375
3376         if (dsl_dataset_is_snapshot(ds))
3377                 return (EINVAL);
3378
3379         if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
3380                 return (err);
3381
3382         effective_value = psa->psa_effective_value;
3383
3384         /*
3385          * If we are doing the preliminary check in open context, the
3386          * space estimates may be inaccurate.
3387          */
3388         if (!dmu_tx_is_syncing(tx))
3389                 return (0);
3390
3391         mutex_enter(&ds->ds_lock);
3392         if (!DS_UNIQUE_IS_ACCURATE(ds))
3393                 dsl_dataset_recalc_head_uniq(ds);
3394         unique = ds->ds_phys->ds_unique_bytes;
3395         mutex_exit(&ds->ds_lock);
3396
3397         if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
3398                 uint64_t delta = MAX(unique, effective_value) -
3399                     MAX(unique, ds->ds_reserved);
3400
3401                 if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3402                         return (ENOSPC);
3403                 if (ds->ds_quota > 0 &&
3404                     effective_value > ds->ds_quota)
3405                         return (ENOSPC);
3406         }
3407
3408         return (0);
3409 }
3410
3411 static void
3412 dsl_dataset_set_reservation_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3413 {
3414         dsl_dataset_t *ds = arg1;
3415         dsl_prop_setarg_t *psa = arg2;
3416         uint64_t effective_value = psa->psa_effective_value;
3417         uint64_t unique;
3418         int64_t delta;
3419
3420         dsl_prop_set_sync(ds, psa, tx);
3421         DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
3422
3423         dmu_buf_will_dirty(ds->ds_dbuf, tx);
3424
3425         mutex_enter(&ds->ds_dir->dd_lock);
3426         mutex_enter(&ds->ds_lock);
3427         ASSERT(DS_UNIQUE_IS_ACCURATE(ds));
3428         unique = ds->ds_phys->ds_unique_bytes;
3429         delta = MAX(0, (int64_t)(effective_value - unique)) -
3430             MAX(0, (int64_t)(ds->ds_reserved - unique));
3431         ds->ds_reserved = effective_value;
3432         mutex_exit(&ds->ds_lock);
3433
3434         dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3435         mutex_exit(&ds->ds_dir->dd_lock);
3436
3437         spa_history_log_internal(LOG_DS_REFRESERV,
3438             ds->ds_dir->dd_pool->dp_spa, tx, "%lld dataset = %llu",
3439             (longlong_t)effective_value, ds->ds_object);
3440 }
3441
3442 int
3443 dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
3444     uint64_t reservation)
3445 {
3446         dsl_dataset_t *ds;
3447         dsl_prop_setarg_t psa;
3448         int err;
3449
3450         dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
3451             &reservation);
3452
3453         err = dsl_dataset_hold(dsname, FTAG, &ds);
3454         if (err)
3455                 return (err);
3456
3457         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3458             dsl_dataset_set_reservation_check,
3459             dsl_dataset_set_reservation_sync, ds, &psa, 0);
3460
3461         dsl_dataset_rele(ds, FTAG);
3462         return (err);
3463 }
3464
3465 typedef struct zfs_hold_cleanup_arg {
3466         dsl_pool_t *dp;
3467         uint64_t dsobj;
3468         char htag[MAXNAMELEN];
3469 } zfs_hold_cleanup_arg_t;
3470
3471 static void
3472 dsl_dataset_user_release_onexit(void *arg)
3473 {
3474         zfs_hold_cleanup_arg_t *ca = arg;
3475
3476         (void) dsl_dataset_user_release_tmp(ca->dp, ca->dsobj, ca->htag,
3477             B_TRUE);
3478         kmem_free(ca, sizeof (zfs_hold_cleanup_arg_t));
3479 }
3480
3481 void
3482 dsl_register_onexit_hold_cleanup(dsl_dataset_t *ds, const char *htag,
3483     minor_t minor)
3484 {
3485         zfs_hold_cleanup_arg_t *ca;
3486
3487         ca = kmem_alloc(sizeof (zfs_hold_cleanup_arg_t), KM_SLEEP);
3488         ca->dp = ds->ds_dir->dd_pool;
3489         ca->dsobj = ds->ds_object;
3490         (void) strlcpy(ca->htag, htag, sizeof (ca->htag));
3491         VERIFY3U(0, ==, zfs_onexit_add_cb(minor,
3492             dsl_dataset_user_release_onexit, ca, NULL));
3493 }
3494
3495 /*
3496  * If you add new checks here, you may need to add
3497  * additional checks to the "temporary" case in
3498  * snapshot_check() in dmu_objset.c.
3499  */
3500 static int
3501 dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3502 {
3503         dsl_dataset_t *ds = arg1;
3504         struct dsl_ds_holdarg *ha = arg2;
3505         char *htag = ha->htag;
3506         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3507         int error = 0;
3508
3509         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3510                 return (ENOTSUP);
3511
3512         if (!dsl_dataset_is_snapshot(ds))
3513                 return (EINVAL);
3514
3515         /* tags must be unique */
3516         mutex_enter(&ds->ds_lock);
3517         if (ds->ds_phys->ds_userrefs_obj) {
3518                 error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3519                     8, 1, tx);
3520                 if (error == 0)
3521                         error = EEXIST;
3522                 else if (error == ENOENT)
3523                         error = 0;
3524         }
3525         mutex_exit(&ds->ds_lock);
3526
3527         if (error == 0 && ha->temphold &&
3528             strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
3529                 error = E2BIG;
3530
3531         return (error);
3532 }
3533
3534 void
3535 dsl_dataset_user_hold_sync(void *arg1, void *arg2, dmu_tx_t *tx)
3536 {
3537         dsl_dataset_t *ds = arg1;
3538         struct dsl_ds_holdarg *ha = arg2;
3539         char *htag = ha->htag;
3540         dsl_pool_t *dp = ds->ds_dir->dd_pool;
3541         objset_t *mos = dp->dp_meta_objset;
3542         uint64_t now = gethrestime_sec();
3543         uint64_t zapobj;
3544
3545         mutex_enter(&ds->ds_lock);
3546         if (ds->ds_phys->ds_userrefs_obj == 0) {
3547                 /*
3548                  * This is the first user hold for this dataset.  Create
3549                  * the userrefs zap object.
3550                  */
3551                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
3552                 zapobj = ds->ds_phys->ds_userrefs_obj =
3553                     zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3554         } else {
3555                 zapobj = ds->ds_phys->ds_userrefs_obj;
3556         }
3557         ds->ds_userrefs++;
3558         mutex_exit(&ds->ds_lock);
3559
3560         VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3561
3562         if (ha->temphold) {
3563                 VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3564                     htag, &now, tx));
3565         }
3566
3567         spa_history_log_internal(LOG_DS_USER_HOLD,
3568             dp->dp_spa, tx, "<%s> temp = %d dataset = %llu", htag,
3569             (int)ha->temphold, ds->ds_object);
3570 }
3571
3572 static int
3573 dsl_dataset_user_hold_one(const char *dsname, void *arg)
3574 {
3575         struct dsl_ds_holdarg *ha = arg;
3576         dsl_dataset_t *ds;
3577         int error;
3578         char *name;
3579
3580         /* alloc a buffer to hold dsname@snapname plus terminating NULL */
3581         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3582         error = dsl_dataset_hold(name, ha->dstg, &ds);
3583         strfree(name);
3584         if (error == 0) {
3585                 ha->gotone = B_TRUE;
3586                 dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3587                     dsl_dataset_user_hold_sync, ds, ha, 0);
3588         } else if (error == ENOENT && ha->recursive) {
3589                 error = 0;
3590         } else {
3591                 (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3592         }
3593         return (error);
3594 }
3595
3596 int
3597 dsl_dataset_user_hold_for_send(dsl_dataset_t *ds, char *htag,
3598     boolean_t temphold)
3599 {
3600         struct dsl_ds_holdarg *ha;
3601         int error;
3602
3603         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3604         ha->htag = htag;
3605         ha->temphold = temphold;
3606         error = dsl_sync_task_do(ds->ds_dir->dd_pool,
3607             dsl_dataset_user_hold_check, dsl_dataset_user_hold_sync,
3608             ds, ha, 0);
3609         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3610
3611         return (error);
3612 }
3613
3614 int
3615 dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3616     boolean_t recursive, boolean_t temphold, int cleanup_fd)
3617 {
3618         struct dsl_ds_holdarg *ha;
3619         dsl_sync_task_t *dst;
3620         spa_t *spa;
3621         int error;
3622         minor_t minor = 0;
3623
3624         if (cleanup_fd != -1) {
3625                 /* Currently we only support cleanup-on-exit of tempholds. */
3626                 if (!temphold)
3627                         return (EINVAL);
3628                 error = zfs_onexit_fd_hold(cleanup_fd, &minor);
3629                 if (error)
3630                         return (error);
3631         }
3632
3633         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3634
3635         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3636
3637         error = spa_open(dsname, &spa, FTAG);
3638         if (error) {
3639                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3640                 if (cleanup_fd != -1)
3641                         zfs_onexit_fd_rele(cleanup_fd);
3642                 return (error);
3643         }
3644
3645         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3646         ha->htag = htag;
3647         ha->snapname = snapname;
3648         ha->recursive = recursive;
3649         ha->temphold = temphold;
3650
3651         if (recursive) {
3652                 error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3653                     ha, DS_FIND_CHILDREN);
3654         } else {
3655                 error = dsl_dataset_user_hold_one(dsname, ha);
3656         }
3657         if (error == 0)
3658                 error = dsl_sync_task_group_wait(ha->dstg);
3659
3660         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3661             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3662                 dsl_dataset_t *ds = dst->dst_arg1;
3663
3664                 if (dst->dst_err) {
3665                         dsl_dataset_name(ds, ha->failed);
3666                         *strchr(ha->failed, '@') = '\0';
3667                 } else if (error == 0 && minor != 0 && temphold) {
3668                         /*
3669                          * If this hold is to be released upon process exit,
3670                          * register that action now.
3671                          */
3672                         dsl_register_onexit_hold_cleanup(ds, htag, minor);
3673                 }
3674                 dsl_dataset_rele(ds, ha->dstg);
3675         }
3676
3677         if (error == 0 && recursive && !ha->gotone)
3678                 error = ENOENT;
3679
3680         if (error)
3681                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3682
3683         dsl_sync_task_group_destroy(ha->dstg);
3684
3685         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3686         spa_close(spa, FTAG);
3687         if (cleanup_fd != -1)
3688                 zfs_onexit_fd_rele(cleanup_fd);
3689         return (error);
3690 }
3691
3692 struct dsl_ds_releasearg {
3693         dsl_dataset_t *ds;
3694         const char *htag;
3695         boolean_t own;          /* do we own or just hold ds? */
3696 };
3697
3698 static int
3699 dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3700     boolean_t *might_destroy)
3701 {
3702         objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3703         uint64_t zapobj;
3704         uint64_t tmp;
3705         int error;
3706
3707         *might_destroy = B_FALSE;
3708
3709         mutex_enter(&ds->ds_lock);
3710         zapobj = ds->ds_phys->ds_userrefs_obj;
3711         if (zapobj == 0) {
3712                 /* The tag can't possibly exist */
3713                 mutex_exit(&ds->ds_lock);
3714                 return (ESRCH);
3715         }
3716
3717         /* Make sure the tag exists */
3718         error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3719         if (error) {
3720                 mutex_exit(&ds->ds_lock);
3721                 if (error == ENOENT)
3722                         error = ESRCH;
3723                 return (error);
3724         }
3725
3726         if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3727             DS_IS_DEFER_DESTROY(ds))
3728                 *might_destroy = B_TRUE;
3729
3730         mutex_exit(&ds->ds_lock);
3731         return (0);
3732 }
3733
3734 static int
3735 dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
3736 {
3737         struct dsl_ds_releasearg *ra = arg1;
3738         dsl_dataset_t *ds = ra->ds;
3739         boolean_t might_destroy;
3740         int error;
3741
3742         if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3743                 return (ENOTSUP);
3744
3745         error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
3746         if (error)
3747                 return (error);
3748
3749         if (might_destroy) {
3750                 struct dsl_ds_destroyarg dsda = {0};
3751
3752                 if (dmu_tx_is_syncing(tx)) {
3753                         /*
3754                          * If we're not prepared to remove the snapshot,
3755                          * we can't allow the release to happen right now.
3756                          */
3757                         if (!ra->own)
3758                                 return (EBUSY);
3759                 }
3760                 dsda.ds = ds;
3761                 dsda.releasing = B_TRUE;
3762                 return (dsl_dataset_destroy_check(&dsda, tag, tx));
3763         }
3764
3765         return (0);
3766 }
3767
3768 static void
3769 dsl_dataset_user_release_sync(void *arg1, void *tag, dmu_tx_t *tx)
3770 {
3771         struct dsl_ds_releasearg *ra = arg1;
3772         dsl_dataset_t *ds = ra->ds;
3773         dsl_pool_t *dp = ds->ds_dir->dd_pool;
3774         objset_t *mos = dp->dp_meta_objset;
3775         uint64_t zapobj;
3776         uint64_t dsobj = ds->ds_object;
3777         uint64_t refs;
3778         int error;
3779
3780         mutex_enter(&ds->ds_lock);
3781         ds->ds_userrefs--;
3782         refs = ds->ds_userrefs;
3783         mutex_exit(&ds->ds_lock);
3784         error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
3785         VERIFY(error == 0 || error == ENOENT);
3786         zapobj = ds->ds_phys->ds_userrefs_obj;
3787         VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
3788         if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
3789             DS_IS_DEFER_DESTROY(ds)) {
3790                 struct dsl_ds_destroyarg dsda = {0};
3791
3792                 ASSERT(ra->own);
3793                 dsda.ds = ds;
3794                 dsda.releasing = B_TRUE;
3795                 /* We already did the destroy_check */
3796                 dsl_dataset_destroy_sync(&dsda, tag, tx);
3797         }
3798
3799         spa_history_log_internal(LOG_DS_USER_RELEASE,
3800             dp->dp_spa, tx, "<%s> %lld dataset = %llu",
3801             ra->htag, (longlong_t)refs, dsobj);
3802 }
3803
3804 static int
3805 dsl_dataset_user_release_one(const char *dsname, void *arg)
3806 {
3807         struct dsl_ds_holdarg *ha = arg;
3808         struct dsl_ds_releasearg *ra;
3809         dsl_dataset_t *ds;
3810         int error;
3811         void *dtag = ha->dstg;
3812         char *name;
3813         boolean_t own = B_FALSE;
3814         boolean_t might_destroy;
3815
3816         /* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
3817         name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3818         error = dsl_dataset_hold(name, dtag, &ds);
3819         strfree(name);
3820         if (error == ENOENT && ha->recursive)
3821                 return (0);
3822         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3823         if (error)
3824                 return (error);
3825
3826         ha->gotone = B_TRUE;
3827
3828         ASSERT(dsl_dataset_is_snapshot(ds));
3829
3830         error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
3831         if (error) {
3832                 dsl_dataset_rele(ds, dtag);
3833                 return (error);
3834         }
3835
3836         if (might_destroy) {
3837 #ifdef _KERNEL
3838                 name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3839                 error = zfs_unmount_snap(name, NULL);
3840                 strfree(name);
3841                 if (error) {
3842                         dsl_dataset_rele(ds, dtag);
3843                         return (error);
3844                 }
3845 #endif
3846                 if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
3847                         dsl_dataset_rele(ds, dtag);
3848                         return (EBUSY);
3849                 } else {
3850                         own = B_TRUE;
3851                         dsl_dataset_make_exclusive(ds, dtag);
3852                 }
3853         }
3854
3855         ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
3856         ra->ds = ds;
3857         ra->htag = ha->htag;
3858         ra->own = own;
3859         dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
3860             dsl_dataset_user_release_sync, ra, dtag, 0);
3861
3862         return (0);
3863 }
3864
3865 int
3866 dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
3867     boolean_t recursive)
3868 {
3869         struct dsl_ds_holdarg *ha;
3870         dsl_sync_task_t *dst;
3871         spa_t *spa;
3872         int error;
3873
3874 top:
3875         ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3876
3877         (void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3878
3879         error = spa_open(dsname, &spa, FTAG);
3880         if (error) {
3881                 kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3882                 return (error);
3883         }
3884
3885         ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3886         ha->htag = htag;
3887         ha->snapname = snapname;
3888         ha->recursive = recursive;
3889         if (recursive) {
3890                 error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
3891                     ha, DS_FIND_CHILDREN);
3892         } else {
3893                 error = dsl_dataset_user_release_one(dsname, ha);
3894         }
3895         if (error == 0)
3896                 error = dsl_sync_task_group_wait(ha->dstg);
3897
3898         for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3899             dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3900                 struct dsl_ds_releasearg *ra = dst->dst_arg1;
3901                 dsl_dataset_t *ds = ra->ds;
3902
3903                 if (dst->dst_err)
3904                         dsl_dataset_name(ds, ha->failed);
3905
3906                 if (ra->own)
3907                         dsl_dataset_disown(ds, ha->dstg);
3908                 else
3909                         dsl_dataset_rele(ds, ha->dstg);
3910
3911                 kmem_free(ra, sizeof (struct dsl_ds_releasearg));
3912         }
3913
3914         if (error == 0 && recursive && !ha->gotone)
3915                 error = ENOENT;
3916
3917         if (error && error != EBUSY)
3918                 (void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
3919
3920         dsl_sync_task_group_destroy(ha->dstg);
3921         kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3922         spa_close(spa, FTAG);
3923
3924         /*
3925          * We can get EBUSY if we were racing with deferred destroy and
3926          * dsl_dataset_user_release_check() hadn't done the necessary
3927          * open context setup.  We can also get EBUSY if we're racing
3928          * with destroy and that thread is the ds_owner.  Either way
3929          * the busy condition should be transient, and we should retry
3930          * the release operation.
3931          */
3932         if (error == EBUSY)
3933                 goto top;
3934
3935         return (error);
3936 }
3937
3938 /*
3939  * Called at spa_load time (with retry == B_FALSE) to release a stale
3940  * temporary user hold. Also called by the onexit code (with retry == B_TRUE).
3941  */
3942 int
3943 dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag,
3944     boolean_t retry)
3945 {
3946         dsl_dataset_t *ds;
3947         char *snap;
3948         char *name;
3949         int namelen;
3950         int error;
3951
3952         do {
3953                 rw_enter(&dp->dp_config_rwlock, RW_READER);
3954                 error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
3955                 rw_exit(&dp->dp_config_rwlock);
3956                 if (error)
3957                         return (error);
3958                 namelen = dsl_dataset_namelen(ds)+1;
3959                 name = kmem_alloc(namelen, KM_SLEEP);
3960                 dsl_dataset_name(ds, name);
3961                 dsl_dataset_rele(ds, FTAG);
3962
3963                 snap = strchr(name, '@');
3964                 *snap = '\0';
3965                 ++snap;
3966                 error = dsl_dataset_user_release(name, snap, htag, B_FALSE);
3967                 kmem_free(name, namelen);
3968
3969                 /*
3970                  * The object can't have been destroyed because we have a hold,
3971                  * but it might have been renamed, resulting in ENOENT.  Retry
3972                  * if we've been requested to do so.
3973                  *
3974                  * It would be nice if we could use the dsobj all the way
3975                  * through and avoid ENOENT entirely.  But we might need to
3976                  * unmount the snapshot, and there's currently no way to lookup
3977                  * a vfsp using a ZFS object id.
3978                  */
3979         } while ((error == ENOENT) && retry);
3980
3981         return (error);
3982 }
3983
3984 int
3985 dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
3986 {
3987         dsl_dataset_t *ds;
3988         int err;
3989
3990         err = dsl_dataset_hold(dsname, FTAG, &ds);
3991         if (err)
3992                 return (err);
3993
3994         VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
3995         if (ds->ds_phys->ds_userrefs_obj != 0) {
3996                 zap_attribute_t *za;
3997                 zap_cursor_t zc;
3998
3999                 za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
4000                 for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
4001                     ds->ds_phys->ds_userrefs_obj);
4002                     zap_cursor_retrieve(&zc, za) == 0;
4003                     zap_cursor_advance(&zc)) {
4004                         VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
4005                             za->za_first_integer));
4006                 }
4007                 zap_cursor_fini(&zc);
4008                 kmem_free(za, sizeof (zap_attribute_t));
4009         }
4010         dsl_dataset_rele(ds, FTAG);
4011         return (0);
4012 }
4013
4014 /*
4015  * Note, this fuction is used as the callback for dmu_objset_find().  We
4016  * always return 0 so that we will continue to find and process
4017  * inconsistent datasets, even if we encounter an error trying to
4018  * process one of them.
4019  */
4020 /* ARGSUSED */
4021 int
4022 dsl_destroy_inconsistent(const char *dsname, void *arg)
4023 {
4024         dsl_dataset_t *ds;
4025
4026         if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
4027                 if (DS_IS_INCONSISTENT(ds))
4028                         (void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
4029                 else
4030                         dsl_dataset_disown(ds, FTAG);
4031         }
4032         return (0);
4033 }