alloy_rpc_types_eth/
pubsub.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
//! Ethereum types for pub-sub

use crate::{Filter, Header, Log, Transaction};
use alloc::{boxed::Box, format};
use alloy_primitives::B256;
use alloy_serde::WithOtherFields;

/// Subscription result.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(untagged))]
pub enum SubscriptionResult<T = Transaction> {
    /// New block header.
    Header(Box<WithOtherFields<Header>>),
    /// Log
    Log(Box<Log>),
    /// Transaction hash
    TransactionHash(B256),
    /// Full Transaction
    FullTransaction(Box<T>),
    /// SyncStatus
    SyncState(PubSubSyncStatus),
}

/// Response type for a SyncStatus subscription.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(untagged))]
pub enum PubSubSyncStatus {
    /// If not currently syncing, this should always be `false`.
    Simple(bool),
    /// Syncing metadata.
    Detailed(SyncStatusMetadata),
}

/// Sync status metadata.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
pub struct SyncStatusMetadata {
    /// Whether the node is currently syncing.
    pub syncing: bool,
    /// The starting block.
    pub starting_block: u64,
    /// The current block.
    pub current_block: u64,
    /// The highest block.
    #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))]
    pub highest_block: Option<u64>,
}

#[cfg(feature = "serde")]
impl<T> serde::Serialize for SubscriptionResult<T>
where
    T: serde::Serialize,
{
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        match *self {
            Self::Header(ref header) => header.serialize(serializer),
            Self::Log(ref log) => log.serialize(serializer),
            Self::TransactionHash(ref hash) => hash.serialize(serializer),
            Self::FullTransaction(ref tx) => tx.serialize(serializer),
            Self::SyncState(ref sync) => sync.serialize(serializer),
        }
    }
}

/// Subscription kind.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
#[cfg_attr(feature = "serde", serde(deny_unknown_fields))]
pub enum SubscriptionKind {
    /// New block headers subscription.
    ///
    /// Fires a notification each time a new header is appended to the chain, including chain
    /// reorganizations. In case of a chain reorganization the subscription will emit all new
    /// headers for the new chain. Therefore the subscription can emit multiple headers on the same
    /// height.
    NewHeads,
    /// Logs subscription.
    ///
    /// Returns logs that are included in new imported blocks and match the given filter criteria.
    /// In case of a chain reorganization previous sent logs that are on the old chain will be
    /// resent with the removed property set to true. Logs from transactions that ended up in the
    /// new chain are emitted. Therefore, a subscription can emit logs for the same transaction
    /// multiple times.
    Logs,
    /// New Pending Transactions subscription.
    ///
    /// Returns the hash or full tx for all transactions that are added to the pending state and
    /// are signed with a key that is available in the node. When a transaction that was
    /// previously part of the canonical chain isn't part of the new canonical chain after a
    /// reorganization its again emitted.
    NewPendingTransactions,
    /// Node syncing status subscription.
    ///
    /// Indicates when the node starts or stops synchronizing. The result can either be a boolean
    /// indicating that the synchronization has started (true), finished (false) or an object with
    /// various progress indicators.
    Syncing,
}

/// Any additional parameters for a subscription.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub enum Params {
    /// No parameters passed.
    #[default]
    None,
    /// Log parameters.
    Logs(Box<Filter>),
    /// Boolean parameter for new pending transactions.
    Bool(bool),
}

impl Params {
    /// Returns true if it's a bool parameter.
    #[inline]
    pub const fn is_bool(&self) -> bool {
        matches!(self, Self::Bool(_))
    }

    /// Returns true if it's a log parameter.
    #[inline]
    pub const fn is_logs(&self) -> bool {
        matches!(self, Self::Logs(_))
    }
}

#[cfg(feature = "serde")]
impl serde::Serialize for Params {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        match self {
            Self::None => (&[] as &[serde_json::Value]).serialize(serializer),
            Self::Logs(logs) => logs.serialize(serializer),
            Self::Bool(full) => full.serialize(serializer),
        }
    }
}

#[cfg(feature = "serde")]
impl<'a> serde::Deserialize<'a> for Params {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'a>,
    {
        use serde::de::Error;

        let v = serde_json::Value::deserialize(deserializer)?;

        if v.is_null() {
            return Ok(Self::None);
        }

        if let Some(val) = v.as_bool() {
            return Ok(Self::Bool(val));
        }

        serde_json::from_value(v)
            .map(|f| Self::Logs(Box::new(f)))
            .map_err(|e| D::Error::custom(format!("Invalid Pub-Sub parameters: {e}")))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use similar_asserts::assert_eq;

    #[test]
    #[cfg(feature = "serde")]
    fn params_serde() {
        let s: Params = serde_json::from_str("true").unwrap();
        assert_eq!(s, Params::Bool(true));
        let s: Params = serde_json::from_str("null").unwrap();
        assert_eq!(s, Params::None);
    }
}