Move the world out of /zfs/ and seperate out module build tree
[zfs.git] / module / zfs / dmu.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25
26 #include <sys/dmu.h>
27 #include <sys/dmu_impl.h>
28 #include <sys/dmu_tx.h>
29 #include <sys/dbuf.h>
30 #include <sys/dnode.h>
31 #include <sys/zfs_context.h>
32 #include <sys/dmu_objset.h>
33 #include <sys/dmu_traverse.h>
34 #include <sys/dsl_dataset.h>
35 #include <sys/dsl_dir.h>
36 #include <sys/dsl_pool.h>
37 #include <sys/dsl_synctask.h>
38 #include <sys/dsl_prop.h>
39 #include <sys/dmu_zfetch.h>
40 #include <sys/zfs_ioctl.h>
41 #include <sys/zap.h>
42 #include <sys/zio_checksum.h>
43 #ifdef _KERNEL
44 #include <sys/vmsystm.h>
45 #include <sys/zfs_znode.h>
46 #endif
47
48 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
49         {       byteswap_uint8_array,   TRUE,   "unallocated"           },
50         {       zap_byteswap,           TRUE,   "object directory"      },
51         {       byteswap_uint64_array,  TRUE,   "object array"          },
52         {       byteswap_uint8_array,   TRUE,   "packed nvlist"         },
53         {       byteswap_uint64_array,  TRUE,   "packed nvlist size"    },
54         {       byteswap_uint64_array,  TRUE,   "bplist"                },
55         {       byteswap_uint64_array,  TRUE,   "bplist header"         },
56         {       byteswap_uint64_array,  TRUE,   "SPA space map header"  },
57         {       byteswap_uint64_array,  TRUE,   "SPA space map"         },
58         {       byteswap_uint64_array,  TRUE,   "ZIL intent log"        },
59         {       dnode_buf_byteswap,     TRUE,   "DMU dnode"             },
60         {       dmu_objset_byteswap,    TRUE,   "DMU objset"            },
61         {       byteswap_uint64_array,  TRUE,   "DSL directory"         },
62         {       zap_byteswap,           TRUE,   "DSL directory child map"},
63         {       zap_byteswap,           TRUE,   "DSL dataset snap map"  },
64         {       zap_byteswap,           TRUE,   "DSL props"             },
65         {       byteswap_uint64_array,  TRUE,   "DSL dataset"           },
66         {       zfs_znode_byteswap,     TRUE,   "ZFS znode"             },
67         {       zfs_oldacl_byteswap,    TRUE,   "ZFS V0 ACL"            },
68         {       byteswap_uint8_array,   FALSE,  "ZFS plain file"        },
69         {       zap_byteswap,           TRUE,   "ZFS directory"         },
70         {       zap_byteswap,           TRUE,   "ZFS master node"       },
71         {       zap_byteswap,           TRUE,   "ZFS delete queue"      },
72         {       byteswap_uint8_array,   FALSE,  "zvol object"           },
73         {       zap_byteswap,           TRUE,   "zvol prop"             },
74         {       byteswap_uint8_array,   FALSE,  "other uint8[]"         },
75         {       byteswap_uint64_array,  FALSE,  "other uint64[]"        },
76         {       zap_byteswap,           TRUE,   "other ZAP"             },
77         {       zap_byteswap,           TRUE,   "persistent error log"  },
78         {       byteswap_uint8_array,   TRUE,   "SPA history"           },
79         {       byteswap_uint64_array,  TRUE,   "SPA history offsets"   },
80         {       zap_byteswap,           TRUE,   "Pool properties"       },
81         {       zap_byteswap,           TRUE,   "DSL permissions"       },
82         {       zfs_acl_byteswap,       TRUE,   "ZFS ACL"               },
83         {       byteswap_uint8_array,   TRUE,   "ZFS SYSACL"            },
84         {       byteswap_uint8_array,   TRUE,   "FUID table"            },
85         {       byteswap_uint64_array,  TRUE,   "FUID table size"       },
86         {       zap_byteswap,           TRUE,   "DSL dataset next clones"},
87         {       zap_byteswap,           TRUE,   "scrub work queue"      },
88 };
89
90 int
91 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
92     void *tag, dmu_buf_t **dbp)
93 {
94         dnode_t *dn;
95         uint64_t blkid;
96         dmu_buf_impl_t *db;
97         int err;
98
99         err = dnode_hold(os->os, object, FTAG, &dn);
100         if (err)
101                 return (err);
102         blkid = dbuf_whichblock(dn, offset);
103         rw_enter(&dn->dn_struct_rwlock, RW_READER);
104         db = dbuf_hold(dn, blkid, tag);
105         rw_exit(&dn->dn_struct_rwlock);
106         if (db == NULL) {
107                 err = EIO;
108         } else {
109                 err = dbuf_read(db, NULL, DB_RF_CANFAIL);
110                 if (err) {
111                         dbuf_rele(db, tag);
112                         db = NULL;
113                 }
114         }
115
116         dnode_rele(dn, FTAG);
117         *dbp = &db->db;
118         return (err);
119 }
120
121 int
122 dmu_bonus_max(void)
123 {
124         return (DN_MAX_BONUSLEN);
125 }
126
127 int
128 dmu_set_bonus(dmu_buf_t *db, int newsize, dmu_tx_t *tx)
129 {
130         dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
131
132         if (dn->dn_bonus != (dmu_buf_impl_t *)db)
133                 return (EINVAL);
134         if (newsize < 0 || newsize > db->db_size)
135                 return (EINVAL);
136         dnode_setbonuslen(dn, newsize, tx);
137         return (0);
138 }
139
140 /*
141  * returns ENOENT, EIO, or 0.
142  */
143 int
144 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
145 {
146         dnode_t *dn;
147         dmu_buf_impl_t *db;
148         int error;
149
150         error = dnode_hold(os->os, object, FTAG, &dn);
151         if (error)
152                 return (error);
153
154         rw_enter(&dn->dn_struct_rwlock, RW_READER);
155         if (dn->dn_bonus == NULL) {
156                 rw_exit(&dn->dn_struct_rwlock);
157                 rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
158                 if (dn->dn_bonus == NULL)
159                         dbuf_create_bonus(dn);
160         }
161         db = dn->dn_bonus;
162         rw_exit(&dn->dn_struct_rwlock);
163
164         /* as long as the bonus buf is held, the dnode will be held */
165         if (refcount_add(&db->db_holds, tag) == 1)
166                 VERIFY(dnode_add_ref(dn, db));
167
168         dnode_rele(dn, FTAG);
169
170         VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
171
172         *dbp = &db->db;
173         return (0);
174 }
175
176 /*
177  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
178  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
179  * and can induce severe lock contention when writing to several files
180  * whose dnodes are in the same block.
181  */
182 static int
183 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
184     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
185 {
186         dsl_pool_t *dp = NULL;
187         dmu_buf_t **dbp;
188         uint64_t blkid, nblks, i;
189         uint32_t flags;
190         int err;
191         zio_t *zio;
192         hrtime_t start;
193
194         ASSERT(length <= DMU_MAX_ACCESS);
195
196         flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
197         if (length > zfetch_array_rd_sz)
198                 flags |= DB_RF_NOPREFETCH;
199
200         rw_enter(&dn->dn_struct_rwlock, RW_READER);
201         if (dn->dn_datablkshift) {
202                 int blkshift = dn->dn_datablkshift;
203                 nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
204                     P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
205         } else {
206                 if (offset + length > dn->dn_datablksz) {
207                         zfs_panic_recover("zfs: accessing past end of object "
208                             "%llx/%llx (size=%u access=%llu+%llu)",
209                             (longlong_t)dn->dn_objset->
210                             os_dsl_dataset->ds_object,
211                             (longlong_t)dn->dn_object, dn->dn_datablksz,
212                             (longlong_t)offset, (longlong_t)length);
213                         return (EIO);
214                 }
215                 nblks = 1;
216         }
217         dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
218
219         if (dn->dn_objset->os_dsl_dataset)
220                 dp = dn->dn_objset->os_dsl_dataset->ds_dir->dd_pool;
221         if (dp && dsl_pool_sync_context(dp))
222                 start = gethrtime();
223         zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
224         blkid = dbuf_whichblock(dn, offset);
225         for (i = 0; i < nblks; i++) {
226                 dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
227                 if (db == NULL) {
228                         rw_exit(&dn->dn_struct_rwlock);
229                         dmu_buf_rele_array(dbp, nblks, tag);
230                         zio_nowait(zio);
231                         return (EIO);
232                 }
233                 /* initiate async i/o */
234                 if (read) {
235                         rw_exit(&dn->dn_struct_rwlock);
236                         (void) dbuf_read(db, zio, flags);
237                         rw_enter(&dn->dn_struct_rwlock, RW_READER);
238                 }
239                 dbp[i] = &db->db;
240         }
241         rw_exit(&dn->dn_struct_rwlock);
242
243         /* wait for async i/o */
244         err = zio_wait(zio);
245         /* track read overhead when we are in sync context */
246         if (dp && dsl_pool_sync_context(dp))
247                 dp->dp_read_overhead += gethrtime() - start;
248         if (err) {
249                 dmu_buf_rele_array(dbp, nblks, tag);
250                 return (err);
251         }
252
253         /* wait for other io to complete */
254         if (read) {
255                 for (i = 0; i < nblks; i++) {
256                         dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
257                         mutex_enter(&db->db_mtx);
258                         while (db->db_state == DB_READ ||
259                             db->db_state == DB_FILL)
260                                 cv_wait(&db->db_changed, &db->db_mtx);
261                         if (db->db_state == DB_UNCACHED)
262                                 err = EIO;
263                         mutex_exit(&db->db_mtx);
264                         if (err) {
265                                 dmu_buf_rele_array(dbp, nblks, tag);
266                                 return (err);
267                         }
268                 }
269         }
270
271         *numbufsp = nblks;
272         *dbpp = dbp;
273         return (0);
274 }
275
276 static int
277 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
278     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
279 {
280         dnode_t *dn;
281         int err;
282
283         err = dnode_hold(os->os, object, FTAG, &dn);
284         if (err)
285                 return (err);
286
287         err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
288             numbufsp, dbpp);
289
290         dnode_rele(dn, FTAG);
291
292         return (err);
293 }
294
295 int
296 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
297     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
298 {
299         dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
300         int err;
301
302         err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
303             numbufsp, dbpp);
304
305         return (err);
306 }
307
308 void
309 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
310 {
311         int i;
312         dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
313
314         if (numbufs == 0)
315                 return;
316
317         for (i = 0; i < numbufs; i++) {
318                 if (dbp[i])
319                         dbuf_rele(dbp[i], tag);
320         }
321
322         kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
323 }
324
325 void
326 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
327 {
328         dnode_t *dn;
329         uint64_t blkid;
330         int nblks, i, err;
331
332         if (zfs_prefetch_disable)
333                 return;
334
335         if (len == 0) {  /* they're interested in the bonus buffer */
336                 dn = os->os->os_meta_dnode;
337
338                 if (object == 0 || object >= DN_MAX_OBJECT)
339                         return;
340
341                 rw_enter(&dn->dn_struct_rwlock, RW_READER);
342                 blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
343                 dbuf_prefetch(dn, blkid);
344                 rw_exit(&dn->dn_struct_rwlock);
345                 return;
346         }
347
348         /*
349          * XXX - Note, if the dnode for the requested object is not
350          * already cached, we will do a *synchronous* read in the
351          * dnode_hold() call.  The same is true for any indirects.
352          */
353         err = dnode_hold(os->os, object, FTAG, &dn);
354         if (err != 0)
355                 return;
356
357         rw_enter(&dn->dn_struct_rwlock, RW_READER);
358         if (dn->dn_datablkshift) {
359                 int blkshift = dn->dn_datablkshift;
360                 nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
361                     P2ALIGN(offset, 1<<blkshift)) >> blkshift;
362         } else {
363                 nblks = (offset < dn->dn_datablksz);
364         }
365
366         if (nblks != 0) {
367                 blkid = dbuf_whichblock(dn, offset);
368                 for (i = 0; i < nblks; i++)
369                         dbuf_prefetch(dn, blkid+i);
370         }
371
372         rw_exit(&dn->dn_struct_rwlock);
373
374         dnode_rele(dn, FTAG);
375 }
376
377 static int
378 get_next_chunk(dnode_t *dn, uint64_t *offset, uint64_t limit)
379 {
380         uint64_t len = *offset - limit;
381         uint64_t chunk_len = dn->dn_datablksz * DMU_MAX_DELETEBLKCNT;
382         uint64_t subchunk =
383             dn->dn_datablksz * EPB(dn->dn_indblkshift, SPA_BLKPTRSHIFT);
384
385         ASSERT(limit <= *offset);
386
387         if (len <= chunk_len) {
388                 *offset = limit;
389                 return (0);
390         }
391
392         ASSERT(ISP2(subchunk));
393
394         while (*offset > limit) {
395                 uint64_t initial_offset = P2ROUNDUP(*offset, subchunk);
396                 uint64_t delta;
397                 int err;
398
399                 /* skip over allocated data */
400                 err = dnode_next_offset(dn,
401                     DNODE_FIND_HOLE|DNODE_FIND_BACKWARDS, offset, 1, 1, 0);
402                 if (err == ESRCH)
403                         *offset = limit;
404                 else if (err)
405                         return (err);
406
407                 ASSERT3U(*offset, <=, initial_offset);
408                 *offset = P2ALIGN(*offset, subchunk);
409                 delta = initial_offset - *offset;
410                 if (delta >= chunk_len) {
411                         *offset += delta - chunk_len;
412                         return (0);
413                 }
414                 chunk_len -= delta;
415
416                 /* skip over unallocated data */
417                 err = dnode_next_offset(dn,
418                     DNODE_FIND_BACKWARDS, offset, 1, 1, 0);
419                 if (err == ESRCH)
420                         *offset = limit;
421                 else if (err)
422                         return (err);
423
424                 if (*offset < limit)
425                         *offset = limit;
426                 ASSERT3U(*offset, <, initial_offset);
427         }
428         return (0);
429 }
430
431 static int
432 dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
433     uint64_t length, boolean_t free_dnode)
434 {
435         dmu_tx_t *tx;
436         uint64_t object_size, start, end, len;
437         boolean_t trunc = (length == DMU_OBJECT_END);
438         int align, err;
439
440         align = 1 << dn->dn_datablkshift;
441         ASSERT(align > 0);
442         object_size = align == 1 ? dn->dn_datablksz :
443             (dn->dn_maxblkid + 1) << dn->dn_datablkshift;
444
445         if (trunc || (end = offset + length) > object_size)
446                 end = object_size;
447         if (end <= offset)
448                 return (0);
449         length = end - offset;
450
451         while (length) {
452                 start = end;
453                 err = get_next_chunk(dn, &start, offset);
454                 if (err)
455                         return (err);
456                 len = trunc ? DMU_OBJECT_END : end - start;
457
458                 tx = dmu_tx_create(os);
459                 dmu_tx_hold_free(tx, dn->dn_object, start, len);
460                 err = dmu_tx_assign(tx, TXG_WAIT);
461                 if (err) {
462                         dmu_tx_abort(tx);
463                         return (err);
464                 }
465
466                 dnode_free_range(dn, start, trunc ? -1 : len, tx);
467
468                 if (start == 0 && free_dnode) {
469                         ASSERT(trunc);
470                         dnode_free(dn, tx);
471                 }
472
473                 length -= end - start;
474
475                 dmu_tx_commit(tx);
476                 end = start;
477         }
478         return (0);
479 }
480
481 int
482 dmu_free_long_range(objset_t *os, uint64_t object,
483     uint64_t offset, uint64_t length)
484 {
485         dnode_t *dn;
486         int err;
487
488         err = dnode_hold(os->os, object, FTAG, &dn);
489         if (err != 0)
490                 return (err);
491         err = dmu_free_long_range_impl(os, dn, offset, length, FALSE);
492         dnode_rele(dn, FTAG);
493         return (err);
494 }
495
496 int
497 dmu_free_object(objset_t *os, uint64_t object)
498 {
499         dnode_t *dn;
500         dmu_tx_t *tx;
501         int err;
502
503         err = dnode_hold_impl(os->os, object, DNODE_MUST_BE_ALLOCATED,
504             FTAG, &dn);
505         if (err != 0)
506                 return (err);
507         if (dn->dn_nlevels == 1) {
508                 tx = dmu_tx_create(os);
509                 dmu_tx_hold_bonus(tx, object);
510                 dmu_tx_hold_free(tx, dn->dn_object, 0, DMU_OBJECT_END);
511                 err = dmu_tx_assign(tx, TXG_WAIT);
512                 if (err == 0) {
513                         dnode_free_range(dn, 0, DMU_OBJECT_END, tx);
514                         dnode_free(dn, tx);
515                         dmu_tx_commit(tx);
516                 } else {
517                         dmu_tx_abort(tx);
518                 }
519         } else {
520                 err = dmu_free_long_range_impl(os, dn, 0, DMU_OBJECT_END, TRUE);
521         }
522         dnode_rele(dn, FTAG);
523         return (err);
524 }
525
526 int
527 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
528     uint64_t size, dmu_tx_t *tx)
529 {
530         dnode_t *dn;
531         int err = dnode_hold(os->os, object, FTAG, &dn);
532         if (err)
533                 return (err);
534         ASSERT(offset < UINT64_MAX);
535         ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
536         dnode_free_range(dn, offset, size, tx);
537         dnode_rele(dn, FTAG);
538         return (0);
539 }
540
541 int
542 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
543     void *buf)
544 {
545         dnode_t *dn;
546         dmu_buf_t **dbp;
547         int numbufs, i, err;
548
549         err = dnode_hold(os->os, object, FTAG, &dn);
550         if (err)
551                 return (err);
552
553         /*
554          * Deal with odd block sizes, where there can't be data past the first
555          * block.  If we ever do the tail block optimization, we will need to
556          * handle that here as well.
557          */
558         if (dn->dn_datablkshift == 0) {
559                 int newsz = offset > dn->dn_datablksz ? 0 :
560                     MIN(size, dn->dn_datablksz - offset);
561                 bzero((char *)buf + newsz, size - newsz);
562                 size = newsz;
563         }
564
565         while (size > 0) {
566                 uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
567
568                 /*
569                  * NB: we could do this block-at-a-time, but it's nice
570                  * to be reading in parallel.
571                  */
572                 err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
573                     TRUE, FTAG, &numbufs, &dbp);
574                 if (err)
575                         break;
576
577                 for (i = 0; i < numbufs; i++) {
578                         int tocpy;
579                         int bufoff;
580                         dmu_buf_t *db = dbp[i];
581
582                         ASSERT(size > 0);
583
584                         bufoff = offset - db->db_offset;
585                         tocpy = (int)MIN(db->db_size - bufoff, size);
586
587                         bcopy((char *)db->db_data + bufoff, buf, tocpy);
588
589                         offset += tocpy;
590                         size -= tocpy;
591                         buf = (char *)buf + tocpy;
592                 }
593                 dmu_buf_rele_array(dbp, numbufs, FTAG);
594         }
595         dnode_rele(dn, FTAG);
596         return (err);
597 }
598
599 void
600 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
601     const void *buf, dmu_tx_t *tx)
602 {
603         dmu_buf_t **dbp;
604         int numbufs, i;
605
606         if (size == 0)
607                 return;
608
609         VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
610             FALSE, FTAG, &numbufs, &dbp));
611
612         for (i = 0; i < numbufs; i++) {
613                 int tocpy;
614                 int bufoff;
615                 dmu_buf_t *db = dbp[i];
616
617                 ASSERT(size > 0);
618
619                 bufoff = offset - db->db_offset;
620                 tocpy = (int)MIN(db->db_size - bufoff, size);
621
622                 ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
623
624                 if (tocpy == db->db_size)
625                         dmu_buf_will_fill(db, tx);
626                 else
627                         dmu_buf_will_dirty(db, tx);
628
629                 bcopy(buf, (char *)db->db_data + bufoff, tocpy);
630
631                 if (tocpy == db->db_size)
632                         dmu_buf_fill_done(db, tx);
633
634                 offset += tocpy;
635                 size -= tocpy;
636                 buf = (char *)buf + tocpy;
637         }
638         dmu_buf_rele_array(dbp, numbufs, FTAG);
639 }
640
641 void
642 dmu_prealloc(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
643     dmu_tx_t *tx)
644 {
645         dmu_buf_t **dbp;
646         int numbufs, i;
647
648         if (size == 0)
649                 return;
650
651         VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
652             FALSE, FTAG, &numbufs, &dbp));
653
654         for (i = 0; i < numbufs; i++) {
655                 dmu_buf_t *db = dbp[i];
656
657                 dmu_buf_will_not_fill(db, tx);
658         }
659         dmu_buf_rele_array(dbp, numbufs, FTAG);
660 }
661
662 #ifdef _KERNEL
663 int
664 dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
665 {
666         dmu_buf_t **dbp;
667         int numbufs, i, err;
668
669         /*
670          * NB: we could do this block-at-a-time, but it's nice
671          * to be reading in parallel.
672          */
673         err = dmu_buf_hold_array(os, object, uio->uio_loffset, size, TRUE, FTAG,
674             &numbufs, &dbp);
675         if (err)
676                 return (err);
677
678         for (i = 0; i < numbufs; i++) {
679                 int tocpy;
680                 int bufoff;
681                 dmu_buf_t *db = dbp[i];
682
683                 ASSERT(size > 0);
684
685                 bufoff = uio->uio_loffset - db->db_offset;
686                 tocpy = (int)MIN(db->db_size - bufoff, size);
687
688                 err = uiomove((char *)db->db_data + bufoff, tocpy,
689                     UIO_READ, uio);
690                 if (err)
691                         break;
692
693                 size -= tocpy;
694         }
695         dmu_buf_rele_array(dbp, numbufs, FTAG);
696
697         return (err);
698 }
699
700 int
701 dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
702     dmu_tx_t *tx)
703 {
704         dmu_buf_t **dbp;
705         int numbufs, i;
706         int err = 0;
707
708         if (size == 0)
709                 return (0);
710
711         err = dmu_buf_hold_array(os, object, uio->uio_loffset, size,
712             FALSE, FTAG, &numbufs, &dbp);
713         if (err)
714                 return (err);
715
716         for (i = 0; i < numbufs; i++) {
717                 int tocpy;
718                 int bufoff;
719                 dmu_buf_t *db = dbp[i];
720
721                 ASSERT(size > 0);
722
723                 bufoff = uio->uio_loffset - db->db_offset;
724                 tocpy = (int)MIN(db->db_size - bufoff, size);
725
726                 ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
727
728                 if (tocpy == db->db_size)
729                         dmu_buf_will_fill(db, tx);
730                 else
731                         dmu_buf_will_dirty(db, tx);
732
733                 /*
734                  * XXX uiomove could block forever (eg. nfs-backed
735                  * pages).  There needs to be a uiolockdown() function
736                  * to lock the pages in memory, so that uiomove won't
737                  * block.
738                  */
739                 err = uiomove((char *)db->db_data + bufoff, tocpy,
740                     UIO_WRITE, uio);
741
742                 if (tocpy == db->db_size)
743                         dmu_buf_fill_done(db, tx);
744
745                 if (err)
746                         break;
747
748                 size -= tocpy;
749         }
750         dmu_buf_rele_array(dbp, numbufs, FTAG);
751         return (err);
752 }
753
754 int
755 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
756     page_t *pp, dmu_tx_t *tx)
757 {
758         dmu_buf_t **dbp;
759         int numbufs, i;
760         int err;
761
762         if (size == 0)
763                 return (0);
764
765         err = dmu_buf_hold_array(os, object, offset, size,
766             FALSE, FTAG, &numbufs, &dbp);
767         if (err)
768                 return (err);
769
770         for (i = 0; i < numbufs; i++) {
771                 int tocpy, copied, thiscpy;
772                 int bufoff;
773                 dmu_buf_t *db = dbp[i];
774                 caddr_t va;
775
776                 ASSERT(size > 0);
777                 ASSERT3U(db->db_size, >=, PAGESIZE);
778
779                 bufoff = offset - db->db_offset;
780                 tocpy = (int)MIN(db->db_size - bufoff, size);
781
782                 ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
783
784                 if (tocpy == db->db_size)
785                         dmu_buf_will_fill(db, tx);
786                 else
787                         dmu_buf_will_dirty(db, tx);
788
789                 for (copied = 0; copied < tocpy; copied += PAGESIZE) {
790                         ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
791                         thiscpy = MIN(PAGESIZE, tocpy - copied);
792                         va = zfs_map_page(pp, S_READ);
793                         bcopy(va, (char *)db->db_data + bufoff, thiscpy);
794                         zfs_unmap_page(pp, va);
795                         pp = pp->p_next;
796                         bufoff += PAGESIZE;
797                 }
798
799                 if (tocpy == db->db_size)
800                         dmu_buf_fill_done(db, tx);
801
802                 if (err)
803                         break;
804
805                 offset += tocpy;
806                 size -= tocpy;
807         }
808         dmu_buf_rele_array(dbp, numbufs, FTAG);
809         return (err);
810 }
811 #endif
812
813 typedef struct {
814         dbuf_dirty_record_t     *dr;
815         dmu_sync_cb_t           *done;
816         void                    *arg;
817 } dmu_sync_arg_t;
818
819 /* ARGSUSED */
820 static void
821 dmu_sync_ready(zio_t *zio, arc_buf_t *buf, void *varg)
822 {
823         blkptr_t *bp = zio->io_bp;
824
825         if (!BP_IS_HOLE(bp)) {
826                 dmu_sync_arg_t *in = varg;
827                 dbuf_dirty_record_t *dr = in->dr;
828                 dmu_buf_impl_t *db = dr->dr_dbuf;
829                 ASSERT(BP_GET_TYPE(bp) == db->db_dnode->dn_type);
830                 ASSERT(BP_GET_LEVEL(bp) == 0);
831                 bp->blk_fill = 1;
832         }
833 }
834
835 /* ARGSUSED */
836 static void
837 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
838 {
839         dmu_sync_arg_t *in = varg;
840         dbuf_dirty_record_t *dr = in->dr;
841         dmu_buf_impl_t *db = dr->dr_dbuf;
842         dmu_sync_cb_t *done = in->done;
843
844         mutex_enter(&db->db_mtx);
845         ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
846         dr->dt.dl.dr_overridden_by = *zio->io_bp; /* structure assignment */
847         dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
848         cv_broadcast(&db->db_changed);
849         mutex_exit(&db->db_mtx);
850
851         if (done)
852                 done(&(db->db), in->arg);
853
854         kmem_free(in, sizeof (dmu_sync_arg_t));
855 }
856
857 /*
858  * Intent log support: sync the block associated with db to disk.
859  * N.B. and XXX: the caller is responsible for making sure that the
860  * data isn't changing while dmu_sync() is writing it.
861  *
862  * Return values:
863  *
864  *      EEXIST: this txg has already been synced, so there's nothing to to.
865  *              The caller should not log the write.
866  *
867  *      ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
868  *              The caller should not log the write.
869  *
870  *      EALREADY: this block is already in the process of being synced.
871  *              The caller should track its progress (somehow).
872  *
873  *      EINPROGRESS: the IO has been initiated.
874  *              The caller should log this blkptr in the callback.
875  *
876  *      0: completed.  Sets *bp to the blkptr just written.
877  *              The caller should log this blkptr immediately.
878  */
879 int
880 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
881     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
882 {
883         dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
884         objset_impl_t *os = db->db_objset;
885         dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
886         tx_state_t *tx = &dp->dp_tx;
887         dbuf_dirty_record_t *dr;
888         dmu_sync_arg_t *in;
889         zbookmark_t zb;
890         writeprops_t wp = { 0 };
891         zio_t *zio;
892         int err;
893
894         ASSERT(BP_IS_HOLE(bp));
895         ASSERT(txg != 0);
896
897         dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
898             txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
899
900         /*
901          * XXX - would be nice if we could do this without suspending...
902          */
903         txg_suspend(dp);
904
905         /*
906          * If this txg already synced, there's nothing to do.
907          */
908         if (txg <= tx->tx_synced_txg) {
909                 txg_resume(dp);
910                 /*
911                  * If we're running ziltest, we need the blkptr regardless.
912                  */
913                 if (txg > spa_freeze_txg(dp->dp_spa)) {
914                         /* if db_blkptr == NULL, this was an empty write */
915                         if (db->db_blkptr)
916                                 *bp = *db->db_blkptr; /* structure assignment */
917                         return (0);
918                 }
919                 return (EEXIST);
920         }
921
922         mutex_enter(&db->db_mtx);
923
924         if (txg == tx->tx_syncing_txg) {
925                 while (db->db_data_pending) {
926                         /*
927                          * IO is in-progress.  Wait for it to finish.
928                          * XXX - would be nice to be able to somehow "attach"
929                          * this zio to the parent zio passed in.
930                          */
931                         cv_wait(&db->db_changed, &db->db_mtx);
932                         if (!db->db_data_pending &&
933                             db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
934                                 /*
935                                  * IO was compressed away
936                                  */
937                                 *bp = *db->db_blkptr; /* structure assignment */
938                                 mutex_exit(&db->db_mtx);
939                                 txg_resume(dp);
940                                 return (0);
941                         }
942                         ASSERT(db->db_data_pending ||
943                             (db->db_blkptr && db->db_blkptr->blk_birth == txg));
944                 }
945
946                 if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
947                         /*
948                          * IO is already completed.
949                          */
950                         *bp = *db->db_blkptr; /* structure assignment */
951                         mutex_exit(&db->db_mtx);
952                         txg_resume(dp);
953                         return (0);
954                 }
955         }
956
957         dr = db->db_last_dirty;
958         while (dr && dr->dr_txg > txg)
959                 dr = dr->dr_next;
960         if (dr == NULL || dr->dr_txg < txg) {
961                 /*
962                  * This dbuf isn't dirty, must have been free_range'd.
963                  * There's no need to log writes to freed blocks, so we're done.
964                  */
965                 mutex_exit(&db->db_mtx);
966                 txg_resume(dp);
967                 return (ENOENT);
968         }
969
970         ASSERT(dr->dr_txg == txg);
971         if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
972                 /*
973                  * We have already issued a sync write for this buffer.
974                  */
975                 mutex_exit(&db->db_mtx);
976                 txg_resume(dp);
977                 return (EALREADY);
978         } else if (dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
979                 /*
980                  * This buffer has already been synced.  It could not
981                  * have been dirtied since, or we would have cleared the state.
982                  */
983                 *bp = dr->dt.dl.dr_overridden_by; /* structure assignment */
984                 mutex_exit(&db->db_mtx);
985                 txg_resume(dp);
986                 return (0);
987         }
988
989         dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
990         in = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
991         in->dr = dr;
992         in->done = done;
993         in->arg = arg;
994         mutex_exit(&db->db_mtx);
995         txg_resume(dp);
996
997         zb.zb_objset = os->os_dsl_dataset->ds_object;
998         zb.zb_object = db->db.db_object;
999         zb.zb_level = db->db_level;
1000         zb.zb_blkid = db->db_blkid;
1001
1002         wp.wp_type = db->db_dnode->dn_type;
1003         wp.wp_level = db->db_level;
1004         wp.wp_copies = os->os_copies;
1005         wp.wp_dnchecksum = db->db_dnode->dn_checksum;
1006         wp.wp_oschecksum = os->os_checksum;
1007         wp.wp_dncompress = db->db_dnode->dn_compress;
1008         wp.wp_oscompress = os->os_compress;
1009
1010         ASSERT(BP_IS_HOLE(bp));
1011
1012         zio = arc_write(pio, os->os_spa, &wp, DBUF_IS_L2CACHEABLE(db),
1013             txg, bp, dr->dt.dl.dr_data, dmu_sync_ready, dmu_sync_done, in,
1014             ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
1015         if (pio) {
1016                 zio_nowait(zio);
1017                 err = EINPROGRESS;
1018         } else {
1019                 err = zio_wait(zio);
1020                 ASSERT(err == 0);
1021         }
1022         return (err);
1023 }
1024
1025 int
1026 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
1027         dmu_tx_t *tx)
1028 {
1029         dnode_t *dn;
1030         int err;
1031
1032         err = dnode_hold(os->os, object, FTAG, &dn);
1033         if (err)
1034                 return (err);
1035         err = dnode_set_blksz(dn, size, ibs, tx);
1036         dnode_rele(dn, FTAG);
1037         return (err);
1038 }
1039
1040 void
1041 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1042         dmu_tx_t *tx)
1043 {
1044         dnode_t *dn;
1045
1046         /* XXX assumes dnode_hold will not get an i/o error */
1047         (void) dnode_hold(os->os, object, FTAG, &dn);
1048         ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
1049         dn->dn_checksum = checksum;
1050         dnode_setdirty(dn, tx);
1051         dnode_rele(dn, FTAG);
1052 }
1053
1054 void
1055 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1056         dmu_tx_t *tx)
1057 {
1058         dnode_t *dn;
1059
1060         /* XXX assumes dnode_hold will not get an i/o error */
1061         (void) dnode_hold(os->os, object, FTAG, &dn);
1062         ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
1063         dn->dn_compress = compress;
1064         dnode_setdirty(dn, tx);
1065         dnode_rele(dn, FTAG);
1066 }
1067
1068 int
1069 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
1070 {
1071         dnode_t *dn;
1072         int i, err;
1073
1074         err = dnode_hold(os->os, object, FTAG, &dn);
1075         if (err)
1076                 return (err);
1077         /*
1078          * Sync any current changes before
1079          * we go trundling through the block pointers.
1080          */
1081         for (i = 0; i < TXG_SIZE; i++) {
1082                 if (list_link_active(&dn->dn_dirty_link[i]))
1083                         break;
1084         }
1085         if (i != TXG_SIZE) {
1086                 dnode_rele(dn, FTAG);
1087                 txg_wait_synced(dmu_objset_pool(os), 0);
1088                 err = dnode_hold(os->os, object, FTAG, &dn);
1089                 if (err)
1090                         return (err);
1091         }
1092
1093         err = dnode_next_offset(dn, (hole ? DNODE_FIND_HOLE : 0), off, 1, 1, 0);
1094         dnode_rele(dn, FTAG);
1095
1096         return (err);
1097 }
1098
1099 void
1100 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
1101 {
1102         rw_enter(&dn->dn_struct_rwlock, RW_READER);
1103         mutex_enter(&dn->dn_mtx);
1104
1105         doi->doi_data_block_size = dn->dn_datablksz;
1106         doi->doi_metadata_block_size = dn->dn_indblkshift ?
1107             1ULL << dn->dn_indblkshift : 0;
1108         doi->doi_indirection = dn->dn_nlevels;
1109         doi->doi_checksum = dn->dn_checksum;
1110         doi->doi_compress = dn->dn_compress;
1111         doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
1112             SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
1113         doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
1114         doi->doi_type = dn->dn_type;
1115         doi->doi_bonus_size = dn->dn_bonuslen;
1116         doi->doi_bonus_type = dn->dn_bonustype;
1117
1118         mutex_exit(&dn->dn_mtx);
1119         rw_exit(&dn->dn_struct_rwlock);
1120 }
1121
1122 /*
1123  * Get information on a DMU object.
1124  * If doi is NULL, just indicates whether the object exists.
1125  */
1126 int
1127 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
1128 {
1129         dnode_t *dn;
1130         int err = dnode_hold(os->os, object, FTAG, &dn);
1131
1132         if (err)
1133                 return (err);
1134
1135         if (doi != NULL)
1136                 dmu_object_info_from_dnode(dn, doi);
1137
1138         dnode_rele(dn, FTAG);
1139         return (0);
1140 }
1141
1142 /*
1143  * As above, but faster; can be used when you have a held dbuf in hand.
1144  */
1145 void
1146 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
1147 {
1148         dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
1149 }
1150
1151 /*
1152  * Faster still when you only care about the size.
1153  * This is specifically optimized for zfs_getattr().
1154  */
1155 void
1156 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
1157 {
1158         dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1159
1160         *blksize = dn->dn_datablksz;
1161         /* add 1 for dnode space */
1162         *nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
1163             SPA_MINBLOCKSHIFT) + 1;
1164 }
1165
1166 void
1167 byteswap_uint64_array(void *vbuf, size_t size)
1168 {
1169         uint64_t *buf = vbuf;
1170         size_t count = size >> 3;
1171         int i;
1172
1173         ASSERT((size & 7) == 0);
1174
1175         for (i = 0; i < count; i++)
1176                 buf[i] = BSWAP_64(buf[i]);
1177 }
1178
1179 void
1180 byteswap_uint32_array(void *vbuf, size_t size)
1181 {
1182         uint32_t *buf = vbuf;
1183         size_t count = size >> 2;
1184         int i;
1185
1186         ASSERT((size & 3) == 0);
1187
1188         for (i = 0; i < count; i++)
1189                 buf[i] = BSWAP_32(buf[i]);
1190 }
1191
1192 void
1193 byteswap_uint16_array(void *vbuf, size_t size)
1194 {
1195         uint16_t *buf = vbuf;
1196         size_t count = size >> 1;
1197         int i;
1198
1199         ASSERT((size & 1) == 0);
1200
1201         for (i = 0; i < count; i++)
1202                 buf[i] = BSWAP_16(buf[i]);
1203 }
1204
1205 /* ARGSUSED */
1206 void
1207 byteswap_uint8_array(void *vbuf, size_t size)
1208 {
1209 }
1210
1211 void
1212 dmu_init(void)
1213 {
1214         dbuf_init();
1215         dnode_init();
1216         arc_init();
1217         l2arc_init();
1218 }
1219
1220 void
1221 dmu_fini(void)
1222 {
1223         arc_fini();
1224         dnode_fini();
1225         dbuf_fini();
1226         l2arc_fini();
1227 }