glib/
thread_pool.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{future::Future, panic, ptr};
4
5use futures_channel::oneshot;
6
7use crate::{ffi, translate::*};
8
9#[derive(Debug)]
10#[doc(alias = "GThreadPool")]
11pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
12
13unsafe impl Send for ThreadPool {}
14unsafe impl Sync for ThreadPool {}
15
16// rustdoc-stripper-ignore-next
17/// A handle to a thread running on a [`ThreadPool`].
18///
19/// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
20/// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
21/// allowing it to complete but discarding the return value.
22#[derive(Debug)]
23pub struct ThreadHandle<T> {
24    rx: std::sync::mpsc::Receiver<std::thread::Result<T>>,
25}
26
27impl<T> ThreadHandle<T> {
28    // rustdoc-stripper-ignore-next
29    /// Waits for the associated thread to finish.
30    ///
31    /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
32    /// thread, or `Err` if the thread panicked. This function will return immediately if the
33    /// associated thread has already finished.
34    #[inline]
35    pub fn join(self) -> std::thread::Result<T> {
36        self.rx.recv().unwrap()
37    }
38}
39
40impl ThreadPool {
41    #[doc(alias = "g_thread_pool_new")]
42    pub fn shared(max_threads: Option<u32>) -> Result<Self, crate::Error> {
43        unsafe {
44            let mut err = ptr::null_mut();
45            let pool = ffi::g_thread_pool_new(
46                Some(spawn_func),
47                ptr::null_mut(),
48                max_threads.map(|v| v as i32).unwrap_or(-1),
49                ffi::GFALSE,
50                &mut err,
51            );
52            if pool.is_null() {
53                Err(from_glib_full(err))
54            } else {
55                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
56            }
57        }
58    }
59
60    #[doc(alias = "g_thread_pool_new")]
61    pub fn exclusive(max_threads: u32) -> Result<Self, crate::Error> {
62        unsafe {
63            let mut err = ptr::null_mut();
64            let pool = ffi::g_thread_pool_new(
65                Some(spawn_func),
66                ptr::null_mut(),
67                max_threads as i32,
68                ffi::GTRUE,
69                &mut err,
70            );
71            if pool.is_null() {
72                Err(from_glib_full(err))
73            } else {
74                Ok(ThreadPool(ptr::NonNull::new_unchecked(pool)))
75            }
76        }
77    }
78
79    #[doc(alias = "g_thread_pool_push")]
80    pub fn push<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
81        &self,
82        func: F,
83    ) -> Result<ThreadHandle<T>, crate::Error> {
84        let (tx, rx) = std::sync::mpsc::sync_channel(1);
85        unsafe {
86            let func: Box<dyn FnOnce() + Send + 'static> = Box::new(move || {
87                let _ = tx.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
88            });
89            let func = Box::new(func);
90            let mut err = ptr::null_mut();
91
92            let func = Box::into_raw(func);
93            let ret: bool = from_glib(ffi::g_thread_pool_push(
94                self.0.as_ptr(),
95                func as *mut _,
96                &mut err,
97            ));
98            if ret {
99                Ok(ThreadHandle { rx })
100            } else {
101                let _ = Box::from_raw(func);
102                Err(from_glib_full(err))
103            }
104        }
105    }
106
107    pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
108        &self,
109        func: F,
110    ) -> Result<impl Future<Output = std::thread::Result<T>> + Send + Sync + 'static, crate::Error>
111    {
112        let (sender, receiver) = oneshot::channel();
113
114        self.push(move || {
115            let _ = sender.send(panic::catch_unwind(panic::AssertUnwindSafe(func)));
116        })?;
117
118        Ok(async move { receiver.await.expect("Dropped before executing") })
119    }
120
121    #[doc(alias = "g_thread_pool_set_max_threads")]
122    pub fn set_max_threads(&self, max_threads: Option<u32>) -> Result<(), crate::Error> {
123        unsafe {
124            let mut err = ptr::null_mut();
125            let ret: bool = from_glib(ffi::g_thread_pool_set_max_threads(
126                self.0.as_ptr(),
127                max_threads.map(|v| v as i32).unwrap_or(-1),
128                &mut err,
129            ));
130            if ret {
131                Ok(())
132            } else {
133                Err(from_glib_full(err))
134            }
135        }
136    }
137
138    #[doc(alias = "g_thread_pool_get_max_threads")]
139    #[doc(alias = "get_max_threads")]
140    pub fn max_threads(&self) -> Option<u32> {
141        unsafe {
142            let max_threads = ffi::g_thread_pool_get_max_threads(self.0.as_ptr());
143            if max_threads == -1 {
144                None
145            } else {
146                Some(max_threads as u32)
147            }
148        }
149    }
150
151    #[doc(alias = "g_thread_pool_get_num_threads")]
152    #[doc(alias = "get_num_threads")]
153    pub fn num_threads(&self) -> u32 {
154        unsafe { ffi::g_thread_pool_get_num_threads(self.0.as_ptr()) }
155    }
156
157    #[doc(alias = "g_thread_pool_unprocessed")]
158    #[doc(alias = "get_unprocessed")]
159    pub fn unprocessed(&self) -> u32 {
160        unsafe { ffi::g_thread_pool_unprocessed(self.0.as_ptr()) }
161    }
162
163    #[doc(alias = "g_thread_pool_set_max_unused_threads")]
164    pub fn set_max_unused_threads(max_threads: Option<u32>) {
165        unsafe {
166            ffi::g_thread_pool_set_max_unused_threads(max_threads.map(|v| v as i32).unwrap_or(-1))
167        }
168    }
169
170    #[doc(alias = "g_thread_pool_get_max_unused_threads")]
171    #[doc(alias = "get_max_unused_threads")]
172    pub fn max_unused_threads() -> Option<u32> {
173        unsafe {
174            let max_unused_threads = ffi::g_thread_pool_get_max_unused_threads();
175            if max_unused_threads == -1 {
176                None
177            } else {
178                Some(max_unused_threads as u32)
179            }
180        }
181    }
182
183    #[doc(alias = "g_thread_pool_get_num_unused_threads")]
184    #[doc(alias = "get_num_unused_threads")]
185    pub fn num_unused_threads() -> u32 {
186        unsafe { ffi::g_thread_pool_get_num_unused_threads() }
187    }
188
189    #[doc(alias = "g_thread_pool_stop_unused_threads")]
190    pub fn stop_unused_threads() {
191        unsafe {
192            ffi::g_thread_pool_stop_unused_threads();
193        }
194    }
195
196    #[doc(alias = "g_thread_pool_set_max_idle_time")]
197    pub fn set_max_idle_time(max_idle_time: u32) {
198        unsafe { ffi::g_thread_pool_set_max_idle_time(max_idle_time) }
199    }
200
201    #[doc(alias = "g_thread_pool_get_max_idle_time")]
202    #[doc(alias = "get_max_idle_time")]
203    pub fn max_idle_time() -> u32 {
204        unsafe { ffi::g_thread_pool_get_max_idle_time() }
205    }
206}
207
208impl Drop for ThreadPool {
209    #[inline]
210    fn drop(&mut self) {
211        unsafe {
212            ffi::g_thread_pool_free(self.0.as_ptr(), ffi::GFALSE, ffi::GTRUE);
213        }
214    }
215}
216
217unsafe extern "C" fn spawn_func(func: ffi::gpointer, _data: ffi::gpointer) {
218    let func: Box<Box<dyn FnOnce()>> = Box::from_raw(func as *mut _);
219    func()
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[test]
227    fn test_push() {
228        use std::sync::mpsc;
229
230        let p = ThreadPool::exclusive(1).unwrap();
231        let (sender, receiver) = mpsc::channel();
232
233        let handle = p
234            .push(move || {
235                sender.send(true).unwrap();
236                123
237            })
238            .unwrap();
239
240        assert_eq!(handle.join().unwrap(), 123);
241        assert_eq!(receiver.recv(), Ok(true));
242    }
243
244    #[test]
245    fn test_push_future() {
246        let c = crate::MainContext::new();
247        let p = ThreadPool::shared(None).unwrap();
248
249        let fut = p.push_future(|| true).unwrap();
250
251        let res = c.block_on(fut);
252        assert!(res.unwrap());
253    }
254}