Add CDDL license file
[zfs.git] / zfs / lib / libdmu-ctl / dctl_thrpool.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26
27 #include <stdlib.h>
28 #include <stddef.h>
29 #include <time.h>
30 #include <pthread.h>
31 #include <errno.h>
32 #include <sys/list.h>
33 #include <sys/debug.h>
34
35 #include <sys/dmu_ctl.h>
36 #include <sys/dmu_ctl_impl.h>
37
38 static dctl_thr_info_t thr_pool = {
39         .dti_mtx = PTHREAD_MUTEX_INITIALIZER
40 };
41
42 /*
43  * Create n threads.
44  * Callers must acquire thr_pool.dti_mtx first.
45  */
46 static int dctl_thr_create(int n)
47 {
48         dctl_thr_info_t *p = &thr_pool;
49         int error;
50
51         for (int i = 0; i < n; i++) {
52                 wthr_info_t *thr = malloc(sizeof(wthr_info_t));
53                 if (thr == NULL)
54                         return ENOMEM;
55
56                 thr->wthr_exit = B_FALSE;
57                 thr->wthr_free = B_TRUE;
58
59                 error = pthread_create(&thr->wthr_id, NULL, p->dti_thr_func,
60                     thr);
61                 if (error) {
62                         free(thr);
63                         return error;
64                 }
65
66                 p->dti_free++;
67
68                 list_insert_tail(&p->dti_list, thr);
69         }
70         return 0;
71 }
72
73 /*
74  * Mark the thread as dead.
75  * Must be called right before exiting the main thread function.
76  */
77 void dctl_thr_die(wthr_info_t *thr)
78 {
79         dctl_thr_info_t *p = &thr_pool;
80
81         thr->wthr_exit = B_TRUE;
82         dctl_thr_rebalance(thr, B_FALSE);
83
84         pthread_mutex_lock(&p->dti_mtx);
85
86         list_remove(&p->dti_list, thr);
87         list_insert_tail(&p->dti_join_list, thr);
88
89         pthread_mutex_unlock(&p->dti_mtx);
90 }
91
92 /*
93  * Clean-up dead threads.
94  */
95 void dctl_thr_join()
96 {
97         dctl_thr_info_t *p = &thr_pool;
98         wthr_info_t *thr;
99
100         pthread_mutex_lock(&p->dti_mtx);
101
102         while ((thr = list_head(&p->dti_join_list))) {
103                 list_remove(&p->dti_join_list, thr);
104
105                 ASSERT(!pthread_equal(thr->wthr_id, pthread_self()));
106
107                 /*
108                  * This should not block because all the threads
109                  * on this list should have died already.
110                  *
111                  * pthread_join() can only return an error if
112                  * we made a programming mistake.
113                  */
114                 VERIFY(pthread_join(thr->wthr_id, NULL) == 0);
115
116                 ASSERT(thr->wthr_exit);
117                 ASSERT(!thr->wthr_free);
118
119                 free(thr);
120         }
121
122         pthread_mutex_unlock(&p->dti_mtx);
123 }
124
125 /*
126  * Adjust the number of free threads in the pool and the thread status.
127  *
128  * Callers must acquire thr_pool.dti_mtx first.
129  */
130 static void dctl_thr_adjust_free(wthr_info_t *thr, boolean_t set_free)
131 {
132         dctl_thr_info_t *p = &thr_pool;
133
134         ASSERT(p->dti_free >= 0);
135
136         if (!thr->wthr_free && set_free)
137                 p->dti_free++;
138         else if (thr->wthr_free && !set_free)
139                 p->dti_free--;
140
141         ASSERT(p->dti_free >= 0);
142
143         thr->wthr_free = set_free;
144 }
145
146 /*
147  * Rebalance threads. Also adjusts the free status of the thread.
148  * Will set the thread exit flag if the number of free threads is above
149  * the limit.
150  */
151 void dctl_thr_rebalance(wthr_info_t *thr, boolean_t set_free)
152 {
153         dctl_thr_info_t *p = &thr_pool;
154
155         pthread_mutex_lock(&p->dti_mtx);
156
157         if (p->dti_exit || p->dti_free > p->dti_max_free)
158                 thr->wthr_exit = B_TRUE;
159
160         if (thr->wthr_exit)
161                 set_free = B_FALSE;
162
163         dctl_thr_adjust_free(thr, set_free);
164
165         if (!p->dti_exit && p->dti_free == 0)
166                 dctl_thr_create(1);
167
168         pthread_mutex_unlock(&p->dti_mtx);
169 }
170
171 /*
172  * Stop the thread pool.
173  *
174  * This can take a while since it actually waits for all threads to exit.
175  */
176 void dctl_thr_pool_stop()
177 {
178         dctl_thr_info_t *p = &thr_pool;
179         wthr_info_t *thr;
180         struct timespec ts;
181
182         pthread_mutex_lock(&p->dti_mtx);
183
184         ASSERT(!p->dti_exit);
185         p->dti_exit = B_TRUE;
186
187         /* Let's flag the threads first */
188         thr = list_head(&p->dti_list);
189         while (thr != NULL) {
190                 thr->wthr_exit = B_TRUE;
191                 dctl_thr_adjust_free(thr, B_FALSE);
192
193                 thr = list_next(&p->dti_list, thr);
194         }
195
196         pthread_mutex_unlock(&p->dti_mtx);
197
198         /* Now let's wait for them to exit */
199         ts.tv_sec = 0;
200         ts.tv_nsec = 50000000; /* 50ms */
201         do {
202                 nanosleep(&ts, NULL);
203
204                 pthread_mutex_lock(&p->dti_mtx);
205                 thr = list_head(&p->dti_list);
206                 pthread_mutex_unlock(&p->dti_mtx);
207
208                 dctl_thr_join();
209         } while(thr != NULL);
210
211         ASSERT(p->dti_free == 0);
212
213         ASSERT(list_is_empty(&p->dti_list));
214         ASSERT(list_is_empty(&p->dti_join_list));
215
216         list_destroy(&p->dti_list);
217         list_destroy(&p->dti_join_list);
218 }
219
220 /*
221  * Create thread pool.
222  *
223  * If at least one thread creation fails, it will stop all previous
224  * threads and return a non-zero value.
225  */
226 int dctl_thr_pool_create(int min_thr, int max_free_thr,
227     thr_func_t *thr_func)
228 {
229         int error;
230         dctl_thr_info_t *p = &thr_pool;
231
232         ASSERT(p->dti_free == 0);
233
234         /* Initialize global variables */
235         p->dti_min = min_thr;
236         p->dti_max_free = max_free_thr;
237         p->dti_exit = B_FALSE;
238         p->dti_thr_func = thr_func;
239
240         list_create(&p->dti_list, sizeof(wthr_info_t), offsetof(wthr_info_t,
241             wthr_node));
242         list_create(&p->dti_join_list, sizeof(wthr_info_t),
243             offsetof(wthr_info_t, wthr_node));
244
245         pthread_mutex_lock(&p->dti_mtx);
246         error = dctl_thr_create(min_thr);
247         pthread_mutex_unlock(&p->dti_mtx);
248
249         if (error)
250                 dctl_thr_pool_stop();
251
252         return error;
253 }