alloy_rpc_client/
poller.rs1use crate::WeakClient;
2use alloy_json_rpc::{RpcError, RpcRecv, RpcSend};
3use alloy_transport::utils::Spawnable;
4use async_stream::stream;
5use futures::{Stream, StreamExt};
6use serde::Serialize;
7use serde_json::value::RawValue;
8use std::{
9 borrow::Cow,
10 marker::PhantomData,
11 ops::{Deref, DerefMut},
12 time::Duration,
13};
14use tokio::sync::broadcast;
15use tokio_stream::wrappers::BroadcastStream;
16use tracing_futures::Instrument;
17
18#[cfg(target_family = "wasm")]
19use wasmtimer::tokio::sleep;
20
21#[cfg(not(target_family = "wasm"))]
22use tokio::time::sleep;
23
24const MAX_RETRIES: usize = 3;
26
27#[derive(Debug)]
65#[must_use = "this builder does nothing unless you call `spawn` or `into_stream`"]
66pub struct PollerBuilder<Params, Resp> {
67 client: WeakClient,
69
70 method: Cow<'static, str>,
72 params: Params,
73
74 channel_size: usize,
76 poll_interval: Duration,
77 limit: usize,
78
79 _pd: PhantomData<fn() -> Resp>,
80}
81
82impl<Params, Resp> PollerBuilder<Params, Resp>
83where
84 Params: RpcSend + 'static,
85 Resp: RpcRecv + Clone,
86{
87 pub fn new(client: WeakClient, method: impl Into<Cow<'static, str>>, params: Params) -> Self {
89 let poll_interval =
90 client.upgrade().map_or_else(|| Duration::from_secs(7), |c| c.poll_interval());
91 Self {
92 client,
93 method: method.into(),
94 params,
95 channel_size: 16,
96 poll_interval,
97 limit: usize::MAX,
98 _pd: PhantomData,
99 }
100 }
101
102 pub const fn channel_size(&self) -> usize {
104 self.channel_size
105 }
106
107 pub fn set_channel_size(&mut self, channel_size: usize) {
109 self.channel_size = channel_size;
110 }
111
112 pub fn with_channel_size(mut self, channel_size: usize) -> Self {
114 self.set_channel_size(channel_size);
115 self
116 }
117
118 pub const fn limit(&self) -> usize {
120 self.limit
121 }
122
123 pub fn set_limit(&mut self, limit: Option<usize>) {
125 self.limit = limit.unwrap_or(usize::MAX);
126 }
127
128 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
130 self.set_limit(limit);
131 self
132 }
133
134 pub const fn poll_interval(&self) -> Duration {
136 self.poll_interval
137 }
138
139 pub fn set_poll_interval(&mut self, poll_interval: Duration) {
141 self.poll_interval = poll_interval;
142 }
143
144 pub fn with_poll_interval(mut self, poll_interval: Duration) -> Self {
146 self.set_poll_interval(poll_interval);
147 self
148 }
149
150 pub fn spawn(self) -> PollChannel<Resp> {
152 let (tx, rx) = broadcast::channel(self.channel_size);
153 self.into_future(tx).spawn_task();
154 rx.into()
155 }
156
157 async fn into_future(self, tx: broadcast::Sender<Resp>) {
158 let mut stream = self.into_stream();
159 while let Some(resp) = stream.next().await {
160 if tx.send(resp).is_err() {
161 debug!("channel closed");
162 break;
163 }
164 }
165 }
166
167 pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
172 Box::pin(self.into_local_stream())
173 }
174
175 fn into_local_stream(self) -> impl Stream<Item = Resp> {
176 let span = debug_span!("poller", method = %self.method);
177 stream! {
178 let mut params = ParamsOnce::Typed(self.params);
179 let mut retries = MAX_RETRIES;
180 'outer: for _ in 0..self.limit {
181 let Some(client) = self.client.upgrade() else {
182 debug!("client dropped");
183 break;
184 };
185
186 let params = match params.get() {
188 Ok(p) => p,
189 Err(err) => {
190 error!(%err, "failed to serialize params");
191 break;
192 }
193 };
194
195 loop {
196 trace!("polling");
197 match client.request(self.method.clone(), params).await {
198 Ok(resp) => yield resp,
199 Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
200 debug!(%err, "failed to poll, retrying");
201 retries -= 1;
202 continue;
203 }
204 Err(err) => {
205 error!(%err, "failed to poll");
206 break 'outer;
207 }
208 }
209 break;
210 }
211
212 trace!(duration=?self.poll_interval, "sleeping");
213 sleep(self.poll_interval).await;
214 }
215 }
216 .instrument(span)
217 }
218
219 pub fn client(&self) -> WeakClient {
221 self.client.clone()
222 }
223}
224
225#[derive(Debug)]
237pub struct PollChannel<Resp> {
238 rx: broadcast::Receiver<Resp>,
239}
240
241impl<Resp> From<broadcast::Receiver<Resp>> for PollChannel<Resp> {
242 fn from(rx: broadcast::Receiver<Resp>) -> Self {
243 Self { rx }
244 }
245}
246
247impl<Resp> Deref for PollChannel<Resp> {
248 type Target = broadcast::Receiver<Resp>;
249
250 fn deref(&self) -> &Self::Target {
251 &self.rx
252 }
253}
254
255impl<Resp> DerefMut for PollChannel<Resp> {
256 fn deref_mut(&mut self) -> &mut Self::Target {
257 &mut self.rx
258 }
259}
260
261impl<Resp> PollChannel<Resp>
262where
263 Resp: RpcRecv + Clone,
264{
265 pub fn resubscribe(&self) -> Self {
267 Self { rx: self.rx.resubscribe() }
268 }
269
270 pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
273 self.into_stream_raw().filter_map(|r| futures::future::ready(r.ok()))
274 }
275
276 pub fn into_stream_raw(self) -> BroadcastStream<Resp> {
279 self.rx.into()
280 }
281}
282
283enum ParamsOnce<P> {
285 Typed(P),
286 Serialized(Box<RawValue>),
287}
288
289impl<P: Serialize> ParamsOnce<P> {
290 #[inline]
291 fn get(&mut self) -> serde_json::Result<&RawValue> {
292 match self {
293 Self::Typed(_) => self.init(),
294 Self::Serialized(p) => Ok(p),
295 }
296 }
297
298 #[cold]
299 fn init(&mut self) -> serde_json::Result<&RawValue> {
300 let Self::Typed(p) = self else { unreachable!() };
301 let v = serde_json::value::to_raw_value(p)?;
302 *self = Self::Serialized(v);
303 let Self::Serialized(v) = self else { unreachable!() };
304 Ok(v)
305 }
306}
307
308#[cfg(test)]
309#[allow(clippy::missing_const_for_fn)]
310fn _assert_unpin() {
311 fn _assert<T: Unpin>() {}
312 _assert::<PollChannel<()>>();
313}