intuicio_frontend_simpleton/library/
jobs.rs1use super::closure::Closure;
2use crate::{Array, Function, Integer, Real, Reference, Transferable};
3use intuicio_core::{
4 context::Context, function::FunctionQuery, host::HostProducer, registry::Registry,
5 IntuicioStruct,
6};
7use intuicio_derive::{intuicio_method, intuicio_methods, IntuicioStruct};
8use std::{
9 collections::VecDeque,
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc, RwLock,
13 },
14 thread::{available_parallelism, spawn, JoinHandle},
15 time::Duration,
16};
17
18type WorkerQueue = Arc<RwLock<VecDeque<JobRequest>>>;
19type JobResult = Arc<RwLock<JobState>>;
20
21#[derive(IntuicioStruct, Default)]
22#[intuicio(name = "Jobs", module_name = "jobs", override_send = false)]
23pub struct Jobs {
24 #[intuicio(ignore)]
25 workers: Vec<Worker>,
26}
27
28#[intuicio_methods(module_name = "jobs")]
29impl Jobs {
30 pub const HOST_PRODUCER_CUSTOM: &'static str = "Jobs::host_producer";
31
32 #[allow(clippy::new_ret_no_self)]
33 #[intuicio_method(use_context, use_registry)]
34 pub fn new(context: &Context, registry: &Registry, workers_count: Reference) -> Reference {
35 let host_producer = match context.custom::<HostProducer>(Self::HOST_PRODUCER_CUSTOM) {
36 Some(host_producer) => host_producer.clone(),
37 None => return Reference::null(),
38 };
39 let workers_count = workers_count
40 .read::<Integer>()
41 .map(|count| *count as usize)
42 .unwrap_or_else(|| {
43 available_parallelism()
44 .map(|count| count.get())
45 .unwrap_or_default()
46 });
47 Reference::new(
48 Self {
49 workers: (0..workers_count)
50 .map(|_| Worker::new(host_producer.clone()))
51 .collect(),
52 },
53 registry,
54 )
55 }
56
57 #[intuicio_method()]
58 pub fn sleep(seconds: Reference) -> Reference {
59 std::thread::sleep(Duration::from_secs_f64(*seconds.read::<Real>().unwrap()));
60 Reference::null()
61 }
62
63 #[intuicio_method(use_registry)]
64 pub fn workers(registry: &Registry, jobs: Reference) -> Reference {
65 let jobs = jobs.read::<Jobs>().unwrap();
66 Reference::new_integer(jobs.workers.len() as Integer, registry)
67 }
68
69 #[intuicio_method(use_registry)]
70 pub fn workers_alive(registry: &Registry, jobs: Reference) -> Reference {
71 let jobs = jobs.read::<Jobs>().unwrap();
72 Reference::new_array(
73 jobs.workers
74 .iter()
75 .map(|worker| {
76 Reference::new_boolean(worker.is_running.load(Ordering::SeqCst), registry)
77 })
78 .collect(),
79 registry,
80 )
81 }
82
83 #[intuicio_method(use_registry)]
84 pub fn schedule(
85 registry: &Registry,
86 jobs: Reference,
87 executor: Reference,
88 arguments: Reference,
89 ) -> Reference {
90 let jobs = jobs.read::<Jobs>().unwrap();
91 let arguments = arguments.read::<Array>().unwrap();
92 let (function_name, function_module_name, captured) =
93 if let Some(function) = executor.read::<Function>() {
94 let signature = function.handle().unwrap().signature();
95 (
96 signature.name.to_owned(),
97 signature.module_name.to_owned(),
98 vec![],
99 )
100 } else if let Some(closure) = executor.read::<Closure>() {
101 let signature = closure.function.handle().unwrap().signature();
102 (
103 signature.name.to_owned(),
104 signature.module_name.to_owned(),
105 closure.captured.to_owned(),
106 )
107 } else {
108 return Reference::null();
109 };
110 let worker = jobs
111 .workers
112 .iter()
113 .filter(|worker| {
114 worker
115 .handle
116 .as_ref()
117 .map(|handle| !handle.is_finished())
118 .unwrap_or_default()
119 })
120 .min_by(|a, b| {
121 let a = a
122 .queue
123 .try_read()
124 .map(|queue| queue.len())
125 .unwrap_or_default();
126 let b = b
127 .queue
128 .try_read()
129 .map(|queue| queue.len())
130 .unwrap_or_default();
131 a.cmp(&b)
132 });
133 if let Some(worker) = worker {
134 return Reference::new(
135 worker.schedule(function_name, function_module_name, &captured, &arguments),
136 registry,
137 );
138 }
139 Reference::null()
140 }
141}
142
143struct Worker {
144 handle: Option<JoinHandle<()>>,
145 is_running: Arc<AtomicBool>,
146 queue: WorkerQueue,
147 _running_job_result: Arc<RwLock<Option<JobResult>>>,
148}
149
150impl Drop for Worker {
151 fn drop(&mut self) {
152 self.is_running.store(false, Ordering::SeqCst);
153 if let Some(handle) = self.handle.take() {
154 let _ = handle.join();
155 }
156 }
157}
158
159impl Worker {
160 pub fn new(host_producer: HostProducer) -> Self {
161 let queue = WorkerQueue::default();
162 let queue_ = queue.clone();
163 let is_running = Arc::new(AtomicBool::new(false));
164 let is_running_ = is_running.clone();
165 let _running_job_result: Arc<RwLock<Option<JobResult>>> = Default::default();
166 let running_job_result = _running_job_result.clone();
167 Self {
168 handle: Some(spawn(move || {
169 Self::worker_thread(host_producer, is_running_, queue_, running_job_result);
170 })),
171 is_running,
172 queue,
173 _running_job_result,
174 }
175 }
176
177 pub fn schedule(
178 &self,
179 function_name: String,
180 function_module_name: Option<String>,
181 captured: &[Reference],
182 arguments: &[Reference],
183 ) -> Job {
184 let result = Job::default();
185 if let Ok(mut queue) = self.queue.write() {
186 queue.push_back(JobRequest {
187 function_name,
188 function_module_name,
189 arguments: captured
190 .iter()
191 .chain(arguments.iter())
192 .map(|argument| Transferable::from(argument.clone()))
193 .collect(),
194 result: result.result.clone(),
195 });
196 }
197 result
198 }
199
200 fn consume_requests(
201 is_running: &Arc<AtomicBool>,
202 queue: &Arc<RwLock<VecDeque<JobRequest>>>,
203 running_job_result: &Arc<RwLock<Option<JobResult>>>,
204 ) {
205 is_running.store(false, Ordering::SeqCst);
206 if let Ok(mut result) = running_job_result.write() {
207 if let Some(result) = result.as_mut() {
208 if let Ok(mut result) = result.write() {
209 *result = JobState::Consumed;
210 }
211 }
212 }
213 if let Ok(mut queue) = queue.write() {
214 while let Some(request) = queue.pop_front() {
215 if let Ok(mut result) = request.result.write() {
216 *result = JobState::Consumed;
217 }
218 }
219 }
220 }
221
222 fn worker_thread(
223 host_producer: HostProducer,
224 is_running: Arc<AtomicBool>,
225 queue: Arc<RwLock<VecDeque<JobRequest>>>,
226 running_job_result: Arc<RwLock<Option<JobResult>>>,
227 ) {
228 let panic_hook = std::panic::take_hook();
229 let is_running_ = is_running.clone();
230 let queue_ = queue.clone();
231 let running_job_result_ = running_job_result.clone();
232 std::panic::set_hook(Box::new(move |info| {
233 Self::consume_requests(&is_running_, &queue_, &running_job_result_);
234 panic_hook(info);
235 }));
236
237 let mut host = host_producer.produce();
238 host.context()
239 .set_custom(Jobs::HOST_PRODUCER_CUSTOM, host_producer);
240 is_running.store(true, Ordering::SeqCst);
241 while is_running.load(Ordering::SeqCst) {
242 let request = queue
243 .try_write()
244 .ok()
245 .and_then(|mut queue| queue.pop_front());
246 if let Some(request) = request {
247 let (context, registry) = host.context_and_registry();
248 if let Some(function) = registry.find_function(FunctionQuery {
249 name: Some(request.function_name.into()),
250 module_name: request.function_module_name.map(|name| name.into()),
251 ..Default::default()
252 }) {
253 if let Ok(mut result) = running_job_result.write() {
254 *result = Some(request.result.clone());
255 }
256 if let Ok(mut result) = request.result.write() {
257 *result = JobState::Running;
258 }
259 for argument in request.arguments.into_iter().rev() {
260 context.stack().push(Reference::from(argument));
261 }
262 function.invoke(context, registry);
263 let output = Transferable::from(context.stack().pop::<Reference>().unwrap());
264 if let Ok(mut result) = request.result.write() {
265 *result = JobState::Done(output);
266 }
267 if let Ok(mut result) = running_job_result.write() {
268 *result = None;
269 }
270 }
271 }
272 }
273 is_running.store(false, Ordering::SeqCst);
274 Self::consume_requests(&is_running, &queue, &running_job_result);
275 }
276}
277
278struct JobRequest {
279 function_name: String,
280 function_module_name: Option<String>,
281 arguments: Vec<Transferable>,
282 result: JobResult,
283}
284
285#[derive(Default)]
286enum JobState {
287 #[default]
288 Pending,
289 Running,
290 Done(Transferable),
291 Consumed,
292}
293
294impl JobState {
295 fn consume(&mut self) -> Reference {
296 let state = std::mem::replace(self, JobState::Consumed);
297 if let Self::Done(transferable) = state {
298 Reference::from(transferable)
299 } else {
300 *self = state;
301 Reference::null()
302 }
303 }
304}
305
306#[derive(IntuicioStruct, Default, Clone)]
307#[intuicio(name = "Job", module_name = "job", override_send = false)]
308pub struct Job {
309 #[intuicio(ignore)]
310 result: JobResult,
311}
312
313#[intuicio_methods(module_name = "job")]
314impl Job {
315 #[intuicio_method(use_registry)]
316 pub fn is_pending(registry: &Registry, job: Reference) -> Reference {
317 let job = job.read::<Job>().unwrap();
318 Reference::new_boolean(
319 job.result
320 .try_read()
321 .map(|state| matches!(*state, JobState::Pending))
322 .unwrap_or_default(),
323 registry,
324 )
325 }
326
327 #[intuicio_method(use_registry)]
328 pub fn is_running(registry: &Registry, job: Reference) -> Reference {
329 let job = job.read::<Job>().unwrap();
330 Reference::new_boolean(
331 job.result
332 .try_read()
333 .map(|state| matches!(*state, JobState::Running))
334 .unwrap_or_default(),
335 registry,
336 )
337 }
338
339 #[intuicio_method(use_registry)]
340 pub fn is_done(registry: &Registry, job: Reference) -> Reference {
341 let job = job.read::<Job>().unwrap();
342 Reference::new_boolean(
343 job.result
344 .try_read()
345 .map(|state| matches!(*state, JobState::Done(_)))
346 .unwrap_or_default(),
347 registry,
348 )
349 }
350
351 #[intuicio_method(use_registry)]
352 pub fn is_consumed(registry: &Registry, job: Reference) -> Reference {
353 let job = job.read::<Job>().unwrap();
354 Reference::new_boolean(
355 job.result
356 .try_read()
357 .map(|state| matches!(*state, JobState::Consumed))
358 .unwrap_or_default(),
359 registry,
360 )
361 }
362
363 #[intuicio_method()]
364 pub fn consume(mut job: Reference) -> Reference {
365 let job = job.write::<Job>().unwrap();
366 if let Ok(mut state) = job.result.try_write() {
367 return state.consume();
368 }
369 Reference::null()
370 }
371
372 #[intuicio_method()]
373 pub fn wait_then_consume(mut job: Reference) -> Reference {
374 let job = job.write::<Job>().unwrap();
375 loop {
376 if let Ok(mut state) = job.result.try_write() {
377 if matches!(*state, JobState::Done(_)) {
378 return state.consume();
379 } else if matches!(*state, JobState::Consumed) {
380 return Reference::null();
381 }
382 }
383 }
384 }
385}
386
387pub fn install(registry: &mut Registry) {
388 registry.add_type(Jobs::define_struct(registry));
389 registry.add_type(Job::define_struct(registry));
390 registry.add_function(Jobs::new__define_function(registry));
391 registry.add_function(Jobs::sleep__define_function(registry));
392 registry.add_function(Jobs::workers__define_function(registry));
393 registry.add_function(Jobs::workers_alive__define_function(registry));
394 registry.add_function(Jobs::schedule__define_function(registry));
395 registry.add_function(Job::is_pending__define_function(registry));
396 registry.add_function(Job::is_running__define_function(registry));
397 registry.add_function(Job::is_done__define_function(registry));
398 registry.add_function(Job::is_consumed__define_function(registry));
399 registry.add_function(Job::consume__define_function(registry));
400 registry.add_function(Job::wait_then_consume__define_function(registry));
401}