quickwit_actors/
universe.rs1use std::time::Duration;
21
22use crate::channel_with_priority::Priority;
23use crate::mailbox::{Command, CommandOrMessage};
24use crate::scheduler::{SimulateAdvanceTime, TimeShift};
25use crate::spawn_builder::SpawnBuilder;
26use crate::{Actor, KillSwitch, Mailbox, QueueCapacity, Scheduler};
27
28pub struct Universe {
34 scheduler_mailbox: Mailbox<Scheduler>,
35 kill_switch: KillSwitch,
38}
39
40impl Universe {
41 #[allow(clippy::new_without_default)]
43 pub fn new() -> Universe {
44 let scheduler = Scheduler::default();
45 let kill_switch = KillSwitch::default();
46 let (mailbox, _inbox) =
47 crate::create_mailbox("fake-mailbox".to_string(), QueueCapacity::Unbounded);
48 let (scheduler_mailbox, _scheduler_inbox) =
49 SpawnBuilder::new(scheduler, mailbox, kill_switch.clone()).spawn();
50 Universe {
51 scheduler_mailbox,
52 kill_switch,
53 }
54 }
55
56 pub fn kill(&self) {
57 self.kill_switch.kill();
58 }
59
60 pub async fn simulate_time_shift(&self, duration: Duration) {
68 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
69 let _ = self
70 .scheduler_mailbox
71 .send_message(SimulateAdvanceTime {
72 time_shift: TimeShift::ByDuration(duration),
73 tx,
74 })
75 .await;
76 let _ = rx.await;
77 }
78
79 pub fn spawn_actor<A: Actor>(&self, actor: A) -> SpawnBuilder<A> {
80 SpawnBuilder::new(
81 actor,
82 self.scheduler_mailbox.clone(),
83 self.kill_switch.clone(),
84 )
85 }
86
87 pub async fn send_exit_with_success<A: Actor>(
90 &self,
91 mailbox: &Mailbox<A>,
92 ) -> Result<(), crate::SendError> {
93 mailbox
94 .send_with_priority(
95 CommandOrMessage::Command(Command::ExitWithSuccess),
96 Priority::Low,
97 )
98 .await
99 }
100}
101
102impl Drop for Universe {
103 fn drop(&mut self) {
104 self.kill_switch.kill();
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use std::time::Duration;
111
112 use async_trait::async_trait;
113
114 use crate::{Actor, ActorContext, ActorExitStatus, Handler, Universe};
115
116 #[derive(Default)]
117 pub struct ActorWithSchedule {
118 count: usize,
119 }
120
121 #[async_trait]
122 impl Actor for ActorWithSchedule {
123 type ObservableState = usize;
124
125 fn observable_state(&self) -> usize {
126 self.count
127 }
128
129 async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
130 self.handle(Loop, ctx).await
131 }
132 }
133
134 #[derive(Debug)]
135 struct Loop;
136
137 #[async_trait]
138 impl Handler<Loop> for ActorWithSchedule {
139 type Reply = ();
140 async fn handle(
141 &mut self,
142 _msg: Loop,
143 ctx: &ActorContext<Self>,
144 ) -> Result<(), ActorExitStatus> {
145 self.count += 1;
146 ctx.schedule_self_msg(Duration::from_secs(60), Loop).await;
147 Ok(())
148 }
149 }
150
151 #[tokio::test]
152 async fn test_schedule_for_actor() {
153 let universe = Universe::new();
154 let actor_with_schedule = ActorWithSchedule::default();
155 let (_maibox, handler) = universe.spawn_actor(actor_with_schedule).spawn();
156 let count_after_initialization = handler.process_pending_and_observe().await.state;
157 assert_eq!(count_after_initialization, 1);
158 universe.simulate_time_shift(Duration::from_secs(200)).await;
159 let count_after_advance_time = handler.process_pending_and_observe().await.state;
160 assert_eq!(count_after_advance_time, 4);
163 }
164}