1//! State of the sync service.
3use std::{
4 cmp::Ordering,
5 ops::RangeInclusive,
9mod test;
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12/// State of the sync service.
14/// The state takes evidence and produces a status.
15pub struct State {
16 status: Status,
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20/// Status of the sync service.
21pub enum Status {
22 /// The service is not initialized and there is nothing to sync.
23 Uninitialized,
24 /// This range is being processed.
25 Processing(RangeInclusive<u32>),
26 /// This height is committed.
27 Committed(u32),
30impl State {
31 #[tracing::instrument(skip_all)]
32 /// Create a new state from the current committed and observed heights.
33 pub fn new(
34 committed: impl Into<Option<u32>>,
35 observed: impl Into<Option<u32>>,
36 ) -> Self {
37 let status = match (committed.into(), observed.into()) {
38 // Both the committed and observed heights are known.
39 (Some(committed), Some(observed)) => {
40 // If there is a gap between the committed and observed heights,
41 // the service is processing the gap otherwise the service is
42 // has nothing to sync.
43 committed
44 .checked_add(1)
45 .map_or(Status::Committed(committed), |next| {
46 let range = next..=observed;
47 if range.is_empty() {
48 Status::Committed(committed)
49 } else {
50 Status::Processing(range)
51 }
52 })
53 }
54 // Only the committed height is known, so the service has nothing to sync.
55 (Some(committed), None) => Status::Committed(committed),
56 // Only the observed height is known, so the service is processing
57 // up to that height.
58 (None, Some(observed)) => Status::Processing(0..=observed),
59 // No heights are known, so the service is uninitialized.
60 (None, None) => Status::Uninitialized,
61 };
62 tracing::debug!("Initial status: {:?}", status);
63 Self { status }
64 }
66 #[tracing::instrument]
67 /// Get the current range to process.
68 pub fn process_range(&self) -> Option<RangeInclusive<u32>> {
69 match &self.status {
70 Status::Processing(range) => {
71 tracing::debug!("Processing range: {:?}", range);
72 Some(range.clone())
73 }
74 _ => {
75 tracing::debug!("Nothing to process");
76 None
77 }
78 }
79 }
81 #[tracing::instrument]
82 /// Record that a block has been committed.
83 pub fn commit(&mut self, height: u32) {
84 let new_status = match &self.status {
85 // Currently processing a range and recording a commit.
86 Status::Processing(range) => {
87 if height < *range.start() {
88 // The committed height is less than the start of the processing range,
89 // it is a lag between committing and processing.
90 None
91 } else {
92 match height.cmp(range.end()) {
93 // The commit is less than the end of the range, so the range
94 // is still being processed.
95 Ordering::Less => Some(Status::Processing(
96 height.saturating_add(1)..=*range.end(),
97 )),
98 // The commit is equal or greater than the end of the range,
99 // so the range is fully committed.
100 Ordering::Equal | Ordering::Greater => {
101 Some(Status::Committed(height))
102 }
103 }
104 }
105 }
106 // Currently uninitialized so now are committed.
107 Status::Uninitialized => Some(Status::Committed(height)),
108 // Currently committed and recording a commit.
109 Status::Committed(existing) => {
110 // Take the max of the existing and new commits.
111 match height.cmp(existing) {
112 Ordering::Less | Ordering::Equal => None,
113 Ordering::Greater => Some(Status::Committed(height)),
114 }
115 }
116 };
117 self.apply_status(new_status);
118 }
120 #[tracing::instrument]
121 /// Record that a block has been observed.
122 pub fn observe(&mut self, height: u32) -> bool {
123 let new_status = match &self.status {
124 // Currently uninitialized so process from the start to the observed height.
125 Status::Uninitialized => Some(Status::Processing(0..=height)),
126 // Currently processing a range and recording an observation.
127 Status::Processing(range) => match range.end().cmp(&height) {
128 // The range end is less than the observed height, so
129 // extend the range to the observed height.
130 Ordering::Less => Some(Status::Processing(*range.start()..=height)),
131 // The range end is equal or greater than the observed height,
132 // so ignore it.
133 Ordering::Equal | Ordering::Greater => None,
134 },
135 // Currently committed and recording an observation.
136 // If there is a gap between the committed and observed heights,
137 // the service is processing.
138 Status::Committed(committed) => committed.checked_add(1).and_then(|next| {
139 let r = next..=height;
140 (!r.is_empty()).then_some(Status::Processing(r))
141 }),
142 };
143 let status_change = new_status.is_some();
144 self.apply_status(new_status);
145 status_change
146 }
148 #[tracing::instrument]
149 /// Record that a range of blocks have failed to process.
150 pub fn failed_to_process(&mut self, range: RangeInclusive<u32>) {
151 // Ignore empty ranges.
152 let status = (!range.is_empty())
153 .then_some(())
154 .and_then(|_| match &self.status {
155 // Currently uninitialized or committed.
156 // Failures do not override these status.
157 Status::Uninitialized | Status::Committed(_) => None,
158 // Currently processing a range and recording a failure.
159 Status::Processing(processing) => range
160 // If the failure range contains the start of the processing range,
161 // then there is no reason to continue trying to process this range.
162 // The processing range is reverted back to just before it's start.
163 // The revert is either to the last committed height, or to uninitialized.
164 .contains(processing.start())
165 .then(|| {
166 processing
167 .start()
168 .checked_sub(1)
169 .map_or(Status::Uninitialized, Status::Committed)
170 })
171 .or_else(|| {
172 // If the failure range contains the end of the processing range,
173 // or the processing range contains the start of the failure range,
174 // then the processing range is shortened to just before the failure range.
175 (range.contains(processing.end())
176 || processing.contains(range.start()))
177 .then(|| {
178 range
179 .start()
180 .checked_sub(1)
181 .map_or(Status::Uninitialized, |prev| {
182 Status::Processing(*processing.start()..=prev)
183 })
184 })
185 })
186 .or_else(|| {
187 // If the processing range contains the end of the failure range,
188 // then the entire processing range is failed and reverted back to
189 // the last committed height, or to uninitialized.
190 processing.contains(range.end()).then(|| {
191 processing
192 .start()
193 .checked_sub(1)
194 .map_or(Status::Uninitialized, Status::Committed)
195 })
196 }),
197 });
198 self.apply_status(status);
199 }
201 fn apply_status(&mut self, status: Option<Status>) {
202 match status {
203 Some(s) => {
204 tracing::info!("Status change from: {:?}, to: {:?}", self.status, s);
205 self.status = s;
206 }
207 _ => {
208 tracing::debug!("No status change: {:?}", self.status);
209 }
210 }
211 }
213 #[cfg(test)]
214 /// Get the current observed height.
215 pub fn proposed_height(&self) -> Option<&u32> {
216 match &self.status {
217 Status::Processing(range) => Some(range.end()),
218 _ => None,
219 }
220 }
222 #[cfg(test)]
223 /// Get the current status.
224 pub fn status(&self) -> &Status {
225 &self.status
226 }