recoverable_thread_pool/thread_pool/
sync.rs

1use super::r#type::ThreadPool;
2use crate::{worker::r#type::Worker, SendResult, ThreadPoolJob};
3use recoverable_spawn::*;
4use std::sync::{
5    mpsc::{self, Receiver},
6    Arc, Mutex,
7};
8
9impl ThreadPool {
10    #[inline]
11    pub fn new(size: usize) -> ThreadPool {
12        let (sender, receiver) = mpsc::channel();
13        let receiver: Arc<Mutex<Receiver<ThreadPoolJob>>> = Arc::new(Mutex::new(receiver));
14        let mut workers: Vec<Worker> = Vec::with_capacity(size);
15        let mut id: usize = 0;
16        loop {
17            if id >= size {
18                break;
19            }
20            let worker: Option<Worker> = Worker::new(id, Arc::clone(&receiver));
21            if worker.is_some() {
22                workers.push(worker.unwrap_or_default());
23                id += 1;
24            }
25        }
26        ThreadPool { workers, sender }
27    }
28
29    #[inline]
30    pub fn execute<F>(&self, job: F) -> SendResult
31    where
32        F: RecoverableFunction,
33    {
34        let job_with_handler: ThreadPoolJob = Box::new(move || {
35            let _ = sync::run_function(job);
36        });
37        self.sender.send(job_with_handler)
38    }
39
40    #[inline]
41    pub fn execute_with_catch<F, E>(&self, job: F, handle_error: E) -> SendResult
42    where
43        F: RecoverableFunction,
44        E: ErrorHandlerFunction,
45    {
46        let job_with_handler: ThreadPoolJob = Box::new(move || {
47            if let Err(err) = sync::run_function(job) {
48                let err_string: String = sync::spawn_error_to_string(err);
49                let _ = sync::run_error_handle_function(handle_error, &err_string);
50            }
51        });
52        self.sender.send(job_with_handler)
53    }
54
55    #[inline]
56    pub fn execute_with_catch_finally<F, E, L>(
57        &self,
58        job: F,
59        handle_error: E,
60        finally: L,
61    ) -> SendResult
62    where
63        F: RecoverableFunction,
64        E: ErrorHandlerFunction,
65        L: RecoverableFunction,
66    {
67        let job_with_handler: ThreadPoolJob = Box::new(move || {
68            if let Err(err) = sync::run_function(job) {
69                let err_string: String = sync::spawn_error_to_string(err);
70                let _ = sync::run_error_handle_function(handle_error, &err_string);
71            }
72            let _ = sync::run_function(finally);
73        });
74        self.sender.send(job_with_handler)
75    }
76}