ic_web3_rs/api/
eth_subscribe.rs1use crate::{
4 api::Namespace,
5 error, helpers,
6 transports::ic_http_client::CallOptions,
7 types::{BlockHeader, Filter, Log, SyncState, H256},
8 DuplexTransport,
9};
10use futures::{
11 task::{Context, Poll},
12 Stream,
13};
14use pin_project::{pin_project, pinned_drop};
15use std::{marker::PhantomData, pin::Pin};
16
17#[derive(Debug, Clone)]
19pub struct EthSubscribe<T> {
20 transport: T,
21}
22
23impl<T: DuplexTransport> Namespace<T> for EthSubscribe<T> {
24 fn new(transport: T) -> Self
25 where
26 Self: Sized,
27 {
28 EthSubscribe { transport }
29 }
30
31 fn transport(&self) -> &T {
32 &self.transport
33 }
34}
35
36#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
38pub struct SubscriptionId(String);
39
40impl From<String> for SubscriptionId {
41 fn from(s: String) -> Self {
42 SubscriptionId(s)
43 }
44}
45
46#[pin_project(PinnedDrop)]
50#[derive(Debug)]
51pub struct SubscriptionStream<T: DuplexTransport, I> {
52 transport: T,
53 id: SubscriptionId,
54 #[pin]
55 rx: T::NotificationStream,
56 _marker: PhantomData<I>,
57}
58
59impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
60 fn new(transport: T, id: SubscriptionId) -> error::Result<Self> {
61 let rx = transport.subscribe(id.clone())?;
62 Ok(SubscriptionStream {
63 transport,
64 id,
65 rx,
66 _marker: PhantomData,
67 })
68 }
69
70 pub fn id(&self) -> &SubscriptionId {
72 &self.id
73 }
74
75 pub async fn unsubscribe(self, options: CallOptions) -> error::Result<bool> {
77 let &SubscriptionId(ref id) = &self.id;
78 let id = helpers::serialize(&id);
79 let response = self.transport.execute("eth_unsubscribe", vec![id], options).await?;
80 helpers::decode(response)
81 }
82}
83
84impl<T, I> Stream for SubscriptionStream<T, I>
85where
86 T: DuplexTransport,
87 I: serde::de::DeserializeOwned,
88{
89 type Item = error::Result<I>;
90
91 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
92 let this = self.project();
93 let x = ready!(this.rx.poll_next(ctx));
94 Poll::Ready(x.map(|result| serde_json::from_value(result).map_err(Into::into)))
95 }
96}
97
98#[pinned_drop]
99impl<T, I> PinnedDrop for SubscriptionStream<T, I>
100where
101 T: DuplexTransport,
102{
103 fn drop(self: Pin<&mut Self>) {
104 let _ = self.transport.unsubscribe(self.id().clone());
105 }
106}
107
108impl<T: DuplexTransport> EthSubscribe<T> {
109 pub async fn subscribe_new_heads(&self, options: CallOptions) -> error::Result<SubscriptionStream<T, BlockHeader>> {
111 let subscription = helpers::serialize(&&"newHeads");
112 let response = self
113 .transport
114 .execute("eth_subscribe", vec![subscription], options)
115 .await?;
116 let id: String = helpers::decode(response)?;
117 SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
118 }
119
120 pub async fn subscribe_logs(
122 &self,
123 filter: Filter,
124 options: CallOptions,
125 ) -> error::Result<SubscriptionStream<T, Log>> {
126 let subscription = helpers::serialize(&&"logs");
127 let filter = helpers::serialize(&filter);
128 let response = self
129 .transport
130 .execute("eth_subscribe", vec![subscription, filter], options)
131 .await?;
132 let id: String = helpers::decode(response)?;
133 SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
134 }
135
136 pub async fn subscribe_new_pending_transactions(
138 &self,
139 options: CallOptions,
140 ) -> error::Result<SubscriptionStream<T, H256>> {
141 let subscription = helpers::serialize(&&"newPendingTransactions");
142 let response = self
143 .transport
144 .execute("eth_subscribe", vec![subscription], options)
145 .await?;
146 let id: String = helpers::decode(response)?;
147 SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
148 }
149
150 pub async fn subscribe_syncing(&self, options: CallOptions) -> error::Result<SubscriptionStream<T, SyncState>> {
152 let subscription = helpers::serialize(&&"syncing");
153 let response = self
154 .transport
155 .execute("eth_subscribe", vec![subscription], options)
156 .await?;
157 let id: String = helpers::decode(response)?;
158 SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
159 }
160}