ic_web3_rs/api/
eth_subscribe.rs

1//! `Eth` namespace, subscriptions
2
3use crate::{
4    api::Namespace,
5    error, helpers,
6    transports::ic_http_client::CallOptions,
7    types::{BlockHeader, Filter, Log, SyncState, H256},
8    DuplexTransport,
9};
10use futures::{
11    task::{Context, Poll},
12    Stream,
13};
14use pin_project::{pin_project, pinned_drop};
15use std::{marker::PhantomData, pin::Pin};
16
17/// `Eth` namespace, subscriptions
18#[derive(Debug, Clone)]
19pub struct EthSubscribe<T> {
20    transport: T,
21}
22
23impl<T: DuplexTransport> Namespace<T> for EthSubscribe<T> {
24    fn new(transport: T) -> Self
25    where
26        Self: Sized,
27    {
28        EthSubscribe { transport }
29    }
30
31    fn transport(&self) -> &T {
32        &self.transport
33    }
34}
35
36/// ID of subscription returned from `eth_subscribe`
37#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
38pub struct SubscriptionId(String);
39
40impl From<String> for SubscriptionId {
41    fn from(s: String) -> Self {
42        SubscriptionId(s)
43    }
44}
45
46/// Stream of notifications from a subscription
47/// Given a type deserializable from rpc::Value and a subscription id, yields items of that type as
48/// notifications are delivered.
49#[pin_project(PinnedDrop)]
50#[derive(Debug)]
51pub struct SubscriptionStream<T: DuplexTransport, I> {
52    transport: T,
53    id: SubscriptionId,
54    #[pin]
55    rx: T::NotificationStream,
56    _marker: PhantomData<I>,
57}
58
59impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
60    fn new(transport: T, id: SubscriptionId) -> error::Result<Self> {
61        let rx = transport.subscribe(id.clone())?;
62        Ok(SubscriptionStream {
63            transport,
64            id,
65            rx,
66            _marker: PhantomData,
67        })
68    }
69
70    /// Return the ID of this subscription
71    pub fn id(&self) -> &SubscriptionId {
72        &self.id
73    }
74
75    /// Unsubscribe from the event represented by this stream
76    pub async fn unsubscribe(self, options: CallOptions) -> error::Result<bool> {
77        let &SubscriptionId(ref id) = &self.id;
78        let id = helpers::serialize(&id);
79        let response = self.transport.execute("eth_unsubscribe", vec![id], options).await?;
80        helpers::decode(response)
81    }
82}
83
84impl<T, I> Stream for SubscriptionStream<T, I>
85where
86    T: DuplexTransport,
87    I: serde::de::DeserializeOwned,
88{
89    type Item = error::Result<I>;
90
91    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
92        let this = self.project();
93        let x = ready!(this.rx.poll_next(ctx));
94        Poll::Ready(x.map(|result| serde_json::from_value(result).map_err(Into::into)))
95    }
96}
97
98#[pinned_drop]
99impl<T, I> PinnedDrop for SubscriptionStream<T, I>
100where
101    T: DuplexTransport,
102{
103    fn drop(self: Pin<&mut Self>) {
104        let _ = self.transport.unsubscribe(self.id().clone());
105    }
106}
107
108impl<T: DuplexTransport> EthSubscribe<T> {
109    /// Create a new heads subscription
110    pub async fn subscribe_new_heads(&self, options: CallOptions) -> error::Result<SubscriptionStream<T, BlockHeader>> {
111        let subscription = helpers::serialize(&&"newHeads");
112        let response = self
113            .transport
114            .execute("eth_subscribe", vec![subscription], options)
115            .await?;
116        let id: String = helpers::decode(response)?;
117        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
118    }
119
120    /// Create a logs subscription
121    pub async fn subscribe_logs(
122        &self,
123        filter: Filter,
124        options: CallOptions,
125    ) -> error::Result<SubscriptionStream<T, Log>> {
126        let subscription = helpers::serialize(&&"logs");
127        let filter = helpers::serialize(&filter);
128        let response = self
129            .transport
130            .execute("eth_subscribe", vec![subscription, filter], options)
131            .await?;
132        let id: String = helpers::decode(response)?;
133        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
134    }
135
136    /// Create a pending transactions subscription
137    pub async fn subscribe_new_pending_transactions(
138        &self,
139        options: CallOptions,
140    ) -> error::Result<SubscriptionStream<T, H256>> {
141        let subscription = helpers::serialize(&&"newPendingTransactions");
142        let response = self
143            .transport
144            .execute("eth_subscribe", vec![subscription], options)
145            .await?;
146        let id: String = helpers::decode(response)?;
147        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
148    }
149
150    /// Create a sync status subscription
151    pub async fn subscribe_syncing(&self, options: CallOptions) -> error::Result<SubscriptionStream<T, SyncState>> {
152        let subscription = helpers::serialize(&&"syncing");
153        let response = self
154            .transport
155            .execute("eth_subscribe", vec![subscription], options)
156            .await?;
157        let id: String = helpers::decode(response)?;
158        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
159    }
160}