quickwit_actors/
universe.rs

1// Copyright (C) 2021 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::time::Duration;
21
22use crate::channel_with_priority::Priority;
23use crate::mailbox::{Command, CommandOrMessage};
24use crate::scheduler::{SimulateAdvanceTime, TimeShift};
25use crate::spawn_builder::SpawnBuilder;
26use crate::{Actor, KillSwitch, Mailbox, QueueCapacity, Scheduler};
27
28/// Universe serves as the top-level context in which Actor can be spawned.
29/// It is *not* a singleton. A typical application will usually have only one universe hosting all
30/// of the actors but it is not a requirement.
31///
32/// In particular, unit test all have their own universe and hence can be executed in parallel.
33pub struct Universe {
34    scheduler_mailbox: Mailbox<Scheduler>,
35    // This killswitch is used for the scheduler, and will be used by default for all spawned
36    // actors.
37    kill_switch: KillSwitch,
38}
39
40impl Universe {
41    /// Creates a new universe.
42    #[allow(clippy::new_without_default)]
43    pub fn new() -> Universe {
44        let scheduler = Scheduler::default();
45        let kill_switch = KillSwitch::default();
46        let (mailbox, _inbox) =
47            crate::create_mailbox("fake-mailbox".to_string(), QueueCapacity::Unbounded);
48        let (scheduler_mailbox, _scheduler_inbox) =
49            SpawnBuilder::new(scheduler, mailbox, kill_switch.clone()).spawn();
50        Universe {
51            scheduler_mailbox,
52            kill_switch,
53        }
54    }
55
56    pub fn kill(&self) {
57        self.kill_switch.kill();
58    }
59
60    /// Simulate advancing the time for unit tests.
61    ///
62    /// It is not just about jumping the clock and triggering one round of messages:
63    /// These message might have generated more messages for instance.
64    ///
65    /// This simulation triggers progress step by step, and after each step, leaves 100ms for actors
66    /// to schedule extra messages.
67    pub async fn simulate_time_shift(&self, duration: Duration) {
68        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
69        let _ = self
70            .scheduler_mailbox
71            .send_message(SimulateAdvanceTime {
72                time_shift: TimeShift::ByDuration(duration),
73                tx,
74            })
75            .await;
76        let _ = rx.await;
77    }
78
79    pub fn spawn_actor<A: Actor>(&self, actor: A) -> SpawnBuilder<A> {
80        SpawnBuilder::new(
81            actor,
82            self.scheduler_mailbox.clone(),
83            self.kill_switch.clone(),
84        )
85    }
86
87    /// Inform an actor to process pending message and then stop processing new messages
88    /// and exit successfully.
89    pub async fn send_exit_with_success<A: Actor>(
90        &self,
91        mailbox: &Mailbox<A>,
92    ) -> Result<(), crate::SendError> {
93        mailbox
94            .send_with_priority(
95                CommandOrMessage::Command(Command::ExitWithSuccess),
96                Priority::Low,
97            )
98            .await
99    }
100}
101
102impl Drop for Universe {
103    fn drop(&mut self) {
104        self.kill_switch.kill();
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use std::time::Duration;
111
112    use async_trait::async_trait;
113
114    use crate::{Actor, ActorContext, ActorExitStatus, Handler, Universe};
115
116    #[derive(Default)]
117    pub struct ActorWithSchedule {
118        count: usize,
119    }
120
121    #[async_trait]
122    impl Actor for ActorWithSchedule {
123        type ObservableState = usize;
124
125        fn observable_state(&self) -> usize {
126            self.count
127        }
128
129        async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
130            self.handle(Loop, ctx).await
131        }
132    }
133
134    #[derive(Debug)]
135    struct Loop;
136
137    #[async_trait]
138    impl Handler<Loop> for ActorWithSchedule {
139        type Reply = ();
140        async fn handle(
141            &mut self,
142            _msg: Loop,
143            ctx: &ActorContext<Self>,
144        ) -> Result<(), ActorExitStatus> {
145            self.count += 1;
146            ctx.schedule_self_msg(Duration::from_secs(60), Loop).await;
147            Ok(())
148        }
149    }
150
151    #[tokio::test]
152    async fn test_schedule_for_actor() {
153        let universe = Universe::new();
154        let actor_with_schedule = ActorWithSchedule::default();
155        let (_maibox, handler) = universe.spawn_actor(actor_with_schedule).spawn();
156        let count_after_initialization = handler.process_pending_and_observe().await.state;
157        assert_eq!(count_after_initialization, 1);
158        universe.simulate_time_shift(Duration::from_secs(200)).await;
159        let count_after_advance_time = handler.process_pending_and_observe().await.state;
160        // Note the count is 2 here and not 1 + 3  = 4.
161        // See comment on `universe.simulate_advance_time`.
162        assert_eq!(count_after_advance_time, 4);
163    }
164}