gevulot_rs/models/
workflow.rs

1//! Workflow model and related types for managing workflow execution.
2//!
3//! This module provides the core workflow model used throughout the system, including:
4//! - Workflow specification with stages and tasks
5//! - Status tracking for workflow execution
6//! - Metadata like tags and labels
7//! - Protobuf serialization/deserialization
8
9use super::{Label, Metadata, TaskSpec};
10use crate::proto::gevulot::gevulot;
11use serde::{Deserialize, Serialize};
12
13/// Represents a complete workflow definition with metadata, specification and status
14///
15/// A workflow consists of one or more stages that are executed sequentially. Each stage
16/// contains one or more tasks that can be executed in parallel.
17///
18/// # Examples
19///
20/// Creating a basic workflow:
21/// ```
22/// use crate::models::Workflow;
23///
24/// let workflow = serde_json::from_str::<Workflow>(r#"{
25///     "kind": "Workflow",
26///     "version": "v0",
27///     "metadata": {
28///         "name": "my-workflow",
29///         "tags": ["compute"]
30///     },
31///     "spec": {
32///         "stages": [{
33///             "tasks": [{
34///                 "image": "alpine",
35///                 "resources": {
36///                     "cpus": "1cpu",
37///                     "memory": "1GiB"
38///                 }
39///             }]
40///         }]
41///     }
42/// }"#).unwrap();
43/// ```
44#[derive(Serialize, Deserialize, Debug)]
45pub struct Workflow {
46    pub kind: String,
47    pub version: String,
48    #[serde(default)]
49    pub metadata: Metadata,
50    pub spec: WorkflowSpec,
51    pub status: Option<WorkflowStatus>,
52}
53
54// Converts a protobuf workflow message into our internal Workflow model
55impl From<gevulot::Workflow> for Workflow {
56    fn from(proto: gevulot::Workflow) -> Self {
57        // Create a new workflow, carefully mapping all protobuf fields to our model
58        Workflow {
59            kind: "Workflow".to_string(),
60            version: "v0".to_string(),
61            metadata: Metadata {
62                id: proto.metadata.as_ref().map(|m| m.id.clone()),
63                name: proto
64                    .metadata
65                    .as_ref()
66                    .map(|m| m.name.clone())
67                    .unwrap_or_default(),
68                creator: proto.metadata.as_ref().map(|m| m.creator.clone()),
69                description: proto
70                    .metadata
71                    .as_ref()
72                    .map(|m| m.desc.clone())
73                    .unwrap_or_default(),
74                tags: proto
75                    .metadata
76                    .as_ref()
77                    .map(|m| m.tags.clone())
78                    .unwrap_or_default(),
79                labels: proto
80                    .metadata
81                    .as_ref()
82                    .map(|m| m.labels.clone())
83                    .unwrap_or_default()
84                    .into_iter()
85                    .map(|l| Label {
86                        key: l.key,
87                        value: l.value,
88                    })
89                    .collect(),
90                workflow_ref: None,
91            },
92            spec: proto.spec.map(|s| s.into()).unwrap(),
93            status: proto.status.map(|s| s.into()),
94        }
95    }
96}
97
98/// Represents a single stage in a workflow containing one or more tasks
99///
100/// Tasks within a stage can be executed in parallel. The workflow will only
101/// proceed to the next stage once all tasks in the current stage are complete.
102#[derive(Serialize, Deserialize, Debug)]
103pub struct WorkflowStage {
104    pub tasks: Vec<TaskSpec>,
105}
106
107/// Specification for a workflow defining its stages and tasks
108///
109/// The stages are executed sequentially, with tasks in each stage potentially
110/// running in parallel depending on available resources.
111#[derive(Serialize, Deserialize, Debug)]
112pub struct WorkflowSpec {
113    pub stages: Vec<WorkflowStage>,
114}
115
116// Converts a protobuf workflow spec into our internal WorkflowSpec model
117impl From<gevulot::WorkflowSpec> for WorkflowSpec {
118    fn from(proto: gevulot::WorkflowSpec) -> Self {
119        // Map each protobuf stage to our stage model, converting tasks as well
120        WorkflowSpec {
121            stages: proto
122                .stages
123                .into_iter()
124                .map(|stage| WorkflowStage {
125                    tasks: stage.tasks.into_iter().map(|t| t.into()).collect(),
126                })
127                .collect(),
128        }
129    }
130}
131
132/// Status information for a single stage in a workflow
133///
134/// Tracks which tasks have been created and how many have completed.
135#[derive(Serialize, Deserialize, Debug)]
136pub struct WorkflowStageStatus {
137    #[serde(rename = "taskIds")]
138    pub task_ids: Vec<String>,
139    #[serde(rename = "finishedTasks")]
140    pub finished_tasks: u64,
141}
142
143/// Current status of a workflow's execution
144///
145/// Tracks:
146/// - Overall workflow state (Pending, Running, Done, Failed)
147/// - Which stage is currently executing
148/// - Status of each stage including task completion
149#[derive(Serialize, Deserialize, Debug)]
150pub struct WorkflowStatus {
151    pub state: String,
152    #[serde(rename = "currentStage")]
153    pub current_stage: u64,
154    pub stages: Vec<WorkflowStageStatus>,
155}
156
157// Converts a protobuf workflow status into our internal WorkflowStatus model
158impl From<gevulot::WorkflowStatus> for WorkflowStatus {
159    fn from(proto: gevulot::WorkflowStatus) -> Self {
160        WorkflowStatus {
161            // Map numeric states to human readable strings
162            state: match proto.state {
163                0 => "Pending".to_string(),
164                1 => "Running".to_string(),
165                2 => "Done".to_string(),
166                3 => "Failed".to_string(),
167                _ => "Unknown".to_string(),
168            },
169            current_stage: proto.current_stage,
170            stages: proto
171                .stages
172                .into_iter()
173                .map(|s| WorkflowStageStatus {
174                    task_ids: s.task_ids,
175                    finished_tasks: s.finished_tasks,
176                })
177                .collect(),
178        }
179    }
180}
181
182// Unit tests to verify workflow serialization/deserialization and field mapping
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use serde_json::json;
187
188    #[test]
189    fn test_parse_workflow() {
190        // Test parsing a complete workflow JSON with all fields populated
191        let workflow = serde_json::from_value::<Workflow>(json!({
192            "kind": "Workflow",
193            "version": "v0",
194            "metadata": {
195                "id": "test-id",
196                "name": "Test Workflow",
197                "creator": "test-creator",
198                "description": "Test Workflow Description",
199                "tags": ["tag1", "tag2"],
200                "labels": [
201                    {"key": "label1", "value": "value1"},
202                    {"key": "label2", "value": "value2"}
203                ]
204            },
205            "spec": {
206                "stages": [
207                    {
208                        "tasks": [
209                            {
210                                "image": "test-image-1",
211                                "resources": {
212                                    "cpus": "1cpu",
213                                    "gpus": "1gpu",
214                                    "memory": "1GiB",
215                                    "time": "1hr"
216                                }
217                            },
218                            {
219                                "image": "test-image-2",
220                                "resources": {
221                                    "cpus": "2cpu",
222                                    "gpus": "2gpu",
223                                    "memory": "2GiB",
224                                    "time": "2hr"
225                                }
226                            }
227                        ]
228                    }
229                ]
230            },
231            "status": {
232                "state": "Running",
233                "currentStage": 0,
234                "stages": [
235                    {
236                        "taskIds": ["task-1", "task-2"],
237                        "finishedTasks": 1
238                    }
239                ]
240            }
241        }))
242        .unwrap();
243
244        // Verify metadata
245        assert_eq!(workflow.metadata.id, Some("test-id".to_string()));
246        assert_eq!(workflow.metadata.name, "Test Workflow");
247        assert_eq!(workflow.metadata.creator, Some("test-creator".to_string()));
248        assert_eq!(workflow.metadata.description, "Test Workflow Description");
249        assert_eq!(workflow.metadata.tags, vec!["tag1", "tag2"]);
250        assert_eq!(workflow.metadata.labels.len(), 2);
251        assert_eq!(workflow.metadata.labels[0].key, "label1");
252        assert_eq!(workflow.metadata.labels[0].value, "value1");
253
254        // Verify spec
255        assert_eq!(workflow.spec.stages.len(), 1);
256        assert_eq!(workflow.spec.stages[0].tasks.len(), 2);
257        assert_eq!(workflow.spec.stages[0].tasks[0].image, "test-image-1");
258        assert_eq!(workflow.spec.stages[0].tasks[1].image, "test-image-2");
259
260        // Verify status
261        let status = workflow.status.unwrap();
262        assert_eq!(status.state, "Running");
263        assert_eq!(status.current_stage, 0);
264        assert_eq!(status.stages.len(), 1);
265        assert_eq!(status.stages[0].task_ids, vec!["task-1", "task-2"]);
266        assert_eq!(status.stages[0].finished_tasks, 1);
267    }
268
269    #[test]
270    fn test_parse_workflow_with_minimum() {
271        // Test parsing a minimal workflow JSON with only required fields
272        let workflow = serde_json::from_value::<Workflow>(json!({
273            "kind": "Workflow",
274            "version": "v0",
275            "spec": {
276                "stages": []
277            }
278        }))
279        .unwrap();
280
281        assert_eq!(workflow.kind, "Workflow");
282        assert_eq!(workflow.version, "v0");
283        assert_eq!(workflow.spec.stages.len(), 0);
284        assert!(workflow.status.is_none());
285    }
286}