gevulot_rs/
event_fetcher.rs

1use std::time::Duration;
2
3use backon::{ExponentialBuilder, Retryable};
4use cosmrs::{
5    rpc::{self, endpoint::block_results::Response as BlockResults, Client},
6    tendermint::block::Height,
7};
8
9use crate::error::Result;
10
11// Trait for handling events asynchronously
12pub trait EventHandler: Send + Sync {
13    // Asynchronously handles an event
14    fn handle_event(
15        &mut self,
16        event: &crate::Event,
17        block_height: crate::Height,
18    ) -> impl std::future::Future<Output = Result<()>> + Send;
19}
20
21// Fetches events from the blockchain and processes them using the provided handler
22pub struct EventFetcher<H: EventHandler> {
23    pub handler: H,
24    pub rpc_url: String,
25    pub start_height: Option<Height>,
26    pub sleep_time: Duration,
27    pub max_retries: usize,
28}
29
30impl<H> EventFetcher<H>
31where
32    H: EventHandler,
33{
34    // Creates a new EventFetcher
35    pub fn new(
36        rpc_url: &str,
37        start_height: Option<Height>,
38        sleep_time: Duration,
39        handler: H,
40    ) -> Self {
41        Self {
42            handler,
43            rpc_url: rpc_url.to_string(),
44            start_height,
45            sleep_time,
46            max_retries: 3,
47        }
48    }
49
50    async fn fetch_latest_block_number_no_retry(
51        &self,
52        rpc_client: &rpc::HttpClient,
53    ) -> Result<Height> {
54        let status = rpc_client.status().await?;
55        Ok(status.sync_info.latest_block_height)
56    }
57
58    async fn fetch_latest_block_number(&self, rpc_client: &rpc::HttpClient) -> Result<Height> {
59        let backoff = ExponentialBuilder::default()
60            .with_max_times(self.max_retries)
61            .with_jitter();
62
63        (|| async { self.fetch_latest_block_number_no_retry(rpc_client).await })
64            .retry(backoff)
65            .await
66            .map_err(|e| {
67                log::error!(
68                    "Error fetching latest block status after {} retries: {:?}",
69                    self.max_retries,
70                    e
71                );
72                e
73            })
74    }
75
76    async fn fetch_block_results_no_retry(
77        &self,
78        rpc_client: &rpc::HttpClient,
79        height: Height,
80    ) -> Result<BlockResults> {
81        rpc_client.block_results(height).await.map_err(Into::into)
82    }
83
84    async fn fetch_block_results(
85        &self,
86        rpc_client: &rpc::HttpClient,
87        height: Height,
88    ) -> Result<BlockResults> {
89        let backoff = ExponentialBuilder::default()
90            .with_max_times(self.max_retries)
91            .with_jitter();
92
93        (|| async { self.fetch_block_results_no_retry(rpc_client, height).await })
94            .retry(backoff)
95            .await
96            .map_err(|e| {
97                log::error!(
98                    "Error fetching block results for height {} after {} retries: {:?}",
99                    height,
100                    self.max_retries,
101                    e
102                );
103                e
104            })
105    }
106
107    async fn process_block_results(&mut self, block_results: &BlockResults) -> Result<()> {
108        if let Some(events) = &block_results.begin_block_events {
109            for event in events.iter() {
110                self.handler
111                    .handle_event(event, block_results.height)
112                    .await?;
113            }
114        }
115        if let Some(txs_results) = &block_results.txs_results {
116            for event in txs_results.iter().flat_map(|tx| tx.events.iter()) {
117                self.handler
118                    .handle_event(event, block_results.height)
119                    .await?;
120            }
121        }
122        if let Some(events) = &block_results.end_block_events {
123            for event in events.iter() {
124                self.handler
125                    .handle_event(event, block_results.height)
126                    .await?;
127            }
128        }
129        for event in block_results.finalize_block_events.iter() {
130            self.handler
131                .handle_event(event, block_results.height)
132                .await?;
133        }
134        Ok(())
135    }
136
137    // Starts fetching events from the blockchain
138    pub async fn start_fetching(&mut self) -> Result<()> {
139        let rpc_client = rpc::HttpClient::new(self.rpc_url.as_str())?;
140        let mut last_indexed_block = if let Some(start_height) = self.start_height {
141            start_height
142        } else {
143            self.fetch_latest_block_number(&rpc_client).await?
144        };
145
146        loop {
147            let latest_block = self.fetch_latest_block_number(&rpc_client).await?;
148
149            if latest_block > last_indexed_block {
150                for height in (last_indexed_block.value() + 1)..=latest_block.value() {
151                    let block_results = self
152                        .fetch_block_results(&rpc_client, Height::from(height as u32))
153                        .await?;
154                    log::debug!("Processing block results for height {}", height);
155                    self.process_block_results(&block_results).await?;
156                    last_indexed_block = Height::from(height as u32);
157                }
158            }
159            tokio::time::sleep(self.sleep_time).await;
160        }
161    }
162}