fuel_core_sync/
sync.rs

1//! # Sync task
2//! Updates the state from the height stream.
3
4use std::sync::Arc;
5
6use fuel_core_services::{
7    stream::{
8        BoxStream,
9        IntoBoxStream,
10    },
11    SharedMutex,
12};
13use fuel_core_types::fuel_types::BlockHeight;
14use futures::stream::StreamExt;
15use tokio::sync::Notify;
16
17use crate::state::State;
18
19#[cfg(test)]
20mod tests;
21
22pub(crate) enum IncomingHeight {
23    Observed(BlockHeight),
24    Committed(BlockHeight),
25}
26
27pub(crate) struct SyncHeights {
28    height_stream: BoxStream<IncomingHeight>,
29    state: SharedMutex<State>,
30    notify: Arc<Notify>,
31}
32
33impl SyncHeights {
34    pub(crate) fn new(
35        height_stream: BoxStream<BlockHeight>,
36        committed_height_stream: BoxStream<BlockHeight>,
37        state: SharedMutex<State>,
38        notify: Arc<Notify>,
39    ) -> Self {
40        let height_stream = futures::stream::select(
41            height_stream.map(IncomingHeight::Observed),
42            committed_height_stream.map(IncomingHeight::Committed),
43        )
44        .into_boxed();
45        Self {
46            height_stream,
47            state,
48            notify,
49        }
50    }
51
52    #[tracing::instrument(skip(self))]
53    /// Sync the state from the height stream.
54    /// This stream never blocks or errors.
55    pub(crate) async fn sync(&mut self) -> Option<()> {
56        let height = self.height_stream.next().await?;
57        let state_change = match height {
58            IncomingHeight::Committed(height) => {
59                self.state.apply(|s| s.commit(*height));
60                // A new committed height doesn't represent new work for the import stream.
61                false
62            }
63            IncomingHeight::Observed(height) => self.state.apply(|s| s.observe(*height)),
64        };
65        if state_change {
66            self.notify.notify_one();
67        }
68        Some(())
69    }
70
71    pub(crate) fn map_stream(
72        &mut self,
73        f: impl FnOnce(BoxStream<IncomingHeight>) -> BoxStream<IncomingHeight>,
74    ) {
75        let height_stream = core::mem::replace(
76            &mut self.height_stream,
77            futures::stream::pending().into_boxed(),
78        );
79        self.height_stream = f(height_stream);
80    }
81}