1use crate::{
4 log::EthEventLog,
5 ports::RelayerDb,
6 service::state::EthLocal,
7 Config,
8};
9use async_trait::async_trait;
10use core::time::Duration;
11use ethers_core::types::{
12 Filter,
13 Log,
14 SyncingStatus,
15 ValueOrArray,
16};
17use ethers_providers::{
18 Http,
19 Middleware,
20 Provider,
21 ProviderError,
22 Quorum,
23 QuorumProvider,
24 WeightedProvider,
25};
26use fuel_core_services::{
27 RunnableService,
28 RunnableTask,
29 ServiceRunner,
30 StateWatcher,
31 TaskNextAction,
32};
33use fuel_core_types::{
34 blockchain::primitives::DaBlockHeight,
35 entities::Message,
36};
37use futures::StreamExt;
38use std::{
39 convert::TryInto,
40 ops::Deref,
41};
42use tokio::sync::watch;
43
44use self::{
45 get_logs::*,
46 run::RelayerData,
47};
48
49mod get_logs;
50mod run;
51mod state;
52mod syncing;
53
54#[cfg(test)]
55mod test;
56
57type Synced = watch::Receiver<Option<DaBlockHeight>>;
58type NotifySynced = watch::Sender<Option<DaBlockHeight>>;
59
60pub type Service<D> = CustomizableService<Provider<QuorumProvider<Http>>, D>;
62type CustomizableService<P, D> = ServiceRunner<NotInitializedTask<P, D>>;
63
64#[derive(Clone)]
66pub struct SharedState<D> {
67 synced: Synced,
69 start_da_block_height: DaBlockHeight,
70 database: D,
71}
72
73pub struct NotInitializedTask<P, D> {
75 synced: NotifySynced,
77 eth_node: P,
79 database: D,
81 config: Config,
83 retry_on_error: bool,
85}
86
87pub struct Task<P, D> {
89 synced: NotifySynced,
91 eth_node: P,
93 database: D,
95 config: Config,
97 shutdown: StateWatcher,
100 retry_on_error: bool,
102}
103
104impl<P, D> NotInitializedTask<P, D> {
105 fn new(eth_node: P, database: D, config: Config, retry_on_error: bool) -> Self {
107 let (synced, _) = watch::channel(None);
108 Self {
109 synced,
110 eth_node,
111 database,
112 config,
113 retry_on_error,
114 }
115 }
116}
117
118#[async_trait]
119impl<P, D> RelayerData for Task<P, D>
120where
121 P: Middleware<Error = ProviderError> + 'static,
122 D: RelayerDb + 'static,
123{
124 async fn wait_if_eth_syncing(&self) -> anyhow::Result<()> {
125 let mut shutdown = self.shutdown.clone();
126 tokio::select! {
127 biased;
128 _ = shutdown.while_started() => {
129 Err(anyhow::anyhow!("The relayer got a stop signal"))
130 },
131 result = syncing::wait_if_eth_syncing(
132 &self.eth_node,
133 self.config.syncing_call_frequency,
134 self.config.syncing_log_frequency,
135 ) => {
136 result
137 }
138 }
139 }
140
141 async fn download_logs(
142 &mut self,
143 eth_sync_gap: &state::EthSyncGap,
144 ) -> anyhow::Result<()> {
145 let logs = download_logs(
146 eth_sync_gap,
147 self.config.eth_v2_listening_contracts.clone(),
148 &self.eth_node,
149 self.config.log_page_size,
150 );
151 let logs = logs.take_until(self.shutdown.while_started());
152
153 write_logs(&mut self.database, logs).await
154 }
155
156 fn update_synced(&self, state: &state::EthState) {
157 self.synced.send_if_modified(|last_state| {
158 if let Some(val) = state.is_synced_at() {
159 *last_state = Some(DaBlockHeight::from(val));
160 true
161 } else {
162 false
163 }
164 });
165 }
166}
167
168#[async_trait]
169impl<P, D> RunnableService for NotInitializedTask<P, D>
170where
171 P: Middleware<Error = ProviderError> + 'static,
172 D: RelayerDb + Clone + 'static,
173{
174 const NAME: &'static str = "Relayer";
175
176 type SharedData = SharedState<D>;
177 type Task = Task<P, D>;
178 type TaskParams = ();
179
180 fn shared_data(&self) -> Self::SharedData {
181 let synced = self.synced.subscribe();
182
183 SharedState {
184 synced,
185 start_da_block_height: self.config.da_deploy_height,
186 database: self.database.clone(),
187 }
188 }
189
190 async fn into_task(
191 mut self,
192 watcher: &StateWatcher,
193 _: Self::TaskParams,
194 ) -> anyhow::Result<Self::Task> {
195 let shutdown = watcher.clone();
196 let NotInitializedTask {
197 synced,
198 eth_node,
199 database,
200 config,
201 retry_on_error,
202 } = self;
203 let task = Task {
204 synced,
205 eth_node,
206 database,
207 config,
208 shutdown,
209 retry_on_error,
210 };
211
212 Ok(task)
213 }
214}
215
216impl<P, D> RunnableTask for Task<P, D>
217where
218 P: Middleware<Error = ProviderError> + 'static,
219 D: RelayerDb + 'static,
220{
221 async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction {
222 let now = tokio::time::Instant::now();
223
224 let result = run::run(self).await;
225
226 if self.shutdown.borrow_and_update().started()
227 && (result.is_err() | self.synced.borrow().is_some())
228 {
229 tokio::time::sleep(
231 self.config
232 .sync_minimum_duration
233 .saturating_sub(now.elapsed()),
234 )
235 .await;
236 }
237
238 if let Err(err) = result {
239 if !self.retry_on_error {
240 tracing::error!("Exiting due to Error in relayer task: {:?}", err);
241 TaskNextAction::Stop
242 } else {
243 TaskNextAction::ErrorContinue(err)
244 }
245 } else {
246 TaskNextAction::Continue
247 }
248 }
249
250 async fn shutdown(self) -> anyhow::Result<()> {
251 Ok(())
254 }
255}
256
257impl<D> SharedState<D> {
258 pub async fn await_synced(&self) -> anyhow::Result<()> {
270 let mut rx = self.synced.clone();
271 if rx.borrow_and_update().deref().is_none() {
272 rx.changed().await?;
273 }
274 Ok(())
275 }
276
277 pub async fn await_at_least_synced(
279 &self,
280 height: &DaBlockHeight,
281 ) -> anyhow::Result<()> {
282 let mut rx = self.synced.clone();
283 while rx.borrow_and_update().deref().map_or(true, |h| h < *height) {
284 rx.changed().await?;
285 }
286 Ok(())
287 }
288
289 pub fn get_finalized_da_height(&self) -> DaBlockHeight
292 where
293 D: RelayerDb + 'static,
294 {
295 self.database
296 .get_finalized_da_height()
297 .unwrap_or(self.start_da_block_height)
298 }
299
300 pub fn database(&self) -> &D {
302 &self.database
303 }
304}
305
306#[async_trait]
307impl<P, D> state::EthRemote for Task<P, D>
308where
309 P: Middleware<Error = ProviderError>,
310 D: RelayerDb + 'static,
311{
312 async fn finalized(&self) -> anyhow::Result<u64> {
313 let mut shutdown = self.shutdown.clone();
314 tokio::select! {
315 biased;
316 _ = shutdown.while_started() => {
317 Err(anyhow::anyhow!("The relayer got a stop signal"))
318 },
319 block = self.eth_node.get_block(ethers_core::types::BlockNumber::Finalized) => {
320 let block_number = block.map_err(anyhow::Error::msg)?
321 .and_then(|block| block.number)
322 .ok_or(anyhow::anyhow!("Block pending"))?
323 .as_u64();
324 Ok(block_number)
325 }
326 }
327 }
328}
329
330#[async_trait]
331impl<P, D> EthLocal for Task<P, D>
332where
333 P: Middleware<Error = ProviderError>,
334 D: RelayerDb + 'static,
335{
336 fn observed(&self) -> Option<u64> {
337 Some(
338 self.database
339 .get_finalized_da_height()
340 .map(|h| h.into())
341 .unwrap_or(self.config.da_deploy_height.0),
342 )
343 }
344}
345
346pub fn new_service<D>(database: D, config: Config) -> anyhow::Result<Service<D>>
348where
349 D: RelayerDb + Clone + 'static,
350{
351 let urls = config
352 .relayer
353 .clone()
354 .ok_or_else(|| {
355 anyhow::anyhow!(
356 "Tried to start Relayer without setting an eth_client in the config"
357 )
358 })?
359 .into_iter()
360 .map(|url| WeightedProvider::new(Http::new(url)));
361
362 let eth_node = Provider::new(QuorumProvider::new(Quorum::Majority, urls));
363 let retry_on_error = true;
364 Ok(new_service_internal(
365 eth_node,
366 database,
367 config,
368 retry_on_error,
369 ))
370}
371
372#[cfg(any(test, feature = "test-helpers"))]
373pub fn new_service_test<P, D>(
375 eth_node: P,
376 database: D,
377 config: Config,
378) -> CustomizableService<P, D>
379where
380 P: Middleware<Error = ProviderError> + 'static,
381 D: RelayerDb + Clone + 'static,
382{
383 let retry_on_fail = false;
384 new_service_internal(eth_node, database, config, retry_on_fail)
385}
386
387fn new_service_internal<P, D>(
388 eth_node: P,
389 database: D,
390 config: Config,
391 retry_on_error: bool,
392) -> CustomizableService<P, D>
393where
394 P: Middleware<Error = ProviderError> + 'static,
395 D: RelayerDb + Clone + 'static,
396{
397 let task = NotInitializedTask::new(eth_node, database, config, retry_on_error);
398
399 CustomizableService::new(task)
400}