intuicio_frontend_simpleton/library/
jobs.rs

1use super::closure::Closure;
2use crate::{Array, Function, Integer, Real, Reference, Transferable};
3use intuicio_core::{
4    context::Context, function::FunctionQuery, host::HostProducer, registry::Registry,
5    IntuicioStruct,
6};
7use intuicio_derive::{intuicio_method, intuicio_methods, IntuicioStruct};
8use std::{
9    collections::VecDeque,
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc, RwLock,
13    },
14    thread::{available_parallelism, spawn, JoinHandle},
15    time::Duration,
16};
17
18type WorkerQueue = Arc<RwLock<VecDeque<JobRequest>>>;
19type JobResult = Arc<RwLock<JobState>>;
20
21#[derive(IntuicioStruct, Default)]
22#[intuicio(name = "Jobs", module_name = "jobs", override_send = false)]
23pub struct Jobs {
24    #[intuicio(ignore)]
25    workers: Vec<Worker>,
26}
27
28#[intuicio_methods(module_name = "jobs")]
29impl Jobs {
30    pub const HOST_PRODUCER_CUSTOM: &'static str = "Jobs::host_producer";
31
32    #[allow(clippy::new_ret_no_self)]
33    #[intuicio_method(use_context, use_registry)]
34    pub fn new(context: &Context, registry: &Registry, workers_count: Reference) -> Reference {
35        let host_producer = match context.custom::<HostProducer>(Self::HOST_PRODUCER_CUSTOM) {
36            Some(host_producer) => host_producer.clone(),
37            None => return Reference::null(),
38        };
39        let workers_count = workers_count
40            .read::<Integer>()
41            .map(|count| *count as usize)
42            .unwrap_or_else(|| {
43                available_parallelism()
44                    .map(|count| count.get())
45                    .unwrap_or_default()
46            });
47        Reference::new(
48            Self {
49                workers: (0..workers_count)
50                    .map(|_| Worker::new(host_producer.clone()))
51                    .collect(),
52            },
53            registry,
54        )
55    }
56
57    #[intuicio_method()]
58    pub fn sleep(seconds: Reference) -> Reference {
59        std::thread::sleep(Duration::from_secs_f64(*seconds.read::<Real>().unwrap()));
60        Reference::null()
61    }
62
63    #[intuicio_method(use_registry)]
64    pub fn workers(registry: &Registry, jobs: Reference) -> Reference {
65        let jobs = jobs.read::<Jobs>().unwrap();
66        Reference::new_integer(jobs.workers.len() as Integer, registry)
67    }
68
69    #[intuicio_method(use_registry)]
70    pub fn workers_alive(registry: &Registry, jobs: Reference) -> Reference {
71        let jobs = jobs.read::<Jobs>().unwrap();
72        Reference::new_array(
73            jobs.workers
74                .iter()
75                .map(|worker| {
76                    Reference::new_boolean(worker.is_running.load(Ordering::SeqCst), registry)
77                })
78                .collect(),
79            registry,
80        )
81    }
82
83    #[intuicio_method(use_registry)]
84    pub fn schedule(
85        registry: &Registry,
86        jobs: Reference,
87        executor: Reference,
88        arguments: Reference,
89    ) -> Reference {
90        let jobs = jobs.read::<Jobs>().unwrap();
91        let arguments = arguments.read::<Array>().unwrap();
92        let (function_name, function_module_name, captured) =
93            if let Some(function) = executor.read::<Function>() {
94                let signature = function.handle().unwrap().signature();
95                (
96                    signature.name.to_owned(),
97                    signature.module_name.to_owned(),
98                    vec![],
99                )
100            } else if let Some(closure) = executor.read::<Closure>() {
101                let signature = closure.function.handle().unwrap().signature();
102                (
103                    signature.name.to_owned(),
104                    signature.module_name.to_owned(),
105                    closure.captured.to_owned(),
106                )
107            } else {
108                return Reference::null();
109            };
110        let worker = jobs
111            .workers
112            .iter()
113            .filter(|worker| {
114                worker
115                    .handle
116                    .as_ref()
117                    .map(|handle| !handle.is_finished())
118                    .unwrap_or_default()
119            })
120            .min_by(|a, b| {
121                let a = a
122                    .queue
123                    .try_read()
124                    .map(|queue| queue.len())
125                    .unwrap_or_default();
126                let b = b
127                    .queue
128                    .try_read()
129                    .map(|queue| queue.len())
130                    .unwrap_or_default();
131                a.cmp(&b)
132            });
133        if let Some(worker) = worker {
134            return Reference::new(
135                worker.schedule(function_name, function_module_name, &captured, &arguments),
136                registry,
137            );
138        }
139        Reference::null()
140    }
141}
142
143struct Worker {
144    handle: Option<JoinHandle<()>>,
145    is_running: Arc<AtomicBool>,
146    queue: WorkerQueue,
147    _running_job_result: Arc<RwLock<Option<JobResult>>>,
148}
149
150impl Drop for Worker {
151    fn drop(&mut self) {
152        self.is_running.store(false, Ordering::SeqCst);
153        if let Some(handle) = self.handle.take() {
154            let _ = handle.join();
155        }
156    }
157}
158
159impl Worker {
160    pub fn new(host_producer: HostProducer) -> Self {
161        let queue = WorkerQueue::default();
162        let queue_ = queue.clone();
163        let is_running = Arc::new(AtomicBool::new(false));
164        let is_running_ = is_running.clone();
165        let _running_job_result: Arc<RwLock<Option<JobResult>>> = Default::default();
166        let running_job_result = _running_job_result.clone();
167        Self {
168            handle: Some(spawn(move || {
169                Self::worker_thread(host_producer, is_running_, queue_, running_job_result);
170            })),
171            is_running,
172            queue,
173            _running_job_result,
174        }
175    }
176
177    pub fn schedule(
178        &self,
179        function_name: String,
180        function_module_name: Option<String>,
181        captured: &[Reference],
182        arguments: &[Reference],
183    ) -> Job {
184        let result = Job::default();
185        if let Ok(mut queue) = self.queue.write() {
186            queue.push_back(JobRequest {
187                function_name,
188                function_module_name,
189                arguments: captured
190                    .iter()
191                    .chain(arguments.iter())
192                    .map(|argument| Transferable::from(argument.clone()))
193                    .collect(),
194                result: result.result.clone(),
195            });
196        }
197        result
198    }
199
200    fn consume_requests(
201        is_running: &Arc<AtomicBool>,
202        queue: &Arc<RwLock<VecDeque<JobRequest>>>,
203        running_job_result: &Arc<RwLock<Option<JobResult>>>,
204    ) {
205        is_running.store(false, Ordering::SeqCst);
206        if let Ok(mut result) = running_job_result.write() {
207            if let Some(result) = result.as_mut() {
208                if let Ok(mut result) = result.write() {
209                    *result = JobState::Consumed;
210                }
211            }
212        }
213        if let Ok(mut queue) = queue.write() {
214            while let Some(request) = queue.pop_front() {
215                if let Ok(mut result) = request.result.write() {
216                    *result = JobState::Consumed;
217                }
218            }
219        }
220    }
221
222    fn worker_thread(
223        host_producer: HostProducer,
224        is_running: Arc<AtomicBool>,
225        queue: Arc<RwLock<VecDeque<JobRequest>>>,
226        running_job_result: Arc<RwLock<Option<JobResult>>>,
227    ) {
228        let panic_hook = std::panic::take_hook();
229        let is_running_ = is_running.clone();
230        let queue_ = queue.clone();
231        let running_job_result_ = running_job_result.clone();
232        std::panic::set_hook(Box::new(move |info| {
233            Self::consume_requests(&is_running_, &queue_, &running_job_result_);
234            panic_hook(info);
235        }));
236
237        let mut host = host_producer.produce();
238        host.context()
239            .set_custom(Jobs::HOST_PRODUCER_CUSTOM, host_producer);
240        is_running.store(true, Ordering::SeqCst);
241        while is_running.load(Ordering::SeqCst) {
242            let request = queue
243                .try_write()
244                .ok()
245                .and_then(|mut queue| queue.pop_front());
246            if let Some(request) = request {
247                let (context, registry) = host.context_and_registry();
248                if let Some(function) = registry.find_function(FunctionQuery {
249                    name: Some(request.function_name.into()),
250                    module_name: request.function_module_name.map(|name| name.into()),
251                    ..Default::default()
252                }) {
253                    if let Ok(mut result) = running_job_result.write() {
254                        *result = Some(request.result.clone());
255                    }
256                    if let Ok(mut result) = request.result.write() {
257                        *result = JobState::Running;
258                    }
259                    for argument in request.arguments.into_iter().rev() {
260                        context.stack().push(Reference::from(argument));
261                    }
262                    function.invoke(context, registry);
263                    let output = Transferable::from(context.stack().pop::<Reference>().unwrap());
264                    if let Ok(mut result) = request.result.write() {
265                        *result = JobState::Done(output);
266                    }
267                    if let Ok(mut result) = running_job_result.write() {
268                        *result = None;
269                    }
270                }
271            }
272        }
273        is_running.store(false, Ordering::SeqCst);
274        Self::consume_requests(&is_running, &queue, &running_job_result);
275    }
276}
277
278struct JobRequest {
279    function_name: String,
280    function_module_name: Option<String>,
281    arguments: Vec<Transferable>,
282    result: JobResult,
283}
284
285#[derive(Default)]
286enum JobState {
287    #[default]
288    Pending,
289    Running,
290    Done(Transferable),
291    Consumed,
292}
293
294impl JobState {
295    fn consume(&mut self) -> Reference {
296        let state = std::mem::replace(self, JobState::Consumed);
297        if let Self::Done(transferable) = state {
298            Reference::from(transferable)
299        } else {
300            *self = state;
301            Reference::null()
302        }
303    }
304}
305
306#[derive(IntuicioStruct, Default, Clone)]
307#[intuicio(name = "Job", module_name = "job", override_send = false)]
308pub struct Job {
309    #[intuicio(ignore)]
310    result: JobResult,
311}
312
313#[intuicio_methods(module_name = "job")]
314impl Job {
315    #[intuicio_method(use_registry)]
316    pub fn is_pending(registry: &Registry, job: Reference) -> Reference {
317        let job = job.read::<Job>().unwrap();
318        Reference::new_boolean(
319            job.result
320                .try_read()
321                .map(|state| matches!(*state, JobState::Pending))
322                .unwrap_or_default(),
323            registry,
324        )
325    }
326
327    #[intuicio_method(use_registry)]
328    pub fn is_running(registry: &Registry, job: Reference) -> Reference {
329        let job = job.read::<Job>().unwrap();
330        Reference::new_boolean(
331            job.result
332                .try_read()
333                .map(|state| matches!(*state, JobState::Running))
334                .unwrap_or_default(),
335            registry,
336        )
337    }
338
339    #[intuicio_method(use_registry)]
340    pub fn is_done(registry: &Registry, job: Reference) -> Reference {
341        let job = job.read::<Job>().unwrap();
342        Reference::new_boolean(
343            job.result
344                .try_read()
345                .map(|state| matches!(*state, JobState::Done(_)))
346                .unwrap_or_default(),
347            registry,
348        )
349    }
350
351    #[intuicio_method(use_registry)]
352    pub fn is_consumed(registry: &Registry, job: Reference) -> Reference {
353        let job = job.read::<Job>().unwrap();
354        Reference::new_boolean(
355            job.result
356                .try_read()
357                .map(|state| matches!(*state, JobState::Consumed))
358                .unwrap_or_default(),
359            registry,
360        )
361    }
362
363    #[intuicio_method()]
364    pub fn consume(mut job: Reference) -> Reference {
365        let job = job.write::<Job>().unwrap();
366        if let Ok(mut state) = job.result.try_write() {
367            return state.consume();
368        }
369        Reference::null()
370    }
371
372    #[intuicio_method()]
373    pub fn wait_then_consume(mut job: Reference) -> Reference {
374        let job = job.write::<Job>().unwrap();
375        loop {
376            if let Ok(mut state) = job.result.try_write() {
377                if matches!(*state, JobState::Done(_)) {
378                    return state.consume();
379                } else if matches!(*state, JobState::Consumed) {
380                    return Reference::null();
381                }
382            }
383        }
384    }
385}
386
387pub fn install(registry: &mut Registry) {
388    registry.add_type(Jobs::define_struct(registry));
389    registry.add_type(Job::define_struct(registry));
390    registry.add_function(Jobs::new__define_function(registry));
391    registry.add_function(Jobs::sleep__define_function(registry));
392    registry.add_function(Jobs::workers__define_function(registry));
393    registry.add_function(Jobs::workers_alive__define_function(registry));
394    registry.add_function(Jobs::schedule__define_function(registry));
395    registry.add_function(Job::is_pending__define_function(registry));
396    registry.add_function(Job::is_running__define_function(registry));
397    registry.add_function(Job::is_done__define_function(registry));
398    registry.add_function(Job::is_consumed__define_function(registry));
399    registry.add_function(Job::consume__define_function(registry));
400    registry.add_function(Job::wait_then_consume__define_function(registry));
401}