gevulot_rs/
event_fetcher.rs1use std::time::Duration;
2
3use backon::{ExponentialBuilder, Retryable};
4use cosmrs::{
5 rpc::{self, endpoint::block_results::Response as BlockResults, Client},
6 tendermint::block::Height,
7};
8
9use crate::error::Result;
10
11pub trait EventHandler: Send + Sync {
13 fn handle_event(
15 &mut self,
16 event: &crate::Event,
17 block_height: crate::Height,
18 ) -> impl std::future::Future<Output = Result<()>> + Send;
19}
20
21pub struct EventFetcher<H: EventHandler> {
23 pub handler: H,
24 pub rpc_url: String,
25 pub start_height: Option<Height>,
26 pub sleep_time: Duration,
27 pub max_retries: usize,
28}
29
30impl<H> EventFetcher<H>
31where
32 H: EventHandler,
33{
34 pub fn new(
36 rpc_url: &str,
37 start_height: Option<Height>,
38 sleep_time: Duration,
39 handler: H,
40 ) -> Self {
41 Self {
42 handler,
43 rpc_url: rpc_url.to_string(),
44 start_height,
45 sleep_time,
46 max_retries: 3,
47 }
48 }
49
50 async fn fetch_latest_block_number_no_retry(
51 &self,
52 rpc_client: &rpc::HttpClient,
53 ) -> Result<Height> {
54 let status = rpc_client.status().await?;
55 Ok(status.sync_info.latest_block_height)
56 }
57
58 async fn fetch_latest_block_number(&self, rpc_client: &rpc::HttpClient) -> Result<Height> {
59 let backoff = ExponentialBuilder::default()
60 .with_max_times(self.max_retries)
61 .with_jitter();
62
63 (|| async { self.fetch_latest_block_number_no_retry(rpc_client).await })
64 .retry(backoff)
65 .await
66 .map_err(|e| {
67 log::error!(
68 "Error fetching latest block status after {} retries: {:?}",
69 self.max_retries,
70 e
71 );
72 e
73 })
74 }
75
76 async fn fetch_block_results_no_retry(
77 &self,
78 rpc_client: &rpc::HttpClient,
79 height: Height,
80 ) -> Result<BlockResults> {
81 rpc_client.block_results(height).await.map_err(Into::into)
82 }
83
84 async fn fetch_block_results(
85 &self,
86 rpc_client: &rpc::HttpClient,
87 height: Height,
88 ) -> Result<BlockResults> {
89 let backoff = ExponentialBuilder::default()
90 .with_max_times(self.max_retries)
91 .with_jitter();
92
93 (|| async { self.fetch_block_results_no_retry(rpc_client, height).await })
94 .retry(backoff)
95 .await
96 .map_err(|e| {
97 log::error!(
98 "Error fetching block results for height {} after {} retries: {:?}",
99 height,
100 self.max_retries,
101 e
102 );
103 e
104 })
105 }
106
107 async fn process_block_results(&mut self, block_results: &BlockResults) -> Result<()> {
108 if let Some(events) = &block_results.begin_block_events {
109 for event in events.iter() {
110 self.handler
111 .handle_event(event, block_results.height)
112 .await?;
113 }
114 }
115 if let Some(txs_results) = &block_results.txs_results {
116 for event in txs_results.iter().flat_map(|tx| tx.events.iter()) {
117 self.handler
118 .handle_event(event, block_results.height)
119 .await?;
120 }
121 }
122 if let Some(events) = &block_results.end_block_events {
123 for event in events.iter() {
124 self.handler
125 .handle_event(event, block_results.height)
126 .await?;
127 }
128 }
129 for event in block_results.finalize_block_events.iter() {
130 self.handler
131 .handle_event(event, block_results.height)
132 .await?;
133 }
134 Ok(())
135 }
136
137 pub async fn start_fetching(&mut self) -> Result<()> {
139 let rpc_client = rpc::HttpClient::new(self.rpc_url.as_str())?;
140 let mut last_indexed_block = if let Some(start_height) = self.start_height {
141 start_height
142 } else {
143 self.fetch_latest_block_number(&rpc_client).await?
144 };
145
146 loop {
147 let latest_block = self.fetch_latest_block_number(&rpc_client).await?;
148
149 if latest_block > last_indexed_block {
150 for height in (last_indexed_block.value() + 1)..=latest_block.value() {
151 let block_results = self
152 .fetch_block_results(&rpc_client, Height::from(height as u32))
153 .await?;
154 log::debug!("Processing block results for height {}", height);
155 self.process_block_results(&block_results).await?;
156 last_indexed_block = Height::from(height as u32);
157 }
158 }
159 tokio::time::sleep(self.sleep_time).await;
160 }
161 }
162}