Rebase master to b117
[zfs.git] / module / zfs / dsl_pool.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25
26 #include <sys/dsl_pool.h>
27 #include <sys/dsl_dataset.h>
28 #include <sys/dsl_dir.h>
29 #include <sys/dsl_synctask.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dmu_objset.h>
32 #include <sys/arc.h>
33 #include <sys/zap.h>
34 #include <sys/zio.h>
35 #include <sys/zfs_context.h>
36 #include <sys/fs/zfs.h>
37 #include <sys/zfs_znode.h>
38 #include <sys/spa_impl.h>
39
40 int zfs_no_write_throttle = 0;
41 int zfs_write_limit_shift = 3;                  /* 1/8th of physical memory */
42 int zfs_txg_synctime = 5;                       /* target secs to sync a txg */
43
44 uint64_t zfs_write_limit_min = 32 << 20;        /* min write limit is 32MB */
45 uint64_t zfs_write_limit_max = 0;               /* max data payload per txg */
46 uint64_t zfs_write_limit_inflated = 0;
47 uint64_t zfs_write_limit_override = 0;
48
49 kmutex_t zfs_write_limit_lock;
50
51 static pgcnt_t old_physmem = 0;
52
53 static int
54 dsl_pool_open_special_dir(dsl_pool_t *dp, const char *name, dsl_dir_t **ddp)
55 {
56         uint64_t obj;
57         int err;
58
59         err = zap_lookup(dp->dp_meta_objset,
60             dp->dp_root_dir->dd_phys->dd_child_dir_zapobj,
61             name, sizeof (obj), 1, &obj);
62         if (err)
63                 return (err);
64
65         return (dsl_dir_open_obj(dp, obj, name, dp, ddp));
66 }
67
68 static dsl_pool_t *
69 dsl_pool_open_impl(spa_t *spa, uint64_t txg)
70 {
71         dsl_pool_t *dp;
72         blkptr_t *bp = spa_get_rootblkptr(spa);
73
74         dp = kmem_zalloc(sizeof (dsl_pool_t), KM_SLEEP);
75         dp->dp_spa = spa;
76         dp->dp_meta_rootbp = *bp;
77         rw_init(&dp->dp_config_rwlock, NULL, RW_DEFAULT, NULL);
78         dp->dp_write_limit = zfs_write_limit_min;
79         txg_init(dp, txg);
80
81         txg_list_create(&dp->dp_dirty_datasets,
82             offsetof(dsl_dataset_t, ds_dirty_link));
83         txg_list_create(&dp->dp_dirty_dirs,
84             offsetof(dsl_dir_t, dd_dirty_link));
85         txg_list_create(&dp->dp_sync_tasks,
86             offsetof(dsl_sync_task_group_t, dstg_node));
87         list_create(&dp->dp_synced_datasets, sizeof (dsl_dataset_t),
88             offsetof(dsl_dataset_t, ds_synced_link));
89
90         mutex_init(&dp->dp_lock, NULL, MUTEX_DEFAULT, NULL);
91         mutex_init(&dp->dp_scrub_cancel_lock, NULL, MUTEX_DEFAULT, NULL);
92
93         dp->dp_vnrele_taskq = taskq_create("zfs_vn_rele_taskq", 1, minclsyspri,
94             1, 4, 0);
95
96         return (dp);
97 }
98
99 int
100 dsl_pool_open(spa_t *spa, uint64_t txg, dsl_pool_t **dpp)
101 {
102         int err;
103         dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
104         dsl_dir_t *dd;
105         dsl_dataset_t *ds;
106         objset_impl_t *osi;
107
108         rw_enter(&dp->dp_config_rwlock, RW_WRITER);
109         err = dmu_objset_open_impl(spa, NULL, &dp->dp_meta_rootbp, &osi);
110         if (err)
111                 goto out;
112         dp->dp_meta_objset = &osi->os;
113
114         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
115             DMU_POOL_ROOT_DATASET, sizeof (uint64_t), 1,
116             &dp->dp_root_dir_obj);
117         if (err)
118                 goto out;
119
120         err = dsl_dir_open_obj(dp, dp->dp_root_dir_obj,
121             NULL, dp, &dp->dp_root_dir);
122         if (err)
123                 goto out;
124
125         err = dsl_pool_open_special_dir(dp, MOS_DIR_NAME, &dp->dp_mos_dir);
126         if (err)
127                 goto out;
128
129         if (spa_version(spa) >= SPA_VERSION_ORIGIN) {
130                 err = dsl_pool_open_special_dir(dp, ORIGIN_DIR_NAME, &dd);
131                 if (err)
132                         goto out;
133                 err = dsl_dataset_hold_obj(dp, dd->dd_phys->dd_head_dataset_obj,
134                     FTAG, &ds);
135                 if (err == 0) {
136                         err = dsl_dataset_hold_obj(dp,
137                             ds->ds_phys->ds_prev_snap_obj, dp,
138                             &dp->dp_origin_snap);
139                         dsl_dataset_rele(ds, FTAG);
140                 }
141                 dsl_dir_close(dd, dp);
142                 if (err)
143                         goto out;
144         }
145
146         /* get scrub status */
147         err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
148             DMU_POOL_SCRUB_FUNC, sizeof (uint32_t), 1,
149             &dp->dp_scrub_func);
150         if (err == 0) {
151                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
152                     DMU_POOL_SCRUB_QUEUE, sizeof (uint64_t), 1,
153                     &dp->dp_scrub_queue_obj);
154                 if (err)
155                         goto out;
156                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
157                     DMU_POOL_SCRUB_MIN_TXG, sizeof (uint64_t), 1,
158                     &dp->dp_scrub_min_txg);
159                 if (err)
160                         goto out;
161                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
162                     DMU_POOL_SCRUB_MAX_TXG, sizeof (uint64_t), 1,
163                     &dp->dp_scrub_max_txg);
164                 if (err)
165                         goto out;
166                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
167                     DMU_POOL_SCRUB_BOOKMARK, sizeof (uint64_t), 4,
168                     &dp->dp_scrub_bookmark);
169                 if (err)
170                         goto out;
171                 err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
172                     DMU_POOL_SCRUB_ERRORS, sizeof (uint64_t), 1,
173                     &spa->spa_scrub_errors);
174                 if (err)
175                         goto out;
176                 if (spa_version(spa) < SPA_VERSION_DSL_SCRUB) {
177                         /*
178                          * A new-type scrub was in progress on an old
179                          * pool.  Restart from the beginning, since the
180                          * old software may have changed the pool in the
181                          * meantime.
182                          */
183                         dsl_pool_scrub_restart(dp);
184                 }
185         } else {
186                 /*
187                  * It's OK if there is no scrub in progress (and if
188                  * there was an I/O error, ignore it).
189                  */
190                 err = 0;
191         }
192
193 out:
194         rw_exit(&dp->dp_config_rwlock);
195         if (err)
196                 dsl_pool_close(dp);
197         else
198                 *dpp = dp;
199
200         return (err);
201 }
202
203 void
204 dsl_pool_close(dsl_pool_t *dp)
205 {
206         /* drop our references from dsl_pool_open() */
207
208         /*
209          * Since we held the origin_snap from "syncing" context (which
210          * includes pool-opening context), it actually only got a "ref"
211          * and not a hold, so just drop that here.
212          */
213         if (dp->dp_origin_snap)
214                 dsl_dataset_drop_ref(dp->dp_origin_snap, dp);
215         if (dp->dp_mos_dir)
216                 dsl_dir_close(dp->dp_mos_dir, dp);
217         if (dp->dp_root_dir)
218                 dsl_dir_close(dp->dp_root_dir, dp);
219
220         /* undo the dmu_objset_open_impl(mos) from dsl_pool_open() */
221         if (dp->dp_meta_objset)
222                 dmu_objset_evict(NULL, dp->dp_meta_objset->os);
223
224         txg_list_destroy(&dp->dp_dirty_datasets);
225         txg_list_destroy(&dp->dp_dirty_dirs);
226         list_destroy(&dp->dp_synced_datasets);
227
228         arc_flush(dp->dp_spa);
229         txg_fini(dp);
230         rw_destroy(&dp->dp_config_rwlock);
231         mutex_destroy(&dp->dp_lock);
232         mutex_destroy(&dp->dp_scrub_cancel_lock);
233         taskq_destroy(dp->dp_vnrele_taskq);
234         if (dp->dp_blkstats)
235                 kmem_free(dp->dp_blkstats, sizeof (zfs_all_blkstats_t));
236         kmem_free(dp, sizeof (dsl_pool_t));
237 }
238
239 dsl_pool_t *
240 dsl_pool_create(spa_t *spa, nvlist_t *zplprops, uint64_t txg)
241 {
242         int err;
243         dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
244         dmu_tx_t *tx = dmu_tx_create_assigned(dp, txg);
245         objset_impl_t *osip;
246         dsl_dataset_t *ds;
247         uint64_t dsobj;
248
249         /* create and open the MOS (meta-objset) */
250         dp->dp_meta_objset = &dmu_objset_create_impl(spa,
251             NULL, &dp->dp_meta_rootbp, DMU_OST_META, tx)->os;
252
253         /* create the pool directory */
254         err = zap_create_claim(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
255             DMU_OT_OBJECT_DIRECTORY, DMU_OT_NONE, 0, tx);
256         ASSERT3U(err, ==, 0);
257
258         /* create and open the root dir */
259         dp->dp_root_dir_obj = dsl_dir_create_sync(dp, NULL, NULL, tx);
260         VERIFY(0 == dsl_dir_open_obj(dp, dp->dp_root_dir_obj,
261             NULL, dp, &dp->dp_root_dir));
262
263         /* create and open the meta-objset dir */
264         (void) dsl_dir_create_sync(dp, dp->dp_root_dir, MOS_DIR_NAME, tx);
265         VERIFY(0 == dsl_pool_open_special_dir(dp,
266             MOS_DIR_NAME, &dp->dp_mos_dir));
267
268         if (spa_version(spa) >= SPA_VERSION_DSL_SCRUB)
269                 dsl_pool_create_origin(dp, tx);
270
271         /* create the root dataset */
272         dsobj = dsl_dataset_create_sync_dd(dp->dp_root_dir, NULL, 0, tx);
273
274         /* create the root objset */
275         VERIFY(0 == dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
276         osip = dmu_objset_create_impl(dp->dp_spa, ds,
277             dsl_dataset_get_blkptr(ds), DMU_OST_ZFS, tx);
278 #ifdef _KERNEL
279         zfs_create_fs(&osip->os, kcred, zplprops, tx);
280 #endif
281         dsl_dataset_rele(ds, FTAG);
282
283         dmu_tx_commit(tx);
284
285         return (dp);
286 }
287
288 void
289 dsl_pool_sync(dsl_pool_t *dp, uint64_t txg)
290 {
291         zio_t *zio;
292         dmu_tx_t *tx;
293         dsl_dir_t *dd;
294         dsl_dataset_t *ds;
295         dsl_sync_task_group_t *dstg;
296         objset_impl_t *mosi = dp->dp_meta_objset->os;
297         hrtime_t start, write_time;
298         uint64_t data_written;
299         int err;
300
301         tx = dmu_tx_create_assigned(dp, txg);
302
303         dp->dp_read_overhead = 0;
304         start = gethrtime();
305
306         zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
307         while (ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) {
308                 /*
309                  * We must not sync any non-MOS datasets twice, because
310                  * we may have taken a snapshot of them.  However, we
311                  * may sync newly-created datasets on pass 2.
312                  */
313                 ASSERT(!list_link_active(&ds->ds_synced_link));
314                 list_insert_tail(&dp->dp_synced_datasets, ds);
315                 dsl_dataset_sync(ds, zio, tx);
316         }
317         DTRACE_PROBE(pool_sync__1setup);
318         err = zio_wait(zio);
319
320         write_time = gethrtime() - start;
321         ASSERT(err == 0);
322         DTRACE_PROBE(pool_sync__2rootzio);
323
324         for (ds = list_head(&dp->dp_synced_datasets); ds;
325             ds = list_next(&dp->dp_synced_datasets, ds))
326                 dmu_objset_do_userquota_callbacks(ds->ds_user_ptr, tx);
327
328         /*
329          * Sync the datasets again to push out the changes due to
330          * userquota updates.  This must be done before we process the
331          * sync tasks, because that could cause a snapshot of a dataset
332          * whose ds_bp will be rewritten when we do this 2nd sync.
333          */
334         zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
335         while (ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) {
336                 ASSERT(list_link_active(&ds->ds_synced_link));
337                 dmu_buf_rele(ds->ds_dbuf, ds);
338                 dsl_dataset_sync(ds, zio, tx);
339         }
340         err = zio_wait(zio);
341
342         while (dstg = txg_list_remove(&dp->dp_sync_tasks, txg)) {
343                 /*
344                  * No more sync tasks should have been added while we
345                  * were syncing.
346                  */
347                 ASSERT(spa_sync_pass(dp->dp_spa) == 1);
348                 dsl_sync_task_group_sync(dstg, tx);
349         }
350         DTRACE_PROBE(pool_sync__3task);
351
352         start = gethrtime();
353         while (dd = txg_list_remove(&dp->dp_dirty_dirs, txg))
354                 dsl_dir_sync(dd, tx);
355         write_time += gethrtime() - start;
356
357         if (spa_sync_pass(dp->dp_spa) == 1)
358                 dsl_pool_scrub_sync(dp, tx);
359
360         start = gethrtime();
361         if (list_head(&mosi->os_dirty_dnodes[txg & TXG_MASK]) != NULL ||
362             list_head(&mosi->os_free_dnodes[txg & TXG_MASK]) != NULL) {
363                 zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
364                 dmu_objset_sync(mosi, zio, tx);
365                 err = zio_wait(zio);
366                 ASSERT(err == 0);
367                 dprintf_bp(&dp->dp_meta_rootbp, "meta objset rootbp is %s", "");
368                 spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
369         }
370         write_time += gethrtime() - start;
371         DTRACE_PROBE2(pool_sync__4io, hrtime_t, write_time,
372             hrtime_t, dp->dp_read_overhead);
373         write_time -= dp->dp_read_overhead;
374
375         dmu_tx_commit(tx);
376
377         data_written = dp->dp_space_towrite[txg & TXG_MASK];
378         dp->dp_space_towrite[txg & TXG_MASK] = 0;
379         ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0);
380
381         /*
382          * If the write limit max has not been explicitly set, set it
383          * to a fraction of available physical memory (default 1/8th).
384          * Note that we must inflate the limit because the spa
385          * inflates write sizes to account for data replication.
386          * Check this each sync phase to catch changing memory size.
387          */
388         if (physmem != old_physmem && zfs_write_limit_shift) {
389                 mutex_enter(&zfs_write_limit_lock);
390                 old_physmem = physmem;
391                 zfs_write_limit_max = ptob(physmem) >> zfs_write_limit_shift;
392                 zfs_write_limit_inflated = MAX(zfs_write_limit_min,
393                     spa_get_asize(dp->dp_spa, zfs_write_limit_max));
394                 mutex_exit(&zfs_write_limit_lock);
395         }
396
397         /*
398          * Attempt to keep the sync time consistent by adjusting the
399          * amount of write traffic allowed into each transaction group.
400          * Weight the throughput calculation towards the current value:
401          *      thru = 3/4 old_thru + 1/4 new_thru
402          */
403         ASSERT(zfs_write_limit_min > 0);
404         if (data_written > zfs_write_limit_min / 8 && write_time > 0) {
405                 uint64_t throughput = (data_written * NANOSEC) / write_time;
406                 if (dp->dp_throughput)
407                         dp->dp_throughput = throughput / 4 +
408                             3 * dp->dp_throughput / 4;
409                 else
410                         dp->dp_throughput = throughput;
411                 dp->dp_write_limit = MIN(zfs_write_limit_inflated,
412                     MAX(zfs_write_limit_min,
413                     dp->dp_throughput * zfs_txg_synctime));
414         }
415 }
416
417 void
418 dsl_pool_zil_clean(dsl_pool_t *dp)
419 {
420         dsl_dataset_t *ds;
421
422         while (ds = list_head(&dp->dp_synced_datasets)) {
423                 list_remove(&dp->dp_synced_datasets, ds);
424                 ASSERT(ds->ds_user_ptr != NULL);
425                 zil_clean(((objset_impl_t *)ds->ds_user_ptr)->os_zil);
426                 dmu_buf_rele(ds->ds_dbuf, ds);
427         }
428 }
429
430 /*
431  * TRUE if the current thread is the tx_sync_thread or if we
432  * are being called from SPA context during pool initialization.
433  */
434 int
435 dsl_pool_sync_context(dsl_pool_t *dp)
436 {
437         return (curthread == dp->dp_tx.tx_sync_thread ||
438             spa_get_dsl(dp->dp_spa) == NULL);
439 }
440
441 uint64_t
442 dsl_pool_adjustedsize(dsl_pool_t *dp, boolean_t netfree)
443 {
444         uint64_t space, resv;
445
446         /*
447          * Reserve about 1.6% (1/64), or at least 32MB, for allocation
448          * efficiency.
449          * XXX The intent log is not accounted for, so it must fit
450          * within this slop.
451          *
452          * If we're trying to assess whether it's OK to do a free,
453          * cut the reservation in half to allow forward progress
454          * (e.g. make it possible to rm(1) files from a full pool).
455          */
456         space = spa_get_dspace(dp->dp_spa);
457         resv = MAX(space >> 6, SPA_MINDEVSIZE >> 1);
458         if (netfree)
459                 resv >>= 1;
460
461         return (space - resv);
462 }
463
464 int
465 dsl_pool_tempreserve_space(dsl_pool_t *dp, uint64_t space, dmu_tx_t *tx)
466 {
467         uint64_t reserved = 0;
468         uint64_t write_limit = (zfs_write_limit_override ?
469             zfs_write_limit_override : dp->dp_write_limit);
470
471         if (zfs_no_write_throttle) {
472                 atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK],
473                     space);
474                 return (0);
475         }
476
477         /*
478          * Check to see if we have exceeded the maximum allowed IO for
479          * this transaction group.  We can do this without locks since
480          * a little slop here is ok.  Note that we do the reserved check
481          * with only half the requested reserve: this is because the
482          * reserve requests are worst-case, and we really don't want to
483          * throttle based off of worst-case estimates.
484          */
485         if (write_limit > 0) {
486                 reserved = dp->dp_space_towrite[tx->tx_txg & TXG_MASK]
487                     + dp->dp_tempreserved[tx->tx_txg & TXG_MASK] / 2;
488
489                 if (reserved && reserved > write_limit)
490                         return (ERESTART);
491         }
492
493         atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK], space);
494
495         /*
496          * If this transaction group is over 7/8ths capacity, delay
497          * the caller 1 clock tick.  This will slow down the "fill"
498          * rate until the sync process can catch up with us.
499          */
500         if (reserved && reserved > (write_limit - (write_limit >> 3)))
501                 txg_delay(dp, tx->tx_txg, 1);
502
503         return (0);
504 }
505
506 void
507 dsl_pool_tempreserve_clear(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
508 {
509         ASSERT(dp->dp_tempreserved[tx->tx_txg & TXG_MASK] >= space);
510         atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK], -space);
511 }
512
513 void
514 dsl_pool_memory_pressure(dsl_pool_t *dp)
515 {
516         uint64_t space_inuse = 0;
517         int i;
518
519         if (dp->dp_write_limit == zfs_write_limit_min)
520                 return;
521
522         for (i = 0; i < TXG_SIZE; i++) {
523                 space_inuse += dp->dp_space_towrite[i];
524                 space_inuse += dp->dp_tempreserved[i];
525         }
526         dp->dp_write_limit = MAX(zfs_write_limit_min,
527             MIN(dp->dp_write_limit, space_inuse / 4));
528 }
529
530 void
531 dsl_pool_willuse_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
532 {
533         if (space > 0) {
534                 mutex_enter(&dp->dp_lock);
535                 dp->dp_space_towrite[tx->tx_txg & TXG_MASK] += space;
536                 mutex_exit(&dp->dp_lock);
537         }
538 }
539
540 /* ARGSUSED */
541 static int
542 upgrade_clones_cb(spa_t *spa, uint64_t dsobj, const char *dsname, void *arg)
543 {
544         dmu_tx_t *tx = arg;
545         dsl_dataset_t *ds, *prev = NULL;
546         int err;
547         dsl_pool_t *dp = spa_get_dsl(spa);
548
549         err = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
550         if (err)
551                 return (err);
552
553         while (ds->ds_phys->ds_prev_snap_obj != 0) {
554                 err = dsl_dataset_hold_obj(dp, ds->ds_phys->ds_prev_snap_obj,
555                     FTAG, &prev);
556                 if (err) {
557                         dsl_dataset_rele(ds, FTAG);
558                         return (err);
559                 }
560
561                 if (prev->ds_phys->ds_next_snap_obj != ds->ds_object)
562                         break;
563                 dsl_dataset_rele(ds, FTAG);
564                 ds = prev;
565                 prev = NULL;
566         }
567
568         if (prev == NULL) {
569                 prev = dp->dp_origin_snap;
570
571                 /*
572                  * The $ORIGIN can't have any data, or the accounting
573                  * will be wrong.
574                  */
575                 ASSERT(prev->ds_phys->ds_bp.blk_birth == 0);
576
577                 /* The origin doesn't get attached to itself */
578                 if (ds->ds_object == prev->ds_object) {
579                         dsl_dataset_rele(ds, FTAG);
580                         return (0);
581                 }
582
583                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
584                 ds->ds_phys->ds_prev_snap_obj = prev->ds_object;
585                 ds->ds_phys->ds_prev_snap_txg = prev->ds_phys->ds_creation_txg;
586
587                 dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
588                 ds->ds_dir->dd_phys->dd_origin_obj = prev->ds_object;
589
590                 dmu_buf_will_dirty(prev->ds_dbuf, tx);
591                 prev->ds_phys->ds_num_children++;
592
593                 if (ds->ds_phys->ds_next_snap_obj == 0) {
594                         ASSERT(ds->ds_prev == NULL);
595                         VERIFY(0 == dsl_dataset_hold_obj(dp,
596                             ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
597                 }
598         }
599
600         ASSERT(ds->ds_dir->dd_phys->dd_origin_obj == prev->ds_object);
601         ASSERT(ds->ds_phys->ds_prev_snap_obj == prev->ds_object);
602
603         if (prev->ds_phys->ds_next_clones_obj == 0) {
604                 prev->ds_phys->ds_next_clones_obj =
605                     zap_create(dp->dp_meta_objset,
606                     DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
607         }
608         VERIFY(0 == zap_add_int(dp->dp_meta_objset,
609             prev->ds_phys->ds_next_clones_obj, ds->ds_object, tx));
610
611         dsl_dataset_rele(ds, FTAG);
612         if (prev != dp->dp_origin_snap)
613                 dsl_dataset_rele(prev, FTAG);
614         return (0);
615 }
616
617 void
618 dsl_pool_upgrade_clones(dsl_pool_t *dp, dmu_tx_t *tx)
619 {
620         ASSERT(dmu_tx_is_syncing(tx));
621         ASSERT(dp->dp_origin_snap != NULL);
622
623         (void) dmu_objset_find_spa(dp->dp_spa, NULL, upgrade_clones_cb,
624             tx, DS_FIND_CHILDREN);
625 }
626
627 void
628 dsl_pool_create_origin(dsl_pool_t *dp, dmu_tx_t *tx)
629 {
630         uint64_t dsobj;
631         dsl_dataset_t *ds;
632
633         ASSERT(dmu_tx_is_syncing(tx));
634         ASSERT(dp->dp_origin_snap == NULL);
635
636         /* create the origin dir, ds, & snap-ds */
637         rw_enter(&dp->dp_config_rwlock, RW_WRITER);
638         dsobj = dsl_dataset_create_sync(dp->dp_root_dir, ORIGIN_DIR_NAME,
639             NULL, 0, kcred, tx);
640         VERIFY(0 == dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
641         dsl_dataset_snapshot_sync(ds, ORIGIN_DIR_NAME, kcred, tx);
642         VERIFY(0 == dsl_dataset_hold_obj(dp, ds->ds_phys->ds_prev_snap_obj,
643             dp, &dp->dp_origin_snap));
644         dsl_dataset_rele(ds, FTAG);
645         rw_exit(&dp->dp_config_rwlock);
646 }
647
648 taskq_t *
649 dsl_pool_vnrele_taskq(dsl_pool_t *dp)
650 {
651         return (dp->dp_vnrele_taskq);
652 }