fuel_core_sync/
service.rs

1//! Service utilities for running fuel sync.
2use std::sync::Arc;
3
4use crate::{
5    import::{
6        Config,
7        Import,
8    },
9    ports::{
10        self,
11        BlockImporterPort,
12        ConsensusPort,
13        PeerToPeerPort,
14    },
15    state::State,
16    sync::SyncHeights,
17};
18
19use fuel_core_services::{
20    stream::{
21        BoxStream,
22        IntoBoxStream,
23    },
24    RunnableService,
25    RunnableTask,
26    Service,
27    ServiceRunner,
28    SharedMutex,
29    StateWatcher,
30    TaskNextAction,
31};
32use fuel_core_types::fuel_types::BlockHeight;
33use futures::StreamExt;
34use tokio::sync::Notify;
35
36#[cfg(test)]
37mod tests;
38
39/// Creates an instance of runnable sync service.
40pub fn new_service<P, E, C>(
41    current_fuel_block_height: BlockHeight,
42    p2p: P,
43    executor: E,
44    consensus: C,
45    params: Config,
46) -> anyhow::Result<ServiceRunner<SyncTask<P, E, C>>>
47where
48    P: ports::PeerToPeerPort + Send + Sync + 'static,
49    E: ports::BlockImporterPort + Send + Sync + 'static,
50    C: ports::ConsensusPort + Send + Sync + 'static,
51{
52    let height_stream = p2p.height_stream();
53    let committed_height_stream = executor.committed_height_stream();
54    let state = State::new(Some(current_fuel_block_height.into()), None);
55    Ok(ServiceRunner::new(SyncTask::new(
56        height_stream,
57        committed_height_stream,
58        state,
59        params,
60        p2p,
61        executor,
62        consensus,
63    )?))
64}
65
66/// Task for syncing heights.
67/// Contains import task as a child task.
68pub struct SyncTask<P, E, C>
69where
70    P: PeerToPeerPort + Send + Sync + 'static,
71    E: BlockImporterPort + Send + Sync + 'static,
72    C: ConsensusPort + Send + Sync + 'static,
73{
74    sync_heights: SyncHeights,
75    import_task_handle: ServiceRunner<ImportTask<P, E, C>>,
76}
77
78struct ImportTask<P, E, C>(Import<P, E, C>);
79
80impl<P, E, C> SyncTask<P, E, C>
81where
82    P: PeerToPeerPort + Send + Sync + 'static,
83    E: BlockImporterPort + Send + Sync + 'static,
84    C: ConsensusPort + Send + Sync + 'static,
85{
86    fn new(
87        height_stream: BoxStream<BlockHeight>,
88        committed_height_stream: BoxStream<BlockHeight>,
89        state: State,
90        params: Config,
91        p2p: P,
92        executor: E,
93        consensus: C,
94    ) -> anyhow::Result<Self> {
95        let notify = Arc::new(Notify::new());
96        let state = SharedMutex::new(state);
97        let p2p = Arc::new(p2p);
98        let executor = Arc::new(executor);
99        let consensus = Arc::new(consensus);
100        let sync_heights = SyncHeights::new(
101            height_stream,
102            committed_height_stream,
103            state.clone(),
104            notify.clone(),
105        );
106        let import = Import::new(state, notify, params, p2p, executor, consensus);
107        let import_task_handle = ServiceRunner::new(ImportTask(import));
108        Ok(Self {
109            sync_heights,
110            import_task_handle,
111        })
112    }
113}
114
115impl<P, E, C> RunnableTask for SyncTask<P, E, C>
116where
117    P: PeerToPeerPort + Send + Sync + 'static,
118    E: BlockImporterPort + Send + Sync + 'static,
119    C: ConsensusPort + Send + Sync + 'static,
120{
121    async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction {
122        match self.sync_heights.sync().await {
123            None => TaskNextAction::Stop,
124            Some(_) => TaskNextAction::Continue,
125        }
126    }
127
128    async fn shutdown(self) -> anyhow::Result<()> {
129        self.import_task_handle.stop_and_await().await?;
130        Ok(())
131    }
132}
133
134#[async_trait::async_trait]
135impl<P, E, C> RunnableService for SyncTask<P, E, C>
136where
137    P: PeerToPeerPort + Send + Sync + 'static,
138    E: BlockImporterPort + Send + Sync + 'static,
139    C: ConsensusPort + Send + Sync + 'static,
140{
141    const NAME: &'static str = "SyncTask";
142
143    type SharedData = ();
144
145    type Task = SyncTask<P, E, C>;
146    type TaskParams = ();
147
148    fn shared_data(&self) -> Self::SharedData {}
149
150    async fn into_task(
151        mut self,
152        watcher: &StateWatcher,
153        _: Self::TaskParams,
154    ) -> anyhow::Result<Self::Task> {
155        let mut sync_watcher = watcher.clone();
156        self.import_task_handle.start_and_await().await?;
157        let mut import_watcher = self.import_task_handle.state_watcher();
158        self.sync_heights.map_stream(|height_stream| {
159            height_stream
160                .take_until(async move {
161                    tokio::select! {
162                        _ = sync_watcher.while_started() => {},
163                        _ = import_watcher.while_started() => {},
164                    }
165                })
166                .into_boxed()
167        });
168
169        Ok(self)
170    }
171}
172
173impl<P, E, C> RunnableTask for ImportTask<P, E, C>
174where
175    P: PeerToPeerPort + Send + Sync + 'static,
176    E: BlockImporterPort + Send + Sync + 'static,
177    C: ConsensusPort + Send + Sync + 'static,
178{
179    async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
180        self.0.import(watcher).await.into()
181    }
182
183    async fn shutdown(self) -> anyhow::Result<()> {
184        // Nothing to shut down because we don't have any temporary state that should be dumped,
185        // and we don't spawn any sub-tasks that we need to finish or await.
186        Ok(())
187    }
188}
189
190#[async_trait::async_trait]
191impl<P, E, C> RunnableService for ImportTask<P, E, C>
192where
193    P: PeerToPeerPort + Send + Sync + 'static,
194    E: BlockImporterPort + Send + Sync + 'static,
195    C: ConsensusPort + Send + Sync + 'static,
196{
197    const NAME: &'static str = "ImportTask";
198
199    type SharedData = ();
200    type TaskParams = ();
201
202    type Task = ImportTask<P, E, C>;
203
204    fn shared_data(&self) -> Self::SharedData {}
205
206    async fn into_task(
207        self,
208        _: &StateWatcher,
209        _: Self::TaskParams,
210    ) -> anyhow::Result<Self::Task> {
211        Ok(self)
212    }
213}