1use crate::{Provider, RootProvider};
4use alloy_consensus::BlockHeader;
5use alloy_json_rpc::RpcError;
6use alloy_network::{BlockResponse, Network};
7use alloy_primitives::{
8 map::{B256HashMap, B256HashSet},
9 TxHash, B256,
10};
11use alloy_transport::{utils::Spawnable, TransportError};
12use futures::{stream::StreamExt, FutureExt, Stream};
13use std::{
14 collections::{BTreeMap, VecDeque},
15 fmt,
16 future::Future,
17 time::Duration,
18};
19use tokio::{
20 select,
21 sync::{mpsc, oneshot},
22};
23
24#[cfg(target_family = "wasm")]
25use wasmtimer::{
26 std::Instant,
27 tokio::{interval, sleep_until},
28};
29
30#[cfg(not(target_family = "wasm"))]
31use {
32 std::time::Instant,
33 tokio::time::{interval, sleep_until},
34};
35
36#[derive(Debug, thiserror::Error)]
38pub enum PendingTransactionError {
39 #[error("failed to register pending transaction to watch")]
41 FailedToRegister,
42
43 #[error(transparent)]
45 TransportError(#[from] TransportError),
46
47 #[error(transparent)]
49 Recv(#[from] oneshot::error::RecvError),
50
51 #[error(transparent)]
53 TxWatcher(#[from] WatchTxError),
54}
55
56#[must_use = "this type does nothing unless you call `register`, `watch` or `get_receipt`"]
90#[derive(Debug)]
91#[doc(alias = "PendingTxBuilder")]
92pub struct PendingTransactionBuilder<N: Network> {
93 config: PendingTransactionConfig,
94 provider: RootProvider<N>,
95}
96
97impl<N: Network> PendingTransactionBuilder<N> {
98 pub const fn new(provider: RootProvider<N>, tx_hash: TxHash) -> Self {
100 Self::from_config(provider, PendingTransactionConfig::new(tx_hash))
101 }
102
103 pub const fn from_config(provider: RootProvider<N>, config: PendingTransactionConfig) -> Self {
105 Self { config, provider }
106 }
107
108 pub const fn inner(&self) -> &PendingTransactionConfig {
110 &self.config
111 }
112
113 pub fn into_inner(self) -> PendingTransactionConfig {
115 self.config
116 }
117
118 pub const fn provider(&self) -> &RootProvider<N> {
120 &self.provider
121 }
122
123 pub fn split(self) -> (RootProvider<N>, PendingTransactionConfig) {
125 (self.provider, self.config)
126 }
127
128 pub fn inspect<F: FnOnce(&Self)>(self, f: F) -> Self {
130 f(&self);
131 self
132 }
133
134 #[doc(alias = "transaction_hash")]
136 pub const fn tx_hash(&self) -> &TxHash {
137 self.config.tx_hash()
138 }
139
140 #[doc(alias = "set_transaction_hash")]
142 pub fn set_tx_hash(&mut self, tx_hash: TxHash) {
143 self.config.set_tx_hash(tx_hash);
144 }
145
146 #[doc(alias = "with_transaction_hash")]
148 pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
149 self.config.tx_hash = tx_hash;
150 self
151 }
152
153 #[doc(alias = "confirmations")]
155 pub const fn required_confirmations(&self) -> u64 {
156 self.config.required_confirmations()
157 }
158
159 #[doc(alias = "set_confirmations")]
161 pub fn set_required_confirmations(&mut self, confirmations: u64) {
162 self.config.set_required_confirmations(confirmations);
163 }
164
165 #[doc(alias = "with_confirmations")]
167 pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
168 self.config.required_confirmations = confirmations;
169 self
170 }
171
172 pub const fn timeout(&self) -> Option<Duration> {
174 self.config.timeout()
175 }
176
177 pub fn set_timeout(&mut self, timeout: Option<Duration>) {
179 self.config.set_timeout(timeout);
180 }
181
182 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
184 self.config.timeout = timeout;
185 self
186 }
187
188 #[doc(alias = "build")]
198 pub async fn register(self) -> Result<PendingTransaction, PendingTransactionError> {
199 self.provider.watch_pending_transaction(self.config).await
200 }
201
202 pub async fn watch(self) -> Result<TxHash, PendingTransactionError> {
210 self.register().await?.await
211 }
212
213 pub async fn get_receipt(self) -> Result<N::ReceiptResponse, PendingTransactionError> {
225 let hash = self.config.tx_hash;
226 let mut pending_tx = self.provider.watch_pending_transaction(self.config).await?;
227
228 let mut interval = interval(self.provider.client().poll_interval());
231
232 loop {
233 let mut confirmed = false;
234
235 select! {
236 _ = interval.tick() => {},
237 res = &mut pending_tx => {
238 let _ = res?;
239 confirmed = true;
240 }
241 }
242
243 let receipt = self.provider.get_transaction_receipt(hash).await?;
245 if let Some(receipt) = receipt {
246 return Ok(receipt);
247 }
248
249 if confirmed {
250 return Err(RpcError::NullResp.into());
251 }
252 }
253 }
254}
255
256#[must_use = "this type does nothing unless you call `with_provider`"]
261#[derive(Clone, Debug)]
262#[doc(alias = "PendingTxConfig", alias = "TxPendingConfig")]
263pub struct PendingTransactionConfig {
264 #[doc(alias = "transaction_hash")]
266 tx_hash: TxHash,
267
268 required_confirmations: u64,
270
271 timeout: Option<Duration>,
273}
274
275impl PendingTransactionConfig {
276 pub const fn new(tx_hash: TxHash) -> Self {
278 Self { tx_hash, required_confirmations: 1, timeout: None }
279 }
280
281 #[doc(alias = "transaction_hash")]
283 pub const fn tx_hash(&self) -> &TxHash {
284 &self.tx_hash
285 }
286
287 #[doc(alias = "set_transaction_hash")]
289 pub fn set_tx_hash(&mut self, tx_hash: TxHash) {
290 self.tx_hash = tx_hash;
291 }
292
293 #[doc(alias = "with_transaction_hash")]
295 pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
296 self.tx_hash = tx_hash;
297 self
298 }
299
300 #[doc(alias = "confirmations")]
302 pub const fn required_confirmations(&self) -> u64 {
303 self.required_confirmations
304 }
305
306 #[doc(alias = "set_confirmations")]
308 pub fn set_required_confirmations(&mut self, confirmations: u64) {
309 self.required_confirmations = confirmations;
310 }
311
312 #[doc(alias = "with_confirmations")]
314 pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
315 self.required_confirmations = confirmations;
316 self
317 }
318
319 pub const fn timeout(&self) -> Option<Duration> {
321 self.timeout
322 }
323
324 pub fn set_timeout(&mut self, timeout: Option<Duration>) {
326 self.timeout = timeout;
327 }
328
329 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
331 self.timeout = timeout;
332 self
333 }
334
335 pub const fn with_provider<N: Network>(
337 self,
338 provider: RootProvider<N>,
339 ) -> PendingTransactionBuilder<N> {
340 PendingTransactionBuilder::from_config(provider, self)
341 }
342}
343
344impl From<TxHash> for PendingTransactionConfig {
345 fn from(tx_hash: TxHash) -> Self {
346 Self::new(tx_hash)
347 }
348}
349
350#[derive(Debug, thiserror::Error)]
352pub enum WatchTxError {
353 #[error("transaction was not confirmed within the timeout")]
355 Timeout,
356}
357
358#[doc(alias = "TransactionWatcher")]
360struct TxWatcher {
361 config: PendingTransactionConfig,
362 received_at_block: Option<u64>,
365 tx: oneshot::Sender<Result<(), WatchTxError>>,
366}
367
368impl TxWatcher {
369 fn notify(self, result: Result<(), WatchTxError>) {
371 debug!(tx=%self.config.tx_hash, "notifying");
372 let _ = self.tx.send(result);
373 }
374}
375
376#[doc(alias = "PendingTx", alias = "TxPending")]
382pub struct PendingTransaction {
383 #[doc(alias = "transaction_hash")]
385 pub(crate) tx_hash: TxHash,
386 pub(crate) rx: oneshot::Receiver<Result<(), WatchTxError>>,
389}
390
391impl fmt::Debug for PendingTransaction {
392 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
393 f.debug_struct("PendingTransaction").field("tx_hash", &self.tx_hash).finish()
394 }
395}
396
397impl PendingTransaction {
398 pub fn ready(tx_hash: TxHash) -> Self {
400 let (tx, rx) = oneshot::channel();
401 tx.send(Ok(())).ok(); Self { tx_hash, rx }
403 }
404
405 #[doc(alias = "transaction_hash")]
407 pub const fn tx_hash(&self) -> &TxHash {
408 &self.tx_hash
409 }
410}
411
412impl Future for PendingTransaction {
413 type Output = Result<TxHash, PendingTransactionError>;
414
415 fn poll(
416 mut self: std::pin::Pin<&mut Self>,
417 cx: &mut std::task::Context<'_>,
418 ) -> std::task::Poll<Self::Output> {
419 self.rx.poll_unpin(cx).map(|res| {
420 res??;
421 Ok(self.tx_hash)
422 })
423 }
424}
425
426#[derive(Clone, Debug)]
428pub(crate) struct HeartbeatHandle {
429 tx: mpsc::Sender<TxWatcher>,
430}
431
432impl HeartbeatHandle {
433 #[doc(alias = "watch_transaction")]
435 pub(crate) async fn watch_tx(
436 &self,
437 config: PendingTransactionConfig,
438 received_at_block: Option<u64>,
439 ) -> Result<PendingTransaction, PendingTransactionConfig> {
440 let (tx, rx) = oneshot::channel();
441 let tx_hash = config.tx_hash;
442 match self.tx.send(TxWatcher { config, received_at_block, tx }).await {
443 Ok(()) => Ok(PendingTransaction { tx_hash, rx }),
444 Err(e) => Err(e.0.config),
445 }
446 }
447}
448
449pub(crate) struct Heartbeat<N, S> {
451 stream: futures::stream::Fuse<S>,
453
454 past_blocks: VecDeque<(u64, B256HashSet)>,
456
457 unconfirmed: B256HashMap<TxWatcher>,
459
460 waiting_confs: BTreeMap<u64, Vec<TxWatcher>>,
462
463 reap_at: BTreeMap<Instant, B256>,
465
466 _network: std::marker::PhantomData<N>,
467}
468
469impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
470 pub(crate) fn new(stream: S) -> Self {
472 Self {
473 stream: stream.fuse(),
474 past_blocks: Default::default(),
475 unconfirmed: Default::default(),
476 waiting_confs: Default::default(),
477 reap_at: Default::default(),
478 _network: Default::default(),
479 }
480 }
481
482 fn check_confirmations(&mut self, current_height: u64) {
484 let to_keep = self.waiting_confs.split_off(&(current_height + 1));
485 let to_notify = std::mem::replace(&mut self.waiting_confs, to_keep);
486 for watcher in to_notify.into_values().flatten() {
487 watcher.notify(Ok(()));
488 }
489 }
490
491 fn next_reap(&self) -> Instant {
494 self.reap_at
495 .first_key_value()
496 .map(|(k, _)| *k)
497 .unwrap_or_else(|| Instant::now() + Duration::from_secs(60_000))
498 }
499
500 fn reap_timeouts(&mut self) {
502 let now = Instant::now();
503 let to_keep = self.reap_at.split_off(&now);
504 let to_reap = std::mem::replace(&mut self.reap_at, to_keep);
505
506 for tx_hash in to_reap.values() {
507 if let Some(watcher) = self.unconfirmed.remove(tx_hash) {
508 debug!(tx=%tx_hash, "reaped");
509 watcher.notify(Err(WatchTxError::Timeout));
510 }
511 }
512 }
513
514 fn move_reorg_to_unconfirmed(&mut self, new_height: u64) {
518 for waiters in self.waiting_confs.values_mut() {
519 *waiters = std::mem::take(waiters).into_iter().filter_map(|watcher| {
520 if let Some(received_at_block) = watcher.received_at_block {
521 if received_at_block >= new_height {
523 let hash = watcher.config.tx_hash;
524 debug!(tx=%hash, %received_at_block, %new_height, "return to unconfirmed due to reorg");
525 self.unconfirmed.insert(hash, watcher);
526 return None;
527 }
528 }
529 Some(watcher)
530 }).collect();
531 }
532 }
533
534 fn handle_watch_ix(&mut self, to_watch: TxWatcher) {
537 debug!(tx=%to_watch.config.tx_hash, "watching");
539 trace!(?to_watch.config, ?to_watch.received_at_block);
540 if let Some(received_at_block) = to_watch.received_at_block {
541 let current_block =
544 self.past_blocks.back().map(|(h, _)| *h).unwrap_or(received_at_block);
545 self.add_to_waiting_list(to_watch, current_block);
546 return;
547 }
548
549 if let Some(timeout) = to_watch.config.timeout {
550 self.reap_at.insert(Instant::now() + timeout, to_watch.config.tx_hash);
551 }
552 for (block_height, txs) in self.past_blocks.iter().rev() {
555 if txs.contains(&to_watch.config.tx_hash) {
556 let confirmations = to_watch.config.required_confirmations;
557 let confirmed_at = *block_height + confirmations - 1;
558 let current_height = self.past_blocks.back().map(|(h, _)| *h).unwrap();
559
560 if confirmed_at <= current_height {
561 to_watch.notify(Ok(()));
562 } else {
563 debug!(tx=%to_watch.config.tx_hash, %block_height, confirmations, "adding to waiting list");
564 self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
565 }
566 return;
567 }
568 }
569
570 self.unconfirmed.insert(to_watch.config.tx_hash, to_watch);
571 }
572
573 fn add_to_waiting_list(&mut self, watcher: TxWatcher, block_height: u64) {
574 let confirmations = watcher.config.required_confirmations;
575 debug!(tx=%watcher.config.tx_hash, %block_height, confirmations, "adding to waiting list");
576 self.waiting_confs.entry(block_height + confirmations - 1).or_default().push(watcher);
577 }
578
579 fn handle_new_block(&mut self, block: N::BlockResponse) {
583 let block_height = block.header().as_ref().number();
584 debug!(%block_height, "handling block");
585
586 const MAX_BLOCKS_TO_RETAIN: usize = 10;
593 if self.past_blocks.len() >= MAX_BLOCKS_TO_RETAIN {
594 self.past_blocks.pop_front();
595 }
596 if let Some((last_height, _)) = self.past_blocks.back().as_ref() {
597 if *last_height + 1 != block_height {
599 warn!(%block_height, last_height, "reorg detected");
601 self.move_reorg_to_unconfirmed(block_height);
602 self.past_blocks.retain(|(h, _)| *h < block_height);
604 }
605 }
606 self.past_blocks.push_back((block_height, block.transactions().hashes().collect()));
607
608 let to_check: Vec<_> = block
610 .transactions()
611 .hashes()
612 .filter_map(|tx_hash| self.unconfirmed.remove(&tx_hash))
613 .collect();
614 for mut watcher in to_check {
615 let confirmations = watcher.config.required_confirmations;
617 if confirmations <= 1 {
618 watcher.notify(Ok(()));
619 continue;
620 }
621 if let Some(set_block) = watcher.received_at_block {
625 warn!(tx=%watcher.config.tx_hash, set_block=%set_block, new_block=%block_height, "received_at_block already set");
626 } else {
628 watcher.received_at_block = Some(block_height);
629 }
630 self.add_to_waiting_list(watcher, block_height);
631 }
632
633 self.check_confirmations(block_height);
634 }
635}
636
637#[cfg(target_family = "wasm")]
638impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
639 pub(crate) fn spawn(self) -> HeartbeatHandle {
641 let (task, handle) = self.consume();
642 task.spawn_task();
643 handle
644 }
645}
646
647#[cfg(not(target_family = "wasm"))]
648impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + Send + 'static> Heartbeat<N, S> {
649 pub(crate) fn spawn(self) -> HeartbeatHandle {
651 let (task, handle) = self.consume();
652 task.spawn_task();
653 handle
654 }
655}
656
657impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
658 fn consume(self) -> (impl Future<Output = ()>, HeartbeatHandle) {
659 let (ix_tx, ixns) = mpsc::channel(64);
660 (self.into_future(ixns), HeartbeatHandle { tx: ix_tx })
661 }
662
663 async fn into_future(mut self, mut ixns: mpsc::Receiver<TxWatcher>) {
664 'shutdown: loop {
665 {
666 let next_reap = self.next_reap();
667 let sleep = std::pin::pin!(sleep_until(next_reap.into()));
668
669 select! {
672 biased;
673
674 ix_opt = ixns.recv() => match ix_opt {
676 Some(to_watch) => self.handle_watch_ix(to_watch),
677 None => break 'shutdown, },
679
680 Some(block) = self.stream.next() => {
682 self.handle_new_block(block);
683 },
684
685 _ = sleep => {},
688 }
689 }
690
691 self.reap_timeouts();
693 }
694 }
695}