alloy_rpc_client/
client.rs

1use crate::{poller::PollerBuilder, BatchRequest, ClientBuilder, RpcCall};
2use alloy_json_rpc::{Id, Request, RpcRecv, RpcSend};
3use alloy_transport::{mock::Asserter, BoxTransport, IntoBoxTransport};
4use std::{
5    borrow::Cow,
6    ops::Deref,
7    sync::{
8        atomic::{AtomicU64, Ordering},
9        Arc, Weak,
10    },
11    time::Duration,
12};
13use tower::{layer::util::Identity, ServiceBuilder};
14
15/// An [`RpcClient`] in a [`Weak`] reference.
16pub type WeakClient = Weak<RpcClientInner>;
17
18/// A borrowed [`RpcClient`].
19pub type ClientRef<'a> = &'a RpcClientInner;
20
21/// Parameter type of a JSON-RPC request with no parameters.
22pub type NoParams = [(); 0];
23
24#[cfg(feature = "pubsub")]
25type MaybePubsub = Option<alloy_pubsub::PubSubFrontend>;
26
27/// A JSON-RPC client.
28///
29/// [`RpcClient`] should never be instantiated directly. Instead, use
30/// [`ClientBuilder`].
31///
32/// [`ClientBuilder`]: crate::ClientBuilder
33#[derive(Debug)]
34pub struct RpcClient(Arc<RpcClientInner>);
35
36impl Clone for RpcClient {
37    fn clone(&self) -> Self {
38        Self(Arc::clone(&self.0))
39    }
40}
41
42impl RpcClient {
43    /// Create a new [`ClientBuilder`].
44    pub const fn builder() -> ClientBuilder<Identity> {
45        ClientBuilder { builder: ServiceBuilder::new() }
46    }
47}
48
49impl RpcClient {
50    /// Creates a new [`RpcClient`] with the given transport.
51    pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
52        Self::new_maybe_pubsub(
53            t,
54            is_local,
55            #[cfg(feature = "pubsub")]
56            None,
57        )
58    }
59
60    /// Create a new [`RpcClient`] with a transport that returns mocked responses from the given
61    /// [`Asserter`].
62    pub fn mocked(asserter: Asserter) -> Self {
63        Self::new(alloy_transport::mock::MockTransport::new(asserter), true)
64    }
65
66    /// Create a new [`RpcClient`] with an HTTP transport.
67    #[cfg(feature = "reqwest")]
68    pub fn new_http(url: reqwest::Url) -> Self {
69        let http = alloy_transport_http::Http::new(url);
70        let is_local = http.guess_local();
71        Self::new(http, is_local)
72    }
73
74    /// Creates a new [`RpcClient`] with the given transport and a `MaybePubsub`.
75    fn new_maybe_pubsub(
76        t: impl IntoBoxTransport,
77        is_local: bool,
78        #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
79    ) -> Self {
80        Self(Arc::new(RpcClientInner::new_maybe_pubsub(
81            t,
82            is_local,
83            #[cfg(feature = "pubsub")]
84            pubsub,
85        )))
86    }
87
88    /// Creates the [`RpcClient`] with the `main_transport` (ipc, ws, http) and a `layer` closure.
89    ///
90    /// The `layer` fn is intended to be [`tower::ServiceBuilder::service`] that layers the
91    /// transport services. The `main_transport` is expected to the type that actually emits the
92    /// request object: `PubSubFrontend`. This exists so that we can intercept the
93    /// `PubSubFrontend` which we need for [`RpcClientInner::pubsub_frontend`].
94    ///
95    /// This workaround exists because due to how [`tower::ServiceBuilder::service`] collapses into
96    /// a [`BoxTransport`] we wouldn't be obtain the `MaybePubsub` by downcasting the layered
97    /// `transport`.
98    pub(crate) fn new_layered<F, T, R>(is_local: bool, main_transport: T, layer: F) -> Self
99    where
100        F: FnOnce(T) -> R,
101        T: IntoBoxTransport,
102        R: IntoBoxTransport,
103    {
104        #[cfg(feature = "pubsub")]
105        {
106            let t = main_transport.clone().into_box_transport();
107            let maybe_pubsub = t.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>().cloned();
108            Self::new_maybe_pubsub(layer(main_transport), is_local, maybe_pubsub)
109        }
110
111        #[cfg(not(feature = "pubsub"))]
112        Self::new(layer(main_transport), is_local)
113    }
114
115    /// Creates a new [`RpcClient`] with the given inner client.
116    pub fn from_inner(inner: RpcClientInner) -> Self {
117        Self(Arc::new(inner))
118    }
119
120    /// Get a reference to the client.
121    pub const fn inner(&self) -> &Arc<RpcClientInner> {
122        &self.0
123    }
124
125    /// Convert the client into its inner type.
126    pub fn into_inner(self) -> Arc<RpcClientInner> {
127        self.0
128    }
129
130    /// Get a [`Weak`] reference to the client.
131    pub fn get_weak(&self) -> WeakClient {
132        Arc::downgrade(&self.0)
133    }
134
135    /// Borrow the client.
136    pub fn get_ref(&self) -> ClientRef<'_> {
137        &self.0
138    }
139
140    /// Sets the poll interval for the client in milliseconds.
141    ///
142    /// Note: This will only set the poll interval for the client if it is the only reference to the
143    /// inner client. If the reference is held by many, then it will not update the poll interval.
144    pub fn with_poll_interval(self, poll_interval: Duration) -> Self {
145        self.inner().set_poll_interval(poll_interval);
146        self
147    }
148
149    /// Build a poller that polls a method with the given parameters.
150    ///
151    /// See [`PollerBuilder`] for examples and more details.
152    pub fn prepare_static_poller<Params, Resp>(
153        &self,
154        method: impl Into<Cow<'static, str>>,
155        params: Params,
156    ) -> PollerBuilder<Params, Resp>
157    where
158        Params: RpcSend + 'static,
159        Resp: RpcRecv + Clone,
160    {
161        PollerBuilder::new(self.get_weak(), method, params)
162    }
163
164    /// Boxes the transport.
165    #[deprecated(since = "0.9.0", note = "`RpcClient` is now always boxed")]
166    #[expect(clippy::missing_const_for_fn)]
167    pub fn boxed(self) -> Self {
168        self
169    }
170
171    /// Create a new [`BatchRequest`] builder.
172    #[inline]
173    pub fn new_batch(&self) -> BatchRequest<'_> {
174        BatchRequest::new(&self.0)
175    }
176}
177
178impl Deref for RpcClient {
179    type Target = RpcClientInner;
180
181    #[inline]
182    fn deref(&self) -> &Self::Target {
183        &self.0
184    }
185}
186
187/// A JSON-RPC client.
188///
189/// This struct manages a [`BoxTransport`] and a request ID counter. It is used to
190/// build [`RpcCall`] and [`BatchRequest`] objects. The client delegates
191/// transport access to the calls.
192///
193/// ### Note
194///
195/// IDs are allocated sequentially, starting at 0. IDs are reserved via
196/// [`RpcClientInner::next_id`]. Note that allocated IDs may not be used. There
197/// is no guarantee that a prepared [`RpcCall`] will be sent, or that a sent
198/// call will receive a response.
199#[derive(Debug)]
200pub struct RpcClientInner {
201    /// The underlying transport.
202    pub(crate) transport: BoxTransport,
203    /// Stores a handle to the PubSub service if pubsub.
204    ///
205    /// We store this _transport_ because if built through the [`ClientBuilder`] with an additional
206    /// layer the actual transport can be an arbitrary type and we would be unable to obtain the
207    /// `PubSubFrontend` by downcasting the `transport`. For example
208    /// `RetryTransport<PubSubFrontend>`.
209    #[cfg(feature = "pubsub")]
210    pub(crate) pubsub: MaybePubsub,
211    /// `true` if the transport is local.
212    pub(crate) is_local: bool,
213    /// The next request ID to use.
214    pub(crate) id: AtomicU64,
215    /// The poll interval for the client in milliseconds.
216    pub(crate) poll_interval: AtomicU64,
217}
218
219impl RpcClientInner {
220    /// Create a new [`RpcClient`] with the given transport.
221    ///
222    /// Note: Sets the poll interval to 250ms for local transports and 7s for remote transports by
223    /// default.
224    #[inline]
225    pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
226        Self {
227            transport: t.into_box_transport(),
228            #[cfg(feature = "pubsub")]
229            pubsub: None,
230            is_local,
231            id: AtomicU64::new(0),
232            poll_interval: if is_local { AtomicU64::new(250) } else { AtomicU64::new(7000) },
233        }
234    }
235
236    /// Create a new [`RpcClient`] with the given transport and an optional handle to the
237    /// `PubSubFrontend`.
238    pub(crate) fn new_maybe_pubsub(
239        t: impl IntoBoxTransport,
240        is_local: bool,
241        #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
242    ) -> Self {
243        Self {
244            #[cfg(feature = "pubsub")]
245            pubsub,
246            ..Self::new(t.into_box_transport(), is_local)
247        }
248    }
249
250    /// Sets the starting ID for the client.
251    #[inline]
252    pub fn with_id(self, id: u64) -> Self {
253        Self { id: AtomicU64::new(id), ..self }
254    }
255
256    /// Returns the default poll interval (milliseconds) for the client.
257    pub fn poll_interval(&self) -> Duration {
258        Duration::from_millis(self.poll_interval.load(Ordering::Relaxed))
259    }
260
261    /// Set the poll interval for the client in milliseconds. Default:
262    /// 7s for remote and 250ms for local transports.
263    pub fn set_poll_interval(&self, poll_interval: Duration) {
264        self.poll_interval.store(poll_interval.as_millis() as u64, Ordering::Relaxed);
265    }
266
267    /// Returns a reference to the underlying transport.
268    #[inline]
269    pub const fn transport(&self) -> &BoxTransport {
270        &self.transport
271    }
272
273    /// Returns a mutable reference to the underlying transport.
274    #[inline]
275    pub fn transport_mut(&mut self) -> &mut BoxTransport {
276        &mut self.transport
277    }
278
279    /// Consumes the client and returns the underlying transport.
280    #[inline]
281    pub fn into_transport(self) -> BoxTransport {
282        self.transport
283    }
284
285    /// Returns a reference to the pubsub frontend if the transport supports it.
286    #[cfg(feature = "pubsub")]
287    #[inline]
288    #[track_caller]
289    pub fn pubsub_frontend(&self) -> Option<&alloy_pubsub::PubSubFrontend> {
290        if let Some(pubsub) = &self.pubsub {
291            return Some(pubsub);
292        }
293        self.transport.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>()
294    }
295
296    /// Returns a reference to the pubsub frontend if the transport supports it.
297    ///
298    /// # Panics
299    ///
300    /// Panics if the transport does not support pubsub.
301    #[cfg(feature = "pubsub")]
302    #[inline]
303    #[track_caller]
304    pub fn expect_pubsub_frontend(&self) -> &alloy_pubsub::PubSubFrontend {
305        self.pubsub_frontend().expect("called pubsub_frontend on a non-pubsub transport")
306    }
307
308    /// Build a `JsonRpcRequest` with the given method and params.
309    ///
310    /// This function reserves an ID for the request, however the request is not sent.
311    ///
312    /// To send a request, use [`RpcClientInner::request`] and await the returned [`RpcCall`].
313    #[inline]
314    pub fn make_request<Params: RpcSend>(
315        &self,
316        method: impl Into<Cow<'static, str>>,
317        params: Params,
318    ) -> Request<Params> {
319        Request::new(method, self.next_id(), params)
320    }
321
322    /// `true` if the client believes the transport is local.
323    ///
324    /// This can be used to optimize remote API usage, or to change program
325    /// behavior on local endpoints. When the client is instantiated by parsing
326    /// a URL or other external input, this value is set on a best-efforts
327    /// basis and may be incorrect.
328    #[inline]
329    pub const fn is_local(&self) -> bool {
330        self.is_local
331    }
332
333    /// Set the `is_local` flag.
334    #[inline]
335    pub fn set_local(&mut self, is_local: bool) {
336        self.is_local = is_local;
337    }
338
339    /// Reserve a request ID value. This is used to generate request IDs.
340    #[inline]
341    fn increment_id(&self) -> u64 {
342        self.id.fetch_add(1, Ordering::Relaxed)
343    }
344
345    /// Reserve a request ID u64.
346    #[inline]
347    pub fn next_id(&self) -> Id {
348        self.increment_id().into()
349    }
350
351    /// Prepares an [`RpcCall`].
352    ///
353    /// This function reserves an ID for the request, however the request is not sent.
354    /// To send a request, await the returned [`RpcCall`].
355    ///
356    /// # Note
357    ///
358    /// Serialization is done lazily. It will not be performed until the call is awaited.
359    /// This means that if a serializer error occurs, it will not be caught until the call is
360    /// awaited.
361    #[doc(alias = "prepare")]
362    pub fn request<Params: RpcSend, Resp: RpcRecv>(
363        &self,
364        method: impl Into<Cow<'static, str>>,
365        params: Params,
366    ) -> RpcCall<Params, Resp> {
367        let request = self.make_request(method, params);
368        RpcCall::new(request, self.transport.clone())
369    }
370
371    /// Prepares an [`RpcCall`] with no parameters.
372    ///
373    /// See [`request`](Self::request) for more details.
374    pub fn request_noparams<Resp: RpcRecv>(
375        &self,
376        method: impl Into<Cow<'static, str>>,
377    ) -> RpcCall<NoParams, Resp> {
378        self.request(method, [])
379    }
380
381    /// Type erase the service in the transport, allowing it to be used in a
382    /// generic context.
383    #[deprecated(since = "0.9.0", note = "`RpcClientInner` is now always boxed")]
384    #[expect(clippy::missing_const_for_fn)]
385    pub fn boxed(self) -> Self {
386        self
387    }
388}
389
390#[cfg(feature = "pubsub")]
391mod pubsub_impl {
392    use super::*;
393    use alloy_pubsub::{PubSubConnect, RawSubscription, Subscription};
394    use alloy_transport::TransportResult;
395
396    impl RpcClientInner {
397        /// Get a [`RawSubscription`] for the given subscription ID.
398        ///
399        /// # Panics
400        ///
401        /// Panics if the transport does not support pubsub.
402        pub async fn get_raw_subscription(&self, id: alloy_primitives::B256) -> RawSubscription {
403            self.expect_pubsub_frontend().get_subscription(id).await.unwrap()
404        }
405
406        /// Get a [`Subscription`] for the given subscription ID.
407        ///
408        /// # Panics
409        ///
410        /// Panics if the transport does not support pubsub.
411        pub async fn get_subscription<T: serde::de::DeserializeOwned>(
412            &self,
413            id: alloy_primitives::B256,
414        ) -> Subscription<T> {
415            Subscription::from(self.get_raw_subscription(id).await)
416        }
417    }
418
419    impl RpcClient {
420        /// Connect to a transport via a [`PubSubConnect`] implementor.
421        pub async fn connect_pubsub<C: PubSubConnect>(connect: C) -> TransportResult<Self> {
422            ClientBuilder::default().pubsub(connect).await
423        }
424
425        /// Get the currently configured channel size. This is the number of items
426        /// to buffer in new subscription channels. Defaults to 16. See
427        /// [`tokio::sync::broadcast`] for a description of relevant
428        /// behavior.
429        ///
430        /// [`tokio::sync::broadcast`]: https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html
431        ///
432        /// # Panics
433        ///
434        /// Panics if the transport does not support pubsub.
435        #[track_caller]
436        pub fn channel_size(&self) -> usize {
437            self.expect_pubsub_frontend().channel_size()
438        }
439
440        /// Set the channel size.
441        ///
442        /// # Panics
443        ///
444        /// Panics if the transport does not support pubsub.
445        #[track_caller]
446        pub fn set_channel_size(&self, size: usize) {
447            self.expect_pubsub_frontend().set_channel_size(size)
448        }
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455    use similar_asserts::assert_eq;
456
457    #[test]
458    fn test_client_with_poll_interval() {
459        let poll_interval = Duration::from_millis(5_000);
460        let client = RpcClient::new_http(reqwest::Url::parse("http://localhost").unwrap())
461            .with_poll_interval(poll_interval);
462        assert_eq!(client.poll_interval(), poll_interval);
463    }
464}