Initial Linux ZFS GIT Repo
[zfs.git] / zfs / lib / libzpool / dmu.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25
26 #pragma ident   "@(#)dmu.c      1.30    07/11/09 SMI"
27
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dsl_synctask.h>
40 #include <sys/dsl_prop.h>
41 #include <sys/dmu_zfetch.h>
42 #include <sys/zfs_ioctl.h>
43 #include <sys/zap.h>
44 #include <sys/zio_checksum.h>
45 #ifdef _KERNEL
46 #include <sys/vmsystm.h>
47 #endif
48
49 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
50         {       byteswap_uint8_array,   TRUE,   "unallocated"           },
51         {       zap_byteswap,           TRUE,   "object directory"      },
52         {       byteswap_uint64_array,  TRUE,   "object array"          },
53         {       byteswap_uint8_array,   TRUE,   "packed nvlist"         },
54         {       byteswap_uint64_array,  TRUE,   "packed nvlist size"    },
55         {       byteswap_uint64_array,  TRUE,   "bplist"                },
56         {       byteswap_uint64_array,  TRUE,   "bplist header"         },
57         {       byteswap_uint64_array,  TRUE,   "SPA space map header"  },
58         {       byteswap_uint64_array,  TRUE,   "SPA space map"         },
59         {       byteswap_uint64_array,  TRUE,   "ZIL intent log"        },
60         {       dnode_buf_byteswap,     TRUE,   "DMU dnode"             },
61         {       dmu_objset_byteswap,    TRUE,   "DMU objset"            },
62         {       byteswap_uint64_array,  TRUE,   "DSL directory"         },
63         {       zap_byteswap,           TRUE,   "DSL directory child map"},
64         {       zap_byteswap,           TRUE,   "DSL dataset snap map"  },
65         {       zap_byteswap,           TRUE,   "DSL props"             },
66         {       byteswap_uint64_array,  TRUE,   "DSL dataset"           },
67         {       zfs_znode_byteswap,     TRUE,   "ZFS znode"             },
68         {       zfs_oldacl_byteswap,    TRUE,   "ZFS V0 ACL"            },
69         {       byteswap_uint8_array,   FALSE,  "ZFS plain file"        },
70         {       zap_byteswap,           TRUE,   "ZFS directory"         },
71         {       zap_byteswap,           TRUE,   "ZFS master node"       },
72         {       zap_byteswap,           TRUE,   "ZFS delete queue"      },
73         {       byteswap_uint8_array,   FALSE,  "zvol object"           },
74         {       zap_byteswap,           TRUE,   "zvol prop"             },
75         {       byteswap_uint8_array,   FALSE,  "other uint8[]"         },
76         {       byteswap_uint64_array,  FALSE,  "other uint64[]"        },
77         {       zap_byteswap,           TRUE,   "other ZAP"             },
78         {       zap_byteswap,           TRUE,   "persistent error log"  },
79         {       byteswap_uint8_array,   TRUE,   "SPA history"           },
80         {       byteswap_uint64_array,  TRUE,   "SPA history offsets"   },
81         {       zap_byteswap,           TRUE,   "Pool properties"       },
82         {       zap_byteswap,           TRUE,   "DSL permissions"       },
83         {       zfs_acl_byteswap,       TRUE,   "ZFS ACL"               },
84         {       byteswap_uint8_array,   TRUE,   "ZFS SYSACL"            },
85         {       byteswap_uint8_array,   TRUE,   "FUID table"            },
86         {       byteswap_uint64_array,  TRUE,   "FUID table size"       },
87 };
88
89 int
90 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
91     void *tag, dmu_buf_t **dbp)
92 {
93         dnode_t *dn;
94         uint64_t blkid;
95         dmu_buf_impl_t *db;
96         int err;
97
98         err = dnode_hold(os->os, object, FTAG, &dn);
99         if (err)
100                 return (err);
101         blkid = dbuf_whichblock(dn, offset);
102         rw_enter(&dn->dn_struct_rwlock, RW_READER);
103         db = dbuf_hold(dn, blkid, tag);
104         rw_exit(&dn->dn_struct_rwlock);
105         if (db == NULL) {
106                 err = EIO;
107         } else {
108                 err = dbuf_read(db, NULL, DB_RF_CANFAIL);
109                 if (err) {
110                         dbuf_rele(db, tag);
111                         db = NULL;
112                 }
113         }
114
115         dnode_rele(dn, FTAG);
116         *dbp = &db->db;
117         return (err);
118 }
119
120 int
121 dmu_bonus_max(void)
122 {
123         return (DN_MAX_BONUSLEN);
124 }
125
126 int
127 dmu_set_bonus(dmu_buf_t *db, int newsize, dmu_tx_t *tx)
128 {
129         dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
130
131         if (dn->dn_bonus != (dmu_buf_impl_t *)db)
132                 return (EINVAL);
133         if (newsize < 0 || newsize > db->db_size)
134                 return (EINVAL);
135         dnode_setbonuslen(dn, newsize, tx);
136         return (0);
137 }
138
139 /*
140  * returns ENOENT, EIO, or 0.
141  */
142 int
143 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
144 {
145         dnode_t *dn;
146         dmu_buf_impl_t *db;
147         int error;
148
149         error = dnode_hold(os->os, object, FTAG, &dn);
150         if (error)
151                 return (error);
152
153         rw_enter(&dn->dn_struct_rwlock, RW_READER);
154         if (dn->dn_bonus == NULL) {
155                 rw_exit(&dn->dn_struct_rwlock);
156                 rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
157                 if (dn->dn_bonus == NULL)
158                         dbuf_create_bonus(dn);
159         }
160         db = dn->dn_bonus;
161         rw_exit(&dn->dn_struct_rwlock);
162
163         /* as long as the bonus buf is held, the dnode will be held */
164         if (refcount_add(&db->db_holds, tag) == 1)
165                 VERIFY(dnode_add_ref(dn, db));
166
167         dnode_rele(dn, FTAG);
168
169         VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
170
171         *dbp = &db->db;
172         return (0);
173 }
174
175 /*
176  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
177  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
178  * and can induce severe lock contention when writing to several files
179  * whose dnodes are in the same block.
180  */
181 static int
182 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
183     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
184 {
185         dmu_buf_t **dbp;
186         uint64_t blkid, nblks, i;
187         uint32_t flags;
188         int err;
189         zio_t *zio;
190
191         ASSERT(length <= DMU_MAX_ACCESS);
192
193         flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
194         if (length > zfetch_array_rd_sz)
195                 flags |= DB_RF_NOPREFETCH;
196
197         rw_enter(&dn->dn_struct_rwlock, RW_READER);
198         if (dn->dn_datablkshift) {
199                 int blkshift = dn->dn_datablkshift;
200                 nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
201                     P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
202         } else {
203                 if (offset + length > dn->dn_datablksz) {
204                         zfs_panic_recover("zfs: accessing past end of object "
205                             "%llx/%llx (size=%u access=%llu+%llu)",
206                             (longlong_t)dn->dn_objset->
207                             os_dsl_dataset->ds_object,
208                             (longlong_t)dn->dn_object, dn->dn_datablksz,
209                             (longlong_t)offset, (longlong_t)length);
210                         return (EIO);
211                 }
212                 nblks = 1;
213         }
214         dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
215
216         zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
217         blkid = dbuf_whichblock(dn, offset);
218         for (i = 0; i < nblks; i++) {
219                 dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
220                 if (db == NULL) {
221                         rw_exit(&dn->dn_struct_rwlock);
222                         dmu_buf_rele_array(dbp, nblks, tag);
223                         zio_nowait(zio);
224                         return (EIO);
225                 }
226                 /* initiate async i/o */
227                 if (read) {
228                         rw_exit(&dn->dn_struct_rwlock);
229                         (void) dbuf_read(db, zio, flags);
230                         rw_enter(&dn->dn_struct_rwlock, RW_READER);
231                 }
232                 dbp[i] = &db->db;
233         }
234         rw_exit(&dn->dn_struct_rwlock);
235
236         /* wait for async i/o */
237         err = zio_wait(zio);
238         if (err) {
239                 dmu_buf_rele_array(dbp, nblks, tag);
240                 return (err);
241         }
242
243         /* wait for other io to complete */
244         if (read) {
245                 for (i = 0; i < nblks; i++) {
246                         dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
247                         mutex_enter(&db->db_mtx);
248                         while (db->db_state == DB_READ ||
249                             db->db_state == DB_FILL)
250                                 cv_wait(&db->db_changed, &db->db_mtx);
251                         if (db->db_state == DB_UNCACHED)
252                                 err = EIO;
253                         mutex_exit(&db->db_mtx);
254                         if (err) {
255                                 dmu_buf_rele_array(dbp, nblks, tag);
256                                 return (err);
257                         }
258                 }
259         }
260
261         *numbufsp = nblks;
262         *dbpp = dbp;
263         return (0);
264 }
265
266 static int
267 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
268     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
269 {
270         dnode_t *dn;
271         int err;
272
273         err = dnode_hold(os->os, object, FTAG, &dn);
274         if (err)
275                 return (err);
276
277         err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
278             numbufsp, dbpp);
279
280         dnode_rele(dn, FTAG);
281
282         return (err);
283 }
284
285 int
286 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
287     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
288 {
289         dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
290         int err;
291
292         err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
293             numbufsp, dbpp);
294
295         return (err);
296 }
297
298 void
299 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
300 {
301         int i;
302         dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
303
304         if (numbufs == 0)
305                 return;
306
307         for (i = 0; i < numbufs; i++) {
308                 if (dbp[i])
309                         dbuf_rele(dbp[i], tag);
310         }
311
312         kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
313 }
314
315 void
316 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
317 {
318         dnode_t *dn;
319         uint64_t blkid;
320         int nblks, i, err;
321
322         if (zfs_prefetch_disable)
323                 return;
324
325         if (len == 0) {  /* they're interested in the bonus buffer */
326                 dn = os->os->os_meta_dnode;
327
328                 if (object == 0 || object >= DN_MAX_OBJECT)
329                         return;
330
331                 rw_enter(&dn->dn_struct_rwlock, RW_READER);
332                 blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
333                 dbuf_prefetch(dn, blkid);
334                 rw_exit(&dn->dn_struct_rwlock);
335                 return;
336         }
337
338         /*
339          * XXX - Note, if the dnode for the requested object is not
340          * already cached, we will do a *synchronous* read in the
341          * dnode_hold() call.  The same is true for any indirects.
342          */
343         err = dnode_hold(os->os, object, FTAG, &dn);
344         if (err != 0)
345                 return;
346
347         rw_enter(&dn->dn_struct_rwlock, RW_READER);
348         if (dn->dn_datablkshift) {
349                 int blkshift = dn->dn_datablkshift;
350                 nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
351                     P2ALIGN(offset, 1<<blkshift)) >> blkshift;
352         } else {
353                 nblks = (offset < dn->dn_datablksz);
354         }
355
356         if (nblks != 0) {
357                 blkid = dbuf_whichblock(dn, offset);
358                 for (i = 0; i < nblks; i++)
359                         dbuf_prefetch(dn, blkid+i);
360         }
361
362         rw_exit(&dn->dn_struct_rwlock);
363
364         dnode_rele(dn, FTAG);
365 }
366
367 int
368 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
369     uint64_t size, dmu_tx_t *tx)
370 {
371         dnode_t *dn;
372         int err = dnode_hold(os->os, object, FTAG, &dn);
373         if (err)
374                 return (err);
375         ASSERT(offset < UINT64_MAX);
376         ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
377         dnode_free_range(dn, offset, size, tx);
378         dnode_rele(dn, FTAG);
379         return (0);
380 }
381
382 int
383 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
384     void *buf)
385 {
386         dnode_t *dn;
387         dmu_buf_t **dbp;
388         int numbufs, i, err;
389
390         err = dnode_hold(os->os, object, FTAG, &dn);
391         if (err)
392                 return (err);
393
394         /*
395          * Deal with odd block sizes, where there can't be data past the first
396          * block.  If we ever do the tail block optimization, we will need to
397          * handle that here as well.
398          */
399         if (dn->dn_datablkshift == 0) {
400                 int newsz = offset > dn->dn_datablksz ? 0 :
401                     MIN(size, dn->dn_datablksz - offset);
402                 bzero((char *)buf + newsz, size - newsz);
403                 size = newsz;
404         }
405
406         while (size > 0) {
407                 uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
408
409                 /*
410                  * NB: we could do this block-at-a-time, but it's nice
411                  * to be reading in parallel.
412                  */
413                 err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
414                     TRUE, FTAG, &numbufs, &dbp);
415                 if (err)
416                         break;
417
418                 for (i = 0; i < numbufs; i++) {
419                         int tocpy;
420                         int bufoff;
421                         dmu_buf_t *db = dbp[i];
422
423                         ASSERT(size > 0);
424
425                         bufoff = offset - db->db_offset;
426                         tocpy = (int)MIN(db->db_size - bufoff, size);
427
428                         bcopy((char *)db->db_data + bufoff, buf, tocpy);
429
430                         offset += tocpy;
431                         size -= tocpy;
432                         buf = (char *)buf + tocpy;
433                 }
434                 dmu_buf_rele_array(dbp, numbufs, FTAG);
435         }
436         dnode_rele(dn, FTAG);
437         return (err);
438 }
439
440 void
441 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
442     const void *buf, dmu_tx_t *tx)
443 {
444         dmu_buf_t **dbp;
445         int numbufs, i;
446
447         if (size == 0)
448                 return;
449
450         VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
451             FALSE, FTAG, &numbufs, &dbp));
452
453         for (i = 0; i < numbufs; i++) {
454                 int tocpy;
455                 int bufoff;
456                 dmu_buf_t *db = dbp[i];
457
458                 ASSERT(size > 0);
459
460                 bufoff = offset - db->db_offset;
461                 tocpy = (int)MIN(db->db_size - bufoff, size);
462
463                 ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
464
465                 if (tocpy == db->db_size)
466                         dmu_buf_will_fill(db, tx);
467                 else
468                         dmu_buf_will_dirty(db, tx);
469
470                 bcopy(buf, (char *)db->db_data + bufoff, tocpy);
471
472                 if (tocpy == db->db_size)
473                         dmu_buf_fill_done(db, tx);
474
475                 offset += tocpy;
476                 size -= tocpy;
477                 buf = (char *)buf + tocpy;
478         }
479         dmu_buf_rele_array(dbp, numbufs, FTAG);
480 }
481
482 #ifdef _KERNEL
483 int
484 dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
485 {
486         dmu_buf_t **dbp;
487         int numbufs, i, err;
488
489         /*
490          * NB: we could do this block-at-a-time, but it's nice
491          * to be reading in parallel.
492          */
493         err = dmu_buf_hold_array(os, object, uio->uio_loffset, size, TRUE, FTAG,
494             &numbufs, &dbp);
495         if (err)
496                 return (err);
497
498         for (i = 0; i < numbufs; i++) {
499                 int tocpy;
500                 int bufoff;
501                 dmu_buf_t *db = dbp[i];
502
503                 ASSERT(size > 0);
504
505                 bufoff = uio->uio_loffset - db->db_offset;
506                 tocpy = (int)MIN(db->db_size - bufoff, size);
507
508                 err = uiomove((char *)db->db_data + bufoff, tocpy,
509                     UIO_READ, uio);
510                 if (err)
511                         break;
512
513                 size -= tocpy;
514         }
515         dmu_buf_rele_array(dbp, numbufs, FTAG);
516
517         return (err);
518 }
519
520 int
521 dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
522     dmu_tx_t *tx)
523 {
524         dmu_buf_t **dbp;
525         int numbufs, i;
526         int err = 0;
527
528         if (size == 0)
529                 return (0);
530
531         err = dmu_buf_hold_array(os, object, uio->uio_loffset, size,
532             FALSE, FTAG, &numbufs, &dbp);
533         if (err)
534                 return (err);
535
536         for (i = 0; i < numbufs; i++) {
537                 int tocpy;
538                 int bufoff;
539                 dmu_buf_t *db = dbp[i];
540
541                 ASSERT(size > 0);
542
543                 bufoff = uio->uio_loffset - db->db_offset;
544                 tocpy = (int)MIN(db->db_size - bufoff, size);
545
546                 ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
547
548                 if (tocpy == db->db_size)
549                         dmu_buf_will_fill(db, tx);
550                 else
551                         dmu_buf_will_dirty(db, tx);
552
553                 /*
554                  * XXX uiomove could block forever (eg. nfs-backed
555                  * pages).  There needs to be a uiolockdown() function
556                  * to lock the pages in memory, so that uiomove won't
557                  * block.
558                  */
559                 err = uiomove((char *)db->db_data + bufoff, tocpy,
560                     UIO_WRITE, uio);
561
562                 if (tocpy == db->db_size)
563                         dmu_buf_fill_done(db, tx);
564
565                 if (err)
566                         break;
567
568                 size -= tocpy;
569         }
570         dmu_buf_rele_array(dbp, numbufs, FTAG);
571         return (err);
572 }
573
574 int
575 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
576     page_t *pp, dmu_tx_t *tx)
577 {
578         dmu_buf_t **dbp;
579         int numbufs, i;
580         int err;
581
582         if (size == 0)
583                 return (0);
584
585         err = dmu_buf_hold_array(os, object, offset, size,
586             FALSE, FTAG, &numbufs, &dbp);
587         if (err)
588                 return (err);
589
590         for (i = 0; i < numbufs; i++) {
591                 int tocpy, copied, thiscpy;
592                 int bufoff;
593                 dmu_buf_t *db = dbp[i];
594                 caddr_t va;
595
596                 ASSERT(size > 0);
597                 ASSERT3U(db->db_size, >=, PAGESIZE);
598
599                 bufoff = offset - db->db_offset;
600                 tocpy = (int)MIN(db->db_size - bufoff, size);
601
602                 ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
603
604                 if (tocpy == db->db_size)
605                         dmu_buf_will_fill(db, tx);
606                 else
607                         dmu_buf_will_dirty(db, tx);
608
609                 for (copied = 0; copied < tocpy; copied += PAGESIZE) {
610                         ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
611                         thiscpy = MIN(PAGESIZE, tocpy - copied);
612                         va = ppmapin(pp, PROT_READ, (caddr_t)-1);
613                         bcopy(va, (char *)db->db_data + bufoff, thiscpy);
614                         ppmapout(va);
615                         pp = pp->p_next;
616                         bufoff += PAGESIZE;
617                 }
618
619                 if (tocpy == db->db_size)
620                         dmu_buf_fill_done(db, tx);
621
622                 if (err)
623                         break;
624
625                 offset += tocpy;
626                 size -= tocpy;
627         }
628         dmu_buf_rele_array(dbp, numbufs, FTAG);
629         return (err);
630 }
631 #endif
632
633 typedef struct {
634         dbuf_dirty_record_t     *dr;
635         dmu_sync_cb_t           *done;
636         void                    *arg;
637 } dmu_sync_arg_t;
638
639 /* ARGSUSED */
640 static void
641 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
642 {
643         dmu_sync_arg_t *in = varg;
644         dbuf_dirty_record_t *dr = in->dr;
645         dmu_buf_impl_t *db = dr->dr_dbuf;
646         dmu_sync_cb_t *done = in->done;
647
648         if (!BP_IS_HOLE(zio->io_bp)) {
649                 zio->io_bp->blk_fill = 1;
650                 BP_SET_TYPE(zio->io_bp, db->db_dnode->dn_type);
651                 BP_SET_LEVEL(zio->io_bp, 0);
652         }
653
654         mutex_enter(&db->db_mtx);
655         ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
656         dr->dt.dl.dr_overridden_by = *zio->io_bp; /* structure assignment */
657         dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
658         cv_broadcast(&db->db_changed);
659         mutex_exit(&db->db_mtx);
660
661         if (done)
662                 done(&(db->db), in->arg);
663
664         kmem_free(in, sizeof (dmu_sync_arg_t));
665 }
666
667 /*
668  * Intent log support: sync the block associated with db to disk.
669  * N.B. and XXX: the caller is responsible for making sure that the
670  * data isn't changing while dmu_sync() is writing it.
671  *
672  * Return values:
673  *
674  *      EEXIST: this txg has already been synced, so there's nothing to to.
675  *              The caller should not log the write.
676  *
677  *      ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
678  *              The caller should not log the write.
679  *
680  *      EALREADY: this block is already in the process of being synced.
681  *              The caller should track its progress (somehow).
682  *
683  *      EINPROGRESS: the IO has been initiated.
684  *              The caller should log this blkptr in the callback.
685  *
686  *      0: completed.  Sets *bp to the blkptr just written.
687  *              The caller should log this blkptr immediately.
688  */
689 int
690 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
691     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
692 {
693         dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
694         objset_impl_t *os = db->db_objset;
695         dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
696         tx_state_t *tx = &dp->dp_tx;
697         dbuf_dirty_record_t *dr;
698         dmu_sync_arg_t *in;
699         zbookmark_t zb;
700         zio_t *zio;
701         int zio_flags;
702         int err;
703
704         ASSERT(BP_IS_HOLE(bp));
705         ASSERT(txg != 0);
706
707
708         dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
709             txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
710
711         /*
712          * XXX - would be nice if we could do this without suspending...
713          */
714         txg_suspend(dp);
715
716         /*
717          * If this txg already synced, there's nothing to do.
718          */
719         if (txg <= tx->tx_synced_txg) {
720                 txg_resume(dp);
721                 /*
722                  * If we're running ziltest, we need the blkptr regardless.
723                  */
724                 if (txg > spa_freeze_txg(dp->dp_spa)) {
725                         /* if db_blkptr == NULL, this was an empty write */
726                         if (db->db_blkptr)
727                                 *bp = *db->db_blkptr; /* structure assignment */
728                         return (0);
729                 }
730                 return (EEXIST);
731         }
732
733         mutex_enter(&db->db_mtx);
734
735         if (txg == tx->tx_syncing_txg) {
736                 while (db->db_data_pending) {
737                         /*
738                          * IO is in-progress.  Wait for it to finish.
739                          * XXX - would be nice to be able to somehow "attach"
740                          * this zio to the parent zio passed in.
741                          */
742                         cv_wait(&db->db_changed, &db->db_mtx);
743                         if (!db->db_data_pending &&
744                             db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
745                                 /*
746                                  * IO was compressed away
747                                  */
748                                 *bp = *db->db_blkptr; /* structure assignment */
749                                 mutex_exit(&db->db_mtx);
750                                 txg_resume(dp);
751                                 return (0);
752                         }
753                         ASSERT(db->db_data_pending ||
754                             (db->db_blkptr && db->db_blkptr->blk_birth == txg));
755                 }
756
757                 if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
758                         /*
759                          * IO is already completed.
760                          */
761                         *bp = *db->db_blkptr; /* structure assignment */
762                         mutex_exit(&db->db_mtx);
763                         txg_resume(dp);
764                         return (0);
765                 }
766         }
767
768         dr = db->db_last_dirty;
769         while (dr && dr->dr_txg > txg)
770                 dr = dr->dr_next;
771         if (dr == NULL || dr->dr_txg < txg) {
772                 /*
773                  * This dbuf isn't dirty, must have been free_range'd.
774                  * There's no need to log writes to freed blocks, so we're done.
775                  */
776                 mutex_exit(&db->db_mtx);
777                 txg_resume(dp);
778                 return (ENOENT);
779         }
780
781         ASSERT(dr->dr_txg == txg);
782         if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
783                 /*
784                  * We have already issued a sync write for this buffer.
785                  */
786                 mutex_exit(&db->db_mtx);
787                 txg_resume(dp);
788                 return (EALREADY);
789         } else if (dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
790                 /*
791                  * This buffer has already been synced.  It could not
792                  * have been dirtied since, or we would have cleared the state.
793                  */
794                 *bp = dr->dt.dl.dr_overridden_by; /* structure assignment */
795                 mutex_exit(&db->db_mtx);
796                 txg_resume(dp);
797                 return (0);
798         }
799
800         dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
801         in = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
802         in->dr = dr;
803         in->done = done;
804         in->arg = arg;
805         mutex_exit(&db->db_mtx);
806         txg_resume(dp);
807
808         zb.zb_objset = os->os_dsl_dataset->ds_object;
809         zb.zb_object = db->db.db_object;
810         zb.zb_level = db->db_level;
811         zb.zb_blkid = db->db_blkid;
812         zio_flags = ZIO_FLAG_MUSTSUCCEED;
813         if (dmu_ot[db->db_dnode->dn_type].ot_metadata || zb.zb_level != 0)
814                 zio_flags |= ZIO_FLAG_METADATA;
815         zio = arc_write(pio, os->os_spa,
816             zio_checksum_select(db->db_dnode->dn_checksum, os->os_checksum),
817             zio_compress_select(db->db_dnode->dn_compress, os->os_compress),
818             dmu_get_replication_level(os, &zb, db->db_dnode->dn_type),
819             txg, bp, dr->dt.dl.dr_data, NULL, dmu_sync_done, in,
820             ZIO_PRIORITY_SYNC_WRITE, zio_flags, &zb);
821
822         if (pio) {
823                 zio_nowait(zio);
824                 err = EINPROGRESS;
825         } else {
826                 err = zio_wait(zio);
827                 ASSERT(err == 0);
828         }
829         return (err);
830 }
831
832 int
833 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
834         dmu_tx_t *tx)
835 {
836         dnode_t *dn;
837         int err;
838
839         err = dnode_hold(os->os, object, FTAG, &dn);
840         if (err)
841                 return (err);
842         err = dnode_set_blksz(dn, size, ibs, tx);
843         dnode_rele(dn, FTAG);
844         return (err);
845 }
846
847 void
848 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
849         dmu_tx_t *tx)
850 {
851         dnode_t *dn;
852
853         /* XXX assumes dnode_hold will not get an i/o error */
854         (void) dnode_hold(os->os, object, FTAG, &dn);
855         ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
856         dn->dn_checksum = checksum;
857         dnode_setdirty(dn, tx);
858         dnode_rele(dn, FTAG);
859 }
860
861 void
862 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
863         dmu_tx_t *tx)
864 {
865         dnode_t *dn;
866
867         /* XXX assumes dnode_hold will not get an i/o error */
868         (void) dnode_hold(os->os, object, FTAG, &dn);
869         ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
870         dn->dn_compress = compress;
871         dnode_setdirty(dn, tx);
872         dnode_rele(dn, FTAG);
873 }
874
875 int
876 dmu_get_replication_level(objset_impl_t *os,
877     zbookmark_t *zb, dmu_object_type_t ot)
878 {
879         int ncopies = os->os_copies;
880
881         /* If it's the mos, it should have max copies set. */
882         ASSERT(zb->zb_objset != 0 ||
883             ncopies == spa_max_replication(os->os_spa));
884
885         if (dmu_ot[ot].ot_metadata || zb->zb_level != 0)
886                 ncopies++;
887         return (MIN(ncopies, spa_max_replication(os->os_spa)));
888 }
889
890 int
891 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
892 {
893         dnode_t *dn;
894         int i, err;
895
896         err = dnode_hold(os->os, object, FTAG, &dn);
897         if (err)
898                 return (err);
899         /*
900          * Sync any current changes before
901          * we go trundling through the block pointers.
902          */
903         for (i = 0; i < TXG_SIZE; i++) {
904                 if (list_link_active(&dn->dn_dirty_link[i]))
905                         break;
906         }
907         if (i != TXG_SIZE) {
908                 dnode_rele(dn, FTAG);
909                 txg_wait_synced(dmu_objset_pool(os), 0);
910                 err = dnode_hold(os->os, object, FTAG, &dn);
911                 if (err)
912                         return (err);
913         }
914
915         err = dnode_next_offset(dn, hole, off, 1, 1, 0);
916         dnode_rele(dn, FTAG);
917
918         return (err);
919 }
920
921 void
922 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
923 {
924         rw_enter(&dn->dn_struct_rwlock, RW_READER);
925         mutex_enter(&dn->dn_mtx);
926
927         doi->doi_data_block_size = dn->dn_datablksz;
928         doi->doi_metadata_block_size = dn->dn_indblkshift ?
929             1ULL << dn->dn_indblkshift : 0;
930         doi->doi_indirection = dn->dn_nlevels;
931         doi->doi_checksum = dn->dn_checksum;
932         doi->doi_compress = dn->dn_compress;
933         doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
934             SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
935         doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
936         doi->doi_type = dn->dn_type;
937         doi->doi_bonus_size = dn->dn_bonuslen;
938         doi->doi_bonus_type = dn->dn_bonustype;
939
940         mutex_exit(&dn->dn_mtx);
941         rw_exit(&dn->dn_struct_rwlock);
942 }
943
944 /*
945  * Get information on a DMU object.
946  * If doi is NULL, just indicates whether the object exists.
947  */
948 int
949 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
950 {
951         dnode_t *dn;
952         int err = dnode_hold(os->os, object, FTAG, &dn);
953
954         if (err)
955                 return (err);
956
957         if (doi != NULL)
958                 dmu_object_info_from_dnode(dn, doi);
959
960         dnode_rele(dn, FTAG);
961         return (0);
962 }
963
964 /*
965  * As above, but faster; can be used when you have a held dbuf in hand.
966  */
967 void
968 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
969 {
970         dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
971 }
972
973 /*
974  * Faster still when you only care about the size.
975  * This is specifically optimized for zfs_getattr().
976  */
977 void
978 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
979 {
980         dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
981
982         *blksize = dn->dn_datablksz;
983         /* add 1 for dnode space */
984         *nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
985             SPA_MINBLOCKSHIFT) + 1;
986 }
987
988 void
989 byteswap_uint64_array(void *vbuf, size_t size)
990 {
991         uint64_t *buf = vbuf;
992         size_t count = size >> 3;
993         int i;
994
995         ASSERT((size & 7) == 0);
996
997         for (i = 0; i < count; i++)
998                 buf[i] = BSWAP_64(buf[i]);
999 }
1000
1001 void
1002 byteswap_uint32_array(void *vbuf, size_t size)
1003 {
1004         uint32_t *buf = vbuf;
1005         size_t count = size >> 2;
1006         int i;
1007
1008         ASSERT((size & 3) == 0);
1009
1010         for (i = 0; i < count; i++)
1011                 buf[i] = BSWAP_32(buf[i]);
1012 }
1013
1014 void
1015 byteswap_uint16_array(void *vbuf, size_t size)
1016 {
1017         uint16_t *buf = vbuf;
1018         size_t count = size >> 1;
1019         int i;
1020
1021         ASSERT((size & 1) == 0);
1022
1023         for (i = 0; i < count; i++)
1024                 buf[i] = BSWAP_16(buf[i]);
1025 }
1026
1027 /* ARGSUSED */
1028 void
1029 byteswap_uint8_array(void *vbuf, size_t size)
1030 {
1031 }
1032
1033 void
1034 dmu_init(void)
1035 {
1036         dbuf_init();
1037         dnode_init();
1038         arc_init();
1039         l2arc_init();
1040 }
1041
1042 void
1043 dmu_fini(void)
1044 {
1045         arc_fini();
1046         dnode_fini();
1047         dbuf_fini();
1048         l2arc_fini();
1049 }