fuel_core_services/
state.rs1use tokio::sync::watch;
4
5#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum State {
8 NotStarted,
10 Starting,
12 Started,
14 Stopping,
16 Stopped,
18 StoppedWithError(String),
20}
21
22impl State {
23 pub fn not_started(&self) -> bool {
25 self == &State::NotStarted
26 }
27
28 pub fn starting(&self) -> bool {
30 self == &State::Starting
31 }
32
33 pub fn started(&self) -> bool {
35 self == &State::Started
36 }
37
38 pub fn stopped(&self) -> bool {
40 matches!(self, State::Stopped | State::StoppedWithError(_))
41 }
42
43 pub fn stopping(&self) -> bool {
45 self == &State::Stopping
46 }
47}
48
49#[derive(Clone)]
52pub struct StateWatcher(watch::Receiver<State>);
53
54#[cfg(feature = "test-helpers")]
55impl Default for StateWatcher {
56 fn default() -> Self {
57 let (_, receiver) = watch::channel(State::NotStarted);
58 Self(receiver)
59 }
60}
61
62#[cfg(feature = "test-helpers")]
63impl StateWatcher {
64 pub fn started() -> Self {
66 let (sender, receiver) = watch::channel(State::Started);
67 core::mem::forget(sender);
69 Self(receiver)
70 }
71}
72
73impl StateWatcher {
74 pub fn borrow(&self) -> watch::Ref<'_, State> {
76 self.0.borrow()
77 }
78
79 pub fn borrow_and_update(&mut self) -> watch::Ref<'_, State> {
81 self.0.borrow_and_update()
82 }
83
84 pub fn has_changed(&self) -> Result<bool, watch::error::RecvError> {
86 self.0.has_changed()
87 }
88
89 pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> {
91 self.0.changed().await
92 }
93
94 pub fn same_channel(&self, other: &Self) -> bool {
96 self.0.same_channel(&other.0)
97 }
98}
99
100impl StateWatcher {
101 #[tracing::instrument(level = "debug", skip(self), err, ret)]
102 pub async fn while_started(&mut self) -> anyhow::Result<State> {
104 loop {
105 let state = self.borrow().clone();
106 if !state.started() {
107 return Ok(state);
108 }
109
110 self.changed().await?;
111 }
112 }
113
114 pub async fn wait_stopping_or_stopped(&mut self) -> anyhow::Result<()> {
116 let state = self.borrow().clone();
117 while !(state.stopped() || state.stopping()) {
118 self.changed().await?;
119 }
120 Ok(())
121 }
122}
123
124impl From<watch::Receiver<State>> for StateWatcher {
125 fn from(receiver: watch::Receiver<State>) -> Self {
126 Self(receiver)
127 }
128}