alloy_provider/provider/
root.rs

1use crate::{
2    blocks::NewBlocks,
3    heart::{Heartbeat, HeartbeatHandle},
4    Identity, ProviderBuilder,
5};
6use alloy_network::{Ethereum, Network};
7use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, ClientRef, RpcClient, WeakClient};
8use alloy_transport::{TransportConnect, TransportError};
9use std::{
10    fmt,
11    marker::PhantomData,
12    sync::{Arc, OnceLock},
13};
14
15#[cfg(feature = "pubsub")]
16use alloy_pubsub::{PubSubFrontend, Subscription};
17
18/// The root provider manages the RPC client and the heartbeat. It is at the
19/// base of every provider stack.
20pub struct RootProvider<N: Network = Ethereum> {
21    /// The inner state of the root provider.
22    pub(crate) inner: Arc<RootProviderInner<N>>,
23}
24
25impl<N: Network> Clone for RootProvider<N> {
26    fn clone(&self) -> Self {
27        Self { inner: self.inner.clone() }
28    }
29}
30
31impl<N: Network> fmt::Debug for RootProvider<N> {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        f.debug_struct("RootProvider").field("client", &self.inner.client).finish_non_exhaustive()
34    }
35}
36
37/// Helper function to directly access [`ProviderBuilder`] with minimal
38/// generics.
39pub fn builder<N: Network>() -> ProviderBuilder<Identity, Identity, N> {
40    ProviderBuilder::default()
41}
42
43impl<N: Network> RootProvider<N> {
44    /// Creates a new HTTP root provider from the given URL.
45    #[cfg(feature = "reqwest")]
46    pub fn new_http(url: url::Url) -> Self {
47        Self::new(RpcClient::new_http(url))
48    }
49
50    /// Creates a new root provider from the given RPC client.
51    pub fn new(client: RpcClient) -> Self {
52        Self { inner: Arc::new(RootProviderInner::new(client)) }
53    }
54
55    /// Creates a new root provider from the provided string.
56    ///
57    /// See [`BuiltInConnectionString`] for more information.
58    pub async fn connect(s: &str) -> Result<Self, TransportError> {
59        Self::connect_with(s.parse::<BuiltInConnectionString>()?).await
60    }
61
62    /// Creates a new root provider from the provided connection details.
63    #[deprecated(since = "0.9.0", note = "use `connect` instead")]
64    pub async fn connect_builtin(s: &str) -> Result<Self, TransportError> {
65        Self::connect(s).await
66    }
67
68    /// Connects to a transport with the given connector.
69    pub async fn connect_with<C: TransportConnect>(conn: C) -> Result<Self, TransportError> {
70        ClientBuilder::default().connect_with(conn).await.map(Self::new)
71    }
72
73    /// Connects to a boxed transport with the given connector.
74    #[deprecated(
75        since = "0.9.0",
76        note = "`RootProvider` is now always boxed, use `connect_with` instead"
77    )]
78    pub async fn connect_boxed<C: TransportConnect>(conn: C) -> Result<Self, TransportError> {
79        Self::connect_with(conn).await
80    }
81}
82
83impl<N: Network> RootProvider<N> {
84    /// Boxes the inner client.
85    #[deprecated(since = "0.9.0", note = "`RootProvider` is now always boxed")]
86    #[allow(clippy::missing_const_for_fn)]
87    pub fn boxed(self) -> Self {
88        self
89    }
90
91    /// Gets the subscription corresponding to the given RPC subscription ID.
92    #[cfg(feature = "pubsub")]
93    pub async fn get_subscription<R: alloy_json_rpc::RpcRecv>(
94        &self,
95        id: alloy_primitives::B256,
96    ) -> alloy_transport::TransportResult<Subscription<R>> {
97        self.pubsub_frontend()?.get_subscription(id).await.map(Subscription::from)
98    }
99
100    /// Unsubscribes from the subscription corresponding to the given RPC subscription ID.
101    #[cfg(feature = "pubsub")]
102    pub fn unsubscribe(&self, id: alloy_primitives::B256) -> alloy_transport::TransportResult<()> {
103        self.pubsub_frontend()?.unsubscribe(id)
104    }
105
106    #[cfg(feature = "pubsub")]
107    pub(crate) fn pubsub_frontend(&self) -> alloy_transport::TransportResult<&PubSubFrontend> {
108        self.inner
109            .client_ref()
110            .pubsub_frontend()
111            .ok_or_else(alloy_transport::TransportErrorKind::pubsub_unavailable)
112    }
113
114    #[inline]
115    pub(crate) fn get_heart(&self) -> &HeartbeatHandle<N> {
116        self.inner.heart.get_or_init(|| {
117            let new_blocks = NewBlocks::<N>::new(self.inner.weak_client());
118            let stream = new_blocks.into_stream();
119            Heartbeat::new(Box::pin(stream)).spawn()
120        })
121    }
122}
123
124/// The root provider manages the RPC client and the heartbeat. It is at the
125/// base of every provider stack.
126pub(crate) struct RootProviderInner<N: Network = Ethereum> {
127    client: RpcClient,
128    heart: OnceLock<HeartbeatHandle<N>>,
129    _network: PhantomData<N>,
130}
131
132impl<N: Network> Clone for RootProviderInner<N> {
133    fn clone(&self) -> Self {
134        Self { client: self.client.clone(), heart: self.heart.clone(), _network: PhantomData }
135    }
136}
137
138impl<N: Network> RootProviderInner<N> {
139    pub(crate) fn new(client: RpcClient) -> Self {
140        Self { client, heart: Default::default(), _network: PhantomData }
141    }
142
143    pub(crate) fn weak_client(&self) -> WeakClient {
144        self.client.get_weak()
145    }
146
147    pub(crate) fn client_ref(&self) -> ClientRef<'_> {
148        self.client.get_ref()
149    }
150}