ic_web3_rs/api/
eth_subscribe.rsuse crate::{
api::Namespace,
error, helpers,
transports::ic_http_client::CallOptions,
types::{BlockHeader, Filter, Log, SyncState, H256},
DuplexTransport,
};
use futures::{
task::{Context, Poll},
Stream,
};
use pin_project::{pin_project, pinned_drop};
use std::{marker::PhantomData, pin::Pin};
#[derive(Debug, Clone)]
pub struct EthSubscribe<T> {
transport: T,
}
impl<T: DuplexTransport> Namespace<T> for EthSubscribe<T> {
fn new(transport: T) -> Self
where
Self: Sized,
{
EthSubscribe { transport }
}
fn transport(&self) -> &T {
&self.transport
}
}
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
pub struct SubscriptionId(String);
impl From<String> for SubscriptionId {
fn from(s: String) -> Self {
SubscriptionId(s)
}
}
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct SubscriptionStream<T: DuplexTransport, I> {
transport: T,
id: SubscriptionId,
#[pin]
rx: T::NotificationStream,
_marker: PhantomData<I>,
}
impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
fn new(transport: T, id: SubscriptionId) -> error::Result<Self> {
let rx = transport.subscribe(id.clone())?;
Ok(SubscriptionStream {
transport,
id,
rx,
_marker: PhantomData,
})
}
pub fn id(&self) -> &SubscriptionId {
&self.id
}
pub async fn unsubscribe(self, options: CallOptions) -> error::Result<bool> {
let &SubscriptionId(ref id) = &self.id;
let id = helpers::serialize(&id);
let response = self.transport.execute("eth_unsubscribe", vec![id], options).await?;
helpers::decode(response)
}
}
impl<T, I> Stream for SubscriptionStream<T, I>
where
T: DuplexTransport,
I: serde::de::DeserializeOwned,
{
type Item = error::Result<I>;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let x = ready!(this.rx.poll_next(ctx));
Poll::Ready(x.map(|result| serde_json::from_value(result).map_err(Into::into)))
}
}
#[pinned_drop]
impl<T, I> PinnedDrop for SubscriptionStream<T, I>
where
T: DuplexTransport,
{
fn drop(self: Pin<&mut Self>) {
let _ = self.transport.unsubscribe(self.id().clone());
}
}
impl<T: DuplexTransport> EthSubscribe<T> {
pub async fn subscribe_new_heads(&self, options: CallOptions) -> error::Result<SubscriptionStream<T, BlockHeader>> {
let subscription = helpers::serialize(&&"newHeads");
let response = self
.transport
.execute("eth_subscribe", vec![subscription], options)
.await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}
pub async fn subscribe_logs(
&self,
filter: Filter,
options: CallOptions,
) -> error::Result<SubscriptionStream<T, Log>> {
let subscription = helpers::serialize(&&"logs");
let filter = helpers::serialize(&filter);
let response = self
.transport
.execute("eth_subscribe", vec![subscription, filter], options)
.await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}
pub async fn subscribe_new_pending_transactions(
&self,
options: CallOptions,
) -> error::Result<SubscriptionStream<T, H256>> {
let subscription = helpers::serialize(&&"newPendingTransactions");
let response = self
.transport
.execute("eth_subscribe", vec![subscription], options)
.await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}
pub async fn subscribe_syncing(&self, options: CallOptions) -> error::Result<SubscriptionStream<T, SyncState>> {
let subscription = helpers::serialize(&&"syncing");
let response = self
.transport
.execute("eth_subscribe", vec![subscription], options)
.await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}
}