alloy_rpc_client/
client.rs

1use crate::{poller::PollerBuilder, BatchRequest, ClientBuilder, RpcCall};
2use alloy_json_rpc::{Id, Request, RpcRecv, RpcSend};
3use alloy_transport::{mock::Asserter, BoxTransport, IntoBoxTransport};
4use std::{
5    borrow::Cow,
6    ops::Deref,
7    sync::{
8        atomic::{AtomicU64, Ordering},
9        Arc, Weak,
10    },
11    time::Duration,
12};
13use tower::{layer::util::Identity, ServiceBuilder};
14
15/// An [`RpcClient`] in a [`Weak`] reference.
16pub type WeakClient = Weak<RpcClientInner>;
17
18/// A borrowed [`RpcClient`].
19pub type ClientRef<'a> = &'a RpcClientInner;
20
21/// Parameter type of a JSON-RPC request with no parameters.
22pub type NoParams = [(); 0];
23
24#[cfg(feature = "pubsub")]
25type MaybePubsub = Option<alloy_pubsub::PubSubFrontend>;
26
27/// A JSON-RPC client.
28///
29/// [`RpcClient`] should never be instantiated directly. Instead, use
30/// [`ClientBuilder`].
31///
32/// [`ClientBuilder`]: crate::ClientBuilder
33#[derive(Debug)]
34pub struct RpcClient(Arc<RpcClientInner>);
35
36impl Clone for RpcClient {
37    fn clone(&self) -> Self {
38        Self(Arc::clone(&self.0))
39    }
40}
41
42impl RpcClient {
43    /// Create a new [`ClientBuilder`].
44    pub const fn builder() -> ClientBuilder<Identity> {
45        ClientBuilder { builder: ServiceBuilder::new() }
46    }
47}
48
49impl RpcClient {
50    /// Creates a new [`RpcClient`] with the given transport.
51    pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
52        Self::new_maybe_pubsub(
53            t,
54            is_local,
55            #[cfg(feature = "pubsub")]
56            None,
57        )
58    }
59
60    /// Create a new [`RpcClient`] with a transport that returns mocked responses from the given
61    /// [`Asserter`].
62    pub fn mocked(asserter: Asserter) -> Self {
63        Self::new(alloy_transport::mock::MockTransport::new(asserter), true)
64    }
65
66    /// Create a new [`RpcClient`] with an HTTP transport.
67    #[cfg(feature = "reqwest")]
68    pub fn new_http(url: reqwest::Url) -> Self {
69        let http = alloy_transport_http::Http::new(url);
70        let is_local = http.guess_local();
71        Self::new(http, is_local)
72    }
73
74    /// Create a new [`RpcClient`] with an HTTP transport using a pre-built [`reqwest::Client`].
75    #[cfg(feature = "reqwest")]
76    pub fn new_http_with_client(client: reqwest::Client, url: reqwest::Url) -> Self {
77        let http = alloy_transport_http::Http::with_client(client, url);
78        let is_local = http.guess_local();
79        Self::new(http, is_local)
80    }
81
82    /// Creates a new [`RpcClient`] with the given transport and a `MaybePubsub`.
83    fn new_maybe_pubsub(
84        t: impl IntoBoxTransport,
85        is_local: bool,
86        #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
87    ) -> Self {
88        Self(Arc::new(RpcClientInner::new_maybe_pubsub(
89            t,
90            is_local,
91            #[cfg(feature = "pubsub")]
92            pubsub,
93        )))
94    }
95
96    /// Creates the [`RpcClient`] with the `main_transport` (ipc, ws, http) and a `layer` closure.
97    ///
98    /// The `layer` fn is intended to be [`tower::ServiceBuilder::service`] that layers the
99    /// transport services. The `main_transport` is expected to the type that actually emits the
100    /// request object: `PubSubFrontend`. This exists so that we can intercept the
101    /// `PubSubFrontend` which we need for [`RpcClientInner::pubsub_frontend`].
102    ///
103    /// This workaround exists because due to how [`tower::ServiceBuilder::service`] collapses into
104    /// a [`BoxTransport`] we wouldn't be obtain the `MaybePubsub` by downcasting the layered
105    /// `transport`.
106    pub(crate) fn new_layered<F, T, R>(is_local: bool, main_transport: T, layer: F) -> Self
107    where
108        F: FnOnce(T) -> R,
109        T: IntoBoxTransport,
110        R: IntoBoxTransport,
111    {
112        #[cfg(feature = "pubsub")]
113        {
114            let t = main_transport.clone().into_box_transport();
115            let maybe_pubsub = t.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>().cloned();
116            Self::new_maybe_pubsub(layer(main_transport), is_local, maybe_pubsub)
117        }
118
119        #[cfg(not(feature = "pubsub"))]
120        Self::new(layer(main_transport), is_local)
121    }
122
123    /// Creates a new [`RpcClient`] with the given inner client.
124    pub fn from_inner(inner: RpcClientInner) -> Self {
125        Self(Arc::new(inner))
126    }
127
128    /// Get a reference to the client.
129    pub const fn inner(&self) -> &Arc<RpcClientInner> {
130        &self.0
131    }
132
133    /// Convert the client into its inner type.
134    pub fn into_inner(self) -> Arc<RpcClientInner> {
135        self.0
136    }
137
138    /// Get a [`Weak`] reference to the client.
139    pub fn get_weak(&self) -> WeakClient {
140        Arc::downgrade(&self.0)
141    }
142
143    /// Borrow the client.
144    pub fn get_ref(&self) -> ClientRef<'_> {
145        &self.0
146    }
147
148    /// Sets the poll interval for the client in milliseconds.
149    ///
150    /// Note: This will only set the poll interval for the client if it is the only reference to the
151    /// inner client. If the reference is held by many, then it will not update the poll interval.
152    pub fn with_poll_interval(self, poll_interval: Duration) -> Self {
153        self.inner().set_poll_interval(poll_interval);
154        self
155    }
156
157    /// Build a poller that polls a method with the given parameters.
158    ///
159    /// See [`PollerBuilder`] for examples and more details.
160    pub fn prepare_static_poller<Params, Resp>(
161        &self,
162        method: impl Into<Cow<'static, str>>,
163        params: Params,
164    ) -> PollerBuilder<Params, Resp>
165    where
166        Params: RpcSend + 'static,
167        Resp: RpcRecv + Clone,
168    {
169        PollerBuilder::new(self.get_weak(), method, params)
170    }
171
172    /// Boxes the transport.
173    #[deprecated(since = "0.9.0", note = "`RpcClient` is now always boxed")]
174    #[expect(clippy::missing_const_for_fn)]
175    pub fn boxed(self) -> Self {
176        self
177    }
178
179    /// Create a new [`BatchRequest`] builder.
180    #[inline]
181    pub fn new_batch(&self) -> BatchRequest<'_> {
182        BatchRequest::new(&self.0)
183    }
184}
185
186impl Deref for RpcClient {
187    type Target = RpcClientInner;
188
189    #[inline]
190    fn deref(&self) -> &Self::Target {
191        &self.0
192    }
193}
194
195/// A JSON-RPC client.
196///
197/// This struct manages a [`BoxTransport`] and a request ID counter. It is used to
198/// build [`RpcCall`] and [`BatchRequest`] objects. The client delegates
199/// transport access to the calls.
200///
201/// ### Note
202///
203/// IDs are allocated sequentially, starting at 0. IDs are reserved via
204/// [`RpcClientInner::next_id`]. Note that allocated IDs may not be used. There
205/// is no guarantee that a prepared [`RpcCall`] will be sent, or that a sent
206/// call will receive a response.
207#[derive(Debug)]
208pub struct RpcClientInner {
209    /// The underlying transport.
210    pub(crate) transport: BoxTransport,
211    /// Stores a handle to the PubSub service if pubsub.
212    ///
213    /// We store this _transport_ because if built through the [`ClientBuilder`] with an additional
214    /// layer the actual transport can be an arbitrary type and we would be unable to obtain the
215    /// `PubSubFrontend` by downcasting the `transport`. For example
216    /// `RetryTransport<PubSubFrontend>`.
217    #[cfg(feature = "pubsub")]
218    pub(crate) pubsub: MaybePubsub,
219    /// `true` if the transport is local.
220    pub(crate) is_local: bool,
221    /// The next request ID to use.
222    pub(crate) id: AtomicU64,
223    /// The poll interval for the client in milliseconds.
224    pub(crate) poll_interval: AtomicU64,
225}
226
227impl RpcClientInner {
228    /// Create a new [`RpcClient`] with the given transport.
229    ///
230    /// Note: Sets the poll interval to 250ms for local transports and 7s for remote transports by
231    /// default.
232    #[inline]
233    pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
234        Self {
235            transport: t.into_box_transport(),
236            #[cfg(feature = "pubsub")]
237            pubsub: None,
238            is_local,
239            id: AtomicU64::new(0),
240            poll_interval: if is_local { AtomicU64::new(250) } else { AtomicU64::new(7000) },
241        }
242    }
243
244    /// Create a new [`RpcClient`] with the given transport and an optional handle to the
245    /// `PubSubFrontend`.
246    pub(crate) fn new_maybe_pubsub(
247        t: impl IntoBoxTransport,
248        is_local: bool,
249        #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
250    ) -> Self {
251        Self {
252            #[cfg(feature = "pubsub")]
253            pubsub,
254            ..Self::new(t.into_box_transport(), is_local)
255        }
256    }
257
258    /// Sets the starting ID for the client.
259    #[inline]
260    pub fn with_id(self, id: u64) -> Self {
261        Self { id: AtomicU64::new(id), ..self }
262    }
263
264    /// Returns the default poll interval (milliseconds) for the client.
265    pub fn poll_interval(&self) -> Duration {
266        Duration::from_millis(self.poll_interval.load(Ordering::Relaxed))
267    }
268
269    /// Set the poll interval for the client in milliseconds. Default:
270    /// 7s for remote and 250ms for local transports.
271    pub fn set_poll_interval(&self, poll_interval: Duration) {
272        self.poll_interval.store(poll_interval.as_millis() as u64, Ordering::Relaxed);
273    }
274
275    /// Returns a reference to the underlying transport.
276    #[inline]
277    pub const fn transport(&self) -> &BoxTransport {
278        &self.transport
279    }
280
281    /// Returns a mutable reference to the underlying transport.
282    #[inline]
283    pub const fn transport_mut(&mut self) -> &mut BoxTransport {
284        &mut self.transport
285    }
286
287    /// Consumes the client and returns the underlying transport.
288    #[inline]
289    pub fn into_transport(self) -> BoxTransport {
290        self.transport
291    }
292
293    /// Returns a reference to the pubsub frontend if the transport supports it.
294    #[cfg(feature = "pubsub")]
295    #[inline]
296    #[track_caller]
297    pub fn pubsub_frontend(&self) -> Option<&alloy_pubsub::PubSubFrontend> {
298        if let Some(pubsub) = &self.pubsub {
299            return Some(pubsub);
300        }
301        self.transport.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>()
302    }
303
304    /// Returns a reference to the pubsub frontend if the transport supports it.
305    ///
306    /// # Panics
307    ///
308    /// Panics if the transport does not support pubsub.
309    #[cfg(feature = "pubsub")]
310    #[inline]
311    #[track_caller]
312    pub fn expect_pubsub_frontend(&self) -> &alloy_pubsub::PubSubFrontend {
313        self.pubsub_frontend().expect("called pubsub_frontend on a non-pubsub transport")
314    }
315
316    /// Build a `JsonRpcRequest` with the given method and params.
317    ///
318    /// This function reserves an ID for the request, however the request is not sent.
319    ///
320    /// To send a request, use [`RpcClientInner::request`] and await the returned [`RpcCall`].
321    #[inline]
322    pub fn make_request<Params: RpcSend>(
323        &self,
324        method: impl Into<Cow<'static, str>>,
325        params: Params,
326    ) -> Request<Params> {
327        Request::new(method, self.next_id(), params)
328    }
329
330    /// `true` if the client believes the transport is local.
331    ///
332    /// This can be used to optimize remote API usage, or to change program
333    /// behavior on local endpoints. When the client is instantiated by parsing
334    /// a URL or other external input, this value is set on a best-efforts
335    /// basis and may be incorrect.
336    #[inline]
337    pub const fn is_local(&self) -> bool {
338        self.is_local
339    }
340
341    /// Set the `is_local` flag.
342    #[inline]
343    pub const fn set_local(&mut self, is_local: bool) {
344        self.is_local = is_local;
345    }
346
347    /// Reserve a request ID value. This is used to generate request IDs.
348    #[inline]
349    fn increment_id(&self) -> u64 {
350        self.id.fetch_add(1, Ordering::Relaxed)
351    }
352
353    /// Reserve a request ID u64.
354    #[inline]
355    pub fn next_id(&self) -> Id {
356        self.increment_id().into()
357    }
358
359    /// Prepares an [`RpcCall`].
360    ///
361    /// This function reserves an ID for the request, however the request is not sent.
362    /// To send a request, await the returned [`RpcCall`].
363    ///
364    /// # Note
365    ///
366    /// Serialization is done lazily. It will not be performed until the call is awaited.
367    /// This means that if a serializer error occurs, it will not be caught until the call is
368    /// awaited.
369    #[doc(alias = "prepare")]
370    pub fn request<Params: RpcSend, Resp: RpcRecv>(
371        &self,
372        method: impl Into<Cow<'static, str>>,
373        params: Params,
374    ) -> RpcCall<Params, Resp> {
375        let request = self.make_request(method, params);
376        RpcCall::new(request, self.transport.clone())
377    }
378
379    /// Prepares an [`RpcCall`] with no parameters.
380    ///
381    /// See [`request`](Self::request) for more details.
382    pub fn request_noparams<Resp: RpcRecv>(
383        &self,
384        method: impl Into<Cow<'static, str>>,
385    ) -> RpcCall<NoParams, Resp> {
386        self.request(method, [])
387    }
388
389    /// Type erase the service in the transport, allowing it to be used in a
390    /// generic context.
391    #[deprecated(since = "0.9.0", note = "`RpcClientInner` is now always boxed")]
392    #[expect(clippy::missing_const_for_fn)]
393    pub fn boxed(self) -> Self {
394        self
395    }
396}
397
398#[cfg(feature = "pubsub")]
399mod pubsub_impl {
400    use super::*;
401    use alloy_pubsub::{PubSubConnect, RawSubscription, Subscription};
402    use alloy_transport::TransportResult;
403
404    impl RpcClientInner {
405        /// Get a [`RawSubscription`] for the given subscription ID.
406        ///
407        /// # Panics
408        ///
409        /// Panics if the transport does not support pubsub.
410        pub async fn get_raw_subscription(&self, id: alloy_primitives::B256) -> RawSubscription {
411            self.expect_pubsub_frontend().get_subscription(id).await.unwrap()
412        }
413
414        /// Get a [`Subscription`] for the given subscription ID.
415        ///
416        /// # Panics
417        ///
418        /// Panics if the transport does not support pubsub.
419        pub async fn get_subscription<T: serde::de::DeserializeOwned>(
420            &self,
421            id: alloy_primitives::B256,
422        ) -> Subscription<T> {
423            Subscription::from(self.get_raw_subscription(id).await)
424        }
425    }
426
427    impl RpcClient {
428        /// Connect to a transport via a [`PubSubConnect`] implementer.
429        pub async fn connect_pubsub<C: PubSubConnect>(connect: C) -> TransportResult<Self> {
430            ClientBuilder::default().pubsub(connect).await
431        }
432
433        /// Get the currently configured channel size. This is the number of items
434        /// to buffer in new subscription channels. Defaults to 16. See
435        /// [`tokio::sync::broadcast`] for a description of relevant
436        /// behavior.
437        ///
438        /// [`tokio::sync::broadcast`]: https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html
439        ///
440        /// # Panics
441        ///
442        /// Panics if the transport does not support pubsub.
443        #[track_caller]
444        pub fn channel_size(&self) -> usize {
445            self.expect_pubsub_frontend().channel_size()
446        }
447
448        /// Set the channel size.
449        ///
450        /// # Panics
451        ///
452        /// Panics if the transport does not support pubsub.
453        #[track_caller]
454        pub fn set_channel_size(&self, size: usize) {
455            self.expect_pubsub_frontend().set_channel_size(size)
456        }
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463    use similar_asserts::assert_eq;
464
465    #[test]
466    fn test_client_with_poll_interval() {
467        let poll_interval = Duration::from_millis(5_000);
468        let client = RpcClient::new_http(reqwest::Url::parse("http://localhost").unwrap())
469            .with_poll_interval(poll_interval);
470        assert_eq!(client.poll_interval(), poll_interval);
471    }
472}