fuel_core_services/
state.rs

1//! The module related to state of the service.
2
3use tokio::sync::watch;
4
5/// The lifecycle state of the service
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum State {
8    /// Service is initialized but not started
9    NotStarted,
10    /// Service is starting up
11    Starting,
12    /// Service is running as normal
13    Started,
14    /// Service is shutting down
15    Stopping,
16    /// Service is stopped
17    Stopped,
18    /// Service shutdown due to an error (panic)
19    StoppedWithError(String),
20}
21
22impl State {
23    /// is not started
24    pub fn not_started(&self) -> bool {
25        self == &State::NotStarted
26    }
27
28    /// is starting
29    pub fn starting(&self) -> bool {
30        self == &State::Starting
31    }
32
33    /// is started
34    pub fn started(&self) -> bool {
35        self == &State::Started
36    }
37
38    /// is stopped
39    pub fn stopped(&self) -> bool {
40        matches!(self, State::Stopped | State::StoppedWithError(_))
41    }
42
43    /// is stopping
44    pub fn stopping(&self) -> bool {
45        self == &State::Stopping
46    }
47}
48
49/// The wrapper around the `watch::Receiver<State>`. It repeats the `Receiver` functionality +
50/// a new one.
51#[derive(Clone)]
52pub struct StateWatcher(watch::Receiver<State>);
53
54#[cfg(feature = "test-helpers")]
55impl Default for StateWatcher {
56    fn default() -> Self {
57        let (_, receiver) = watch::channel(State::NotStarted);
58        Self(receiver)
59    }
60}
61
62#[cfg(feature = "test-helpers")]
63impl StateWatcher {
64    /// Create a new `StateWatcher` with the `State::Started` state.
65    pub fn started() -> Self {
66        let (sender, receiver) = watch::channel(State::Started);
67        // This function is used only in tests, so for simplicity of the tests, we want to leak sender.
68        core::mem::forget(sender);
69        Self(receiver)
70    }
71}
72
73impl StateWatcher {
74    /// See [`watch::Receiver::borrow`].
75    pub fn borrow(&self) -> watch::Ref<'_, State> {
76        self.0.borrow()
77    }
78
79    /// See [`watch::Receiver::borrow_and_update`].
80    pub fn borrow_and_update(&mut self) -> watch::Ref<'_, State> {
81        self.0.borrow_and_update()
82    }
83
84    /// See [`watch::Receiver::has_changed`].
85    pub fn has_changed(&self) -> Result<bool, watch::error::RecvError> {
86        self.0.has_changed()
87    }
88
89    /// See [`watch::Receiver::changed`].
90    pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> {
91        self.0.changed().await
92    }
93
94    /// See [`watch::Receiver::same_channel`].
95    pub fn same_channel(&self, other: &Self) -> bool {
96        self.0.same_channel(&other.0)
97    }
98}
99
100impl StateWatcher {
101    #[tracing::instrument(level = "debug", skip(self), err, ret)]
102    /// Infinity loop while the state is `State::Started`. Returns the next received state.
103    pub async fn while_started(&mut self) -> anyhow::Result<State> {
104        loop {
105            let state = self.borrow().clone();
106            if !state.started() {
107                return Ok(state);
108            }
109
110            self.changed().await?;
111        }
112    }
113
114    /// Future that resolves once the state is `State::Stopped`.
115    pub async fn wait_stopping_or_stopped(&mut self) -> anyhow::Result<()> {
116        let state = self.borrow().clone();
117        while !(state.stopped() || state.stopping()) {
118            self.changed().await?;
119        }
120        Ok(())
121    }
122}
123
124impl From<watch::Receiver<State>> for StateWatcher {
125    fn from(receiver: watch::Receiver<State>) -> Self {
126        Self(receiver)
127    }
128}