datafusion_ethers/convert/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
mod decoded;
mod hybrid;
mod raw;

use alloy::rpc::types::eth::{Filter, Log};
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::datatypes::SchemaRef;
pub use decoded::*;
pub use hybrid::*;
pub use raw::*;

use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::ExecutionPlan;

///////////////////////////////////////////////////////////////////////////////////////////////////

pub trait Transcoder {
    fn schema(&self) -> SchemaRef;
    fn append(&mut self, logs: &[Log]) -> Result<(), AppendError>;
    fn len(&self) -> usize;
    fn finish(&mut self) -> RecordBatch;
    fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

#[derive(Debug, thiserror::Error)]
pub enum AppendError {
    #[error(transparent)]
    EventDecodingError(#[from] alloy::dyn_abi::Error),
}

///////////////////////////////////////////////////////////////////////////////////////////////////

/// Analyzes the SQL and returns a pushed-down filter that will be used when querying logs from the ETH node
pub async fn sql_to_pushdown_filter(ctx: &SessionContext, sql: &str) -> DfResult<Option<Filter>> {
    let df = ctx.sql(sql).await?;
    let plan = df.create_physical_plan().await?;
    Ok(sql_to_pushdown_filter_rec(plan.as_ref()))
}

fn sql_to_pushdown_filter_rec(plan: &dyn ExecutionPlan) -> Option<Filter> {
    let mut found = plan
        .as_any()
        .downcast_ref::<super::provider::EthGetLogs>()
        .map(|scan| scan.filter().clone());

    // Traverse all the children too to make sure there is only one scan in this query (e.g. no UNIONs)
    for child in &plan.children() {
        let child = sql_to_pushdown_filter_rec(child.as_ref());
        if child.is_some() {
            if found.is_some() {
                unimplemented!("Multiple table scans in one query are not yet supported");
            }
            found = child;
        }
    }

    found
}