use crate::{
utils::{interval, PinBoxFut},
JsonRpcClient, Middleware, Provider, ProviderError,
};
use ethers_core::types::{Transaction, TransactionReceipt, TxHash, U64};
use futures_core::stream::Stream;
use futures_timer::Delay;
use futures_util::stream::StreamExt;
use instant::Duration;
use pin_project::pin_project;
use std::{
fmt,
future::Future,
ops::Deref,
pin::Pin,
task::{Context, Poll},
};
#[pin_project]
pub struct PendingTransaction<'a, P> {
tx_hash: TxHash,
confirmations: usize,
provider: &'a Provider<P>,
state: PendingTxState<'a>,
interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
retries_remaining: usize,
}
const DEFAULT_RETRIES: usize = 3;
impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
pub fn new(tx_hash: TxHash, provider: &'a Provider<P>) -> Self {
let delay = Box::pin(Delay::new(provider.get_interval()));
Self {
tx_hash,
confirmations: 1,
provider,
state: PendingTxState::InitialDelay(delay),
interval: Box::new(interval(provider.get_interval())),
retries_remaining: DEFAULT_RETRIES,
}
}
pub fn provider(&self) -> Provider<P>
where
P: Clone,
{
self.provider.clone()
}
pub fn tx_hash(&self) -> TxHash {
self.tx_hash
}
#[must_use]
pub fn confirmations(mut self, confs: usize) -> Self {
self.confirmations = confs;
self
}
#[must_use]
pub fn interval<T: Into<Duration>>(mut self, duration: T) -> Self {
let duration = duration.into();
self.interval = Box::new(interval(duration));
if matches!(self.state, PendingTxState::InitialDelay(_)) {
self.state = PendingTxState::InitialDelay(Box::pin(Delay::new(duration)))
}
self
}
#[must_use]
pub fn retries(mut self, retries: usize) -> Self {
self.retries_remaining = retries;
self
}
}
impl<'a, P> PendingTransaction<'a, P> {
pub fn inspect<F>(self, mut f: F) -> Self
where
F: FnMut(&Self),
{
f(&self);
self
}
pub fn log_msg<S: std::fmt::Display>(self, msg: S) -> Self {
self.inspect(|s| println!("{msg}: {:?}", **s))
}
pub fn log(self) -> Self {
self.inspect(|s| println!("Pending hash: {:?}", **s))
}
}
macro_rules! rewake_with_new_state {
($ctx:ident, $this:ident, $new_state:expr) => {
*$this.state = $new_state;
$ctx.waker().wake_by_ref();
return Poll::Pending
};
}
macro_rules! rewake_with_new_state_if {
($condition:expr, $ctx:ident, $this:ident, $new_state:expr) => {
if $condition {
rewake_with_new_state!($ctx, $this, $new_state);
}
};
}
impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
type Output = Result<Option<TransactionReceipt>, ProviderError>;
#[cfg_attr(target_arch = "wasm32", allow(unused_must_use))]
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
match this.state {
PendingTxState::InitialDelay(fut) => {
futures_util::ready!(fut.as_mut().poll(ctx));
tracing::debug!("Starting to poll pending tx {:?}", *this.tx_hash);
let fut = Box::pin(this.provider.get_transaction(*this.tx_hash));
rewake_with_new_state!(ctx, this, PendingTxState::GettingTx(fut));
}
PendingTxState::PausedGettingTx => {
let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
let fut = Box::pin(this.provider.get_transaction(*this.tx_hash));
*this.state = PendingTxState::GettingTx(fut);
ctx.waker().wake_by_ref();
}
PendingTxState::GettingTx(fut) => {
let tx_res = futures_util::ready!(fut.as_mut().poll(ctx));
rewake_with_new_state_if!(
tx_res.is_err(),
ctx,
this,
PendingTxState::PausedGettingTx
);
let tx_opt = tx_res.unwrap();
if tx_opt.is_none() {
if *this.retries_remaining == 0 {
tracing::debug!("Dropped from mempool, pending tx {:?}", *this.tx_hash);
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(None))
}
*this.retries_remaining -= 1;
rewake_with_new_state!(ctx, this, PendingTxState::PausedGettingTx);
}
let tx = tx_opt.unwrap();
rewake_with_new_state_if!(
tx.block_number.is_none(),
ctx,
this,
PendingTxState::PausedGettingTx
);
tracing::debug!("Getting receipt for pending tx {:?}", *this.tx_hash);
let fut = Box::pin(this.provider.get_transaction_receipt(*this.tx_hash));
rewake_with_new_state!(ctx, this, PendingTxState::GettingReceipt(fut));
}
PendingTxState::PausedGettingReceipt => {
let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
let fut = Box::pin(this.provider.get_transaction_receipt(*this.tx_hash));
*this.state = PendingTxState::GettingReceipt(fut);
ctx.waker().wake_by_ref();
}
PendingTxState::GettingReceipt(fut) => {
if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) {
tracing::debug!("Checking receipt for pending tx {:?}", *this.tx_hash);
*this.state = PendingTxState::CheckingReceipt(receipt)
} else {
*this.state = PendingTxState::PausedGettingReceipt
}
ctx.waker().wake_by_ref();
}
PendingTxState::CheckingReceipt(receipt) => {
rewake_with_new_state_if!(
receipt.is_none(),
ctx,
this,
PendingTxState::PausedGettingReceipt
);
if *this.confirmations > 1 {
tracing::debug!("Waiting on confirmations for pending tx {:?}", *this.tx_hash);
let fut = Box::pin(this.provider.get_block_number());
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.take());
ctx.waker().wake_by_ref();
} else {
let receipt = receipt.take();
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt))
}
}
PendingTxState::PausedGettingBlockNumber(receipt) => {
let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
let fut = Box::pin(this.provider.get_block_number());
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.take());
ctx.waker().wake_by_ref();
}
PendingTxState::GettingBlockNumber(fut, receipt) => {
let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?;
let receipt = receipt.take().expect("GettingBlockNumber without receipt");
let inclusion_block = receipt
.block_number
.expect("Receipt did not have a block number. This should never happen");
if current_block > inclusion_block + *this.confirmations - 1 {
let receipt = Some(receipt);
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt))
} else {
tracing::trace!(tx_hash = ?this.tx_hash, "confirmations {}/{}", current_block - inclusion_block + 1, this.confirmations);
*this.state = PendingTxState::PausedGettingBlockNumber(Some(receipt));
ctx.waker().wake_by_ref();
}
}
PendingTxState::Completed => {
panic!("polled pending transaction future after completion")
}
};
Poll::Pending
}
}
impl<'a, P> fmt::Debug for PendingTransaction<'a, P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PendingTransaction")
.field("tx_hash", &self.tx_hash)
.field("confirmations", &self.confirmations)
.field("state", &self.state)
.finish()
}
}
impl<'a, P> PartialEq for PendingTransaction<'a, P> {
fn eq(&self, other: &Self) -> bool {
self.tx_hash == other.tx_hash
}
}
impl<'a, P> PartialEq<TxHash> for PendingTransaction<'a, P> {
fn eq(&self, other: &TxHash) -> bool {
&self.tx_hash == other
}
}
impl<'a, P> Eq for PendingTransaction<'a, P> {}
impl<'a, P> Deref for PendingTransaction<'a, P> {
type Target = TxHash;
fn deref(&self) -> &Self::Target {
&self.tx_hash
}
}
enum PendingTxState<'a> {
InitialDelay(Pin<Box<Delay>>),
PausedGettingTx,
GettingTx(PinBoxFut<'a, Option<Transaction>>),
PausedGettingReceipt,
GettingReceipt(PinBoxFut<'a, Option<TransactionReceipt>>),
CheckingReceipt(Option<TransactionReceipt>),
PausedGettingBlockNumber(Option<TransactionReceipt>),
GettingBlockNumber(PinBoxFut<'a, U64>, Option<TransactionReceipt>),
Completed,
}
impl<'a> fmt::Debug for PendingTxState<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = match self {
PendingTxState::InitialDelay(_) => "InitialDelay",
PendingTxState::PausedGettingTx => "PausedGettingTx",
PendingTxState::GettingTx(_) => "GettingTx",
PendingTxState::PausedGettingReceipt => "PausedGettingReceipt",
PendingTxState::GettingReceipt(_) => "GettingReceipt",
PendingTxState::GettingBlockNumber(_, _) => "GettingBlockNumber",
PendingTxState::PausedGettingBlockNumber(_) => "PausedGettingBlockNumber",
PendingTxState::CheckingReceipt(_) => "CheckingReceipt",
PendingTxState::Completed => "Completed",
};
f.debug_struct("PendingTxState").field("state", &state).finish()
}
}