1use std::sync::Arc;
3
4use crate::{
5 import::{
6 Config,
7 Import,
8 },
9 ports::{
10 self,
11 BlockImporterPort,
12 ConsensusPort,
13 PeerToPeerPort,
14 },
15 state::State,
16 sync::SyncHeights,
17};
18
19use fuel_core_services::{
20 stream::{
21 BoxStream,
22 IntoBoxStream,
23 },
24 RunnableService,
25 RunnableTask,
26 Service,
27 ServiceRunner,
28 SharedMutex,
29 StateWatcher,
30 TaskNextAction,
31};
32use fuel_core_types::fuel_types::BlockHeight;
33use futures::StreamExt;
34use tokio::sync::Notify;
35
36#[cfg(test)]
37mod tests;
38
39pub fn new_service<P, E, C>(
41 current_fuel_block_height: BlockHeight,
42 p2p: P,
43 executor: E,
44 consensus: C,
45 params: Config,
46) -> anyhow::Result<ServiceRunner<SyncTask<P, E, C>>>
47where
48 P: ports::PeerToPeerPort + Send + Sync + 'static,
49 E: ports::BlockImporterPort + Send + Sync + 'static,
50 C: ports::ConsensusPort + Send + Sync + 'static,
51{
52 let height_stream = p2p.height_stream();
53 let committed_height_stream = executor.committed_height_stream();
54 let state = State::new(Some(current_fuel_block_height.into()), None);
55 Ok(ServiceRunner::new(SyncTask::new(
56 height_stream,
57 committed_height_stream,
58 state,
59 params,
60 p2p,
61 executor,
62 consensus,
63 )?))
64}
65
66pub struct SyncTask<P, E, C>
69where
70 P: PeerToPeerPort + Send + Sync + 'static,
71 E: BlockImporterPort + Send + Sync + 'static,
72 C: ConsensusPort + Send + Sync + 'static,
73{
74 sync_heights: SyncHeights,
75 import_task_handle: ServiceRunner<ImportTask<P, E, C>>,
76}
77
78struct ImportTask<P, E, C>(Import<P, E, C>);
79
80impl<P, E, C> SyncTask<P, E, C>
81where
82 P: PeerToPeerPort + Send + Sync + 'static,
83 E: BlockImporterPort + Send + Sync + 'static,
84 C: ConsensusPort + Send + Sync + 'static,
85{
86 fn new(
87 height_stream: BoxStream<BlockHeight>,
88 committed_height_stream: BoxStream<BlockHeight>,
89 state: State,
90 params: Config,
91 p2p: P,
92 executor: E,
93 consensus: C,
94 ) -> anyhow::Result<Self> {
95 let notify = Arc::new(Notify::new());
96 let state = SharedMutex::new(state);
97 let p2p = Arc::new(p2p);
98 let executor = Arc::new(executor);
99 let consensus = Arc::new(consensus);
100 let sync_heights = SyncHeights::new(
101 height_stream,
102 committed_height_stream,
103 state.clone(),
104 notify.clone(),
105 );
106 let import = Import::new(state, notify, params, p2p, executor, consensus);
107 let import_task_handle = ServiceRunner::new(ImportTask(import));
108 Ok(Self {
109 sync_heights,
110 import_task_handle,
111 })
112 }
113}
114
115impl<P, E, C> RunnableTask for SyncTask<P, E, C>
116where
117 P: PeerToPeerPort + Send + Sync + 'static,
118 E: BlockImporterPort + Send + Sync + 'static,
119 C: ConsensusPort + Send + Sync + 'static,
120{
121 async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction {
122 match self.sync_heights.sync().await {
123 None => TaskNextAction::Stop,
124 Some(_) => TaskNextAction::Continue,
125 }
126 }
127
128 async fn shutdown(self) -> anyhow::Result<()> {
129 self.import_task_handle.stop_and_await().await?;
130 Ok(())
131 }
132}
133
134#[async_trait::async_trait]
135impl<P, E, C> RunnableService for SyncTask<P, E, C>
136where
137 P: PeerToPeerPort + Send + Sync + 'static,
138 E: BlockImporterPort + Send + Sync + 'static,
139 C: ConsensusPort + Send + Sync + 'static,
140{
141 const NAME: &'static str = "SyncTask";
142
143 type SharedData = ();
144
145 type Task = SyncTask<P, E, C>;
146 type TaskParams = ();
147
148 fn shared_data(&self) -> Self::SharedData {}
149
150 async fn into_task(
151 mut self,
152 watcher: &StateWatcher,
153 _: Self::TaskParams,
154 ) -> anyhow::Result<Self::Task> {
155 let mut sync_watcher = watcher.clone();
156 self.import_task_handle.start_and_await().await?;
157 let mut import_watcher = self.import_task_handle.state_watcher();
158 self.sync_heights.map_stream(|height_stream| {
159 height_stream
160 .take_until(async move {
161 tokio::select! {
162 _ = sync_watcher.while_started() => {},
163 _ = import_watcher.while_started() => {},
164 }
165 })
166 .into_boxed()
167 });
168
169 Ok(self)
170 }
171}
172
173impl<P, E, C> RunnableTask for ImportTask<P, E, C>
174where
175 P: PeerToPeerPort + Send + Sync + 'static,
176 E: BlockImporterPort + Send + Sync + 'static,
177 C: ConsensusPort + Send + Sync + 'static,
178{
179 async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
180 self.0.import(watcher).await.into()
181 }
182
183 async fn shutdown(self) -> anyhow::Result<()> {
184 Ok(())
187 }
188}
189
190#[async_trait::async_trait]
191impl<P, E, C> RunnableService for ImportTask<P, E, C>
192where
193 P: PeerToPeerPort + Send + Sync + 'static,
194 E: BlockImporterPort + Send + Sync + 'static,
195 C: ConsensusPort + Send + Sync + 'static,
196{
197 const NAME: &'static str = "ImportTask";
198
199 type SharedData = ();
200 type TaskParams = ();
201
202 type Task = ImportTask<P, E, C>;
203
204 fn shared_data(&self) -> Self::SharedData {}
205
206 async fn into_task(
207 self,
208 _: &StateWatcher,
209 _: Self::TaskParams,
210 ) -> anyhow::Result<Self::Task> {
211 Ok(self)
212 }
213}