1use crate::WeakClient;
2use alloy_json_rpc::{RpcRecv, RpcSend};
3use alloy_transport::utils::Spawnable;
4use futures::{ready, stream::FusedStream, Future, FutureExt, Stream, StreamExt};
5use serde::Serialize;
6use serde_json::value::RawValue;
7use std::{
8 borrow::Cow,
9 marker::PhantomData,
10 ops::{Deref, DerefMut},
11 pin::Pin,
12 task::{Context, Poll},
13 time::Duration,
14};
15use tokio::sync::broadcast;
16use tokio_stream::wrappers::BroadcastStream;
17use tracing::Span;
18
19#[cfg(target_family = "wasm")]
20use wasmtimer::tokio::{sleep, Sleep};
21
22#[cfg(not(target_family = "wasm"))]
23use tokio::time::{sleep, Sleep};
24
25#[derive(Debug)]
63#[must_use = "this builder does nothing unless you call `spawn` or `into_stream`"]
64pub struct PollerBuilder<Params, Resp> {
65 client: WeakClient,
67
68 method: Cow<'static, str>,
70 params: Params,
71
72 channel_size: usize,
74 poll_interval: Duration,
75 limit: usize,
76
77 _pd: PhantomData<fn() -> Resp>,
78}
79
80impl<Params, Resp> PollerBuilder<Params, Resp>
81where
82 Params: RpcSend + 'static,
83 Resp: RpcRecv + Clone,
84{
85 pub fn new(client: WeakClient, method: impl Into<Cow<'static, str>>, params: Params) -> Self {
87 let poll_interval =
88 client.upgrade().map_or_else(|| Duration::from_secs(7), |c| c.poll_interval());
89 Self {
90 client,
91 method: method.into(),
92 params,
93 channel_size: 16,
94 poll_interval,
95 limit: usize::MAX,
96 _pd: PhantomData,
97 }
98 }
99
100 pub const fn channel_size(&self) -> usize {
102 self.channel_size
103 }
104
105 pub const fn set_channel_size(&mut self, channel_size: usize) {
107 self.channel_size = channel_size;
108 }
109
110 pub const fn with_channel_size(mut self, channel_size: usize) -> Self {
112 self.set_channel_size(channel_size);
113 self
114 }
115
116 pub const fn limit(&self) -> usize {
118 self.limit
119 }
120
121 pub fn set_limit(&mut self, limit: Option<usize>) {
123 self.limit = limit.unwrap_or(usize::MAX);
124 }
125
126 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
128 self.set_limit(limit);
129 self
130 }
131
132 pub const fn poll_interval(&self) -> Duration {
134 self.poll_interval
135 }
136
137 pub const fn set_poll_interval(&mut self, poll_interval: Duration) {
139 self.poll_interval = poll_interval;
140 }
141
142 pub const fn with_poll_interval(mut self, poll_interval: Duration) -> Self {
144 self.set_poll_interval(poll_interval);
145 self
146 }
147
148 pub fn spawn(self) -> PollChannel<Resp> {
150 let (tx, rx) = broadcast::channel(self.channel_size);
151 self.into_future(tx).spawn_task();
152 rx.into()
153 }
154
155 async fn into_future(self, tx: broadcast::Sender<Resp>) {
156 let mut stream = self.into_stream();
157 while let Some(resp) = stream.next().await {
158 if tx.send(resp).is_err() {
159 debug!("channel closed");
160 break;
161 }
162 }
163 }
164
165 pub fn into_stream(self) -> PollerStream<Resp> {
170 PollerStream::new(self)
171 }
172
173 pub fn client(&self) -> WeakClient {
175 self.client.clone()
176 }
177}
178
179enum PollState<Resp> {
181 Paused,
183 Waiting,
185 Polling(
187 alloy_transport::Pbf<
188 'static,
189 Resp,
190 alloy_transport::RpcError<alloy_transport::TransportErrorKind>,
191 >,
192 ),
193 Sleeping(Pin<Box<Sleep>>),
195
196 Finished,
198}
199
200pub struct PollerStream<Resp, Output = Resp, Map = fn(Resp) -> Output> {
226 client: WeakClient,
227 method: Cow<'static, str>,
228 params: Box<RawValue>,
229 poll_interval: Duration,
230 limit: usize,
231 poll_count: usize,
232 state: PollState<Resp>,
233 span: Span,
234 map: Map,
235 _pd: PhantomData<fn() -> Output>,
236}
237
238impl<Resp, Output, Map> std::fmt::Debug for PollerStream<Resp, Output, Map> {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 f.debug_struct("PollerStream")
241 .field("method", &self.method)
242 .field("poll_interval", &self.poll_interval)
243 .field("limit", &self.limit)
244 .field("poll_count", &self.poll_count)
245 .finish_non_exhaustive()
246 }
247}
248
249impl<Resp> PollerStream<Resp> {
250 fn new<Params: Serialize>(builder: PollerBuilder<Params, Resp>) -> Self {
251 let span = debug_span!("poller", method = %builder.method);
252
253 let params = serde_json::value::to_raw_value(&builder.params).unwrap_or_else(|err| {
255 error!(%err, "failed to serialize params during initialization");
256 Box::<RawValue>::default()
258 });
259
260 Self {
261 client: builder.client,
262 method: builder.method,
263 params,
264 poll_interval: builder.poll_interval,
265 limit: builder.limit,
266 poll_count: 0,
267 state: PollState::Waiting,
268 span,
269 map: std::convert::identity,
270 _pd: PhantomData,
271 }
272 }
273
274 pub fn client(&self) -> WeakClient {
276 self.client.clone()
277 }
278
279 pub fn pause(&mut self) {
283 self.state = PollState::Paused;
284 }
285
286 pub fn unpause(&mut self) {
290 if matches!(self.state, PollState::Paused) {
291 self.state = PollState::Waiting;
292 }
293 }
294}
295
296impl<Resp, Output, Map> PollerStream<Resp, Output, Map>
297where
298 Map: Fn(Resp) -> Output,
299{
300 pub fn map<NewOutput, NewMap>(self, map: NewMap) -> PollerStream<Resp, NewOutput, NewMap>
302 where
303 NewMap: Fn(Resp) -> NewOutput,
304 {
305 PollerStream {
306 client: self.client,
307 method: self.method,
308 params: self.params,
309 poll_interval: self.poll_interval,
310 limit: self.limit,
311 poll_count: self.poll_count,
312 state: self.state,
313 span: self.span,
314 map,
315 _pd: PhantomData,
316 }
317 }
318}
319
320impl<Resp, Output, Map> Stream for PollerStream<Resp, Output, Map>
321where
322 Resp: RpcRecv + 'static,
323 Map: Fn(Resp) -> Output + Unpin,
324{
325 type Item = Output;
326
327 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
328 let this = self.get_mut();
329 let _guard = this.span.enter();
330
331 loop {
332 match &mut this.state {
333 PollState::Paused => return Poll::Pending,
334 PollState::Waiting => {
335 if this.poll_count >= this.limit {
337 debug!("poll limit reached");
338 this.state = PollState::Finished;
339 continue;
340 }
341
342 let Some(client) = this.client.upgrade() else {
344 debug!("client dropped");
345 this.state = PollState::Finished;
346 continue;
347 };
348
349 trace!("polling");
351 let method = this.method.clone();
352 let params = this.params.clone();
353 let fut = Box::pin(async move { client.request(method, params).await });
354 this.state = PollState::Polling(fut);
355 }
356 PollState::Polling(fut) => {
357 match ready!(fut.poll_unpin(cx)) {
358 Ok(resp) => {
359 this.poll_count += 1;
360 trace!(duration=?this.poll_interval, "sleeping");
362 let sleep = Box::pin(sleep(this.poll_interval));
363 this.state = PollState::Sleeping(sleep);
364 return Poll::Ready(Some((this.map)(resp)));
365 }
366 Err(err) => {
367 error!(%err, "failed to poll");
368
369 if let Some(resp) = err.as_error_resp() {
374 if resp.message.contains("filter not found") {
375 warn!("server has dropped the filter, stopping poller");
376 this.state = PollState::Finished;
377 continue;
378 }
379 }
380
381 trace!(duration=?this.poll_interval, "sleeping after error");
383
384 let sleep = Box::pin(sleep(this.poll_interval));
385 this.state = PollState::Sleeping(sleep);
386 }
387 }
388 }
389 PollState::Sleeping(sleep) => {
390 ready!(sleep.as_mut().poll(cx));
391 this.state = PollState::Waiting;
392 }
393 PollState::Finished => {
394 return Poll::Ready(None);
395 }
396 }
397 }
398 }
399}
400
401impl<Resp, Output, Map> FusedStream for PollerStream<Resp, Output, Map>
402where
403 Resp: RpcRecv + 'static,
404 Map: Fn(Resp) -> Output + Unpin,
405{
406 fn is_terminated(&self) -> bool {
407 matches!(self.state, PollState::Finished)
408 }
409}
410
411#[derive(Debug)]
423pub struct PollChannel<Resp> {
424 rx: broadcast::Receiver<Resp>,
425}
426
427impl<Resp> From<broadcast::Receiver<Resp>> for PollChannel<Resp> {
428 fn from(rx: broadcast::Receiver<Resp>) -> Self {
429 Self { rx }
430 }
431}
432
433impl<Resp> Deref for PollChannel<Resp> {
434 type Target = broadcast::Receiver<Resp>;
435
436 fn deref(&self) -> &Self::Target {
437 &self.rx
438 }
439}
440
441impl<Resp> DerefMut for PollChannel<Resp> {
442 fn deref_mut(&mut self) -> &mut Self::Target {
443 &mut self.rx
444 }
445}
446
447impl<Resp> PollChannel<Resp>
448where
449 Resp: RpcRecv + Clone,
450{
451 pub fn resubscribe(&self) -> Self {
453 Self { rx: self.rx.resubscribe() }
454 }
455
456 pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
459 self.into_stream_raw().filter_map(|r| futures::future::ready(r.ok()))
460 }
461
462 pub fn into_stream_raw(self) -> BroadcastStream<Resp> {
465 self.rx.into()
466 }
467}
468
469#[cfg(test)]
470#[allow(clippy::missing_const_for_fn)]
471fn _assert_unpin() {
472 fn _assert<T: Unpin>() {}
473 _assert::<PollChannel<()>>();
474}