fuel_core_sync/
state.rs

1//! State of the sync service.
2
3use std::{
4    cmp::Ordering,
5    ops::RangeInclusive,
6};
7
8#[cfg(test)]
9mod test;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12/// State of the sync service.
13///
14/// The state takes evidence and produces a status.
15pub struct State {
16    status: Status,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20/// Status of the sync service.
21pub enum Status {
22    /// The service is not initialized and there is nothing to sync.
23    Uninitialized,
24    /// This range is being processed.
25    Processing(RangeInclusive<u32>),
26    /// This height is committed.
27    Committed(u32),
28}
29
30impl State {
31    #[tracing::instrument(skip_all)]
32    /// Create a new state from the current committed and observed heights.
33    pub fn new(
34        committed: impl Into<Option<u32>>,
35        observed: impl Into<Option<u32>>,
36    ) -> Self {
37        let status = match (committed.into(), observed.into()) {
38            // Both the committed and observed heights are known.
39            (Some(committed), Some(observed)) => {
40                // If there is a gap between the committed and observed heights,
41                // the service is processing the gap otherwise the service is
42                // has nothing to sync.
43                committed
44                    .checked_add(1)
45                    .map_or(Status::Committed(committed), |next| {
46                        let range = next..=observed;
47                        if range.is_empty() {
48                            Status::Committed(committed)
49                        } else {
50                            Status::Processing(range)
51                        }
52                    })
53            }
54            // Only the committed height is known, so the service has nothing to sync.
55            (Some(committed), None) => Status::Committed(committed),
56            // Only the observed height is known, so the service is processing
57            // up to that height.
58            (None, Some(observed)) => Status::Processing(0..=observed),
59            // No heights are known, so the service is uninitialized.
60            (None, None) => Status::Uninitialized,
61        };
62        tracing::debug!("Initial status: {:?}", status);
63        Self { status }
64    }
65
66    #[tracing::instrument]
67    /// Get the current range to process.
68    pub fn process_range(&self) -> Option<RangeInclusive<u32>> {
69        match &self.status {
70            Status::Processing(range) => {
71                tracing::debug!("Processing range: {:?}", range);
72                Some(range.clone())
73            }
74            _ => {
75                tracing::debug!("Nothing to process");
76                None
77            }
78        }
79    }
80
81    #[tracing::instrument]
82    /// Record that a block has been committed.
83    pub fn commit(&mut self, height: u32) {
84        let new_status = match &self.status {
85            // Currently processing a range and recording a commit.
86            Status::Processing(range) => {
87                if height < *range.start() {
88                    // The committed height is less than the start of the processing range,
89                    // it is a lag between committing and processing.
90                    None
91                } else {
92                    match height.cmp(range.end()) {
93                        // The commit is less than the end of the range, so the range
94                        // is still being processed.
95                        Ordering::Less => Some(Status::Processing(
96                            height.saturating_add(1)..=*range.end(),
97                        )),
98                        // The commit is equal or greater than the end of the range,
99                        // so the range is fully committed.
100                        Ordering::Equal | Ordering::Greater => {
101                            Some(Status::Committed(height))
102                        }
103                    }
104                }
105            }
106            // Currently uninitialized so now are committed.
107            Status::Uninitialized => Some(Status::Committed(height)),
108            // Currently committed and recording a commit.
109            Status::Committed(existing) => {
110                // Take the max of the existing and new commits.
111                match height.cmp(existing) {
112                    Ordering::Less | Ordering::Equal => None,
113                    Ordering::Greater => Some(Status::Committed(height)),
114                }
115            }
116        };
117        self.apply_status(new_status);
118    }
119
120    #[tracing::instrument]
121    /// Record that a block has been observed.
122    pub fn observe(&mut self, height: u32) -> bool {
123        let new_status = match &self.status {
124            // Currently uninitialized so process from the start to the observed height.
125            Status::Uninitialized => Some(Status::Processing(0..=height)),
126            // Currently processing a range and recording an observation.
127            Status::Processing(range) => match range.end().cmp(&height) {
128                // The range end is less than the observed height, so
129                // extend the range to the observed height.
130                Ordering::Less => Some(Status::Processing(*range.start()..=height)),
131                // The range end is equal or greater than the observed height,
132                // so ignore it.
133                Ordering::Equal | Ordering::Greater => None,
134            },
135            // Currently committed and recording an observation.
136            // If there is a gap between the committed and observed heights,
137            // the service is processing.
138            Status::Committed(committed) => committed.checked_add(1).and_then(|next| {
139                let r = next..=height;
140                (!r.is_empty()).then_some(Status::Processing(r))
141            }),
142        };
143        let status_change = new_status.is_some();
144        self.apply_status(new_status);
145        status_change
146    }
147
148    #[tracing::instrument]
149    /// Record that a range of blocks have failed to process.
150    pub fn failed_to_process(&mut self, range: RangeInclusive<u32>) {
151        // Ignore empty ranges.
152        let status = (!range.is_empty())
153            .then_some(())
154            .and_then(|_| match &self.status {
155                // Currently uninitialized or committed.
156                // Failures do not override these status.
157                Status::Uninitialized | Status::Committed(_) => None,
158                // Currently processing a range and recording a failure.
159                Status::Processing(processing) => range
160                    // If the failure range contains the start of the processing range,
161                    // then there is no reason to continue trying to process this range.
162                    // The processing range is reverted back to just before it's start.
163                    // The revert is either to the last committed height, or to uninitialized.
164                    .contains(processing.start())
165                    .then(|| {
166                        processing
167                            .start()
168                            .checked_sub(1)
169                            .map_or(Status::Uninitialized, Status::Committed)
170                    })
171                    .or_else(|| {
172                        // If the failure range contains the end of the processing range,
173                        // or the processing range contains the start of the failure range,
174                        // then the processing range is shortened to just before the failure range.
175                        (range.contains(processing.end())
176                            || processing.contains(range.start()))
177                        .then(|| {
178                            range
179                                .start()
180                                .checked_sub(1)
181                                .map_or(Status::Uninitialized, |prev| {
182                                    Status::Processing(*processing.start()..=prev)
183                                })
184                        })
185                    })
186                    .or_else(|| {
187                        // If the processing range contains the end of the failure range,
188                        // then the entire processing range is failed and reverted back to
189                        // the last committed height, or to uninitialized.
190                        processing.contains(range.end()).then(|| {
191                            processing
192                                .start()
193                                .checked_sub(1)
194                                .map_or(Status::Uninitialized, Status::Committed)
195                        })
196                    }),
197            });
198        self.apply_status(status);
199    }
200
201    fn apply_status(&mut self, status: Option<Status>) {
202        match status {
203            Some(s) => {
204                tracing::info!("Status change from: {:?}, to: {:?}", self.status, s);
205                self.status = s;
206            }
207            _ => {
208                tracing::debug!("No status change: {:?}", self.status);
209            }
210        }
211    }
212
213    #[cfg(test)]
214    /// Get the current observed height.
215    pub fn proposed_height(&self) -> Option<&u32> {
216        match &self.status {
217            Status::Processing(range) => Some(range.end()),
218            _ => None,
219        }
220    }
221
222    #[cfg(test)]
223    /// Get the current status.
224    pub fn status(&self) -> &Status {
225        &self.status
226    }
227}