linera_client/
chain_listener.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{btree_map::Entry, BTreeMap, BTreeSet},
6    sync::Arc,
7    time::Duration,
8};
9
10use futures::{future, lock::Mutex, Future, FutureExt as _, StreamExt};
11use linera_base::{
12    crypto::{CryptoHash, Signer},
13    data_types::{ChainDescription, Epoch, MessagePolicy, Timestamp},
14    identifiers::{AccountOwner, BlobType, ChainId},
15    util::future::FutureSyncExt as _,
16    Task,
17};
18use linera_core::{
19    client::{
20        chain_client::{self, ChainClient},
21        AbortOnDrop, ListeningMode,
22    },
23    node::NotificationStream,
24    worker::{Notification, Reason},
25    Environment, Wallet,
26};
27use linera_storage::{Clock as _, Storage as _};
28use tokio::sync::mpsc::UnboundedReceiver;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, info, instrument, warn, Instrument as _};
31
32use crate::error::{self, Error};
33
34#[derive(Default, Debug, Clone, clap::Args, serde::Serialize, serde::Deserialize, tsify::Tsify)]
35#[serde(rename_all = "camelCase")]
36pub struct ChainListenerConfig {
37    /// Do not create blocks automatically to receive incoming messages. Instead, wait for
38    /// an explicit mutation `processInbox`.
39    #[serde(default)]
40    #[arg(
41        long = "listener-skip-process-inbox",
42        env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
43    )]
44    pub skip_process_inbox: bool,
45
46    /// Wait before processing any notification (useful for testing).
47    #[serde(default)]
48    #[arg(
49        long = "listener-delay-before-ms",
50        default_value = "0",
51        env = "LINERA_LISTENER_DELAY_BEFORE"
52    )]
53    pub delay_before_ms: u64,
54
55    /// Wait after processing any notification (useful for rate limiting).
56    #[serde(default)]
57    #[arg(
58        long = "listener-delay-after-ms",
59        default_value = "0",
60        env = "LINERA_LISTENER_DELAY_AFTER"
61    )]
62    pub delay_after_ms: u64,
63}
64
65type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
66
67#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
68#[allow(async_fn_in_trait)]
69pub trait ClientContext {
70    type Environment: linera_core::Environment;
71
72    fn wallet(&self) -> &<Self::Environment as linera_core::Environment>::Wallet;
73
74    fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
75
76    fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
77
78    fn admin_chain_id(&self) -> ChainId {
79        self.client().admin_chain_id()
80    }
81
82    /// Gets the timing sender for benchmarking, if available.
83    #[cfg(not(web))]
84    fn timing_sender(
85        &self,
86    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
87
88    #[cfg(web)]
89    fn timing_sender(
90        &self,
91    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
92        None
93    }
94
95    fn make_chain_client(
96        &self,
97        chain_id: ChainId,
98    ) -> impl Future<Output = Result<ChainClient<Self::Environment>, Error>> {
99        async move {
100            let chain = self
101                .wallet()
102                .get(chain_id)
103                .make_sync()
104                .await
105                .map_err(error::Inner::wallet)?
106                .unwrap_or_default();
107            let follow_only = chain.is_follow_only();
108            Ok(self.client().create_chain_client(
109                chain_id,
110                chain.block_hash,
111                chain.next_block_height,
112                chain.pending_proposal,
113                chain.owner,
114                self.timing_sender(),
115                follow_only,
116            ))
117        }
118    }
119
120    async fn update_wallet_for_new_chain(
121        &mut self,
122        chain_id: ChainId,
123        owner: Option<AccountOwner>,
124        timestamp: Timestamp,
125        epoch: Epoch,
126    ) -> Result<(), Error>;
127
128    async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
129}
130
131#[allow(async_fn_in_trait)]
132pub trait ClientContextExt: ClientContext {
133    async fn clients(&self) -> Result<Vec<ContextChainClient<Self>>, Error> {
134        use futures::stream::TryStreamExt as _;
135        self.wallet()
136            .chain_ids()
137            .map_err(|e| error::Inner::wallet(e).into())
138            .and_then(|chain_id| self.make_chain_client(chain_id))
139            .try_collect()
140            .await
141    }
142}
143
144impl<T: ClientContext> ClientContextExt for T {}
145
146/// A chain client together with the stream of notifications from the local node.
147///
148/// A background task listens to the validators and updates the local node, so any updates to
149/// this chain will trigger a notification. The background task is terminated when this gets
150/// dropped.
151struct ListeningClient<C: ClientContext> {
152    /// The chain client.
153    client: ContextChainClient<C>,
154    /// The abort handle for the task that listens to the validators.
155    abort_handle: AbortOnDrop,
156    /// The listening task.
157    listener: Task<()>,
158    /// The stream of notifications from the local node.
159    notification_stream: Arc<Mutex<NotificationStream>>,
160    /// This is only `< u64::MAX` when the client is waiting for a timeout to process the inbox.
161    timeout: Timestamp,
162    /// The background sync process.
163    background_sync: Task<()>,
164}
165
166impl<C: ClientContext> ListeningClient<C> {
167    fn new(
168        client: ContextChainClient<C>,
169        abort_handle: AbortOnDrop,
170        listener: Task<()>,
171        notification_stream: NotificationStream,
172        background_sync: Task<()>,
173    ) -> Self {
174        Self {
175            client,
176            abort_handle,
177            listener,
178            #[allow(clippy::arc_with_non_send_sync)] // Only `Send` with `futures-util/alloc`.
179            notification_stream: Arc::new(Mutex::new(notification_stream)),
180            timeout: Timestamp::from(u64::MAX),
181            background_sync,
182        }
183    }
184
185    async fn stop(self) {
186        // TODO(#4965): this is unnecessary: the join handle now also acts as an abort handle
187        drop(self.abort_handle);
188        futures::future::join(self.listener.cancel(), self.background_sync.cancel()).await;
189    }
190}
191
192/// Commands to the chain listener.
193pub enum ListenerCommand {
194    /// Command: start listening to the given chains. If the chain must produce blocks,
195    /// an owner is required.
196    Listen(BTreeMap<ChainId, Option<AccountOwner>>),
197    /// Command: stop listening to the given chains.
198    StopListening(BTreeSet<ChainId>),
199    /// Command: set the message policies of some chain clients.
200    SetMessagePolicy(BTreeMap<ChainId, MessagePolicy>),
201}
202
203/// A `ChainListener` is a process that listens to notifications from validators and reacts
204/// appropriately.
205pub struct ChainListener<C: ClientContext> {
206    context: Arc<Mutex<C>>,
207    storage: <C::Environment as Environment>::Storage,
208    config: Arc<ChainListenerConfig>,
209    listening: BTreeMap<ChainId, ListeningClient<C>>,
210    cancellation_token: CancellationToken,
211    /// Map from publishing chain to subscriber chains.
212    /// Events emitted on the _publishing chain_ are of interest to the _subscriber chains_.
213    event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
214    /// The channel through which the listener can receive commands.
215    command_receiver: UnboundedReceiver<ListenerCommand>,
216    /// Whether to fully sync chains in the background.
217    enable_background_sync: bool,
218}
219
220impl<C: ClientContext + 'static> ChainListener<C> {
221    /// Creates a new chain listener given client chains.
222    pub fn new(
223        config: ChainListenerConfig,
224        context: Arc<Mutex<C>>,
225        storage: <C::Environment as Environment>::Storage,
226        cancellation_token: CancellationToken,
227        command_receiver: UnboundedReceiver<ListenerCommand>,
228        enable_background_sync: bool,
229    ) -> Self {
230        Self {
231            storage,
232            context,
233            config: Arc::new(config),
234            listening: Default::default(),
235            cancellation_token,
236            event_subscribers: Default::default(),
237            command_receiver,
238            enable_background_sync,
239        }
240    }
241
242    /// Runs the chain listener.
243    #[instrument(skip(self))]
244    pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
245        let chain_ids = {
246            let guard = self.context.lock().await;
247            let admin_chain_id = guard.admin_chain_id();
248            guard
249                .make_chain_client(admin_chain_id)
250                .await?
251                .synchronize_chain_state(admin_chain_id)
252                .await?;
253            let mut chain_ids: BTreeMap<_, _> = guard
254                .wallet()
255                .items()
256                .collect::<Vec<_>>()
257                .await
258                .into_iter()
259                .map(|result| {
260                    let (chain_id, chain) = result?;
261                    let mode = if chain.is_follow_only() {
262                        ListeningMode::FollowChain
263                    } else {
264                        ListeningMode::FullChain
265                    };
266                    Ok((chain_id, mode))
267                })
268                .collect::<Result<BTreeMap<_, _>, _>>()
269                .map_err(
270                    |e: <<C::Environment as Environment>::Wallet as Wallet>::Error| {
271                        crate::error::Inner::Wallet(Box::new(e) as _)
272                    },
273                )?;
274            // If the admin chain is not in the wallet, add it as follow-only since we
275            // typically don't own it.
276            chain_ids
277                .entry(admin_chain_id)
278                .or_insert(ListeningMode::FollowChain);
279            chain_ids
280        };
281
282        Ok(async move {
283            self.listen_recursively(chain_ids).await?;
284            loop {
285                match self.next_action().await? {
286                    Action::Stop => break,
287                    Action::ProcessInbox(chain_id) => self.maybe_process_inbox(chain_id).await?,
288                    Action::Notification(notification) => {
289                        self.process_notification(notification).await?
290                    }
291                }
292            }
293            future::join_all(self.listening.into_values().map(|client| client.stop())).await;
294            Ok(())
295        })
296    }
297
298    /// Processes a notification, updating local chains and validators as needed.
299    async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
300        Self::sleep(self.config.delay_before_ms).await;
301        let Some(listening_client) = self.listening.get(&notification.chain_id) else {
302            warn!(
303                ?notification,
304                "ChainListener::process_notification: got a notification without listening to the chain"
305            );
306            return Ok(());
307        };
308        let Some(listening_mode) = listening_client.client.listening_mode() else {
309            warn!(
310                ?notification,
311                "ChainListener::process_notification: chain has no listening mode"
312            );
313            return Ok(());
314        };
315
316        if !listening_mode.is_relevant(&notification.reason) {
317            debug!(
318                reason = ?notification.reason,
319                "ChainListener: ignoring notification due to listening mode"
320            );
321            return Ok(());
322        }
323        match &notification.reason {
324            Reason::NewIncomingBundle { .. } => {
325                self.maybe_process_inbox(notification.chain_id).await?;
326            }
327            Reason::NewRound { .. } => {
328                self.update_validators(&notification).await?;
329            }
330            Reason::NewBlock { hash, .. } => {
331                self.update_wallet(notification.chain_id).await?;
332                if listening_mode.is_full() {
333                    self.add_new_chains(*hash).await?;
334                    let publishers = self
335                        .update_event_subscriptions(notification.chain_id)
336                        .await?;
337                    if !publishers.is_empty() {
338                        self.listen_recursively(publishers).await?;
339                        self.maybe_process_inbox(notification.chain_id).await?;
340                    }
341                    self.process_new_events(notification.chain_id).await?;
342                }
343            }
344            Reason::NewEvents { .. } => {
345                self.process_new_events(notification.chain_id).await?;
346            }
347            Reason::BlockExecuted { .. } => {}
348        }
349        Self::sleep(self.config.delay_after_ms).await;
350        Ok(())
351    }
352
353    /// If any new chains were created by the given block, and we have a key pair for them,
354    /// add them to the wallet and start listening for notifications. (This is not done for
355    /// fallback owners, as those would have to monitor all chains anyway.)
356    async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
357        let block = self
358            .storage
359            .read_confirmed_block(hash)
360            .await?
361            .ok_or(chain_client::Error::MissingConfirmedBlock(hash))?
362            .into_block();
363        let parent_chain_id = block.header.chain_id;
364        let blobs = block.created_blobs().into_iter();
365        let new_chains = blobs
366            .filter_map(|(blob_id, blob)| {
367                if blob_id.blob_type == BlobType::ChainDescription {
368                    let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
369                        .expect("ChainDescription should deserialize correctly");
370                    Some((ChainId(blob_id.hash), chain_desc))
371                } else {
372                    None
373                }
374            })
375            .collect::<Vec<_>>();
376        if new_chains.is_empty() {
377            return Ok(());
378        }
379        let mut new_ids = BTreeMap::new();
380        let mut context_guard = self.context.lock().await;
381        for (new_chain_id, chain_desc) in new_chains {
382            for chain_owner in chain_desc.config().ownership.all_owners() {
383                if context_guard.client().has_key_for(chain_owner).await? {
384                    context_guard
385                        .update_wallet_for_new_chain(
386                            new_chain_id,
387                            Some(*chain_owner),
388                            block.header.timestamp,
389                            block.header.epoch,
390                        )
391                        .await?;
392                    context_guard
393                        .client()
394                        .extend_chain_mode(new_chain_id, ListeningMode::FullChain);
395                    new_ids.insert(new_chain_id, ListeningMode::FullChain);
396                }
397            }
398        }
399        // Re-process the parent chain's outboxes now that the new chains are tracked.
400        // This ensures cross-chain messages to newly created chains are delivered.
401        if !new_ids.is_empty() {
402            context_guard
403                .client()
404                .local_node
405                .retry_pending_cross_chain_requests(parent_chain_id)
406                .await?;
407        }
408        drop(context_guard);
409        self.listen_recursively(new_ids).await?;
410        Ok(())
411    }
412
413    /// Processes the inboxes of all chains that are subscribed to `chain_id`.
414    async fn process_new_events(&mut self, chain_id: ChainId) -> Result<(), Error> {
415        let Some(subscribers) = self.event_subscribers.get(&chain_id).cloned() else {
416            return Ok(());
417        };
418        for subscriber_id in subscribers {
419            self.maybe_process_inbox(subscriber_id).await?;
420        }
421        Ok(())
422    }
423
424    /// Starts listening for notifications about the given chains, and any chains that publish
425    /// event streams those chains are subscribed to.
426    async fn listen_recursively(
427        &mut self,
428        mut chain_ids: BTreeMap<ChainId, ListeningMode>,
429    ) -> Result<(), Error> {
430        while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
431            for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
432                match chain_ids.entry(new_chain_id) {
433                    Entry::Vacant(vacant) => {
434                        vacant.insert(new_listening_mode);
435                    }
436                    Entry::Occupied(mut occupied) => {
437                        occupied.get_mut().extend(Some(new_listening_mode));
438                    }
439                }
440            }
441        }
442
443        Ok(())
444    }
445
446    /// Background task that syncs received certificates in small batches.
447    /// This discovers unacknowledged sender blocks gradually without overwhelming the system.
448    #[instrument(skip(context))]
449    async fn background_sync_received_certificates(
450        context: Arc<Mutex<C>>,
451        chain_id: ChainId,
452    ) -> Result<(), Error> {
453        info!("Starting background certificate sync for chain {chain_id}");
454        let client = context.lock().await.make_chain_client(chain_id).await?;
455
456        Ok(client.find_received_certificates().await?)
457    }
458
459    /// Starts listening for notifications about the given chain.
460    ///
461    /// Returns all publishing chains, that we also need to listen to.
462    async fn listen(
463        &mut self,
464        chain_id: ChainId,
465        listening_mode: ListeningMode,
466    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
467        let context_guard = self.context.lock().await;
468        let existing_mode = context_guard.client().chain_mode(chain_id);
469        // If we already have a listener with a sufficient mode, nothing to do.
470        if self.listening.contains_key(&chain_id)
471            && existing_mode.as_ref().is_some_and(|m| *m >= listening_mode)
472        {
473            return Ok(BTreeMap::new());
474        }
475        // Extend the mode in the central map.
476        context_guard
477            .client()
478            .extend_chain_mode(chain_id, listening_mode);
479        drop(context_guard);
480
481        // Start background tasks to sync received certificates, if enabled.
482        let background_sync_task = self.start_background_sync(chain_id).await;
483        let client = self
484            .context
485            .lock()
486            .await
487            .make_chain_client(chain_id)
488            .await?;
489        let (listener, abort_handle, notification_stream) = client.listen().await?;
490        let listening_client = ListeningClient::new(
491            client,
492            abort_handle,
493            Task::spawn(listener.in_current_span()),
494            notification_stream,
495            background_sync_task,
496        );
497        self.listening.insert(chain_id, listening_client);
498        let publishing_chains = self.update_event_subscriptions(chain_id).await?;
499        self.maybe_process_inbox(chain_id).await?;
500        Ok(publishing_chains)
501    }
502
503    async fn start_background_sync(&mut self, chain_id: ChainId) -> Task<()> {
504        if !self.enable_background_sync
505            || !self
506                .context
507                .lock()
508                .await
509                .client()
510                .chain_mode(chain_id)
511                .is_some_and(|m| m.is_full())
512        {
513            return Task::ready(());
514        }
515
516        let context = Arc::clone(&self.context);
517        Task::spawn(async move {
518            if let Err(e) = Self::background_sync_received_certificates(context, chain_id).await {
519                warn!("Background sync failed for chain {chain_id}: {e}");
520            }
521        })
522    }
523
524    fn remove_event_subscriber(&mut self, chain_id: ChainId) {
525        self.event_subscribers.retain(|_, subscribers| {
526            subscribers.remove(&chain_id);
527            !subscribers.is_empty()
528        });
529    }
530
531    /// Updates the event subscribers map, and returns all publishing chains we need to listen to.
532    async fn update_event_subscriptions(
533        &mut self,
534        chain_id: ChainId,
535    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
536        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
537        if !listening_client.client.is_tracked() {
538            return Ok(BTreeMap::new());
539        }
540        let publishing_chains: BTreeMap<_, _> = listening_client
541            .client
542            .event_stream_publishers()
543            .await?
544            .into_iter()
545            .map(|(chain_id, streams)| (chain_id, ListeningMode::EventsOnly(streams)))
546            .collect();
547        for publisher_id in publishing_chains.keys() {
548            self.event_subscribers
549                .entry(*publisher_id)
550                .or_default()
551                .insert(chain_id);
552        }
553        Ok(publishing_chains)
554    }
555
556    /// Returns the next notification or timeout to process.
557    async fn next_action(&mut self) -> Result<Action, Error> {
558        loop {
559            let (timeout_chain_id, timeout) = self.next_timeout()?;
560            let notification_futures = self
561                .listening
562                .values_mut()
563                .map(|client| {
564                    let stream = client.notification_stream.clone();
565                    Box::pin(async move { stream.lock().await.next().await })
566                })
567                .collect::<Vec<_>>();
568            futures::select! {
569                () = self.cancellation_token.cancelled().fuse() => {
570                    return Ok(Action::Stop);
571                }
572                () = self.storage.clock().sleep_until(timeout).fuse() => {
573                    return Ok(Action::ProcessInbox(timeout_chain_id));
574                }
575                command = self.command_receiver.recv().then(async |maybe_command| {
576                    if let Some(command) = maybe_command {
577                        command
578                    } else {
579                        std::future::pending().await
580                    }
581                }).fuse() => {
582                    match command {
583                        ListenerCommand::Listen(new_chains) => {
584                            debug!(?new_chains, "received command to listen to new chains");
585                            let listening_modes = self.update_wallet_for_listening(new_chains).await?;
586                            self.listen_recursively(listening_modes).await?;
587                        }
588                        ListenerCommand::StopListening(chains) => {
589                            debug!(?chains, "received command to stop listening to chains");
590                            for chain_id in chains {
591                                debug!(%chain_id, "stopping the listener for chain");
592                                let Some(listening_client) = self.listening.remove(&chain_id) else {
593                                    error!(%chain_id, "attempted to drop a non-existent listener");
594                                    continue;
595                                };
596                                self.remove_event_subscriber(chain_id);
597                                listening_client.stop().await;
598                                if let Err(error) = self.context.lock().await.wallet().remove(chain_id).await {
599                                    error!(%error, %chain_id, "error removing a chain from the wallet");
600                                }
601                            }
602                        }
603                        ListenerCommand::SetMessagePolicy(policies) => {
604                            debug!(?policies, "received command to set message policies");
605                            for (chain_id, policy) in policies {
606                                let Some(listening_client) = self.listening.get_mut(&chain_id) else {
607                                    error!(
608                                        %chain_id,
609                                        "attempted to set the message policy of a non-existent \
610                                        listener"
611                                    );
612                                    continue;
613                                };
614                                listening_client.client.options_mut().message_policy = policy;
615                            }
616                        }
617                    }
618                }
619                (maybe_notification, index, _) = future::select_all(notification_futures).fuse() => {
620                    let Some(notification) = maybe_notification else {
621                        let chain_id = *self.listening.keys().nth(index).unwrap();
622                        warn!("Notification stream for {chain_id} closed");
623                        let Some(listening_client) = self.listening.remove(&chain_id) else {
624                            error!(%chain_id, "attempted to drop a non-existent listener");
625                            continue;
626                        };
627                        self.remove_event_subscriber(chain_id);
628                        listening_client.stop().await;
629                        continue;
630                    };
631                    return Ok(Action::Notification(notification));
632                }
633            }
634        }
635    }
636
637    /// Returns the next timeout to process, and the chain to which it applies.
638    fn next_timeout(&self) -> Result<(ChainId, Timestamp), Error> {
639        let (chain_id, client) = self
640            .listening
641            .iter()
642            .min_by_key(|(_, client)| client.timeout)
643            .expect("No chains left to listen to");
644        Ok((*chain_id, client.timeout))
645    }
646
647    /// Updates the validators about the chain.
648    async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
649        let chain_id = notification.chain_id;
650        let listening_client = self.listening.get(&chain_id).expect("missing client");
651        let latest_block = if let Reason::NewBlock { hash, .. } = &notification.reason {
652            listening_client.client.read_certificate(*hash).await.ok()
653        } else {
654            None
655        };
656        if let Err(error) = listening_client
657            .client
658            .update_validators(None, latest_block)
659            .await
660        {
661            warn!(
662                "Failed to update validators about the local chain after \
663                 receiving {notification:?} with error: {error:?}"
664            );
665        }
666        Ok(())
667    }
668
669    /// Updates the wallet based on the client for this chain.
670    async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
671        let client = &self
672            .listening
673            .get(&chain_id)
674            .expect("missing client")
675            .client;
676        self.context.lock().await.update_wallet(client).await?;
677        Ok(())
678    }
679
680    /// Updates the wallet with the set of chains we're supposed to start listening to,
681    /// and returns the appropriate listening modes based on whether we have the private
682    /// keys corresponding to the given chains' owners.
683    async fn update_wallet_for_listening(
684        &self,
685        new_chains: BTreeMap<ChainId, Option<AccountOwner>>,
686    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
687        let mut chains = BTreeMap::new();
688        let context_guard = self.context.lock().await;
689        for (chain_id, owner) in new_chains {
690            if let Some(owner) = owner {
691                if context_guard
692                    .client()
693                    .signer()
694                    .contains_key(&owner)
695                    .await
696                    .map_err(chain_client::Error::signer_failure)?
697                {
698                    // Try to modify existing chain entry, setting the owner.
699                    let modified = context_guard
700                        .wallet()
701                        .modify(chain_id, |chain| chain.owner = Some(owner))
702                        .await
703                        .map_err(error::Inner::wallet)?;
704                    // If the chain didn't exist, insert a new entry.
705                    if modified.is_none() {
706                        let chain_description = context_guard
707                            .client()
708                            .get_chain_description(chain_id)
709                            .await?;
710                        let timestamp = chain_description.timestamp();
711                        let epoch = chain_description.config().epoch;
712                        context_guard
713                            .wallet()
714                            .insert(
715                                chain_id,
716                                linera_core::wallet::Chain {
717                                    owner: Some(owner),
718                                    timestamp,
719                                    epoch: Some(epoch),
720                                    ..Default::default()
721                                },
722                            )
723                            .await
724                            .map_err(error::Inner::wallet)?;
725                    }
726
727                    chains.insert(chain_id, ListeningMode::FullChain);
728                }
729            } else {
730                chains.insert(chain_id, ListeningMode::FollowChain);
731            }
732        }
733        Ok(chains)
734    }
735
736    /// Processes the inbox, unless `skip_process_inbox` is set.
737    ///
738    /// If no block can be produced because we are not the round leader, a timeout is returned
739    /// for when to retry; otherwise `u64::MAX` is returned.
740    ///
741    /// The wallet is persisted with any blocks that processing the inbox added. An error
742    /// is returned if persisting the wallet fails.
743    async fn maybe_process_inbox(&mut self, chain_id: ChainId) -> Result<(), Error> {
744        if self.config.skip_process_inbox {
745            debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
746            return Ok(());
747        }
748        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
749        if !listening_client.client.is_tracked() {
750            debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
751            return Ok(());
752        }
753        if listening_client.client.preferred_owner().is_none() {
754            debug!("Not processing inbox for non-owned chain {chain_id:.8}");
755            return Ok(());
756        }
757        debug!("Processing inbox for {chain_id:.8}");
758        listening_client.timeout = Timestamp::from(u64::MAX);
759        match listening_client
760            .client
761            .process_inbox_without_prepare()
762            .await
763        {
764            Err(chain_client::Error::CannotFindKeyForChain(chain_id)) => {
765                debug!(%chain_id, "Cannot find key for chain");
766            }
767            Err(error) => warn!(%error, "Failed to process inbox."),
768            Ok((certs, None)) if certs.is_empty() => debug!(
769                %chain_id,
770                "done processing inbox: no blocks created",
771            ),
772            Ok((certs, None)) => info!(
773                %chain_id,
774                created_block_count = %certs.len(),
775                "done processing inbox",
776            ),
777            Ok((certs, Some(new_timeout))) => {
778                info!(
779                    %chain_id,
780                    created_block_count = %certs.len(),
781                    timeout = %new_timeout,
782                    "waiting for round timeout before continuing to process the inbox",
783                );
784                listening_client.timeout = new_timeout.timestamp;
785            }
786        }
787        let mut context_guard = self.context.lock().await;
788        context_guard
789            .update_wallet(&listening_client.client)
790            .await?;
791        Ok(())
792    }
793
794    /// Sleeps for the given number of milliseconds, if greater than 0.
795    async fn sleep(delay_ms: u64) {
796        if delay_ms > 0 {
797            linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
798        }
799    }
800}
801
802enum Action {
803    ProcessInbox(ChainId),
804    Notification(Notification),
805    Stop,
806}