alloy_rpc_client/
poller.rs

1use crate::WeakClient;
2use alloy_json_rpc::{RpcRecv, RpcSend};
3use alloy_transport::utils::Spawnable;
4use futures::{ready, stream::FusedStream, Future, FutureExt, Stream, StreamExt};
5use serde::Serialize;
6use serde_json::value::RawValue;
7use std::{
8    borrow::Cow,
9    marker::PhantomData,
10    ops::{Deref, DerefMut},
11    pin::Pin,
12    task::{Context, Poll},
13    time::Duration,
14};
15use tokio::sync::broadcast;
16use tokio_stream::wrappers::BroadcastStream;
17use tracing::Span;
18
19#[cfg(target_family = "wasm")]
20use wasmtimer::tokio::{sleep, Sleep};
21
22#[cfg(not(target_family = "wasm"))]
23use tokio::time::{sleep, Sleep};
24
25/// A poller task builder.
26///
27/// This builder is used to create a poller task that repeatedly polls a method on a client and
28/// sends the responses to a channel. By default, this is done every 10 seconds, with a channel size
29/// of 16, and no limit on the number of successful polls. This is all configurable.
30///
31/// The builder is consumed using the [`spawn`](Self::spawn) method, which returns a channel to
32/// receive the responses. The task will continue to poll until either the client or the channel is
33/// dropped.
34///
35/// The channel can be converted into a stream using the [`into_stream`](PollChannel::into_stream)
36/// method.
37///
38/// Alternatively, [`into_stream`](Self::into_stream) on the builder can be used to directly return
39/// a stream of responses on the current thread, instead of spawning a task.
40///
41/// # Examples
42///
43/// Poll `eth_blockNumber` every 5 seconds:
44///
45/// ```no_run
46/// # async fn example(client: alloy_rpc_client::RpcClient) -> Result<(), Box<dyn std::error::Error>> {
47/// use alloy_primitives::U64;
48/// use alloy_rpc_client::PollerBuilder;
49/// use futures_util::StreamExt;
50///
51/// let poller: PollerBuilder<(), U64> = client
52///     .prepare_static_poller("eth_blockNumber", ())
53///     .with_poll_interval(std::time::Duration::from_secs(5));
54/// let mut stream = poller.into_stream();
55/// while let Some(block_number) = stream.next().await {
56///    println!("polled block number: {block_number}");
57/// }
58/// # Ok(())
59/// # }
60/// ```
61// TODO: make this be able to be spawned on the current thread instead of forcing a task.
62#[derive(Debug)]
63#[must_use = "this builder does nothing unless you call `spawn` or `into_stream`"]
64pub struct PollerBuilder<Params, Resp> {
65    /// The client to poll with.
66    client: WeakClient,
67
68    /// Request Method
69    method: Cow<'static, str>,
70    params: Params,
71
72    // config options
73    channel_size: usize,
74    poll_interval: Duration,
75    limit: usize,
76
77    _pd: PhantomData<fn() -> Resp>,
78}
79
80impl<Params, Resp> PollerBuilder<Params, Resp>
81where
82    Params: RpcSend + 'static,
83    Resp: RpcRecv + Clone,
84{
85    /// Create a new poller task.
86    pub fn new(client: WeakClient, method: impl Into<Cow<'static, str>>, params: Params) -> Self {
87        let poll_interval =
88            client.upgrade().map_or_else(|| Duration::from_secs(7), |c| c.poll_interval());
89        Self {
90            client,
91            method: method.into(),
92            params,
93            channel_size: 16,
94            poll_interval,
95            limit: usize::MAX,
96            _pd: PhantomData,
97        }
98    }
99
100    /// Returns the channel size for the poller task.
101    pub const fn channel_size(&self) -> usize {
102        self.channel_size
103    }
104
105    /// Sets the channel size for the poller task.
106    pub const fn set_channel_size(&mut self, channel_size: usize) {
107        self.channel_size = channel_size;
108    }
109
110    /// Sets the channel size for the poller task.
111    pub const fn with_channel_size(mut self, channel_size: usize) -> Self {
112        self.set_channel_size(channel_size);
113        self
114    }
115
116    /// Returns the limit on the number of successful polls.
117    pub const fn limit(&self) -> usize {
118        self.limit
119    }
120
121    /// Sets a limit on the number of successful polls.
122    pub fn set_limit(&mut self, limit: Option<usize>) {
123        self.limit = limit.unwrap_or(usize::MAX);
124    }
125
126    /// Sets a limit on the number of successful polls.
127    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
128        self.set_limit(limit);
129        self
130    }
131
132    /// Returns the duration between polls.
133    pub const fn poll_interval(&self) -> Duration {
134        self.poll_interval
135    }
136
137    /// Sets the duration between polls.
138    pub const fn set_poll_interval(&mut self, poll_interval: Duration) {
139        self.poll_interval = poll_interval;
140    }
141
142    /// Sets the duration between polls.
143    pub const fn with_poll_interval(mut self, poll_interval: Duration) -> Self {
144        self.set_poll_interval(poll_interval);
145        self
146    }
147
148    /// Starts the poller in a new task, returning a channel to receive the responses on.
149    pub fn spawn(self) -> PollChannel<Resp> {
150        let (tx, rx) = broadcast::channel(self.channel_size);
151        self.into_future(tx).spawn_task();
152        rx.into()
153    }
154
155    async fn into_future(self, tx: broadcast::Sender<Resp>) {
156        let mut stream = self.into_stream();
157        while let Some(resp) = stream.next().await {
158            if tx.send(resp).is_err() {
159                debug!("channel closed");
160                break;
161            }
162        }
163    }
164
165    /// Starts the poller and returns the stream of responses.
166    ///
167    /// Note that this does not spawn the poller on a separate task, thus all responses will be
168    /// polled on the current thread once this stream is polled.
169    pub fn into_stream(self) -> PollerStream<Resp> {
170        PollerStream::new(self)
171    }
172
173    /// Returns the [`WeakClient`] associated with the poller.
174    pub fn client(&self) -> WeakClient {
175        self.client.clone()
176    }
177}
178
179/// State for the polling stream.
180enum PollState<Resp> {
181    /// Poller is paused
182    Paused,
183    /// Waiting to start the next poll.
184    Waiting,
185    /// Currently polling for a response.
186    Polling(
187        alloy_transport::Pbf<
188            'static,
189            Resp,
190            alloy_transport::RpcError<alloy_transport::TransportErrorKind>,
191        >,
192    ),
193    /// Sleeping between polls.
194    Sleeping(Pin<Box<Sleep>>),
195
196    /// Polling has finished due to an error.
197    Finished,
198}
199
200/// A stream of responses from polling an RPC method.
201///
202/// This stream polls the given RPC method at the specified interval and yields the responses.
203///
204/// # Examples
205///
206/// ```no_run
207/// # async fn example(client: alloy_rpc_client::RpcClient) -> Result<(), Box<dyn std::error::Error>> {
208/// use alloy_primitives::U64;
209/// use futures_util::StreamExt;
210///
211/// // Create a poller that fetches block numbers
212/// let poller = client
213///     .prepare_static_poller("eth_blockNumber", ())
214///     .with_poll_interval(std::time::Duration::from_secs(1));
215///
216/// // Convert the block number to a more useful format
217/// let mut stream = poller.into_stream().map(|block_num: U64| block_num.to::<u64>());
218///
219/// while let Some(block_number) = stream.next().await {
220///     println!("Current block: {}", block_number);
221/// }
222/// # Ok(())
223/// # }
224/// ```
225pub struct PollerStream<Resp, Output = Resp, Map = fn(Resp) -> Output> {
226    client: WeakClient,
227    method: Cow<'static, str>,
228    params: Box<RawValue>,
229    poll_interval: Duration,
230    limit: usize,
231    poll_count: usize,
232    state: PollState<Resp>,
233    span: Span,
234    map: Map,
235    _pd: PhantomData<fn() -> Output>,
236}
237
238impl<Resp, Output, Map> std::fmt::Debug for PollerStream<Resp, Output, Map> {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        f.debug_struct("PollerStream")
241            .field("method", &self.method)
242            .field("poll_interval", &self.poll_interval)
243            .field("limit", &self.limit)
244            .field("poll_count", &self.poll_count)
245            .finish_non_exhaustive()
246    }
247}
248
249impl<Resp> PollerStream<Resp> {
250    fn new<Params: Serialize>(builder: PollerBuilder<Params, Resp>) -> Self {
251        let span = debug_span!("poller", method = %builder.method);
252
253        // Serialize params once
254        let params = serde_json::value::to_raw_value(&builder.params).unwrap_or_else(|err| {
255            error!(%err, "failed to serialize params during initialization");
256            // Return empty params, stream will terminate on first poll
257            Box::<RawValue>::default()
258        });
259
260        Self {
261            client: builder.client,
262            method: builder.method,
263            params,
264            poll_interval: builder.poll_interval,
265            limit: builder.limit,
266            poll_count: 0,
267            state: PollState::Waiting,
268            span,
269            map: std::convert::identity,
270            _pd: PhantomData,
271        }
272    }
273
274    /// Get a reference to the [`WeakClient`] used by this poller.
275    pub fn client(&self) -> WeakClient {
276        self.client.clone()
277    }
278
279    /// Pauses the poller until it's unpaused.
280    ///
281    /// While paused the poller will not initiate new rpc requests
282    pub fn pause(&mut self) {
283        self.state = PollState::Paused;
284    }
285
286    /// Unpauses the poller.
287    ///
288    /// The poller will initiate new rpc requests once polled.
289    pub fn unpause(&mut self) {
290        if matches!(self.state, PollState::Paused) {
291            self.state = PollState::Waiting;
292        }
293    }
294}
295
296impl<Resp, Output, Map> PollerStream<Resp, Output, Map>
297where
298    Map: Fn(Resp) -> Output,
299{
300    /// Maps the responses using the provided function.
301    pub fn map<NewOutput, NewMap>(self, map: NewMap) -> PollerStream<Resp, NewOutput, NewMap>
302    where
303        NewMap: Fn(Resp) -> NewOutput,
304    {
305        PollerStream {
306            client: self.client,
307            method: self.method,
308            params: self.params,
309            poll_interval: self.poll_interval,
310            limit: self.limit,
311            poll_count: self.poll_count,
312            state: self.state,
313            span: self.span,
314            map,
315            _pd: PhantomData,
316        }
317    }
318}
319
320impl<Resp, Output, Map> Stream for PollerStream<Resp, Output, Map>
321where
322    Resp: RpcRecv + 'static,
323    Map: Fn(Resp) -> Output + Unpin,
324{
325    type Item = Output;
326
327    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
328        let this = self.get_mut();
329        let _guard = this.span.enter();
330
331        loop {
332            match &mut this.state {
333                PollState::Paused => return Poll::Pending,
334                PollState::Waiting => {
335                    // Check if we've reached the limit
336                    if this.poll_count >= this.limit {
337                        debug!("poll limit reached");
338                        this.state = PollState::Finished;
339                        continue;
340                    }
341
342                    // Check if client is still alive
343                    let Some(client) = this.client.upgrade() else {
344                        debug!("client dropped");
345                        this.state = PollState::Finished;
346                        continue;
347                    };
348
349                    // Start polling
350                    trace!("polling");
351                    let method = this.method.clone();
352                    let params = this.params.clone();
353                    let fut = Box::pin(async move { client.request(method, params).await });
354                    this.state = PollState::Polling(fut);
355                }
356                PollState::Polling(fut) => {
357                    match ready!(fut.poll_unpin(cx)) {
358                        Ok(resp) => {
359                            this.poll_count += 1;
360                            // Start sleeping before next poll
361                            trace!(duration=?this.poll_interval, "sleeping");
362                            let sleep = Box::pin(sleep(this.poll_interval));
363                            this.state = PollState::Sleeping(sleep);
364                            return Poll::Ready(Some((this.map)(resp)));
365                        }
366                        Err(err) => {
367                            error!(%err, "failed to poll");
368
369                            // If the error is a filter not found error, stop
370                            // the poller. Error codes are not consistent
371                            // across reth/geth/nethermind, so we check
372                            // just the message.
373                            if let Some(resp) = err.as_error_resp() {
374                                if resp.message.contains("filter not found") {
375                                    warn!("server has dropped the filter, stopping poller");
376                                    this.state = PollState::Finished;
377                                    continue;
378                                }
379                            }
380
381                            // Start sleeping before retry
382                            trace!(duration=?this.poll_interval, "sleeping after error");
383
384                            let sleep = Box::pin(sleep(this.poll_interval));
385                            this.state = PollState::Sleeping(sleep);
386                        }
387                    }
388                }
389                PollState::Sleeping(sleep) => {
390                    ready!(sleep.as_mut().poll(cx));
391                    this.state = PollState::Waiting;
392                }
393                PollState::Finished => {
394                    return Poll::Ready(None);
395                }
396            }
397        }
398    }
399}
400
401impl<Resp, Output, Map> FusedStream for PollerStream<Resp, Output, Map>
402where
403    Resp: RpcRecv + 'static,
404    Map: Fn(Resp) -> Output + Unpin,
405{
406    fn is_terminated(&self) -> bool {
407        matches!(self.state, PollState::Finished)
408    }
409}
410
411/// A channel yielding responses from a poller task.
412///
413/// This stream is backed by a coroutine, and will continue to produce responses
414/// until the poller task is dropped. The poller task is dropped when all
415/// [`RpcClient`] instances are dropped, or when all listening `PollChannel` are
416/// dropped.
417///
418/// The poller task also ignores errors from the server and deserialization
419/// errors, and will continue to poll until the client is dropped.
420///
421/// [`RpcClient`]: crate::RpcClient
422#[derive(Debug)]
423pub struct PollChannel<Resp> {
424    rx: broadcast::Receiver<Resp>,
425}
426
427impl<Resp> From<broadcast::Receiver<Resp>> for PollChannel<Resp> {
428    fn from(rx: broadcast::Receiver<Resp>) -> Self {
429        Self { rx }
430    }
431}
432
433impl<Resp> Deref for PollChannel<Resp> {
434    type Target = broadcast::Receiver<Resp>;
435
436    fn deref(&self) -> &Self::Target {
437        &self.rx
438    }
439}
440
441impl<Resp> DerefMut for PollChannel<Resp> {
442    fn deref_mut(&mut self) -> &mut Self::Target {
443        &mut self.rx
444    }
445}
446
447impl<Resp> PollChannel<Resp>
448where
449    Resp: RpcRecv + Clone,
450{
451    /// Resubscribe to the poller task.
452    pub fn resubscribe(&self) -> Self {
453        Self { rx: self.rx.resubscribe() }
454    }
455
456    /// Converts the poll channel into a stream.
457    // TODO: can we name this type?
458    pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
459        self.into_stream_raw().filter_map(|r| futures::future::ready(r.ok()))
460    }
461
462    /// Converts the poll channel into a stream that also yields
463    /// [lag errors](tokio_stream::wrappers::errors::BroadcastStreamRecvError).
464    pub fn into_stream_raw(self) -> BroadcastStream<Resp> {
465        self.rx.into()
466    }
467}
468
469#[cfg(test)]
470#[allow(clippy::missing_const_for_fn)]
471fn _assert_unpin() {
472    fn _assert<T: Unpin>() {}
473    _assert::<PollChannel<()>>();
474}