datafusion_ethers/
config.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use alloy::rpc::types::eth::{BlockNumberOrTag, Filter};
use datafusion::{
    config::{ConfigEntry, ConfigExtension, ExtensionOptions},
    error::DataFusionError,
};

use crate::stream::StreamOptions;

///////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Clone)]
pub struct EthProviderConfig {
    pub block_range_from: BlockNumberOrTag,
    pub block_range_to: BlockNumberOrTag,
    pub block_stride: u64,
}

///////////////////////////////////////////////////////////////////////////////////////////////////

impl Default for EthProviderConfig {
    fn default() -> Self {
        Self {
            block_range_from: BlockNumberOrTag::Earliest,
            block_range_to: BlockNumberOrTag::Latest,
            block_stride: 100_000,
        }
    }
}

///////////////////////////////////////////////////////////////////////////////////////////////////

impl EthProviderConfig {
    pub fn default_filter(&self) -> Filter {
        Filter::new()
            .from_block(self.block_range_from)
            .to_block(self.block_range_to)
    }

    pub fn stream_options(&self) -> StreamOptions {
        StreamOptions {
            block_stride: self.block_stride,
        }
    }
}

///////////////////////////////////////////////////////////////////////////////////////////////////

impl ConfigExtension for EthProviderConfig {
    const PREFIX: &'static str = "ethereum";
}

///////////////////////////////////////////////////////////////////////////////////////////////////

impl ExtensionOptions for EthProviderConfig {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
        &mut *self
    }

    fn cloned(&self) -> Box<dyn ExtensionOptions> {
        Box::new(self.clone())
    }

    fn set(&mut self, key: &str, value: &str) -> datafusion::error::Result<()> {
        match key {
            "block_range_from" => {
                self.block_range_from = parse_block_number(value)
                    .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
                Ok(())
            }
            "block_range_to" => {
                self.block_range_to = parse_block_number(value)
                    .map_err(|msg| DataFusionError::Configuration(msg.to_string()))?;
                Ok(())
            }
            _ => Err(DataFusionError::Configuration(format!(
                "Unsupported option: {key}"
            ))),
        }
    }

    fn entries(&self) -> Vec<ConfigEntry> {
        vec![
            ConfigEntry {
                key: "block_range_from".to_string(),
                value: Some(self.block_range_from.to_string()),
                description:
                    "Lower boundry (inclusive) restriction on block range when pushing down predicate to the node",
            },
            ConfigEntry {
                key: "block_range_to".to_string(),
                value: Some(self.block_range_to.to_string()),
                description:
                    "Upper boundry (inclusive) restriction on block range when pushing down predicate to the node",
            },
        ]
    }
}

///////////////////////////////////////////////////////////////////////////////////////////////////

fn parse_block_number(s: &str) -> Result<BlockNumberOrTag, String> {
    let block = match s.to_lowercase().as_str() {
        "latest" => BlockNumberOrTag::Latest,
        "finalized" => BlockNumberOrTag::Finalized,
        "safe" => BlockNumberOrTag::Safe,
        "earliest" => BlockNumberOrTag::Earliest,
        "pending" => BlockNumberOrTag::Pending,
        number => BlockNumberOrTag::Number(
            number
                .parse()
                .map_err(|_| format!("Invalid block number: {number}"))?,
        ),
    };
    Ok(block)
}