fuel_core_relayer/
service.rs

1//! This module handles bridge communications between the fuel node and the data availability layer.
2
3use crate::{
4    log::EthEventLog,
5    ports::RelayerDb,
6    service::state::EthLocal,
7    Config,
8};
9use async_trait::async_trait;
10use core::time::Duration;
11use ethers_core::types::{
12    Filter,
13    Log,
14    SyncingStatus,
15    ValueOrArray,
16};
17use ethers_providers::{
18    Http,
19    Middleware,
20    Provider,
21    ProviderError,
22    Quorum,
23    QuorumProvider,
24    WeightedProvider,
25};
26use fuel_core_services::{
27    RunnableService,
28    RunnableTask,
29    ServiceRunner,
30    StateWatcher,
31    TaskNextAction,
32};
33use fuel_core_types::{
34    blockchain::primitives::DaBlockHeight,
35    entities::Message,
36};
37use futures::StreamExt;
38use std::{
39    convert::TryInto,
40    ops::Deref,
41};
42use tokio::sync::watch;
43
44use self::{
45    get_logs::*,
46    run::RelayerData,
47};
48
49mod get_logs;
50mod run;
51mod state;
52mod syncing;
53
54#[cfg(test)]
55mod test;
56
57type Synced = watch::Receiver<Option<DaBlockHeight>>;
58type NotifySynced = watch::Sender<Option<DaBlockHeight>>;
59
60/// The alias of runnable relayer service.
61pub type Service<D> = CustomizableService<Provider<QuorumProvider<Http>>, D>;
62type CustomizableService<P, D> = ServiceRunner<NotInitializedTask<P, D>>;
63
64/// The shared state of the relayer task.
65#[derive(Clone)]
66pub struct SharedState<D> {
67    /// Receives signals when the relayer reaches consistency with the DA layer.
68    synced: Synced,
69    start_da_block_height: DaBlockHeight,
70    database: D,
71}
72
73/// Not initialized version of the [`Task`].
74pub struct NotInitializedTask<P, D> {
75    /// Sends signals when the relayer reaches consistency with the DA layer.
76    synced: NotifySynced,
77    /// The node that communicates with Ethereum.
78    eth_node: P,
79    /// The fuel database.
80    database: D,
81    /// Configuration settings.
82    config: Config,
83    /// Retry on error
84    retry_on_error: bool,
85}
86
87/// The actual relayer background task that syncs with the DA layer.
88pub struct Task<P, D> {
89    /// Sends signals when the relayer reaches consistency with the DA layer.
90    synced: NotifySynced,
91    /// The node that communicates with Ethereum.
92    eth_node: P,
93    /// The fuel database.
94    database: D,
95    /// Configuration settings.
96    config: Config,
97    /// The watcher used to track the state of the service. If the service stops,
98    /// the task will stop synchronization.
99    shutdown: StateWatcher,
100    /// Retry on error
101    retry_on_error: bool,
102}
103
104impl<P, D> NotInitializedTask<P, D> {
105    /// Create a new relayer task.
106    fn new(eth_node: P, database: D, config: Config, retry_on_error: bool) -> Self {
107        let (synced, _) = watch::channel(None);
108        Self {
109            synced,
110            eth_node,
111            database,
112            config,
113            retry_on_error,
114        }
115    }
116}
117
118#[async_trait]
119impl<P, D> RelayerData for Task<P, D>
120where
121    P: Middleware<Error = ProviderError> + 'static,
122    D: RelayerDb + 'static,
123{
124    async fn wait_if_eth_syncing(&self) -> anyhow::Result<()> {
125        let mut shutdown = self.shutdown.clone();
126        tokio::select! {
127            biased;
128            _ = shutdown.while_started() => {
129                Err(anyhow::anyhow!("The relayer got a stop signal"))
130            },
131            result = syncing::wait_if_eth_syncing(
132                &self.eth_node,
133                self.config.syncing_call_frequency,
134                self.config.syncing_log_frequency,
135            ) => {
136                result
137            }
138        }
139    }
140
141    async fn download_logs(
142        &mut self,
143        eth_sync_gap: &state::EthSyncGap,
144    ) -> anyhow::Result<()> {
145        let logs = download_logs(
146            eth_sync_gap,
147            self.config.eth_v2_listening_contracts.clone(),
148            &self.eth_node,
149            self.config.log_page_size,
150        );
151        let logs = logs.take_until(self.shutdown.while_started());
152
153        write_logs(&mut self.database, logs).await
154    }
155
156    fn update_synced(&self, state: &state::EthState) {
157        self.synced.send_if_modified(|last_state| {
158            if let Some(val) = state.is_synced_at() {
159                *last_state = Some(DaBlockHeight::from(val));
160                true
161            } else {
162                false
163            }
164        });
165    }
166}
167
168#[async_trait]
169impl<P, D> RunnableService for NotInitializedTask<P, D>
170where
171    P: Middleware<Error = ProviderError> + 'static,
172    D: RelayerDb + Clone + 'static,
173{
174    const NAME: &'static str = "Relayer";
175
176    type SharedData = SharedState<D>;
177    type Task = Task<P, D>;
178    type TaskParams = ();
179
180    fn shared_data(&self) -> Self::SharedData {
181        let synced = self.synced.subscribe();
182
183        SharedState {
184            synced,
185            start_da_block_height: self.config.da_deploy_height,
186            database: self.database.clone(),
187        }
188    }
189
190    async fn into_task(
191        mut self,
192        watcher: &StateWatcher,
193        _: Self::TaskParams,
194    ) -> anyhow::Result<Self::Task> {
195        let shutdown = watcher.clone();
196        let NotInitializedTask {
197            synced,
198            eth_node,
199            database,
200            config,
201            retry_on_error,
202        } = self;
203        let task = Task {
204            synced,
205            eth_node,
206            database,
207            config,
208            shutdown,
209            retry_on_error,
210        };
211
212        Ok(task)
213    }
214}
215
216impl<P, D> RunnableTask for Task<P, D>
217where
218    P: Middleware<Error = ProviderError> + 'static,
219    D: RelayerDb + 'static,
220{
221    async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction {
222        let now = tokio::time::Instant::now();
223
224        let result = run::run(self).await;
225
226        if self.shutdown.borrow_and_update().started()
227            && (result.is_err() | self.synced.borrow().is_some())
228        {
229            // Sleep the loop so the da node is not spammed.
230            tokio::time::sleep(
231                self.config
232                    .sync_minimum_duration
233                    .saturating_sub(now.elapsed()),
234            )
235            .await;
236        }
237
238        if let Err(err) = result {
239            if !self.retry_on_error {
240                tracing::error!("Exiting due to Error in relayer task: {:?}", err);
241                TaskNextAction::Stop
242            } else {
243                TaskNextAction::ErrorContinue(err)
244            }
245        } else {
246            TaskNextAction::Continue
247        }
248    }
249
250    async fn shutdown(self) -> anyhow::Result<()> {
251        // Nothing to shut down because we don't have any temporary state that should be dumped,
252        // and we don't spawn any sub-tasks that we need to finish or await.
253        Ok(())
254    }
255}
256
257impl<D> SharedState<D> {
258    /// Wait for the `Task` to be in sync with
259    /// the data availability layer.
260    ///
261    /// Yields until the relayer reaches a point where it
262    /// considered up to date. Note that there's no guarantee
263    /// the relayer will ever catch up to the da layer and
264    /// may fall behind immediately after this future completes.
265    ///
266    /// The only guarantee is that if this future completes then
267    /// the relayer did reach consistency with the da layer for
268    /// some period of time.
269    pub async fn await_synced(&self) -> anyhow::Result<()> {
270        let mut rx = self.synced.clone();
271        if rx.borrow_and_update().deref().is_none() {
272            rx.changed().await?;
273        }
274        Ok(())
275    }
276
277    /// Wait until at least the given height is synced.
278    pub async fn await_at_least_synced(
279        &self,
280        height: &DaBlockHeight,
281    ) -> anyhow::Result<()> {
282        let mut rx = self.synced.clone();
283        while rx.borrow_and_update().deref().map_or(true, |h| h < *height) {
284            rx.changed().await?;
285        }
286        Ok(())
287    }
288
289    /// Get finalized da height that represents last block from da layer that got finalized.
290    /// Panics if height is not set as of initialization of the relayer.
291    pub fn get_finalized_da_height(&self) -> DaBlockHeight
292    where
293        D: RelayerDb + 'static,
294    {
295        self.database
296            .get_finalized_da_height()
297            .unwrap_or(self.start_da_block_height)
298    }
299
300    /// Getter for database field
301    pub fn database(&self) -> &D {
302        &self.database
303    }
304}
305
306#[async_trait]
307impl<P, D> state::EthRemote for Task<P, D>
308where
309    P: Middleware<Error = ProviderError>,
310    D: RelayerDb + 'static,
311{
312    async fn finalized(&self) -> anyhow::Result<u64> {
313        let mut shutdown = self.shutdown.clone();
314        tokio::select! {
315            biased;
316            _ = shutdown.while_started() => {
317                Err(anyhow::anyhow!("The relayer got a stop signal"))
318            },
319            block = self.eth_node.get_block(ethers_core::types::BlockNumber::Finalized) => {
320                let block_number = block.map_err(anyhow::Error::msg)?
321                    .and_then(|block| block.number)
322                    .ok_or(anyhow::anyhow!("Block pending"))?
323                    .as_u64();
324                Ok(block_number)
325            }
326        }
327    }
328}
329
330#[async_trait]
331impl<P, D> EthLocal for Task<P, D>
332where
333    P: Middleware<Error = ProviderError>,
334    D: RelayerDb + 'static,
335{
336    fn observed(&self) -> Option<u64> {
337        Some(
338            self.database
339                .get_finalized_da_height()
340                .map(|h| h.into())
341                .unwrap_or(self.config.da_deploy_height.0),
342        )
343    }
344}
345
346/// Creates an instance of runnable relayer service.
347pub fn new_service<D>(database: D, config: Config) -> anyhow::Result<Service<D>>
348where
349    D: RelayerDb + Clone + 'static,
350{
351    let urls = config
352        .relayer
353        .clone()
354        .ok_or_else(|| {
355            anyhow::anyhow!(
356                "Tried to start Relayer without setting an eth_client in the config"
357            )
358        })?
359        .into_iter()
360        .map(|url| WeightedProvider::new(Http::new(url)));
361
362    let eth_node = Provider::new(QuorumProvider::new(Quorum::Majority, urls));
363    let retry_on_error = true;
364    Ok(new_service_internal(
365        eth_node,
366        database,
367        config,
368        retry_on_error,
369    ))
370}
371
372#[cfg(any(test, feature = "test-helpers"))]
373/// Start a test relayer.
374pub fn new_service_test<P, D>(
375    eth_node: P,
376    database: D,
377    config: Config,
378) -> CustomizableService<P, D>
379where
380    P: Middleware<Error = ProviderError> + 'static,
381    D: RelayerDb + Clone + 'static,
382{
383    let retry_on_fail = false;
384    new_service_internal(eth_node, database, config, retry_on_fail)
385}
386
387fn new_service_internal<P, D>(
388    eth_node: P,
389    database: D,
390    config: Config,
391    retry_on_error: bool,
392) -> CustomizableService<P, D>
393where
394    P: Middleware<Error = ProviderError> + 'static,
395    D: RelayerDb + Clone + 'static,
396{
397    let task = NotInitializedTask::new(eth_node, database, config, retry_on_error);
398
399    CustomizableService::new(task)
400}