1use crate::{poller::PollerBuilder, BatchRequest, ClientBuilder, RpcCall};
2use alloy_json_rpc::{Id, Request, RpcRecv, RpcSend};
3use alloy_transport::{mock::Asserter, BoxTransport, IntoBoxTransport};
4use std::{
5 borrow::Cow,
6 ops::Deref,
7 sync::{
8 atomic::{AtomicU64, Ordering},
9 Arc, Weak,
10 },
11 time::Duration,
12};
13use tower::{layer::util::Identity, ServiceBuilder};
14
15pub type WeakClient = Weak<RpcClientInner>;
17
18pub type ClientRef<'a> = &'a RpcClientInner;
20
21pub type NoParams = [(); 0];
23
24#[cfg(feature = "pubsub")]
25type MaybePubsub = Option<alloy_pubsub::PubSubFrontend>;
26
27#[derive(Debug)]
34pub struct RpcClient(Arc<RpcClientInner>);
35
36impl Clone for RpcClient {
37 fn clone(&self) -> Self {
38 Self(Arc::clone(&self.0))
39 }
40}
41
42impl RpcClient {
43 pub const fn builder() -> ClientBuilder<Identity> {
45 ClientBuilder { builder: ServiceBuilder::new() }
46 }
47}
48
49impl RpcClient {
50 pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
52 Self::new_maybe_pubsub(
53 t,
54 is_local,
55 #[cfg(feature = "pubsub")]
56 None,
57 )
58 }
59
60 pub fn mocked(asserter: Asserter) -> Self {
63 Self::new(alloy_transport::mock::MockTransport::new(asserter), true)
64 }
65
66 #[cfg(feature = "reqwest")]
68 pub fn new_http(url: reqwest::Url) -> Self {
69 let http = alloy_transport_http::Http::new(url);
70 let is_local = http.guess_local();
71 Self::new(http, is_local)
72 }
73
74 #[cfg(feature = "reqwest")]
76 pub fn new_http_with_client(client: reqwest::Client, url: reqwest::Url) -> Self {
77 let http = alloy_transport_http::Http::with_client(client, url);
78 let is_local = http.guess_local();
79 Self::new(http, is_local)
80 }
81
82 fn new_maybe_pubsub(
84 t: impl IntoBoxTransport,
85 is_local: bool,
86 #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
87 ) -> Self {
88 Self(Arc::new(RpcClientInner::new_maybe_pubsub(
89 t,
90 is_local,
91 #[cfg(feature = "pubsub")]
92 pubsub,
93 )))
94 }
95
96 pub(crate) fn new_layered<F, T, R>(is_local: bool, main_transport: T, layer: F) -> Self
107 where
108 F: FnOnce(T) -> R,
109 T: IntoBoxTransport,
110 R: IntoBoxTransport,
111 {
112 #[cfg(feature = "pubsub")]
113 {
114 let t = main_transport.clone().into_box_transport();
115 let maybe_pubsub = t.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>().cloned();
116 Self::new_maybe_pubsub(layer(main_transport), is_local, maybe_pubsub)
117 }
118
119 #[cfg(not(feature = "pubsub"))]
120 Self::new(layer(main_transport), is_local)
121 }
122
123 pub fn from_inner(inner: RpcClientInner) -> Self {
125 Self(Arc::new(inner))
126 }
127
128 pub const fn inner(&self) -> &Arc<RpcClientInner> {
130 &self.0
131 }
132
133 pub fn into_inner(self) -> Arc<RpcClientInner> {
135 self.0
136 }
137
138 pub fn get_weak(&self) -> WeakClient {
140 Arc::downgrade(&self.0)
141 }
142
143 pub fn get_ref(&self) -> ClientRef<'_> {
145 &self.0
146 }
147
148 pub fn with_poll_interval(self, poll_interval: Duration) -> Self {
153 self.inner().set_poll_interval(poll_interval);
154 self
155 }
156
157 pub fn prepare_static_poller<Params, Resp>(
161 &self,
162 method: impl Into<Cow<'static, str>>,
163 params: Params,
164 ) -> PollerBuilder<Params, Resp>
165 where
166 Params: RpcSend + 'static,
167 Resp: RpcRecv + Clone,
168 {
169 PollerBuilder::new(self.get_weak(), method, params)
170 }
171
172 #[deprecated(since = "0.9.0", note = "`RpcClient` is now always boxed")]
174 #[expect(clippy::missing_const_for_fn)]
175 pub fn boxed(self) -> Self {
176 self
177 }
178
179 #[inline]
181 pub fn new_batch(&self) -> BatchRequest<'_> {
182 BatchRequest::new(&self.0)
183 }
184}
185
186impl Deref for RpcClient {
187 type Target = RpcClientInner;
188
189 #[inline]
190 fn deref(&self) -> &Self::Target {
191 &self.0
192 }
193}
194
195#[derive(Debug)]
208pub struct RpcClientInner {
209 pub(crate) transport: BoxTransport,
211 #[cfg(feature = "pubsub")]
218 pub(crate) pubsub: MaybePubsub,
219 pub(crate) is_local: bool,
221 pub(crate) id: AtomicU64,
223 pub(crate) poll_interval: AtomicU64,
225}
226
227impl RpcClientInner {
228 #[inline]
233 pub fn new(t: impl IntoBoxTransport, is_local: bool) -> Self {
234 Self {
235 transport: t.into_box_transport(),
236 #[cfg(feature = "pubsub")]
237 pubsub: None,
238 is_local,
239 id: AtomicU64::new(0),
240 poll_interval: if is_local { AtomicU64::new(250) } else { AtomicU64::new(7000) },
241 }
242 }
243
244 pub(crate) fn new_maybe_pubsub(
247 t: impl IntoBoxTransport,
248 is_local: bool,
249 #[cfg(feature = "pubsub")] pubsub: MaybePubsub,
250 ) -> Self {
251 Self {
252 #[cfg(feature = "pubsub")]
253 pubsub,
254 ..Self::new(t.into_box_transport(), is_local)
255 }
256 }
257
258 #[inline]
260 pub fn with_id(self, id: u64) -> Self {
261 Self { id: AtomicU64::new(id), ..self }
262 }
263
264 pub fn poll_interval(&self) -> Duration {
266 Duration::from_millis(self.poll_interval.load(Ordering::Relaxed))
267 }
268
269 pub fn set_poll_interval(&self, poll_interval: Duration) {
272 self.poll_interval.store(poll_interval.as_millis() as u64, Ordering::Relaxed);
273 }
274
275 #[inline]
277 pub const fn transport(&self) -> &BoxTransport {
278 &self.transport
279 }
280
281 #[inline]
283 pub const fn transport_mut(&mut self) -> &mut BoxTransport {
284 &mut self.transport
285 }
286
287 #[inline]
289 pub fn into_transport(self) -> BoxTransport {
290 self.transport
291 }
292
293 #[cfg(feature = "pubsub")]
295 #[inline]
296 #[track_caller]
297 pub fn pubsub_frontend(&self) -> Option<&alloy_pubsub::PubSubFrontend> {
298 if let Some(pubsub) = &self.pubsub {
299 return Some(pubsub);
300 }
301 self.transport.as_any().downcast_ref::<alloy_pubsub::PubSubFrontend>()
302 }
303
304 #[cfg(feature = "pubsub")]
310 #[inline]
311 #[track_caller]
312 pub fn expect_pubsub_frontend(&self) -> &alloy_pubsub::PubSubFrontend {
313 self.pubsub_frontend().expect("called pubsub_frontend on a non-pubsub transport")
314 }
315
316 #[inline]
322 pub fn make_request<Params: RpcSend>(
323 &self,
324 method: impl Into<Cow<'static, str>>,
325 params: Params,
326 ) -> Request<Params> {
327 Request::new(method, self.next_id(), params)
328 }
329
330 #[inline]
337 pub const fn is_local(&self) -> bool {
338 self.is_local
339 }
340
341 #[inline]
343 pub const fn set_local(&mut self, is_local: bool) {
344 self.is_local = is_local;
345 }
346
347 #[inline]
349 fn increment_id(&self) -> u64 {
350 self.id.fetch_add(1, Ordering::Relaxed)
351 }
352
353 #[inline]
355 pub fn next_id(&self) -> Id {
356 self.increment_id().into()
357 }
358
359 #[doc(alias = "prepare")]
370 pub fn request<Params: RpcSend, Resp: RpcRecv>(
371 &self,
372 method: impl Into<Cow<'static, str>>,
373 params: Params,
374 ) -> RpcCall<Params, Resp> {
375 let request = self.make_request(method, params);
376 RpcCall::new(request, self.transport.clone())
377 }
378
379 pub fn request_noparams<Resp: RpcRecv>(
383 &self,
384 method: impl Into<Cow<'static, str>>,
385 ) -> RpcCall<NoParams, Resp> {
386 self.request(method, [])
387 }
388
389 #[deprecated(since = "0.9.0", note = "`RpcClientInner` is now always boxed")]
392 #[expect(clippy::missing_const_for_fn)]
393 pub fn boxed(self) -> Self {
394 self
395 }
396}
397
398#[cfg(feature = "pubsub")]
399mod pubsub_impl {
400 use super::*;
401 use alloy_pubsub::{PubSubConnect, RawSubscription, Subscription};
402 use alloy_transport::TransportResult;
403
404 impl RpcClientInner {
405 pub async fn get_raw_subscription(&self, id: alloy_primitives::B256) -> RawSubscription {
411 self.expect_pubsub_frontend().get_subscription(id).await.unwrap()
412 }
413
414 pub async fn get_subscription<T: serde::de::DeserializeOwned>(
420 &self,
421 id: alloy_primitives::B256,
422 ) -> Subscription<T> {
423 Subscription::from(self.get_raw_subscription(id).await)
424 }
425 }
426
427 impl RpcClient {
428 pub async fn connect_pubsub<C: PubSubConnect>(connect: C) -> TransportResult<Self> {
430 ClientBuilder::default().pubsub(connect).await
431 }
432
433 #[track_caller]
444 pub fn channel_size(&self) -> usize {
445 self.expect_pubsub_frontend().channel_size()
446 }
447
448 #[track_caller]
454 pub fn set_channel_size(&self, size: usize) {
455 self.expect_pubsub_frontend().set_channel_size(size)
456 }
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463 use similar_asserts::assert_eq;
464
465 #[test]
466 fn test_client_with_poll_interval() {
467 let poll_interval = Duration::from_millis(5_000);
468 let client = RpcClient::new_http(reqwest::Url::parse("http://localhost").unwrap())
469 .with_poll_interval(poll_interval);
470 assert_eq!(client.poll_interval(), poll_interval);
471 }
472}