use crate::{
log::EthEventLog,
ports::RelayerDb,
service::state::EthLocal,
Config,
};
use async_trait::async_trait;
use core::time::Duration;
use ethers_core::types::{
Filter,
Log,
SyncingStatus,
ValueOrArray,
H160,
};
use ethers_providers::{
Http,
Middleware,
Provider,
ProviderError,
};
use fuel_core_services::{
RunnableService,
RunnableTask,
ServiceRunner,
StateWatcher,
};
use fuel_core_storage::{
tables::Messages,
StorageAsRef,
StorageInspect,
};
use fuel_core_types::{
blockchain::primitives::DaBlockHeight,
entities::message::CompressedMessage,
fuel_types::MessageId,
};
use futures::StreamExt;
use std::{
borrow::Cow,
convert::TryInto,
ops::Deref,
};
use synced::update_synced;
use tokio::sync::watch;
use self::{
get_logs::*,
run::RelayerData,
};
mod get_logs;
mod run;
mod state;
mod synced;
mod syncing;
#[cfg(test)]
mod test;
type Synced = watch::Receiver<Option<DaBlockHeight>>;
type NotifySynced = watch::Sender<Option<DaBlockHeight>>;
pub type Service<D> = CustomizableService<Provider<Http>, D>;
type CustomizableService<P, D> = ServiceRunner<NotInitializedTask<P, D>>;
#[derive(Clone)]
pub struct SharedState<D> {
synced: Synced,
database: D,
}
pub struct NotInitializedTask<P, D> {
synced: NotifySynced,
eth_node: P,
database: D,
config: Config,
}
pub struct Task<P, D> {
synced: NotifySynced,
eth_node: P,
database: D,
config: Config,
shutdown: StateWatcher,
}
impl<P, D> NotInitializedTask<P, D> {
fn new(eth_node: P, database: D, config: Config) -> Self {
let (synced, _) = watch::channel(None);
Self {
synced,
eth_node,
database,
config,
}
}
}
impl<P, D> Task<P, D>
where
D: RelayerDb + 'static,
{
fn set_deploy_height(&mut self) {
self.database
.set_finalized_da_height_to_at_least(&self.config.da_deploy_height)
.expect("Should be able to set the finalized da height");
}
}
#[async_trait]
impl<P, D> RelayerData for Task<P, D>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + 'static,
{
async fn wait_if_eth_syncing(&self) -> anyhow::Result<()> {
let mut shutdown = self.shutdown.clone();
tokio::select! {
biased;
_ = shutdown.while_started() => {
Err(anyhow::anyhow!("The relayer got a stop signal"))
},
result = syncing::wait_if_eth_syncing(
&self.eth_node,
self.config.syncing_call_frequency,
self.config.syncing_log_frequency,
) => {
result
}
}
}
async fn download_logs(
&mut self,
eth_sync_gap: &state::EthSyncGap,
) -> anyhow::Result<()> {
let logs = download_logs(
eth_sync_gap,
self.config.eth_v2_listening_contracts.clone(),
&self.eth_node,
self.config.log_page_size,
);
let logs = logs.take_until(self.shutdown.while_started());
write_logs(&mut self.database, logs).await
}
fn update_synced(&self, state: &state::EthState) {
update_synced(&self.synced, state)
}
}
#[async_trait]
impl<P, D> RunnableService for NotInitializedTask<P, D>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + Clone + 'static,
{
const NAME: &'static str = "Relayer";
type SharedData = SharedState<D>;
type Task = Task<P, D>;
fn shared_data(&self) -> Self::SharedData {
let synced = self.synced.subscribe();
SharedState {
synced,
database: self.database.clone(),
}
}
async fn into_task(mut self, watcher: &StateWatcher) -> anyhow::Result<Self::Task> {
let shutdown = watcher.clone();
let NotInitializedTask {
synced,
eth_node,
database,
config,
} = self;
let mut task = Task {
synced,
eth_node,
database,
config,
shutdown,
};
task.set_deploy_height();
Ok(task)
}
}
#[async_trait]
impl<P, D> RunnableTask for Task<P, D>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + 'static,
{
async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result<bool> {
let now = tokio::time::Instant::now();
let should_continue = true;
let result = run::run(self).await;
if self.shutdown.borrow_and_update().started() && self.synced.borrow().is_some() {
tokio::time::sleep(
self.config
.sync_minimum_duration
.saturating_sub(now.elapsed()),
)
.await;
}
result.map(|_| should_continue)
}
async fn shutdown(self) -> anyhow::Result<()> {
Ok(())
}
}
impl<D> SharedState<D> {
pub async fn await_synced(&self) -> anyhow::Result<()> {
let mut rx = self.synced.clone();
if rx.borrow_and_update().deref().is_none() {
rx.changed().await?;
}
Ok(())
}
pub async fn await_at_least_synced(
&self,
height: &DaBlockHeight,
) -> anyhow::Result<()> {
let mut rx = self.synced.clone();
while rx.borrow_and_update().deref().map_or(true, |h| h < *height) {
rx.changed().await?;
}
Ok(())
}
pub fn get_message(
&self,
id: &MessageId,
da_height: &DaBlockHeight,
) -> anyhow::Result<Option<CompressedMessage>>
where
D: StorageInspect<Messages, Error = fuel_core_storage::Error>,
{
Ok(self
.database
.storage::<Messages>()
.get(id)?
.map(Cow::into_owned)
.filter(|message| message.da_height <= *da_height))
}
pub fn get_finalized_da_height(&self) -> anyhow::Result<DaBlockHeight>
where
D: RelayerDb + 'static,
{
Ok(self.database.get_finalized_da_height()?)
}
}
#[async_trait]
impl<P, D> state::EthRemote for Task<P, D>
where
P: Middleware<Error = ProviderError>,
D: RelayerDb + 'static,
{
async fn current(&self) -> anyhow::Result<u64> {
let mut shutdown = self.shutdown.clone();
tokio::select! {
biased;
_ = shutdown.while_started() => {
Err(anyhow::anyhow!("The relayer got a stop signal"))
},
block_number = self.eth_node.get_block_number() => {
Ok(block_number?.as_u64())
}
}
}
fn finalization_period(&self) -> u64 {
*self.config.da_finalization
}
}
#[async_trait]
impl<P, D> EthLocal for Task<P, D>
where
P: Middleware<Error = ProviderError>,
D: RelayerDb + 'static,
{
fn finalized(&self) -> Option<u64> {
self.database.get_finalized_da_height().map(|h| *h).ok()
}
}
pub fn new_service<D>(database: D, config: Config) -> anyhow::Result<Service<D>>
where
D: RelayerDb + Clone + 'static,
{
let url = config.eth_client.clone().ok_or_else(|| {
anyhow::anyhow!(
"Tried to start Relayer without setting an eth_client in the config"
)
})?;
let http = Http::new(url);
let eth_node = Provider::new(http);
Ok(new_service_internal(eth_node, database, config))
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_service_test<P, D>(
eth_node: P,
database: D,
config: Config,
) -> CustomizableService<P, D>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + Clone + 'static,
{
new_service_internal(eth_node, database, config)
}
fn new_service_internal<P, D>(
eth_node: P,
database: D,
config: Config,
) -> CustomizableService<P, D>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + Clone + 'static,
{
let task = NotInitializedTask::new(eth_node, database, config);
CustomizableService::new(task)
}