gevulot_rs/models/
task.rs

1//! Task model and related types for managing task execution.
2//!
3//! This module provides the core task model used throughout the system, including:
4//! - Task specification and status
5//! - Resource requirements (CPU, GPU, memory, time)
6//! - Input/output context handling
7//! - Environment variables
8//! - Metadata like tags and labels
9
10use crate::proto::gevulot::gevulot;
11use serde::{Deserialize, Serialize};
12
13use super::serialization_helpers::DefaultFactorOneMegabyte;
14
15/// Represents a complete task definition with metadata, specification and status
16///
17/// # Examples
18///
19/// Creating a basic task:
20/// ```
21/// use crate::models::Task;
22///
23/// let task = serde_json::from_str::<Task>(r#"{
24///     "kind": "Task",
25///     "version": "v0",
26///     "spec": {
27///         "image": "ubuntu:latest",
28///         "command": ["echo", "hello"],
29///         "resources": {
30///             "cpus": "1cpu",
31///             "gpus": "0gpu",
32///             "memory": "512mb",
33///             "time": "1h"
34///         }
35///     }
36/// }"#).unwrap();
37/// ```
38///
39/// Task with input/output contexts:
40/// ```
41/// let task = serde_json::from_str::<Task>(r#"{
42///     "kind": "Task",
43///     "version": "v0",
44///     "spec": {
45///         "image": "processor:v1",
46///         "inputContexts": [{
47///             "source": "input-data",
48///             "target": "/data"
49///         }],
50///         "outputContexts": [{
51///             "source": "/results",
52///             "retentionPeriod": 86400
53///         }],
54///         "resources": {
55///             "cpus": "2cpu",
56///             "gpus": "1gpu",
57///             "memory": "4gb",
58///             "time": "2h"
59///         }
60///     }
61/// }"#).unwrap();
62/// ```
63#[derive(Serialize, Deserialize, Debug)]
64pub struct Task {
65    // The kind is always "Task" - used for type identification in serialized form
66    pub kind: String,
67    // API version, currently "v0"
68    pub version: String,
69    // Task metadata like name, description, tags etc
70    #[serde(default)]
71    pub metadata: crate::models::Metadata,
72    // Core task specification
73    pub spec: TaskSpec,
74    // Optional runtime status
75    pub status: Option<TaskStatus>,
76}
77
78// Conversion from protobuf Task message
79impl From<gevulot::Task> for Task {
80    fn from(proto: gevulot::Task) -> Self {
81        // Extract workflow reference if present in spec
82        let workflow_ref = match proto.spec.as_ref() {
83            Some(spec) if !spec.workflow_ref.is_empty() => Some(spec.workflow_ref.clone()),
84            _ => None,
85        };
86
87        Task {
88            kind: "Task".to_string(),
89            version: "v0".to_string(),
90            metadata: crate::models::Metadata {
91                id: proto.metadata.as_ref().map(|m| m.id.clone()),
92                name: proto
93                    .metadata
94                    .as_ref()
95                    .map(|m| m.name.clone())
96                    .unwrap_or_default(),
97                creator: proto.metadata.as_ref().map(|m| m.creator.clone()),
98                description: proto
99                    .metadata
100                    .as_ref()
101                    .map(|m| m.desc.clone())
102                    .unwrap_or_default(),
103                tags: proto
104                    .metadata
105                    .as_ref()
106                    .map(|m| m.tags.clone())
107                    .unwrap_or_default(),
108                labels: proto
109                    .metadata
110                    .as_ref()
111                    .map(|m| m.labels.clone())
112                    .unwrap_or_default()
113                    .into_iter()
114                    .map(|l| crate::models::Label {
115                        key: l.key,
116                        value: l.value,
117                    })
118                    .collect(),
119                workflow_ref,
120            },
121            spec: proto.spec.unwrap().into(),
122            status: proto.status.map(|s| s.into()),
123        }
124    }
125}
126
127/// Task specification containing all execution parameters
128///
129/// # Examples
130///
131/// Basic spec with just image and resources:
132/// ```
133/// use crate::models::TaskSpec;
134///
135/// let spec = serde_json::from_str::<TaskSpec>(r#"{
136///     "image": "ubuntu:latest",
137///     "resources": {
138///         "cpus": "1cpu",
139///         "gpus": "0gpu",
140///         "memory": "512mb",
141///         "time": "1h"
142///     }
143/// }"#).unwrap();
144/// ```
145#[derive(Serialize, Deserialize, Debug)]
146pub struct TaskSpec {
147    // Container image to run
148    pub image: String,
149    // Optional command to override image entrypoint
150    #[serde(default)]
151    pub command: Vec<String>,
152    // Optional arguments for the command
153    #[serde(default)]
154    pub args: Vec<String>,
155    // Environment variables to set in container
156    #[serde(default)]
157    pub env: Vec<TaskEnv>,
158    // Input data contexts to mount
159    #[serde(rename = "inputContexts", default)]
160    pub input_contexts: Vec<InputContext>,
161    // Output data contexts to capture
162    #[serde(rename = "outputContexts", default)]
163    pub output_contexts: Vec<OutputContext>,
164    // Resource requirements
165    pub resources: TaskResources,
166    // Whether to store stdout stream
167    #[serde(rename = "storeStdout", default)]
168    pub store_stdout: bool,
169    // Whether to store stderr stream
170    #[serde(rename = "storeStderr", default)]
171    pub store_stderr: bool,
172}
173
174// Conversion from protobuf TaskSpec message
175impl From<gevulot::TaskSpec> for TaskSpec {
176    fn from(proto: gevulot::TaskSpec) -> Self {
177        TaskSpec {
178            image: proto.image,
179            command: proto.command,
180            args: proto.args,
181            env: proto
182                .env
183                .into_iter()
184                .map(|e| TaskEnv {
185                    name: e.name,
186                    value: e.value,
187                })
188                .collect(),
189            input_contexts: proto
190                .input_contexts
191                .into_iter()
192                .map(|ic| InputContext {
193                    source: ic.source,
194                    target: ic.target,
195                })
196                .collect(),
197            output_contexts: proto
198                .output_contexts
199                .into_iter()
200                .map(|oc| OutputContext {
201                    source: oc.source,
202                    retention_period: oc.retention_period as i64,
203                })
204                .collect(),
205            resources: TaskResources {
206                cpus: (proto.cpus as i64).into(),
207                gpus: (proto.gpus as i64).into(),
208                memory: (proto.memory as i64).into(),
209                time: (proto.time as i64).into(),
210            },
211            store_stdout: proto.store_stdout,
212            store_stderr: proto.store_stderr,
213        }
214    }
215}
216
217/// Environment variable definition for task container
218#[derive(Serialize, Deserialize, Debug)]
219pub struct TaskEnv {
220    pub name: String,
221    pub value: String,
222}
223
224/// Input context for mounting data into task container
225#[derive(Serialize, Deserialize, Debug)]
226pub struct InputContext {
227    // Source data identifier
228    pub source: String,
229    // Target mount path in container
230    pub target: String,
231}
232
233/// Output context for capturing data from task container
234#[derive(Serialize, Deserialize, Debug)]
235pub struct OutputContext {
236    // Source path in container to capture
237    pub source: String,
238    // How long to retain the output data
239    #[serde(rename = "retentionPeriod")]
240    pub retention_period: i64,
241}
242
243/// Resource requirements for task execution
244#[derive(Serialize, Deserialize, Debug)]
245pub struct TaskResources {
246    // CPU cores required (supports units like "2cpu", "500mcpu")
247    pub cpus: crate::models::CoreUnit,
248    // GPU cores required (supports units like "1gpu", "500mgpu")
249    pub gpus: crate::models::CoreUnit,
250    // Memory required (supports units like "1gb", "512mb")
251    pub memory: crate::models::ByteUnit<DefaultFactorOneMegabyte>, // when no unit is specified, we assume MiB
252    // Time limit (supports units like "1h", "30m")
253    pub time: crate::models::TimeUnit,
254}
255
256/// Runtime status of a task
257#[derive(Serialize, Deserialize, Debug)]
258pub struct TaskStatus {
259    // Current state (Pending, Running, Done, Failed etc)
260    pub state: String,
261    // Timestamps for task lifecycle
262    #[serde(rename = "createdAt")]
263    pub created_at: i64,
264    #[serde(rename = "startedAt")]
265    pub started_at: i64,
266    #[serde(rename = "completedAt")]
267    pub completed_at: i64,
268    // Workers assigned/active for this task
269    #[serde(rename = "assignedWorkers")]
270    pub assigned_workers: Vec<String>,
271    #[serde(rename = "activeWorker")]
272    pub active_worker: String,
273    // Exit code if task completed
274    #[serde(rename = "exitCode")]
275    pub exit_code: Option<i64>,
276    // Output context identifiers
277    #[serde(rename = "outputContexts")]
278    pub output_contexts: Vec<String>,
279    // Captured output streams if enabled
280    pub stdout: Option<String>,
281    pub stderr: Option<String>,
282    // Error message if failed
283    pub error: Option<String>,
284}
285
286// Conversion from protobuf TaskStatus message
287impl From<gevulot::TaskStatus> for TaskStatus {
288    fn from(proto: gevulot::TaskStatus) -> Self {
289        // Map numeric state to string representation
290        let mut exit_code = None;
291        let state = match proto.state {
292            0 => "Pending".to_string(),
293            1 => "Running".to_string(),
294            2 => "Declined".to_string(),
295            3 => {
296                exit_code = Some(proto.exit_code);
297                "Done".to_string()
298            }
299            4 => {
300                exit_code = Some(proto.exit_code);
301                "Failed".to_string()
302            }
303            _ => "Unknown".to_string(),
304        };
305
306        // Convert empty strings to None
307        let error = if proto.error.is_empty() {
308            None
309        } else {
310            Some(proto.error)
311        };
312        let stdout = if proto.stdout.is_empty() {
313            None
314        } else {
315            Some(proto.stdout)
316        };
317        let stderr = if proto.stderr.is_empty() {
318            None
319        } else {
320            Some(proto.stderr)
321        };
322
323        TaskStatus {
324            state,
325            created_at: proto.created_at as i64,
326            started_at: proto.started_at as i64,
327            completed_at: proto.completed_at as i64,
328            assigned_workers: proto.assigned_workers,
329            active_worker: proto.active_worker,
330            exit_code,
331            output_contexts: proto.output_contexts,
332            error,
333            stdout,
334            stderr,
335        }
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use serde_json::json;
343
344    #[test]
345    fn test_parse_task_with_units() {
346        let task = serde_json::from_value::<Task>(json!({
347            "kind": "Task",
348            "version": "v0",
349            "spec": {
350                "image": "test",
351                "resources": {
352                    "cpus": "1cpu",
353                    "gpus": "1gpu",
354                    "memory": "1024mb",
355                    "time": "1hr"
356                }
357            }
358        }))
359        .unwrap();
360
361        assert_eq!(
362            task.spec.resources.cpus,
363            crate::models::CoreUnit::String("1cpu".to_string())
364        );
365        assert_eq!(
366            task.spec.resources.gpus,
367            crate::models::CoreUnit::String("1gpu".to_string())
368        );
369        assert_eq!(
370            task.spec.resources.memory,
371            crate::models::ByteUnit::String("1024mb".to_string())
372        );
373        assert_eq!(
374            task.spec.resources.time,
375            crate::models::TimeUnit::String("1hr".to_string())
376        );
377    }
378
379    #[test]
380    fn test_parse_task_without_units() {
381        let task = serde_json::from_value::<Task>(json!({
382            "kind": "Task",
383            "version": "v0",
384            "metadata": {
385                "name": "Test Task",
386                "creator": "test",
387                "description": "Test Task",
388                "tags": [],
389                "labels": []
390            },
391            "spec": {
392                "image": "test",
393                "command": ["test"],
394                "args": ["test"],
395                "env": [],
396                "inputContexts": [],
397                "outputContexts": [],
398                "resources": {
399                    "cpus": 1,
400                    "gpus": 1,
401                    "memory": 1024,
402                    "time": 1
403                }
404            }
405        }))
406        .unwrap();
407
408        assert_eq!(task.spec.resources.cpus.millicores(), Ok(1000));
409        assert_eq!(task.spec.resources.gpus.millicores(), Ok(1000));
410        assert_eq!(task.spec.resources.memory.bytes(), Ok(1024));
411        assert_eq!(task.spec.resources.time.seconds(), Ok(1));
412    }
413
414    #[test]
415    fn test_parse_task_without_much() {
416        let task = serde_json::from_value::<Task>(json!({
417            "kind": "Task",
418            "version": "v0",
419            "spec": {
420                "image": "test",
421                "resources": {
422                    "cpus": "1000 MCpu",
423                    "gpus": "1000 MGpu",
424                    "memory": "1024 MiB",
425                    "time": "1 hr"
426                }
427            }
428        }))
429        .expect("Failed to parse task");
430
431        assert_eq!(task.spec.resources.cpus.millicores(), Ok(1000));
432        assert_eq!(task.spec.resources.gpus.millicores(), Ok(1000));
433        assert_eq!(task.spec.resources.memory.bytes(), Ok(1024 * 1024 * 1024));
434        assert_eq!(task.spec.resources.time.seconds(), Ok(60 * 60));
435    }
436
437    #[test]
438    fn test_parse_task_yaml() {
439        let task = serde_yaml::from_str::<Task>(
440            r#"
441            kind: Task
442            version: v0
443            spec:
444                image: test
445                resources:
446                    cpus: 1000 MCpu
447                    gpus: 1000 MGpu
448                    memory: 1024 MiB
449                    time: 1 hr
450        "#,
451        )
452        .expect("Failed to parse task");
453
454        assert_eq!(task.spec.resources.cpus.millicores(), Ok(1000));
455        assert_eq!(task.spec.resources.gpus.millicores(), Ok(1000));
456        assert_eq!(task.spec.resources.memory.bytes(), Ok(1024 * 1024 * 1024));
457        assert_eq!(task.spec.resources.time.seconds(), Ok(60 * 60));
458    }
459
460    #[test]
461    fn test_parse_task_with_env() {
462        let task = serde_json::from_value::<Task>(json!({
463            "kind": "Task",
464            "version": "v0",
465            "spec": {
466                "image": "test",
467                "env": [
468                    {
469                        "name": "FOO",
470                        "value": "bar"
471                    },
472                    {
473                        "name": "DEBUG",
474                        "value": "1"
475                    }
476                ],
477                "resources": {
478                    "cpus": "1000 MCpu",
479                    "gpus": "1000 MGpu",
480                    "memory": "1024 MiB",
481                    "time": "1 hr"
482                }
483            }
484        }))
485        .expect("Failed to parse task");
486
487        assert_eq!(
488            task.spec
489                .env
490                .iter()
491                .find(|e| e.name == "FOO")
492                .unwrap()
493                .value,
494            "bar"
495        );
496        assert_eq!(
497            task.spec
498                .env
499                .iter()
500                .find(|e| e.name == "DEBUG")
501                .unwrap()
502                .value,
503            "1"
504        );
505    }
506
507    #[test]
508    fn test_parse_task_with_input_context() {
509        let task = serde_json::from_value::<Task>(json!({
510            "kind": "Task",
511            "version": "v0",
512            "spec": {
513                "image": "test",
514                "inputContexts": [
515                    {
516                        "source": "pin1",
517                        "target": "/input/data1"
518                    },
519                    {
520                        "source": "pin2",
521                        "target": "/input/data2"
522                    }
523                ],
524                "resources": {
525                    "cpus": "1000 MCpu",
526                    "gpus": "1000 MGpu",
527                    "memory": "1024 MiB",
528                    "time": "1 hr"
529                }
530            }
531        }))
532        .expect("Failed to parse task");
533
534        let input = &task.spec.input_contexts[0];
535        assert_eq!(input.source, "pin1");
536        assert_eq!(input.target, "/input/data1");
537        let input = &task.spec.input_contexts[1];
538        assert_eq!(input.source, "pin2");
539        assert_eq!(input.target, "/input/data2");
540    }
541
542    #[test]
543    fn test_parse_task_with_output_context() {
544        let task = serde_json::from_value::<Task>(json!({
545            "kind": "Task",
546            "version": "v0",
547            "spec": {
548                "image": "test",
549                "outputContexts": [
550                    {
551                        "source": "/output/result1",
552                        "retentionPeriod": 1000
553                    },
554                    {
555                        "source": "/output/result2",
556                        "retentionPeriod": 1000
557                    }
558                ],
559                "resources": {
560                    "cpus": "1000 MCpu",
561                    "gpus": "1000 MGpu",
562                    "memory": "1024 MiB",
563                    "time": "1 hr"
564                }
565            }
566        }))
567        .expect("Failed to parse task");
568
569        let output = &task.spec.output_contexts[0];
570        assert_eq!(output.source, "/output/result1");
571        assert_eq!(output.retention_period, 1000);
572        let output = &task.spec.output_contexts[1];
573        assert_eq!(output.source, "/output/result2");
574        assert_eq!(output.retention_period, 1000);
575    }
576
577    #[test]
578    fn test_parse_task_with_metadata() {
579        let task = serde_json::from_value::<Task>(json!({
580            "kind": "Task",
581            "version": "v0",
582            "metadata": {
583                "name": "test-task",
584                "description": "A test task",
585                "tags": ["test", "example"],
586                "labels": [
587                    {"key": "env", "value": "test"},
588                    {"key": "priority", "value": "high"}
589                ]
590            },
591            "spec": {
592                "image": "test",
593                "resources": {
594                    "cpus": "1000 MCpu",
595                    "gpus": "1000 MGpu",
596                    "memory": "1024 MiB",
597                    "time": "1 hr"
598                }
599            }
600        }))
601        .expect("Failed to parse task");
602
603        assert_eq!(task.metadata.name, "test-task");
604        assert_eq!(task.metadata.description, "A test task");
605        assert_eq!(task.metadata.tags, vec!["test", "example"]);
606        assert_eq!(task.metadata.labels[0].key, "env");
607        assert_eq!(task.metadata.labels[0].value, "test");
608        assert_eq!(task.metadata.labels[1].key, "priority");
609        assert_eq!(task.metadata.labels[1].value, "high");
610    }
611}