alloy_provider/
heart.rs

1//! Block heartbeat and pending transaction watcher.
2
3use crate::{Provider, RootProvider};
4use alloy_consensus::BlockHeader;
5use alloy_json_rpc::RpcError;
6use alloy_network::{BlockResponse, Network};
7use alloy_primitives::{
8    map::{B256HashMap, B256HashSet},
9    TxHash, B256,
10};
11use alloy_transport::{utils::Spawnable, TransportError};
12use futures::{stream::StreamExt, FutureExt, Stream};
13use std::{
14    collections::{BTreeMap, VecDeque},
15    fmt,
16    future::Future,
17    time::Duration,
18};
19use tokio::{
20    select,
21    sync::{mpsc, oneshot},
22};
23
24#[cfg(target_family = "wasm")]
25use wasmtimer::{
26    std::Instant,
27    tokio::{interval, sleep_until},
28};
29
30#[cfg(not(target_family = "wasm"))]
31use {
32    std::time::Instant,
33    tokio::time::{interval, sleep_until},
34};
35
36/// Errors which may occur when watching a pending transaction.
37#[derive(Debug, thiserror::Error)]
38pub enum PendingTransactionError {
39    /// Failed to register pending transaction in heartbeat.
40    #[error("failed to register pending transaction to watch")]
41    FailedToRegister,
42
43    /// Underlying transport error.
44    #[error(transparent)]
45    TransportError(#[from] TransportError),
46
47    /// Error occurred while getting response from the heartbeat.
48    #[error(transparent)]
49    Recv(#[from] oneshot::error::RecvError),
50
51    /// Errors that may occur when watching a transaction.
52    #[error(transparent)]
53    TxWatcher(#[from] WatchTxError),
54}
55
56/// A builder for configuring a pending transaction watcher.
57///
58/// # Examples
59///
60/// Send and wait for a transaction to be confirmed 2 times, with a timeout of 60 seconds:
61///
62/// ```no_run
63/// # async fn example<N: alloy_network::Network>(provider: impl alloy_provider::Provider, tx: alloy_rpc_types_eth::transaction::TransactionRequest) -> Result<(), Box<dyn std::error::Error>> {
64/// // Send a transaction, and configure the pending transaction.
65/// let builder = provider.send_transaction(tx)
66///     .await?
67///     .with_required_confirmations(2)
68///     .with_timeout(Some(std::time::Duration::from_secs(60)));
69/// // Register the pending transaction with the provider.
70/// let pending_tx = builder.register().await?;
71/// // Wait for the transaction to be confirmed 2 times.
72/// let tx_hash = pending_tx.await?;
73/// # Ok(())
74/// # }
75/// ```
76///
77/// This can also be more concisely written using `watch`:
78/// ```no_run
79/// # async fn example<N: alloy_network::Network>(provider: impl alloy_provider::Provider, tx: alloy_rpc_types_eth::transaction::TransactionRequest) -> Result<(), Box<dyn std::error::Error>> {
80/// let tx_hash = provider.send_transaction(tx)
81///     .await?
82///     .with_required_confirmations(2)
83///     .with_timeout(Some(std::time::Duration::from_secs(60)))
84///     .watch()
85///     .await?;
86/// # Ok(())
87/// # }
88/// ```
89#[must_use = "this type does nothing unless you call `register`, `watch` or `get_receipt`"]
90#[derive(Debug)]
91#[doc(alias = "PendingTxBuilder")]
92pub struct PendingTransactionBuilder<N: Network> {
93    config: PendingTransactionConfig,
94    provider: RootProvider<N>,
95}
96
97impl<N: Network> PendingTransactionBuilder<N> {
98    /// Creates a new pending transaction builder.
99    pub const fn new(provider: RootProvider<N>, tx_hash: TxHash) -> Self {
100        Self::from_config(provider, PendingTransactionConfig::new(tx_hash))
101    }
102
103    /// Creates a new pending transaction builder from the given configuration.
104    pub const fn from_config(provider: RootProvider<N>, config: PendingTransactionConfig) -> Self {
105        Self { config, provider }
106    }
107
108    /// Returns the inner configuration.
109    pub const fn inner(&self) -> &PendingTransactionConfig {
110        &self.config
111    }
112
113    /// Consumes this builder, returning the inner configuration.
114    pub fn into_inner(self) -> PendingTransactionConfig {
115        self.config
116    }
117
118    /// Returns the provider.
119    pub const fn provider(&self) -> &RootProvider<N> {
120        &self.provider
121    }
122
123    /// Consumes this builder, returning the provider and the configuration.
124    pub fn split(self) -> (RootProvider<N>, PendingTransactionConfig) {
125        (self.provider, self.config)
126    }
127
128    /// Calls a function with a reference to the value.
129    pub fn inspect<F: FnOnce(&Self)>(self, f: F) -> Self {
130        f(&self);
131        self
132    }
133
134    /// Returns the transaction hash.
135    #[doc(alias = "transaction_hash")]
136    pub const fn tx_hash(&self) -> &TxHash {
137        self.config.tx_hash()
138    }
139
140    /// Sets the transaction hash.
141    #[doc(alias = "set_transaction_hash")]
142    pub fn set_tx_hash(&mut self, tx_hash: TxHash) {
143        self.config.set_tx_hash(tx_hash);
144    }
145
146    /// Sets the transaction hash.
147    #[doc(alias = "with_transaction_hash")]
148    pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
149        self.config.tx_hash = tx_hash;
150        self
151    }
152
153    /// Returns the number of confirmations to wait for.
154    #[doc(alias = "confirmations")]
155    pub const fn required_confirmations(&self) -> u64 {
156        self.config.required_confirmations()
157    }
158
159    /// Sets the number of confirmations to wait for.
160    #[doc(alias = "set_confirmations")]
161    pub fn set_required_confirmations(&mut self, confirmations: u64) {
162        self.config.set_required_confirmations(confirmations);
163    }
164
165    /// Sets the number of confirmations to wait for.
166    #[doc(alias = "with_confirmations")]
167    pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
168        self.config.required_confirmations = confirmations;
169        self
170    }
171
172    /// Returns the timeout.
173    pub const fn timeout(&self) -> Option<Duration> {
174        self.config.timeout()
175    }
176
177    /// Sets the timeout.
178    pub fn set_timeout(&mut self, timeout: Option<Duration>) {
179        self.config.set_timeout(timeout);
180    }
181
182    /// Sets the timeout.
183    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
184        self.config.timeout = timeout;
185        self
186    }
187
188    /// Registers the watching configuration with the provider.
189    ///
190    /// This does not wait for the transaction to be confirmed, but returns a [`PendingTransaction`]
191    /// that can be awaited at a later moment.
192    ///
193    /// See:
194    /// - [`watch`](Self::watch) for watching the transaction without fetching the receipt.
195    /// - [`get_receipt`](Self::get_receipt) for fetching the receipt after the transaction has been
196    ///   confirmed.
197    #[doc(alias = "build")]
198    pub async fn register(self) -> Result<PendingTransaction, PendingTransactionError> {
199        self.provider.watch_pending_transaction(self.config).await
200    }
201
202    /// Waits for the transaction to confirm with the given number of confirmations.
203    ///
204    /// See:
205    /// - [`register`](Self::register): for registering the transaction without waiting for it to be
206    ///   confirmed.
207    /// - [`get_receipt`](Self::get_receipt) for fetching the receipt after the transaction has been
208    ///   confirmed.
209    pub async fn watch(self) -> Result<TxHash, PendingTransactionError> {
210        self.register().await?.await
211    }
212
213    /// Waits for the transaction to confirm with the given number of confirmations, and
214    /// then fetches its receipt.
215    ///
216    /// Note that this method will call `eth_getTransactionReceipt` on the [**root
217    /// provider**](RootProvider), and not on a specific network provider. This means that any
218    /// overrides or customizations made to the network provider will not be used.
219    ///
220    /// See:
221    /// - [`register`](Self::register): for registering the transaction without waiting for it to be
222    ///   confirmed.
223    /// - [`watch`](Self::watch) for watching the transaction without fetching the receipt.
224    pub async fn get_receipt(self) -> Result<N::ReceiptResponse, PendingTransactionError> {
225        let hash = self.config.tx_hash;
226        let mut pending_tx = self.provider.watch_pending_transaction(self.config).await?;
227
228        // FIXME: this is a hotfix to prevent a race condition where the heartbeat would miss the
229        // block the tx was mined in
230        let mut interval = interval(self.provider.client().poll_interval());
231
232        loop {
233            let mut confirmed = false;
234
235            select! {
236                _ = interval.tick() => {},
237                res = &mut pending_tx => {
238                    let _ = res?;
239                    confirmed = true;
240                }
241            }
242
243            // try to fetch the receipt
244            let receipt = self.provider.get_transaction_receipt(hash).await?;
245            if let Some(receipt) = receipt {
246                return Ok(receipt);
247            }
248
249            if confirmed {
250                return Err(RpcError::NullResp.into());
251            }
252        }
253    }
254}
255
256/// Configuration for watching a pending transaction.
257///
258/// This type can be used to create a [`PendingTransactionBuilder`], but in general it is only used
259/// internally.
260#[must_use = "this type does nothing unless you call `with_provider`"]
261#[derive(Clone, Debug)]
262#[doc(alias = "PendingTxConfig", alias = "TxPendingConfig")]
263pub struct PendingTransactionConfig {
264    /// The transaction hash to watch for.
265    #[doc(alias = "transaction_hash")]
266    tx_hash: TxHash,
267
268    /// Require a number of confirmations.
269    required_confirmations: u64,
270
271    /// Optional timeout for the transaction.
272    timeout: Option<Duration>,
273}
274
275impl PendingTransactionConfig {
276    /// Create a new watch for a transaction.
277    pub const fn new(tx_hash: TxHash) -> Self {
278        Self { tx_hash, required_confirmations: 1, timeout: None }
279    }
280
281    /// Returns the transaction hash.
282    #[doc(alias = "transaction_hash")]
283    pub const fn tx_hash(&self) -> &TxHash {
284        &self.tx_hash
285    }
286
287    /// Sets the transaction hash.
288    #[doc(alias = "set_transaction_hash")]
289    pub fn set_tx_hash(&mut self, tx_hash: TxHash) {
290        self.tx_hash = tx_hash;
291    }
292
293    /// Sets the transaction hash.
294    #[doc(alias = "with_transaction_hash")]
295    pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
296        self.tx_hash = tx_hash;
297        self
298    }
299
300    /// Returns the number of confirmations to wait for.
301    #[doc(alias = "confirmations")]
302    pub const fn required_confirmations(&self) -> u64 {
303        self.required_confirmations
304    }
305
306    /// Sets the number of confirmations to wait for.
307    #[doc(alias = "set_confirmations")]
308    pub fn set_required_confirmations(&mut self, confirmations: u64) {
309        self.required_confirmations = confirmations;
310    }
311
312    /// Sets the number of confirmations to wait for.
313    #[doc(alias = "with_confirmations")]
314    pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
315        self.required_confirmations = confirmations;
316        self
317    }
318
319    /// Returns the timeout.
320    pub const fn timeout(&self) -> Option<Duration> {
321        self.timeout
322    }
323
324    /// Sets the timeout.
325    pub fn set_timeout(&mut self, timeout: Option<Duration>) {
326        self.timeout = timeout;
327    }
328
329    /// Sets the timeout.
330    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
331        self.timeout = timeout;
332        self
333    }
334
335    /// Wraps this configuration with a provider to expose watching methods.
336    pub const fn with_provider<N: Network>(
337        self,
338        provider: RootProvider<N>,
339    ) -> PendingTransactionBuilder<N> {
340        PendingTransactionBuilder::from_config(provider, self)
341    }
342}
343
344impl From<TxHash> for PendingTransactionConfig {
345    fn from(tx_hash: TxHash) -> Self {
346        Self::new(tx_hash)
347    }
348}
349
350/// Errors which may occur in heartbeat when watching a transaction.
351#[derive(Debug, thiserror::Error)]
352pub enum WatchTxError {
353    /// Transaction was not confirmed after configured timeout.
354    #[error("transaction was not confirmed within the timeout")]
355    Timeout,
356}
357
358/// The type sent by the [`HeartbeatHandle`] to the [`Heartbeat`] background task.
359#[doc(alias = "TransactionWatcher")]
360struct TxWatcher {
361    config: PendingTransactionConfig,
362    /// The block at which the transaction was received. To be filled once known.
363    /// Invariant: any confirmed transaction in `Heart` has this value set.
364    received_at_block: Option<u64>,
365    tx: oneshot::Sender<Result<(), WatchTxError>>,
366}
367
368impl TxWatcher {
369    /// Notify the waiter.
370    fn notify(self, result: Result<(), WatchTxError>) {
371        debug!(tx=%self.config.tx_hash, "notifying");
372        let _ = self.tx.send(result);
373    }
374}
375
376/// Represents a transaction that is yet to be confirmed a specified number of times.
377///
378/// This struct is a future created by [`PendingTransactionBuilder`] that resolves to the
379/// transaction hash once the underlying transaction has been confirmed the specified number of
380/// times in the network.
381#[doc(alias = "PendingTx", alias = "TxPending")]
382pub struct PendingTransaction {
383    /// The transaction hash.
384    #[doc(alias = "transaction_hash")]
385    pub(crate) tx_hash: TxHash,
386    /// The receiver for the notification.
387    // TODO: send a receipt?
388    pub(crate) rx: oneshot::Receiver<Result<(), WatchTxError>>,
389}
390
391impl fmt::Debug for PendingTransaction {
392    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
393        f.debug_struct("PendingTransaction").field("tx_hash", &self.tx_hash).finish()
394    }
395}
396
397impl PendingTransaction {
398    /// Creates a ready pending transaction.
399    pub fn ready(tx_hash: TxHash) -> Self {
400        let (tx, rx) = oneshot::channel();
401        tx.send(Ok(())).ok(); // Make sure that the receiver is notified already.
402        Self { tx_hash, rx }
403    }
404
405    /// Returns this transaction's hash.
406    #[doc(alias = "transaction_hash")]
407    pub const fn tx_hash(&self) -> &TxHash {
408        &self.tx_hash
409    }
410}
411
412impl Future for PendingTransaction {
413    type Output = Result<TxHash, PendingTransactionError>;
414
415    fn poll(
416        mut self: std::pin::Pin<&mut Self>,
417        cx: &mut std::task::Context<'_>,
418    ) -> std::task::Poll<Self::Output> {
419        self.rx.poll_unpin(cx).map(|res| {
420            res??;
421            Ok(self.tx_hash)
422        })
423    }
424}
425
426/// A handle to the heartbeat task.
427#[derive(Clone, Debug)]
428pub(crate) struct HeartbeatHandle {
429    tx: mpsc::Sender<TxWatcher>,
430}
431
432impl HeartbeatHandle {
433    /// Watch for a transaction to be confirmed with the given config.
434    #[doc(alias = "watch_transaction")]
435    pub(crate) async fn watch_tx(
436        &self,
437        config: PendingTransactionConfig,
438        received_at_block: Option<u64>,
439    ) -> Result<PendingTransaction, PendingTransactionConfig> {
440        let (tx, rx) = oneshot::channel();
441        let tx_hash = config.tx_hash;
442        match self.tx.send(TxWatcher { config, received_at_block, tx }).await {
443            Ok(()) => Ok(PendingTransaction { tx_hash, rx }),
444            Err(e) => Err(e.0.config),
445        }
446    }
447}
448
449/// A heartbeat task that receives blocks and watches for transactions.
450pub(crate) struct Heartbeat<N, S> {
451    /// The stream of incoming blocks to watch.
452    stream: futures::stream::Fuse<S>,
453
454    /// Lookbehind blocks in form of mapping block number -> vector of transaction hashes.
455    past_blocks: VecDeque<(u64, B256HashSet)>,
456
457    /// Transactions to watch for.
458    unconfirmed: B256HashMap<TxWatcher>,
459
460    /// Ordered map of transactions waiting for confirmations.
461    waiting_confs: BTreeMap<u64, Vec<TxWatcher>>,
462
463    /// Ordered map of transactions to reap at a certain time.
464    reap_at: BTreeMap<Instant, B256>,
465
466    _network: std::marker::PhantomData<N>,
467}
468
469impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
470    /// Create a new heartbeat task.
471    pub(crate) fn new(stream: S) -> Self {
472        Self {
473            stream: stream.fuse(),
474            past_blocks: Default::default(),
475            unconfirmed: Default::default(),
476            waiting_confs: Default::default(),
477            reap_at: Default::default(),
478            _network: Default::default(),
479        }
480    }
481
482    /// Check if any transactions have enough confirmations to notify.
483    fn check_confirmations(&mut self, current_height: u64) {
484        let to_keep = self.waiting_confs.split_off(&(current_height + 1));
485        let to_notify = std::mem::replace(&mut self.waiting_confs, to_keep);
486        for watcher in to_notify.into_values().flatten() {
487            watcher.notify(Ok(()));
488        }
489    }
490
491    /// Get the next time to reap a transaction. If no reaps, this is a very
492    /// long time from now (i.e. will not be woken).
493    fn next_reap(&self) -> Instant {
494        self.reap_at
495            .first_key_value()
496            .map(|(k, _)| *k)
497            .unwrap_or_else(|| Instant::now() + Duration::from_secs(60_000))
498    }
499
500    /// Reap any timeout
501    fn reap_timeouts(&mut self) {
502        let now = Instant::now();
503        let to_keep = self.reap_at.split_off(&now);
504        let to_reap = std::mem::replace(&mut self.reap_at, to_keep);
505
506        for tx_hash in to_reap.values() {
507            if let Some(watcher) = self.unconfirmed.remove(tx_hash) {
508                debug!(tx=%tx_hash, "reaped");
509                watcher.notify(Err(WatchTxError::Timeout));
510            }
511        }
512    }
513
514    /// Reap transactions overridden by the reorg.
515    /// Accepts new chain height as an argument, and drops any subscriptions
516    /// that were received in blocks affected by the reorg (e.g. >= new_height).
517    fn move_reorg_to_unconfirmed(&mut self, new_height: u64) {
518        for waiters in self.waiting_confs.values_mut() {
519            *waiters = std::mem::take(waiters).into_iter().filter_map(|watcher| {
520                if let Some(received_at_block) = watcher.received_at_block {
521                    // All blocks after and _including_ the new height are reaped.
522                    if received_at_block >= new_height {
523                        let hash = watcher.config.tx_hash;
524                        debug!(tx=%hash, %received_at_block, %new_height, "return to unconfirmed due to reorg");
525                        self.unconfirmed.insert(hash, watcher);
526                        return None;
527                    }
528                }
529                Some(watcher)
530            }).collect();
531        }
532    }
533
534    /// Handle a watch instruction by adding it to the watch list, and
535    /// potentially adding it to our `reap_at` list.
536    fn handle_watch_ix(&mut self, to_watch: TxWatcher) {
537        // Start watching for the transaction.
538        debug!(tx=%to_watch.config.tx_hash, "watching");
539        trace!(?to_watch.config, ?to_watch.received_at_block);
540        if let Some(received_at_block) = to_watch.received_at_block {
541            // Transaction is already confirmed, we just need to wait for the required
542            // confirmations.
543            let current_block =
544                self.past_blocks.back().map(|(h, _)| *h).unwrap_or(received_at_block);
545            self.add_to_waiting_list(to_watch, current_block);
546            return;
547        }
548
549        if let Some(timeout) = to_watch.config.timeout {
550            self.reap_at.insert(Instant::now() + timeout, to_watch.config.tx_hash);
551        }
552        // Transaction may be confirmed already, check the lookbehind history first.
553        // If so, insert it into the waiting list.
554        for (block_height, txs) in self.past_blocks.iter().rev() {
555            if txs.contains(&to_watch.config.tx_hash) {
556                let confirmations = to_watch.config.required_confirmations;
557                let confirmed_at = *block_height + confirmations - 1;
558                let current_height = self.past_blocks.back().map(|(h, _)| *h).unwrap();
559
560                if confirmed_at <= current_height {
561                    to_watch.notify(Ok(()));
562                } else {
563                    debug!(tx=%to_watch.config.tx_hash, %block_height, confirmations, "adding to waiting list");
564                    self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
565                }
566                return;
567            }
568        }
569
570        self.unconfirmed.insert(to_watch.config.tx_hash, to_watch);
571    }
572
573    fn add_to_waiting_list(&mut self, watcher: TxWatcher, block_height: u64) {
574        let confirmations = watcher.config.required_confirmations;
575        debug!(tx=%watcher.config.tx_hash, %block_height, confirmations, "adding to waiting list");
576        self.waiting_confs.entry(block_height + confirmations - 1).or_default().push(watcher);
577    }
578
579    /// Handle a new block by checking if any of the transactions we're
580    /// watching are in it, and if so, notifying the watcher. Also updates
581    /// the latest block.
582    fn handle_new_block(&mut self, block: N::BlockResponse) {
583        let block_height = block.header().as_ref().number();
584        debug!(%block_height, "handling block");
585
586        // Add the block the lookbehind.
587        // The value is chosen arbitrarily to not have a huge memory footprint but still
588        // catch most cases where user subscribes for an already mined transaction.
589        // Note that we expect provider to check whether transaction is already mined
590        // before subscribing, so here we only need to consider time before sending a notification
591        // and processing it.
592        const MAX_BLOCKS_TO_RETAIN: usize = 10;
593        if self.past_blocks.len() >= MAX_BLOCKS_TO_RETAIN {
594            self.past_blocks.pop_front();
595        }
596        if let Some((last_height, _)) = self.past_blocks.back().as_ref() {
597            // Check that the chain is continuous.
598            if *last_height + 1 != block_height {
599                // Move all the transactions that were reset by the reorg to the unconfirmed list.
600                warn!(%block_height, last_height, "reorg detected");
601                self.move_reorg_to_unconfirmed(block_height);
602                // Remove past blocks that are now invalid.
603                self.past_blocks.retain(|(h, _)| *h < block_height);
604            }
605        }
606        self.past_blocks.push_back((block_height, block.transactions().hashes().collect()));
607
608        // Check if we are watching for any of the transactions in this block.
609        let to_check: Vec<_> = block
610            .transactions()
611            .hashes()
612            .filter_map(|tx_hash| self.unconfirmed.remove(&tx_hash))
613            .collect();
614        for mut watcher in to_check {
615            // If `confirmations` is not more than 1 we can notify the watcher immediately.
616            let confirmations = watcher.config.required_confirmations;
617            if confirmations <= 1 {
618                watcher.notify(Ok(()));
619                continue;
620            }
621            // Otherwise add it to the waiting list.
622
623            // Set the block at which the transaction was received.
624            if let Some(set_block) = watcher.received_at_block {
625                warn!(tx=%watcher.config.tx_hash, set_block=%set_block, new_block=%block_height, "received_at_block already set");
626                // We don't override the set value.
627            } else {
628                watcher.received_at_block = Some(block_height);
629            }
630            self.add_to_waiting_list(watcher, block_height);
631        }
632
633        self.check_confirmations(block_height);
634    }
635}
636
637#[cfg(target_family = "wasm")]
638impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
639    /// Spawn the heartbeat task, returning a [`HeartbeatHandle`].
640    pub(crate) fn spawn(self) -> HeartbeatHandle {
641        let (task, handle) = self.consume();
642        task.spawn_task();
643        handle
644    }
645}
646
647#[cfg(not(target_family = "wasm"))]
648impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + Send + 'static> Heartbeat<N, S> {
649    /// Spawn the heartbeat task, returning a [`HeartbeatHandle`].
650    pub(crate) fn spawn(self) -> HeartbeatHandle {
651        let (task, handle) = self.consume();
652        task.spawn_task();
653        handle
654    }
655}
656
657impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
658    fn consume(self) -> (impl Future<Output = ()>, HeartbeatHandle) {
659        let (ix_tx, ixns) = mpsc::channel(64);
660        (self.into_future(ixns), HeartbeatHandle { tx: ix_tx })
661    }
662
663    async fn into_future(mut self, mut ixns: mpsc::Receiver<TxWatcher>) {
664        'shutdown: loop {
665            {
666                let next_reap = self.next_reap();
667                let sleep = std::pin::pin!(sleep_until(next_reap.into()));
668
669                // We bias the select so that we always handle new messages
670                // before checking blocks, and reap timeouts are last.
671                select! {
672                    biased;
673
674                    // Watch for new transactions.
675                    ix_opt = ixns.recv() => match ix_opt {
676                        Some(to_watch) => self.handle_watch_ix(to_watch),
677                        None => break 'shutdown, // ix channel is closed
678                    },
679
680                    // Wake up to handle new blocks.
681                    Some(block) = self.stream.next() => {
682                        self.handle_new_block(block);
683                    },
684
685                    // This arm ensures we always wake up to reap timeouts,
686                    // even if there are no other events.
687                    _ = sleep => {},
688                }
689            }
690
691            // Always reap timeouts
692            self.reap_timeouts();
693        }
694    }
695}