alloy_rpc_client/
poller.rs

1use crate::WeakClient;
2use alloy_json_rpc::{RpcError, RpcRecv, RpcSend};
3use alloy_transport::utils::Spawnable;
4use async_stream::stream;
5use futures::{Stream, StreamExt};
6use serde::Serialize;
7use serde_json::value::RawValue;
8use std::{
9    borrow::Cow,
10    marker::PhantomData,
11    ops::{Deref, DerefMut},
12    time::Duration,
13};
14use tokio::sync::broadcast;
15use tokio_stream::wrappers::BroadcastStream;
16use tracing_futures::Instrument;
17
18#[cfg(target_family = "wasm")]
19use wasmtimer::tokio::sleep;
20
21#[cfg(not(target_family = "wasm"))]
22use tokio::time::sleep;
23
24/// The number of retries for polling a request.
25const MAX_RETRIES: usize = 3;
26
27/// A poller task builder.
28///
29/// This builder is used to create a poller task that repeatedly polls a method on a client and
30/// sends the responses to a channel. By default, this is done every 10 seconds, with a channel size
31/// of 16, and no limit on the number of successful polls. This is all configurable.
32///
33/// The builder is consumed using the [`spawn`](Self::spawn) method, which returns a channel to
34/// receive the responses. The task will continue to poll until either the client or the channel is
35/// dropped.
36///
37/// The channel can be converted into a stream using the [`into_stream`](PollChannel::into_stream)
38/// method.
39///
40/// Alternatively, [`into_stream`](Self::into_stream) on the builder can be used to directly return
41/// a stream of responses on the current thread, instead of spawning a task.
42///
43/// # Examples
44///
45/// Poll `eth_blockNumber` every 5 seconds:
46///
47/// ```no_run
48/// # async fn example(client: alloy_rpc_client::RpcClient) -> Result<(), Box<dyn std::error::Error>> {
49/// use alloy_primitives::U64;
50/// use alloy_rpc_client::PollerBuilder;
51/// use futures_util::StreamExt;
52///
53/// let poller: PollerBuilder<alloy_rpc_client::NoParams, U64> = client
54///     .prepare_static_poller("eth_blockNumber", [])
55///     .with_poll_interval(std::time::Duration::from_secs(5));
56/// let mut stream = poller.into_stream();
57/// while let Some(block_number) = stream.next().await {
58///    println!("polled block number: {block_number}");
59/// }
60/// # Ok(())
61/// # }
62/// ```
63// TODO: make this be able to be spawned on the current thread instead of forcing a task.
64#[derive(Debug)]
65#[must_use = "this builder does nothing unless you call `spawn` or `into_stream`"]
66pub struct PollerBuilder<Params, Resp> {
67    /// The client to poll with.
68    client: WeakClient,
69
70    /// Request Method
71    method: Cow<'static, str>,
72    params: Params,
73
74    // config options
75    channel_size: usize,
76    poll_interval: Duration,
77    limit: usize,
78
79    _pd: PhantomData<fn() -> Resp>,
80}
81
82impl<Params, Resp> PollerBuilder<Params, Resp>
83where
84    Params: RpcSend + 'static,
85    Resp: RpcRecv + Clone,
86{
87    /// Create a new poller task.
88    pub fn new(client: WeakClient, method: impl Into<Cow<'static, str>>, params: Params) -> Self {
89        let poll_interval =
90            client.upgrade().map_or_else(|| Duration::from_secs(7), |c| c.poll_interval());
91        Self {
92            client,
93            method: method.into(),
94            params,
95            channel_size: 16,
96            poll_interval,
97            limit: usize::MAX,
98            _pd: PhantomData,
99        }
100    }
101
102    /// Returns the channel size for the poller task.
103    pub const fn channel_size(&self) -> usize {
104        self.channel_size
105    }
106
107    /// Sets the channel size for the poller task.
108    pub fn set_channel_size(&mut self, channel_size: usize) {
109        self.channel_size = channel_size;
110    }
111
112    /// Sets the channel size for the poller task.
113    pub fn with_channel_size(mut self, channel_size: usize) -> Self {
114        self.set_channel_size(channel_size);
115        self
116    }
117
118    /// Returns the limit on the number of successful polls.
119    pub const fn limit(&self) -> usize {
120        self.limit
121    }
122
123    /// Sets a limit on the number of successful polls.
124    pub fn set_limit(&mut self, limit: Option<usize>) {
125        self.limit = limit.unwrap_or(usize::MAX);
126    }
127
128    /// Sets a limit on the number of successful polls.
129    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
130        self.set_limit(limit);
131        self
132    }
133
134    /// Returns the duration between polls.
135    pub const fn poll_interval(&self) -> Duration {
136        self.poll_interval
137    }
138
139    /// Sets the duration between polls.
140    pub fn set_poll_interval(&mut self, poll_interval: Duration) {
141        self.poll_interval = poll_interval;
142    }
143
144    /// Sets the duration between polls.
145    pub fn with_poll_interval(mut self, poll_interval: Duration) -> Self {
146        self.set_poll_interval(poll_interval);
147        self
148    }
149
150    /// Starts the poller in a new task, returning a channel to receive the responses on.
151    pub fn spawn(self) -> PollChannel<Resp> {
152        let (tx, rx) = broadcast::channel(self.channel_size);
153        self.into_future(tx).spawn_task();
154        rx.into()
155    }
156
157    async fn into_future(self, tx: broadcast::Sender<Resp>) {
158        let mut stream = self.into_stream();
159        while let Some(resp) = stream.next().await {
160            if tx.send(resp).is_err() {
161                debug!("channel closed");
162                break;
163            }
164        }
165    }
166
167    /// Starts the poller and returns the stream of responses.
168    ///
169    /// Note that this does not spawn the poller on a separate task, thus all responses will be
170    /// polled on the current thread once this stream is polled.
171    pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
172        Box::pin(self.into_local_stream())
173    }
174
175    fn into_local_stream(self) -> impl Stream<Item = Resp> {
176        let span = debug_span!("poller", method = %self.method);
177        stream! {
178        let mut params = ParamsOnce::Typed(self.params);
179        let mut retries = MAX_RETRIES;
180        'outer: for _ in 0..self.limit {
181            let Some(client) = self.client.upgrade() else {
182                debug!("client dropped");
183                break;
184            };
185
186            // Avoid serializing the params more than once.
187            let params = match params.get() {
188                Ok(p) => p,
189                Err(err) => {
190                    error!(%err, "failed to serialize params");
191                    break;
192                }
193            };
194
195            loop {
196                trace!("polling");
197                match client.request(self.method.clone(), params).await {
198                    Ok(resp) => yield resp,
199                    Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
200                        debug!(%err, "failed to poll, retrying");
201                        retries -= 1;
202                        continue;
203                    }
204                    Err(err) => {
205                        error!(%err, "failed to poll");
206                        break 'outer;
207                    }
208                }
209                break;
210            }
211
212            trace!(duration=?self.poll_interval, "sleeping");
213            sleep(self.poll_interval).await;
214        }
215        }
216        .instrument(span)
217    }
218
219    /// Returns the [`WeakClient`] associated with the poller.
220    pub fn client(&self) -> WeakClient {
221        self.client.clone()
222    }
223}
224
225/// A channel yielding responses from a poller task.
226///
227/// This stream is backed by a coroutine, and will continue to produce responses
228/// until the poller task is dropped. The poller task is dropped when all
229/// [`RpcClient`] instances are dropped, or when all listening `PollChannel` are
230/// dropped.
231///
232/// The poller task also ignores errors from the server and deserialization
233/// errors, and will continue to poll until the client is dropped.
234///
235/// [`RpcClient`]: crate::RpcClient
236#[derive(Debug)]
237pub struct PollChannel<Resp> {
238    rx: broadcast::Receiver<Resp>,
239}
240
241impl<Resp> From<broadcast::Receiver<Resp>> for PollChannel<Resp> {
242    fn from(rx: broadcast::Receiver<Resp>) -> Self {
243        Self { rx }
244    }
245}
246
247impl<Resp> Deref for PollChannel<Resp> {
248    type Target = broadcast::Receiver<Resp>;
249
250    fn deref(&self) -> &Self::Target {
251        &self.rx
252    }
253}
254
255impl<Resp> DerefMut for PollChannel<Resp> {
256    fn deref_mut(&mut self) -> &mut Self::Target {
257        &mut self.rx
258    }
259}
260
261impl<Resp> PollChannel<Resp>
262where
263    Resp: RpcRecv + Clone,
264{
265    /// Resubscribe to the poller task.
266    pub fn resubscribe(&self) -> Self {
267        Self { rx: self.rx.resubscribe() }
268    }
269
270    /// Converts the poll channel into a stream.
271    // TODO: can we name this type?
272    pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
273        self.into_stream_raw().filter_map(|r| futures::future::ready(r.ok()))
274    }
275
276    /// Converts the poll channel into a stream that also yields
277    /// [lag errors](tokio_stream::wrappers::errors::BroadcastStreamRecvError).
278    pub fn into_stream_raw(self) -> BroadcastStream<Resp> {
279        self.rx.into()
280    }
281}
282
283// Serializes the parameters only once.
284enum ParamsOnce<P> {
285    Typed(P),
286    Serialized(Box<RawValue>),
287}
288
289impl<P: Serialize> ParamsOnce<P> {
290    #[inline]
291    fn get(&mut self) -> serde_json::Result<&RawValue> {
292        match self {
293            Self::Typed(_) => self.init(),
294            Self::Serialized(p) => Ok(p),
295        }
296    }
297
298    #[cold]
299    fn init(&mut self) -> serde_json::Result<&RawValue> {
300        let Self::Typed(p) = self else { unreachable!() };
301        let v = serde_json::value::to_raw_value(p)?;
302        *self = Self::Serialized(v);
303        let Self::Serialized(v) = self else { unreachable!() };
304        Ok(v)
305    }
306}
307
308#[cfg(test)]
309#[allow(clippy::missing_const_for_fn)]
310fn _assert_unpin() {
311    fn _assert<T: Unpin>() {}
312    _assert::<PollChannel<()>>();
313}