1use std::sync::Arc;
5
6use fuel_core_services::{
7 stream::{
8 BoxStream,
9 IntoBoxStream,
10 },
11 SharedMutex,
12};
13use fuel_core_types::fuel_types::BlockHeight;
14use futures::stream::StreamExt;
15use tokio::sync::Notify;
16
17use crate::state::State;
18
19#[cfg(test)]
20mod tests;
21
22pub(crate) enum IncomingHeight {
23 Observed(BlockHeight),
24 Committed(BlockHeight),
25}
26
27pub(crate) struct SyncHeights {
28 height_stream: BoxStream<IncomingHeight>,
29 state: SharedMutex<State>,
30 notify: Arc<Notify>,
31}
32
33impl SyncHeights {
34 pub(crate) fn new(
35 height_stream: BoxStream<BlockHeight>,
36 committed_height_stream: BoxStream<BlockHeight>,
37 state: SharedMutex<State>,
38 notify: Arc<Notify>,
39 ) -> Self {
40 let height_stream = futures::stream::select(
41 height_stream.map(IncomingHeight::Observed),
42 committed_height_stream.map(IncomingHeight::Committed),
43 )
44 .into_boxed();
45 Self {
46 height_stream,
47 state,
48 notify,
49 }
50 }
51
52 #[tracing::instrument(skip(self))]
53 pub(crate) async fn sync(&mut self) -> Option<()> {
56 let height = self.height_stream.next().await?;
57 let state_change = match height {
58 IncomingHeight::Committed(height) => {
59 self.state.apply(|s| s.commit(*height));
60 false
62 }
63 IncomingHeight::Observed(height) => self.state.apply(|s| s.observe(*height)),
64 };
65 if state_change {
66 self.notify.notify_one();
67 }
68 Some(())
69 }
70
71 pub(crate) fn map_stream(
72 &mut self,
73 f: impl FnOnce(BoxStream<IncomingHeight>) -> BoxStream<IncomingHeight>,
74 ) {
75 let height_stream = core::mem::replace(
76 &mut self.height_stream,
77 futures::stream::pending().into_boxed(),
78 );
79 self.height_stream = f(height_stream);
80 }
81}