1use crate::proto::gevulot::gevulot;
11use serde::{Deserialize, Serialize};
12
13use super::serialization_helpers::DefaultFactorOneMegabyte;
14
15#[derive(Serialize, Deserialize, Debug)]
64pub struct Task {
65 pub kind: String,
67 pub version: String,
69 #[serde(default)]
71 pub metadata: crate::models::Metadata,
72 pub spec: TaskSpec,
74 pub status: Option<TaskStatus>,
76}
77
78impl From<gevulot::Task> for Task {
80 fn from(proto: gevulot::Task) -> Self {
81 let workflow_ref = match proto.spec.as_ref() {
83 Some(spec) if !spec.workflow_ref.is_empty() => Some(spec.workflow_ref.clone()),
84 _ => None,
85 };
86
87 Task {
88 kind: "Task".to_string(),
89 version: "v0".to_string(),
90 metadata: crate::models::Metadata {
91 id: proto.metadata.as_ref().map(|m| m.id.clone()),
92 name: proto
93 .metadata
94 .as_ref()
95 .map(|m| m.name.clone())
96 .unwrap_or_default(),
97 creator: proto.metadata.as_ref().map(|m| m.creator.clone()),
98 description: proto
99 .metadata
100 .as_ref()
101 .map(|m| m.desc.clone())
102 .unwrap_or_default(),
103 tags: proto
104 .metadata
105 .as_ref()
106 .map(|m| m.tags.clone())
107 .unwrap_or_default(),
108 labels: proto
109 .metadata
110 .as_ref()
111 .map(|m| m.labels.clone())
112 .unwrap_or_default()
113 .into_iter()
114 .map(|l| crate::models::Label {
115 key: l.key,
116 value: l.value,
117 })
118 .collect(),
119 workflow_ref,
120 },
121 spec: proto.spec.unwrap().into(),
122 status: proto.status.map(|s| s.into()),
123 }
124 }
125}
126
127#[derive(Serialize, Deserialize, Debug)]
146pub struct TaskSpec {
147 pub image: String,
149 #[serde(default)]
151 pub command: Vec<String>,
152 #[serde(default)]
154 pub args: Vec<String>,
155 #[serde(default)]
157 pub env: Vec<TaskEnv>,
158 #[serde(rename = "inputContexts", default)]
160 pub input_contexts: Vec<InputContext>,
161 #[serde(rename = "outputContexts", default)]
163 pub output_contexts: Vec<OutputContext>,
164 pub resources: TaskResources,
166 #[serde(rename = "storeStdout", default)]
168 pub store_stdout: bool,
169 #[serde(rename = "storeStderr", default)]
171 pub store_stderr: bool,
172}
173
174impl From<gevulot::TaskSpec> for TaskSpec {
176 fn from(proto: gevulot::TaskSpec) -> Self {
177 TaskSpec {
178 image: proto.image,
179 command: proto.command,
180 args: proto.args,
181 env: proto
182 .env
183 .into_iter()
184 .map(|e| TaskEnv {
185 name: e.name,
186 value: e.value,
187 })
188 .collect(),
189 input_contexts: proto
190 .input_contexts
191 .into_iter()
192 .map(|ic| InputContext {
193 source: ic.source,
194 target: ic.target,
195 })
196 .collect(),
197 output_contexts: proto
198 .output_contexts
199 .into_iter()
200 .map(|oc| OutputContext {
201 source: oc.source,
202 retention_period: oc.retention_period as i64,
203 })
204 .collect(),
205 resources: TaskResources {
206 cpus: (proto.cpus as i64).into(),
207 gpus: (proto.gpus as i64).into(),
208 memory: (proto.memory as i64).into(),
209 time: (proto.time as i64).into(),
210 },
211 store_stdout: proto.store_stdout,
212 store_stderr: proto.store_stderr,
213 }
214 }
215}
216
217#[derive(Serialize, Deserialize, Debug)]
219pub struct TaskEnv {
220 pub name: String,
221 pub value: String,
222}
223
224#[derive(Serialize, Deserialize, Debug)]
226pub struct InputContext {
227 pub source: String,
229 pub target: String,
231}
232
233#[derive(Serialize, Deserialize, Debug)]
235pub struct OutputContext {
236 pub source: String,
238 #[serde(rename = "retentionPeriod")]
240 pub retention_period: i64,
241}
242
243#[derive(Serialize, Deserialize, Debug)]
245pub struct TaskResources {
246 pub cpus: crate::models::CoreUnit,
248 pub gpus: crate::models::CoreUnit,
250 pub memory: crate::models::ByteUnit<DefaultFactorOneMegabyte>, pub time: crate::models::TimeUnit,
254}
255
256#[derive(Serialize, Deserialize, Debug)]
258pub struct TaskStatus {
259 pub state: String,
261 #[serde(rename = "createdAt")]
263 pub created_at: i64,
264 #[serde(rename = "startedAt")]
265 pub started_at: i64,
266 #[serde(rename = "completedAt")]
267 pub completed_at: i64,
268 #[serde(rename = "assignedWorkers")]
270 pub assigned_workers: Vec<String>,
271 #[serde(rename = "activeWorker")]
272 pub active_worker: String,
273 #[serde(rename = "exitCode")]
275 pub exit_code: Option<i64>,
276 #[serde(rename = "outputContexts")]
278 pub output_contexts: Vec<String>,
279 pub stdout: Option<String>,
281 pub stderr: Option<String>,
282 pub error: Option<String>,
284}
285
286impl From<gevulot::TaskStatus> for TaskStatus {
288 fn from(proto: gevulot::TaskStatus) -> Self {
289 let mut exit_code = None;
291 let state = match proto.state {
292 0 => "Pending".to_string(),
293 1 => "Running".to_string(),
294 2 => "Declined".to_string(),
295 3 => {
296 exit_code = Some(proto.exit_code);
297 "Done".to_string()
298 }
299 4 => {
300 exit_code = Some(proto.exit_code);
301 "Failed".to_string()
302 }
303 _ => "Unknown".to_string(),
304 };
305
306 let error = if proto.error.is_empty() {
308 None
309 } else {
310 Some(proto.error)
311 };
312 let stdout = if proto.stdout.is_empty() {
313 None
314 } else {
315 Some(proto.stdout)
316 };
317 let stderr = if proto.stderr.is_empty() {
318 None
319 } else {
320 Some(proto.stderr)
321 };
322
323 TaskStatus {
324 state,
325 created_at: proto.created_at as i64,
326 started_at: proto.started_at as i64,
327 completed_at: proto.completed_at as i64,
328 assigned_workers: proto.assigned_workers,
329 active_worker: proto.active_worker,
330 exit_code,
331 output_contexts: proto.output_contexts,
332 error,
333 stdout,
334 stderr,
335 }
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use serde_json::json;
343
344 #[test]
345 fn test_parse_task_with_units() {
346 let task = serde_json::from_value::<Task>(json!({
347 "kind": "Task",
348 "version": "v0",
349 "spec": {
350 "image": "test",
351 "resources": {
352 "cpus": "1cpu",
353 "gpus": "1gpu",
354 "memory": "1024mb",
355 "time": "1hr"
356 }
357 }
358 }))
359 .unwrap();
360
361 assert_eq!(
362 task.spec.resources.cpus,
363 crate::models::CoreUnit::String("1cpu".to_string())
364 );
365 assert_eq!(
366 task.spec.resources.gpus,
367 crate::models::CoreUnit::String("1gpu".to_string())
368 );
369 assert_eq!(
370 task.spec.resources.memory,
371 crate::models::ByteUnit::String("1024mb".to_string())
372 );
373 assert_eq!(
374 task.spec.resources.time,
375 crate::models::TimeUnit::String("1hr".to_string())
376 );
377 }
378
379 #[test]
380 fn test_parse_task_without_units() {
381 let task = serde_json::from_value::<Task>(json!({
382 "kind": "Task",
383 "version": "v0",
384 "metadata": {
385 "name": "Test Task",
386 "creator": "test",
387 "description": "Test Task",
388 "tags": [],
389 "labels": []
390 },
391 "spec": {
392 "image": "test",
393 "command": ["test"],
394 "args": ["test"],
395 "env": [],
396 "inputContexts": [],
397 "outputContexts": [],
398 "resources": {
399 "cpus": 1,
400 "gpus": 1,
401 "memory": 1024,
402 "time": 1
403 }
404 }
405 }))
406 .unwrap();
407
408 assert_eq!(task.spec.resources.cpus.millicores(), Ok(1000));
409 assert_eq!(task.spec.resources.gpus.millicores(), Ok(1000));
410 assert_eq!(task.spec.resources.memory.bytes(), Ok(1024));
411 assert_eq!(task.spec.resources.time.seconds(), Ok(1));
412 }
413
414 #[test]
415 fn test_parse_task_without_much() {
416 let task = serde_json::from_value::<Task>(json!({
417 "kind": "Task",
418 "version": "v0",
419 "spec": {
420 "image": "test",
421 "resources": {
422 "cpus": "1000 MCpu",
423 "gpus": "1000 MGpu",
424 "memory": "1024 MiB",
425 "time": "1 hr"
426 }
427 }
428 }))
429 .expect("Failed to parse task");
430
431 assert_eq!(task.spec.resources.cpus.millicores(), Ok(1000));
432 assert_eq!(task.spec.resources.gpus.millicores(), Ok(1000));
433 assert_eq!(task.spec.resources.memory.bytes(), Ok(1024 * 1024 * 1024));
434 assert_eq!(task.spec.resources.time.seconds(), Ok(60 * 60));
435 }
436
437 #[test]
438 fn test_parse_task_yaml() {
439 let task = serde_yaml::from_str::<Task>(
440 r#"
441 kind: Task
442 version: v0
443 spec:
444 image: test
445 resources:
446 cpus: 1000 MCpu
447 gpus: 1000 MGpu
448 memory: 1024 MiB
449 time: 1 hr
450 "#,
451 )
452 .expect("Failed to parse task");
453
454 assert_eq!(task.spec.resources.cpus.millicores(), Ok(1000));
455 assert_eq!(task.spec.resources.gpus.millicores(), Ok(1000));
456 assert_eq!(task.spec.resources.memory.bytes(), Ok(1024 * 1024 * 1024));
457 assert_eq!(task.spec.resources.time.seconds(), Ok(60 * 60));
458 }
459
460 #[test]
461 fn test_parse_task_with_env() {
462 let task = serde_json::from_value::<Task>(json!({
463 "kind": "Task",
464 "version": "v0",
465 "spec": {
466 "image": "test",
467 "env": [
468 {
469 "name": "FOO",
470 "value": "bar"
471 },
472 {
473 "name": "DEBUG",
474 "value": "1"
475 }
476 ],
477 "resources": {
478 "cpus": "1000 MCpu",
479 "gpus": "1000 MGpu",
480 "memory": "1024 MiB",
481 "time": "1 hr"
482 }
483 }
484 }))
485 .expect("Failed to parse task");
486
487 assert_eq!(
488 task.spec
489 .env
490 .iter()
491 .find(|e| e.name == "FOO")
492 .unwrap()
493 .value,
494 "bar"
495 );
496 assert_eq!(
497 task.spec
498 .env
499 .iter()
500 .find(|e| e.name == "DEBUG")
501 .unwrap()
502 .value,
503 "1"
504 );
505 }
506
507 #[test]
508 fn test_parse_task_with_input_context() {
509 let task = serde_json::from_value::<Task>(json!({
510 "kind": "Task",
511 "version": "v0",
512 "spec": {
513 "image": "test",
514 "inputContexts": [
515 {
516 "source": "pin1",
517 "target": "/input/data1"
518 },
519 {
520 "source": "pin2",
521 "target": "/input/data2"
522 }
523 ],
524 "resources": {
525 "cpus": "1000 MCpu",
526 "gpus": "1000 MGpu",
527 "memory": "1024 MiB",
528 "time": "1 hr"
529 }
530 }
531 }))
532 .expect("Failed to parse task");
533
534 let input = &task.spec.input_contexts[0];
535 assert_eq!(input.source, "pin1");
536 assert_eq!(input.target, "/input/data1");
537 let input = &task.spec.input_contexts[1];
538 assert_eq!(input.source, "pin2");
539 assert_eq!(input.target, "/input/data2");
540 }
541
542 #[test]
543 fn test_parse_task_with_output_context() {
544 let task = serde_json::from_value::<Task>(json!({
545 "kind": "Task",
546 "version": "v0",
547 "spec": {
548 "image": "test",
549 "outputContexts": [
550 {
551 "source": "/output/result1",
552 "retentionPeriod": 1000
553 },
554 {
555 "source": "/output/result2",
556 "retentionPeriod": 1000
557 }
558 ],
559 "resources": {
560 "cpus": "1000 MCpu",
561 "gpus": "1000 MGpu",
562 "memory": "1024 MiB",
563 "time": "1 hr"
564 }
565 }
566 }))
567 .expect("Failed to parse task");
568
569 let output = &task.spec.output_contexts[0];
570 assert_eq!(output.source, "/output/result1");
571 assert_eq!(output.retention_period, 1000);
572 let output = &task.spec.output_contexts[1];
573 assert_eq!(output.source, "/output/result2");
574 assert_eq!(output.retention_period, 1000);
575 }
576
577 #[test]
578 fn test_parse_task_with_metadata() {
579 let task = serde_json::from_value::<Task>(json!({
580 "kind": "Task",
581 "version": "v0",
582 "metadata": {
583 "name": "test-task",
584 "description": "A test task",
585 "tags": ["test", "example"],
586 "labels": [
587 {"key": "env", "value": "test"},
588 {"key": "priority", "value": "high"}
589 ]
590 },
591 "spec": {
592 "image": "test",
593 "resources": {
594 "cpus": "1000 MCpu",
595 "gpus": "1000 MGpu",
596 "memory": "1024 MiB",
597 "time": "1 hr"
598 }
599 }
600 }))
601 .expect("Failed to parse task");
602
603 assert_eq!(task.metadata.name, "test-task");
604 assert_eq!(task.metadata.description, "A test task");
605 assert_eq!(task.metadata.tags, vec!["test", "example"]);
606 assert_eq!(task.metadata.labels[0].key, "env");
607 assert_eq!(task.metadata.labels[0].value, "test");
608 assert_eq!(task.metadata.labels[1].key, "priority");
609 assert_eq!(task.metadata.labels[1].value, "high");
610 }
611}