1use crate::{poller::PollerBuilder, BatchRequest, ClientBuilder, RpcCall};
2use alloy_json_rpc::{Id, Request, RpcRecv, RpcSend};
3use alloy_transport::{mock::Asserter, BoxTransport, IntoBoxTransport};
4use std::{
5 borrow::Cow,
6 ops::Deref,
7 sync::{
8 atomic::{AtomicU64, Ordering},
9 Arc, Weak,
10 },
11 time::Duration,
12};
13use tower::{layer::util::Identity, ServiceBuilder};
14
15pub type WeakClient = Weak<RpcClientInner>;
17
18pub type ClientRef<'a> = &'a RpcClientInner;
20
21pub type NoParams = [(); 0];
23
24#[cfg(feature = "pubsub")]
25type MaybePubsub = Option<alloy_pubsub::PubSubFrontend>;
26
27#[derive(Debug)]
34pub struct RpcClient(Arc<RpcClientInner>);
35
36impl Clone for RpcClient {
37 fn clone(&self) -> Self {
38 Self(Arc::clone(&self.0))
39 }
40}
41
42impl RpcClient {
43 pub const fn builder() -> ClientBuilder<Identity> {
45 ClientBuilder { builder: ServiceBuilder::new() }
46 }
47}
48
49impl RpcClient {
50 pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
52 Self::new_maybe_pubsub(
53 t,
54 is_local,
55 #[cfg(feature = "pubsub")]
56 None,
57 )
58 }
59
60 pub fn mocked(asserter: Asserter) -> Self {
63 Self::new(alloy_transport::mock::MockTransport::new(asserter), true)
64 }
65
66 #[cfg(feature = "reqwest")]
68 pub fn new_http(url: reqwest::Url) -> Self {
69 let http = alloy_transport_http::Http::new(url);
70 let is_local = http.guess_local();
71 Self::new(http, is_local)
72 }
73
74 fn new_maybe_pubsub(
76 t: impl IntoBoxTransport,
77 is_local: bool,
78 #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
79 ) -> Self {
80 Self(Arc::new(RpcClientInner::new_maybe_pubsub(
81 t,
82 is_local,
83 #[cfg(feature = "pubsub")]
84 pubsub,
85 )))
86 }
87
88 pub(crate) fn new_layered<F, T, R>(is_local: bool, main_transport: T, layer: F) -> Self
99 where
100 F: FnOnce(T) -> R,
101 T: IntoBoxTransport,
102 R: IntoBoxTransport,
103 {
104 #[cfg(feature = "pubsub")]
105 {
106 let t = main_transport.clone().into_box_transport();
107 let maybe_pubsub = t.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>().cloned();
108 Self::new_maybe_pubsub(layer(main_transport), is_local, maybe_pubsub)
109 }
110
111 #[cfg(not(feature = "pubsub"))]
112 Self::new(layer(main_transport), is_local)
113 }
114
115 pub fn from_inner(inner: RpcClientInner) -> Self {
117 Self(Arc::new(inner))
118 }
119
120 pub const fn inner(&self) -> &Arc<RpcClientInner> {
122 &self.0
123 }
124
125 pub fn into_inner(self) -> Arc<RpcClientInner> {
127 self.0
128 }
129
130 pub fn get_weak(&self) -> WeakClient {
132 Arc::downgrade(&self.0)
133 }
134
135 pub fn get_ref(&self) -> ClientRef<'_> {
137 &self.0
138 }
139
140 pub fn with_poll_interval(self, poll_interval: Duration) -> Self {
145 self.inner().set_poll_interval(poll_interval);
146 self
147 }
148
149 pub fn prepare_static_poller<Params, Resp>(
153 &self,
154 method: impl Into<Cow<'static, str>>,
155 params: Params,
156 ) -> PollerBuilder<Params, Resp>
157 where
158 Params: RpcSend + 'static,
159 Resp: RpcRecv + Clone,
160 {
161 PollerBuilder::new(self.get_weak(), method, params)
162 }
163
164 #[deprecated(since = "0.9.0", note = "`RpcClient` is now always boxed")]
166 #[expect(clippy::missing_const_for_fn)]
167 pub fn boxed(self) -> Self {
168 self
169 }
170
171 #[inline]
173 pub fn new_batch(&self) -> BatchRequest<'_> {
174 BatchRequest::new(&self.0)
175 }
176}
177
178impl Deref for RpcClient {
179 type Target = RpcClientInner;
180
181 #[inline]
182 fn deref(&self) -> &Self::Target {
183 &self.0
184 }
185}
186
187#[derive(Debug)]
200pub struct RpcClientInner {
201 pub(crate) transport: BoxTransport,
203 #[cfg(feature = "pubsub")]
210 pub(crate) pubsub: MaybePubsub,
211 pub(crate) is_local: bool,
213 pub(crate) id: AtomicU64,
215 pub(crate) poll_interval: AtomicU64,
217}
218
219impl RpcClientInner {
220 #[inline]
225 pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
226 Self {
227 transport: t.into_box_transport(),
228 #[cfg(feature = "pubsub")]
229 pubsub: None,
230 is_local,
231 id: AtomicU64::new(0),
232 poll_interval: if is_local { AtomicU64::new(250) } else { AtomicU64::new(7000) },
233 }
234 }
235
236 pub(crate) fn new_maybe_pubsub(
239 t: impl IntoBoxTransport,
240 is_local: bool,
241 #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
242 ) -> Self {
243 Self {
244 #[cfg(feature = "pubsub")]
245 pubsub,
246 ..Self::new(t.into_box_transport(), is_local)
247 }
248 }
249
250 #[inline]
252 pub fn with_id(self, id: u64) -> Self {
253 Self { id: AtomicU64::new(id), ..self }
254 }
255
256 pub fn poll_interval(&self) -> Duration {
258 Duration::from_millis(self.poll_interval.load(Ordering::Relaxed))
259 }
260
261 pub fn set_poll_interval(&self, poll_interval: Duration) {
264 self.poll_interval.store(poll_interval.as_millis() as u64, Ordering::Relaxed);
265 }
266
267 #[inline]
269 pub const fn transport(&self) -> &BoxTransport {
270 &self.transport
271 }
272
273 #[inline]
275 pub fn transport_mut(&mut self) -> &mut BoxTransport {
276 &mut self.transport
277 }
278
279 #[inline]
281 pub fn into_transport(self) -> BoxTransport {
282 self.transport
283 }
284
285 #[cfg(feature = "pubsub")]
287 #[inline]
288 #[track_caller]
289 pub fn pubsub_frontend(&self) -> Option<&alloy_pubsub::PubSubFrontend> {
290 if let Some(pubsub) = &self.pubsub {
291 return Some(pubsub);
292 }
293 self.transport.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>()
294 }
295
296 #[cfg(feature = "pubsub")]
302 #[inline]
303 #[track_caller]
304 pub fn expect_pubsub_frontend(&self) -> &alloy_pubsub::PubSubFrontend {
305 self.pubsub_frontend().expect("called pubsub_frontend on a non-pubsub transport")
306 }
307
308 #[inline]
314 pub fn make_request<Params: RpcSend>(
315 &self,
316 method: impl Into<Cow<'static, str>>,
317 params: Params,
318 ) -> Request<Params> {
319 Request::new(method, self.next_id(), params)
320 }
321
322 #[inline]
329 pub const fn is_local(&self) -> bool {
330 self.is_local
331 }
332
333 #[inline]
335 pub fn set_local(&mut self, is_local: bool) {
336 self.is_local = is_local;
337 }
338
339 #[inline]
341 fn increment_id(&self) -> u64 {
342 self.id.fetch_add(1, Ordering::Relaxed)
343 }
344
345 #[inline]
347 pub fn next_id(&self) -> Id {
348 self.increment_id().into()
349 }
350
351 #[doc(alias = "prepare")]
362 pub fn request<Params: RpcSend, Resp: RpcRecv>(
363 &self,
364 method: impl Into<Cow<'static, str>>,
365 params: Params,
366 ) -> RpcCall<Params, Resp> {
367 let request = self.make_request(method, params);
368 RpcCall::new(request, self.transport.clone())
369 }
370
371 pub fn request_noparams<Resp: RpcRecv>(
375 &self,
376 method: impl Into<Cow<'static, str>>,
377 ) -> RpcCall<NoParams, Resp> {
378 self.request(method, [])
379 }
380
381 #[deprecated(since = "0.9.0", note = "`RpcClientInner` is now always boxed")]
384 #[expect(clippy::missing_const_for_fn)]
385 pub fn boxed(self) -> Self {
386 self
387 }
388}
389
390#[cfg(feature = "pubsub")]
391mod pubsub_impl {
392 use super::*;
393 use alloy_pubsub::{PubSubConnect, RawSubscription, Subscription};
394 use alloy_transport::TransportResult;
395
396 impl RpcClientInner {
397 pub async fn get_raw_subscription(&self, id: alloy_primitives::B256) -> RawSubscription {
403 self.expect_pubsub_frontend().get_subscription(id).await.unwrap()
404 }
405
406 pub async fn get_subscription<T: serde::de::DeserializeOwned>(
412 &self,
413 id: alloy_primitives::B256,
414 ) -> Subscription<T> {
415 Subscription::from(self.get_raw_subscription(id).await)
416 }
417 }
418
419 impl RpcClient {
420 pub async fn connect_pubsub<C: PubSubConnect>(connect: C) -> TransportResult<Self> {
422 ClientBuilder::default().pubsub(connect).await
423 }
424
425 #[track_caller]
436 pub fn channel_size(&self) -> usize {
437 self.expect_pubsub_frontend().channel_size()
438 }
439
440 #[track_caller]
446 pub fn set_channel_size(&self, size: usize) {
447 self.expect_pubsub_frontend().set_channel_size(size)
448 }
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455 use similar_asserts::assert_eq;
456
457 #[test]
458 fn test_client_with_poll_interval() {
459 let poll_interval = Duration::from_millis(5_000);
460 let client = RpcClient::new_http(reqwest::Url::parse("http://localhost").unwrap())
461 .with_poll_interval(poll_interval);
462 assert_eq!(client.poll_interval(), poll_interval);
463 }
464}