Skip to main content

linera_client/
chain_listener.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{btree_map::Entry, BTreeMap, BTreeSet},
6    sync::Arc,
7    time::Duration,
8};
9
10use futures::{future, lock::Mutex, Future, FutureExt as _, StreamExt};
11use linera_base::{
12    crypto::{CryptoHash, Signer},
13    data_types::{ChainDescription, Epoch, MessagePolicy, TimeDelta, Timestamp},
14    identifiers::{AccountOwner, BlobType, ChainId},
15    ownership::ChainOwnership,
16    util::future::FutureSyncExt as _,
17    Task,
18};
19use linera_core::{
20    client::{
21        chain_client::{self, ChainClient},
22        AbortOnDrop, ListeningMode,
23    },
24    node::NotificationStream,
25    worker::{Notification, Reason},
26    Environment, Wallet,
27};
28use linera_storage::{Arc as CacheArc, Storage as _};
29use tokio::sync::{mpsc::UnboundedReceiver, Notify};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info, instrument, warn, Instrument as _};
32
33use crate::error::{self, Error};
34
35#[derive(Default, Debug, Clone, clap::Args, serde::Serialize, serde::Deserialize, tsify::Tsify)]
36#[serde(rename_all = "camelCase")]
37pub struct ChainListenerConfig {
38    /// Do not create blocks automatically to receive incoming messages. Instead, wait for
39    /// an explicit mutation `processInbox`.
40    #[serde(default)]
41    #[arg(
42        long = "listener-skip-process-inbox",
43        env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
44    )]
45    pub skip_process_inbox: bool,
46
47    /// Wait before processing any notification (useful for testing).
48    #[serde(default)]
49    #[arg(
50        long = "listener-delay-before-ms",
51        default_value = "0",
52        env = "LINERA_LISTENER_DELAY_BEFORE"
53    )]
54    pub delay_before_ms: u64,
55
56    /// Wait after processing any notification (useful for rate limiting).
57    #[serde(default)]
58    #[arg(
59        long = "listener-delay-after-ms",
60        default_value = "0",
61        env = "LINERA_LISTENER_DELAY_AFTER"
62    )]
63    pub delay_after_ms: u64,
64}
65
66type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
67
68#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
69#[allow(async_fn_in_trait)]
70pub trait ClientContext {
71    type Environment: linera_core::Environment;
72
73    fn wallet(&self) -> &<Self::Environment as linera_core::Environment>::Wallet;
74
75    fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
76
77    fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
78
79    fn admin_chain_id(&self) -> ChainId {
80        self.client().admin_chain_id()
81    }
82
83    /// Gets the timing sender for benchmarking, if available.
84    #[cfg(not(web))]
85    fn timing_sender(
86        &self,
87    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
88
89    #[cfg(web)]
90    fn timing_sender(
91        &self,
92    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
93        None
94    }
95
96    fn make_chain_client(
97        &self,
98        chain_id: ChainId,
99    ) -> impl Future<Output = Result<ChainClient<Self::Environment>, Error>> {
100        async move {
101            let chain = self
102                .wallet()
103                .get(chain_id)
104                .make_sync()
105                .await
106                .map_err(error::Inner::wallet)?
107                .unwrap_or_default();
108            let follow_only = chain.is_follow_only();
109            Ok(self.client().create_chain_client(
110                chain_id,
111                chain.block_hash,
112                chain.next_block_height,
113                &chain.pending_fast_proposal,
114                chain.owner,
115                self.timing_sender(),
116                follow_only,
117            ))
118        }
119    }
120
121    async fn update_wallet_for_new_chain(
122        &mut self,
123        chain_id: ChainId,
124        owner: Option<AccountOwner>,
125        timestamp: Timestamp,
126        epoch: Epoch,
127    ) -> Result<(), Error>;
128
129    async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
130}
131
132#[allow(async_fn_in_trait)]
133pub trait ClientContextExt: ClientContext {
134    async fn clients(&self) -> Result<Vec<ContextChainClient<Self>>, Error> {
135        use futures::stream::TryStreamExt as _;
136        self.wallet()
137            .chain_ids()
138            .map_err(|e| error::Inner::wallet(e).into())
139            .and_then(|chain_id| self.make_chain_client(chain_id))
140            .try_collect()
141            .await
142    }
143
144    /// Returns the subset of `owners` for which we have a key pair in the wallet.
145    ///
146    /// Duplicate inputs are treated as one.
147    async fn owners_with_key(
148        &self,
149        owners: impl IntoIterator<Item = AccountOwner>,
150    ) -> Result<BTreeSet<AccountOwner>, Error> {
151        let mut result = BTreeSet::new();
152        for owner in owners.into_iter().collect::<BTreeSet<_>>() {
153            if self.client().has_key_for(&owner).await? {
154                result.insert(owner);
155            }
156        }
157        Ok(result)
158    }
159
160    /// Returns the unique owner from `owners` for which we have a key pair in the wallet.
161    ///
162    /// Returns `None` when zero or multiple distinct owners match.
163    async fn unique_owner_with_key(
164        &self,
165        owners: impl IntoIterator<Item = AccountOwner>,
166    ) -> Result<Option<AccountOwner>, Error> {
167        let with_key = self.owners_with_key(owners).await?;
168        Ok(if with_key.len() == 1 {
169            with_key.into_iter().next()
170        } else {
171            None
172        })
173    }
174
175    /// Sets the preferred owner of `chain_client`'s chain if the current one is no longer
176    /// in `ownership` and we have a key pair for exactly one of the new owners.
177    async fn maybe_auto_assign_preferred_owner(
178        &self,
179        chain_client: &mut ContextChainClient<Self>,
180        ownership: &ChainOwnership,
181    ) -> Result<(), Error> {
182        let chain_id = chain_client.chain_id();
183        let old_owner = chain_client.preferred_owner();
184        if old_owner.is_some_and(|o| ownership.all_owners().any(|n| *n == o)) {
185            return Ok(());
186        }
187        let Some(new_owner) = self
188            .unique_owner_with_key(ownership.all_owners().copied())
189            .await?
190        else {
191            return Ok(());
192        };
193        info!(
194            %chain_id, ?old_owner, %new_owner,
195            "Auto-assigning preferred owner from wallet key pair",
196        );
197        chain_client.set_preferred_owner(new_owner);
198        self.wallet()
199            .modify(chain_id, |chain| chain.owner = Some(new_owner))
200            .await
201            .map_err(error::Inner::wallet)?;
202        Ok(())
203    }
204}
205
206impl<T: ClientContext> ClientContextExt for T {}
207
208/// A chain client together with the stream of notifications from the local node.
209///
210/// A background task listens to the validators and updates the local node, so any updates to
211/// this chain will trigger a notification. The background task is terminated when this gets
212/// dropped.
213struct ListeningClient<C: ClientContext> {
214    /// The chain client.
215    client: ContextChainClient<C>,
216    /// The abort handle for the task that listens to the validators.
217    abort_handle: AbortOnDrop,
218    /// The listening task.
219    listener: Task<()>,
220    /// The stream of notifications from the local node.
221    notification_stream: Arc<Mutex<NotificationStream>>,
222    /// The background sync process.
223    background_sync: Task<()>,
224    /// Signal to wake the per-chain inbox processing task.
225    inbox_notify: Arc<Notify>,
226    /// The long-lived per-chain inbox processing task.
227    inbox_task: Task<()>,
228    /// Cancellation token for the per-chain inbox task (child of the global token).
229    inbox_cancellation: CancellationToken,
230}
231
232impl<C: ClientContext + 'static> ListeningClient<C> {
233    #[expect(clippy::too_many_arguments)]
234    fn new(
235        client: ContextChainClient<C>,
236        abort_handle: AbortOnDrop,
237        listener: Task<()>,
238        notification_stream: NotificationStream,
239        background_sync: Task<()>,
240        context: &Arc<Mutex<C>>,
241        config: &Arc<ChainListenerConfig>,
242        parent_cancellation: &CancellationToken,
243    ) -> Self {
244        let inbox_notify = Arc::new(Notify::new());
245        let inbox_cancellation = parent_cancellation.child_token();
246        let inbox_task =
247            Self::spawn_inbox_task(&client, context, config, &inbox_notify, &inbox_cancellation);
248        Self {
249            client,
250            abort_handle,
251            listener,
252            #[allow(clippy::arc_with_non_send_sync)] // Only `Send` with `futures-util/alloc`.
253            notification_stream: Arc::new(Mutex::new(notification_stream)),
254            background_sync,
255            inbox_notify,
256            inbox_task,
257            inbox_cancellation,
258        }
259    }
260
261    /// Respawns the per-chain inbox task with a fresh clone of the client.
262    /// The `inbox_notify` `Arc` is reused so no pending permits are lost.
263    fn respawn_inbox_task(
264        &mut self,
265        parent_cancellation: &CancellationToken,
266        context: &Arc<Mutex<C>>,
267        config: &Arc<ChainListenerConfig>,
268    ) {
269        self.inbox_cancellation.cancel();
270        self.inbox_cancellation = parent_cancellation.child_token();
271        self.inbox_task = Self::spawn_inbox_task(
272            &self.client,
273            context,
274            config,
275            &self.inbox_notify,
276            &self.inbox_cancellation,
277        );
278    }
279
280    fn spawn_inbox_task(
281        client: &ContextChainClient<C>,
282        context: &Arc<Mutex<C>>,
283        config: &Arc<ChainListenerConfig>,
284        inbox_notify: &Arc<Notify>,
285        inbox_cancellation: &CancellationToken,
286    ) -> Task<()> {
287        Task::spawn(inbox_processing_loop(
288            client.clone(),
289            Arc::clone(context),
290            Arc::clone(config),
291            Arc::clone(inbox_notify),
292            inbox_cancellation.clone(),
293        ))
294    }
295
296    async fn stop(self) {
297        // TODO(#4965): this is unnecessary: the join handle now also acts as an abort handle
298        drop(self.abort_handle);
299        self.inbox_cancellation.cancel();
300        futures::future::join3(
301            self.listener.cancel(),
302            self.background_sync.cancel(),
303            self.inbox_task.cancel(),
304        )
305        .await;
306    }
307}
308
309/// Commands to the chain listener.
310pub enum ListenerCommand {
311    /// Command: start listening to the given chains. If the chain must produce blocks,
312    /// an owner is required.
313    Listen(BTreeMap<ChainId, Option<AccountOwner>>),
314    /// Command: stop listening to the given chains.
315    StopListening(BTreeSet<ChainId>),
316    /// Command: set the message policies of some chain clients.
317    SetMessagePolicy(BTreeMap<ChainId, MessagePolicy>),
318}
319
320/// A `ChainListener` is a process that listens to notifications from validators and reacts
321/// appropriately.
322pub struct ChainListener<C: ClientContext> {
323    context: Arc<Mutex<C>>,
324    storage: <C::Environment as Environment>::Storage,
325    config: Arc<ChainListenerConfig>,
326    listening: BTreeMap<ChainId, ListeningClient<C>>,
327    cancellation_token: CancellationToken,
328    /// Map from publishing chain to subscriber chains.
329    /// Events emitted on the _publishing chain_ are of interest to the _subscriber chains_.
330    event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
331    /// The channel through which the listener can receive commands.
332    command_receiver: UnboundedReceiver<ListenerCommand>,
333    /// Whether to fully sync chains in the background.
334    enable_background_sync: bool,
335}
336
337impl<C: ClientContext + 'static> ChainListener<C> {
338    /// Creates a new chain listener given client chains.
339    pub fn new(
340        config: ChainListenerConfig,
341        context: Arc<Mutex<C>>,
342        storage: <C::Environment as Environment>::Storage,
343        cancellation_token: CancellationToken,
344        command_receiver: UnboundedReceiver<ListenerCommand>,
345        enable_background_sync: bool,
346    ) -> Self {
347        Self {
348            storage,
349            context,
350            config: Arc::new(config),
351            listening: Default::default(),
352            cancellation_token,
353            event_subscribers: Default::default(),
354            command_receiver,
355            enable_background_sync,
356        }
357    }
358
359    /// Runs the chain listener.
360    #[instrument(skip(self))]
361    pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
362        let chain_ids = {
363            let guard = self.context.lock().await;
364            let admin_chain_id = guard.admin_chain_id();
365            guard
366                .make_chain_client(admin_chain_id)
367                .await?
368                .synchronize_chain_state(admin_chain_id)
369                .await?;
370            let mut chain_ids: BTreeMap<_, _> = guard
371                .wallet()
372                .items()
373                .collect::<Vec<_>>()
374                .await
375                .into_iter()
376                .map(|result| {
377                    let (chain_id, chain) = result?;
378                    let mode = if chain.is_follow_only() {
379                        ListeningMode::FollowChain
380                    } else {
381                        ListeningMode::FullChain
382                    };
383                    Ok((chain_id, mode))
384                })
385                .collect::<Result<BTreeMap<_, _>, _>>()
386                .map_err(
387                    |e: <<C::Environment as Environment>::Wallet as Wallet>::Error| {
388                        crate::error::Inner::Wallet(Box::new(e) as _)
389                    },
390                )?;
391            // If the admin chain is not in the wallet, add it as follow-only since we
392            // typically don't own it.
393            chain_ids
394                .entry(admin_chain_id)
395                .or_insert(ListeningMode::FollowChain);
396            chain_ids
397        };
398
399        Ok(async move {
400            self.listen_recursively(chain_ids).await?;
401            loop {
402                match self.next_action().await? {
403                    Action::Stop => break,
404                    Action::Notification(notification) => {
405                        self.process_notification(notification).await?
406                    }
407                }
408            }
409            future::join_all(self.listening.into_values().map(|client| client.stop())).await;
410            Ok(())
411        })
412    }
413
414    /// Processes a notification, updating local chains and validators as needed.
415    async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
416        Self::sleep(self.config.delay_before_ms).await;
417        let Some(listening_client) = self.listening.get(&notification.chain_id) else {
418            warn!(
419                ?notification,
420                "ChainListener::process_notification: got a notification without listening to the chain"
421            );
422            return Ok(());
423        };
424        let Some(listening_mode) = listening_client.client.listening_mode() else {
425            warn!(
426                ?notification,
427                "ChainListener::process_notification: chain has no listening mode"
428            );
429            return Ok(());
430        };
431
432        if !listening_mode.is_relevant(&notification.reason) {
433            debug!(
434                reason = ?notification.reason,
435                "ChainListener: ignoring notification due to listening mode"
436            );
437            return Ok(());
438        }
439        match &notification.reason {
440            Reason::NewIncomingBundle { .. } => {
441                self.maybe_notify_inbox_processing(notification.chain_id);
442            }
443            Reason::NewRound { .. } => {
444                self.update_validators(&notification).await?;
445            }
446            Reason::NewBlock { hash, .. } => {
447                self.update_wallet(notification.chain_id).await?;
448                if listening_mode.is_full() {
449                    self.add_new_chains(*hash).await?;
450                    let publishers = self
451                        .update_event_subscriptions(notification.chain_id)
452                        .await?;
453                    if !publishers.is_empty() {
454                        self.listen_recursively(publishers).await?;
455                        self.maybe_notify_inbox_processing(notification.chain_id);
456                    }
457                }
458                self.process_new_events(notification.chain_id);
459            }
460            Reason::NewEvents { .. } => {
461                self.process_new_events(notification.chain_id);
462            }
463            Reason::BlockExecuted { .. } => {}
464        }
465        Self::sleep(self.config.delay_after_ms).await;
466        Ok(())
467    }
468
469    /// If any new chains were created by the given block, and we have a key pair for at
470    /// least one of their owners, add them to the wallet and start listening for
471    /// notifications. The preferred owner is assigned only when we hold a key pair for
472    /// exactly one of the chain's owners. (Fallback owners are ignored, as those would
473    /// have to monitor all chains anyway.)
474    async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
475        let block = CacheArc::unwrap_or_clone(
476            self.storage
477                .read_confirmed_block(hash)
478                .await?
479                .ok_or(chain_client::Error::MissingConfirmedBlock(hash))?,
480        )
481        .into_block();
482        let parent_chain_id = block.header.chain_id;
483        let blobs = block.created_blobs().into_iter();
484        let new_chains = blobs
485            .filter_map(|(blob_id, blob)| {
486                if blob_id.blob_type == BlobType::ChainDescription {
487                    let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
488                        .expect("ChainDescription should deserialize correctly");
489                    Some((ChainId(blob_id.hash), chain_desc))
490                } else {
491                    None
492                }
493            })
494            .collect::<Vec<_>>();
495        if new_chains.is_empty() {
496            return Ok(());
497        }
498        let mut new_ids = BTreeMap::new();
499        let mut context_guard = self.context.lock().await;
500        for (new_chain_id, chain_desc) in new_chains {
501            let with_key = context_guard
502                .owners_with_key(chain_desc.config().ownership.all_owners().copied())
503                .await?;
504            if with_key.is_empty() {
505                continue;
506            }
507            let owner = if with_key.len() == 1 {
508                with_key.into_iter().next()
509            } else {
510                None
511            };
512            context_guard
513                .update_wallet_for_new_chain(
514                    new_chain_id,
515                    owner,
516                    block.header.timestamp,
517                    block.header.epoch,
518                )
519                .await?;
520            context_guard
521                .client()
522                .extend_chain_mode(new_chain_id, ListeningMode::FullChain);
523            new_ids.insert(new_chain_id, ListeningMode::FullChain);
524        }
525        // Re-process the parent chain's outboxes now that the new chains are tracked.
526        // This ensures cross-chain messages to newly created chains are delivered.
527        if !new_ids.is_empty() {
528            context_guard
529                .client()
530                .retry_pending_cross_chain_requests(parent_chain_id)
531                .await?;
532        }
533        drop(context_guard);
534        self.listen_recursively(new_ids).await?;
535        Ok(())
536    }
537
538    /// Notifies all chains subscribed to `chain_id` to process their inboxes.
539    fn process_new_events(&self, chain_id: ChainId) {
540        let Some(subscribers) = self.event_subscribers.get(&chain_id) else {
541            return;
542        };
543        for subscriber_id in subscribers {
544            self.maybe_notify_inbox_processing(*subscriber_id);
545        }
546    }
547
548    /// Starts listening for notifications about the given chains, and any chains that publish
549    /// event streams those chains are subscribed to.
550    async fn listen_recursively(
551        &mut self,
552        mut chain_ids: BTreeMap<ChainId, ListeningMode>,
553    ) -> Result<(), Error> {
554        while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
555            for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
556                match chain_ids.entry(new_chain_id) {
557                    Entry::Vacant(vacant) => {
558                        vacant.insert(new_listening_mode);
559                    }
560                    Entry::Occupied(mut occupied) => {
561                        occupied.get_mut().extend(Some(new_listening_mode));
562                    }
563                }
564            }
565        }
566
567        Ok(())
568    }
569
570    /// Background task that syncs received certificates in small batches.
571    /// This discovers unacknowledged sender blocks gradually without overwhelming the system.
572    #[instrument(skip(context))]
573    async fn background_sync_received_certificates(
574        context: Arc<Mutex<C>>,
575        chain_id: ChainId,
576    ) -> Result<(), Error> {
577        info!("Starting background certificate sync");
578        let client = context.lock().await.make_chain_client(chain_id).await?;
579
580        Ok(client.find_received_certificates().await?)
581    }
582
583    /// Starts listening for notifications about the given chain.
584    ///
585    /// Returns all publishing chains, that we also need to listen to.
586    async fn listen(
587        &mut self,
588        chain_id: ChainId,
589        listening_mode: ListeningMode,
590    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
591        let context_guard = self.context.lock().await;
592        let existing_mode = context_guard.client().chain_mode(chain_id);
593        // If we already have a listener with a sufficient mode, nothing to do.
594        if self.listening.contains_key(&chain_id)
595            && existing_mode.as_ref().is_some_and(|m| *m >= listening_mode)
596        {
597            return Ok(BTreeMap::new());
598        }
599        // Extend the mode in the central map.
600        context_guard
601            .client()
602            .extend_chain_mode(chain_id, listening_mode);
603        drop(context_guard);
604
605        // Start background tasks to sync received certificates, if enabled.
606        let background_sync_task = self.start_background_sync(chain_id).await;
607        let client = self
608            .context
609            .lock()
610            .await
611            .make_chain_client(chain_id)
612            .await?;
613        let (listener, abort_handle, notification_stream) = client.listen().await?;
614        let listening_client = ListeningClient::new(
615            client,
616            abort_handle,
617            Task::spawn(listener.in_current_span()),
618            notification_stream,
619            background_sync_task,
620            &self.context,
621            &self.config,
622            &self.cancellation_token,
623        );
624        self.listening.insert(chain_id, listening_client);
625        let publishing_chains = self.update_event_subscriptions(chain_id).await?;
626        self.maybe_notify_inbox_processing(chain_id);
627        Ok(publishing_chains)
628    }
629
630    async fn start_background_sync(&mut self, chain_id: ChainId) -> Task<()> {
631        if !self.enable_background_sync
632            || !self
633                .context
634                .lock()
635                .await
636                .client()
637                .chain_mode(chain_id)
638                .is_some_and(|m| m.is_full())
639        {
640            return Task::ready(());
641        }
642
643        let context = Arc::clone(&self.context);
644        Task::spawn(async move {
645            if let Err(e) = Self::background_sync_received_certificates(context, chain_id).await {
646                warn!("Background sync failed for chain {chain_id}: {e}");
647            }
648        })
649    }
650
651    fn remove_event_subscriber(&mut self, chain_id: ChainId) {
652        self.event_subscribers.retain(|_, subscribers| {
653            subscribers.remove(&chain_id);
654            !subscribers.is_empty()
655        });
656    }
657
658    /// Updates the event subscribers map, and returns all publishing chains we need to listen to.
659    async fn update_event_subscriptions(
660        &mut self,
661        chain_id: ChainId,
662    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
663        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
664        if !listening_client.client.is_tracked() {
665            return Ok(BTreeMap::new());
666        }
667        let app_filter = listening_client
668            .client
669            .options()
670            .message_policy
671            .process_events_from_application_ids
672            .clone();
673        let publishing_chains: BTreeMap<_, _> = listening_client
674            .client
675            .event_stream_publishers()
676            .await?
677            .into_iter()
678            .filter_map(|(chain_id, streams)| {
679                let streams = if let Some(app_set) = &app_filter {
680                    streams
681                        .into_iter()
682                        .filter(|s| app_set.contains(&s.application_id))
683                        .collect::<BTreeSet<_>>()
684                } else {
685                    streams
686                };
687                if streams.is_empty() {
688                    None
689                } else {
690                    Some((chain_id, ListeningMode::EventsOnly(streams)))
691                }
692            })
693            .collect();
694        for publisher_id in publishing_chains.keys() {
695            self.event_subscribers
696                .entry(*publisher_id)
697                .or_default()
698                .insert(chain_id);
699        }
700        Ok(publishing_chains)
701    }
702
703    /// Returns the next notification to process, or a stop signal.
704    async fn next_action(&mut self) -> Result<Action, Error> {
705        loop {
706            let notification_futures = self
707                .listening
708                .values_mut()
709                .map(|client| {
710                    let stream = client.notification_stream.clone();
711                    Box::pin(async move { stream.lock().await.next().await })
712                })
713                .collect::<Vec<_>>();
714            futures::select! {
715                () = self.cancellation_token.cancelled().fuse() => {
716                    return Ok(Action::Stop);
717                }
718                command = self.command_receiver.recv().then(async |maybe_command| {
719                    if let Some(command) = maybe_command {
720                        command
721                    } else {
722                        std::future::pending().await
723                    }
724                }).fuse() => {
725                    match command {
726                        ListenerCommand::Listen(new_chains) => {
727                            debug!(?new_chains, "received command to listen to new chains");
728                            let listening_modes = self.update_wallet_for_listening(new_chains).await?;
729                            self.listen_recursively(listening_modes).await?;
730                        }
731                        ListenerCommand::StopListening(chains) => {
732                            debug!(?chains, "received command to stop listening to chains");
733                            for chain_id in chains {
734                                debug!(%chain_id, "stopping the listener for chain");
735                                let Some(listening_client) = self.listening.remove(&chain_id) else {
736                                    error!(%chain_id, "attempted to drop a non-existent listener");
737                                    continue;
738                                };
739                                self.remove_event_subscriber(chain_id);
740                                listening_client.stop().await;
741                                if let Err(error) = self.context.lock().await.wallet().remove(chain_id).await {
742                                    error!(%error, %chain_id, "error removing a chain from the wallet");
743                                }
744                            }
745                        }
746                        ListenerCommand::SetMessagePolicy(policies) => {
747                            debug!(?policies, "received command to set message policies");
748                            for (chain_id, policy) in policies {
749                                let Some(listening_client) = self.listening.get_mut(&chain_id) else {
750                                    error!(
751                                        %chain_id,
752                                        "attempted to set the message policy of a non-existent \
753                                        listener"
754                                    );
755                                    continue;
756                                };
757                                listening_client.client.options_mut().message_policy = policy;
758                                listening_client.respawn_inbox_task(
759                                    &self.cancellation_token,
760                                    &self.context,
761                                    &self.config,
762                                );
763                            }
764                        }
765                    }
766                }
767                (maybe_notification, index, _) = future::select_all(notification_futures).fuse() => {
768                    let Some(notification) = maybe_notification else {
769                        let chain_id = *self.listening.keys().nth(index).unwrap();
770                        warn!("Notification stream for {chain_id} closed");
771                        let Some(listening_client) = self.listening.remove(&chain_id) else {
772                            error!(%chain_id, "attempted to drop a non-existent listener");
773                            continue;
774                        };
775                        self.remove_event_subscriber(chain_id);
776                        listening_client.stop().await;
777                        continue;
778                    };
779                    return Ok(Action::Notification(notification));
780                }
781            }
782        }
783    }
784
785    /// Updates the validators about the chain.
786    async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
787        let chain_id = notification.chain_id;
788        let listening_client = self.listening.get(&chain_id).expect("missing client");
789        let latest_block = if let Reason::NewBlock { hash, .. } = &notification.reason {
790            listening_client.client.read_certificate(*hash).await.ok()
791        } else {
792            None
793        };
794        if let Err(error) = listening_client
795            .client
796            .update_validators(None, latest_block)
797            .await
798        {
799            warn!(
800                "Failed to update validators about the local chain after \
801                 receiving {notification:?} with error: {error:?}"
802            );
803        }
804        Ok(())
805    }
806
807    /// Updates the wallet based on the client for this chain.
808    async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
809        let client = &self
810            .listening
811            .get(&chain_id)
812            .expect("missing client")
813            .client;
814        self.context.lock().await.update_wallet(client).await?;
815        Ok(())
816    }
817
818    /// Updates the wallet with the set of chains we're supposed to start listening to,
819    /// and returns the appropriate listening modes based on whether we have the private
820    /// keys corresponding to the given chains' owners.
821    async fn update_wallet_for_listening(
822        &self,
823        new_chains: BTreeMap<ChainId, Option<AccountOwner>>,
824    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
825        let mut chains = BTreeMap::new();
826        let context_guard = self.context.lock().await;
827        for (chain_id, owner) in new_chains {
828            if let Some(owner) = owner {
829                if context_guard
830                    .client()
831                    .signer()
832                    .contains_key(&owner)
833                    .await
834                    .map_err(chain_client::Error::signer_failure)?
835                {
836                    // Try to modify existing chain entry, setting the owner.
837                    let modified = context_guard
838                        .wallet()
839                        .modify(chain_id, |chain| chain.owner = Some(owner))
840                        .await
841                        .map_err(error::Inner::wallet)?;
842                    // If the chain didn't exist, insert a new entry.
843                    if modified.is_none() {
844                        let chain_description = context_guard
845                            .client()
846                            .get_chain_description(chain_id)
847                            .await?;
848                        let timestamp = chain_description.timestamp();
849                        let epoch = chain_description.config().epoch;
850                        context_guard
851                            .wallet()
852                            .insert(
853                                chain_id,
854                                linera_core::wallet::Chain {
855                                    owner: Some(owner),
856                                    timestamp,
857                                    epoch: Some(epoch),
858                                    ..Default::default()
859                                },
860                            )
861                            .await
862                            .map_err(error::Inner::wallet)?;
863                    }
864
865                    chains.insert(chain_id, ListeningMode::FullChain);
866                }
867            } else {
868                chains.insert(chain_id, ListeningMode::FollowChain);
869            }
870        }
871        Ok(chains)
872    }
873
874    /// Signals the per-chain inbox processing task to wake up and process the inbox.
875    fn maybe_notify_inbox_processing(&self, chain_id: ChainId) {
876        if let Some(listening_client) = self.listening.get(&chain_id) {
877            listening_client.inbox_notify.notify_one();
878        }
879    }
880
881    /// Sleeps for the given number of milliseconds, if greater than 0.
882    async fn sleep(delay_ms: u64) {
883        if delay_ms > 0 {
884            linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
885        }
886    }
887}
888
889/// Per-chain inbox processing loop. Runs as a long-lived tokio task. Wakes on
890/// `inbox_notify` signals and processes the inbox, handling round-leader timeouts
891/// internally. Multiple notifications while busy collapse into a single permit.
892async fn inbox_processing_loop<C: ClientContext>(
893    client: ContextChainClient<C>,
894    context: Arc<Mutex<C>>,
895    config: Arc<ChainListenerConfig>,
896    inbox_notify: Arc<Notify>,
897    cancellation_token: CancellationToken,
898) {
899    let chain_id = client.chain_id();
900    loop {
901        futures::select! {
902            () = cancellation_token.cancelled().fuse() => break,
903            () = inbox_notify.notified().fuse() => {
904                if config.skip_process_inbox {
905                    debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
906                    continue;
907                }
908                if !client.is_tracked() {
909                    debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
910                    continue;
911                }
912                if client.preferred_owner().is_none() {
913                    debug!("Not processing inbox for follow-only chain {chain_id:.8}");
914                    continue;
915                }
916                debug!("Processing inbox for {chain_id:.8}");
917
918                // Inner loop handles round-leader timeouts: if we can't produce a block
919                // because we're not the leader, sleep until the timeout then retry.
920                // A new notification or cancellation can interrupt the sleep.
921                loop {
922                    match client.process_inbox_without_prepare().await {
923                        Err(chain_client::Error::CannotFindKeyForChain(chain_id)) => {
924                            debug!(%chain_id, "Cannot find key for chain");
925                            break;
926                        }
927                        Err(error) => {
928                            warn!(%error, "Failed to process inbox");
929                            break;
930                        }
931                        Ok((certs, None)) => {
932                            if certs.is_empty() {
933                                debug!(%chain_id, "done processing inbox: no blocks created");
934                            } else {
935                                info!(
936                                    %chain_id,
937                                    created_block_count = %certs.len(),
938                                    "done processing inbox",
939                                );
940                            }
941                            break;
942                        }
943                        Ok((certs, Some(new_timeout))) => {
944                            info!(
945                                %chain_id,
946                                created_block_count = %certs.len(),
947                                timeout = %new_timeout,
948                                "waiting for round timeout before continuing to process the inbox",
949                            );
950                            let delta = new_timeout.timestamp.delta_since(Timestamp::now());
951                            if delta > TimeDelta::ZERO {
952                                futures::select! {
953                                    () = cancellation_token.cancelled().fuse() => return,
954                                    () = linera_base::time::timer::sleep(delta.as_duration()).fuse() => {},
955                                    () = inbox_notify.notified().fuse() => {},
956                                }
957                            }
958                        }
959                    }
960                }
961
962                if let Err(error) = context.lock().await.update_wallet(&client).await {
963                    warn!(%error, "Failed to update wallet after inbox processing");
964                }
965            }
966        }
967    }
968}
969
970enum Action {
971    Notification(Notification),
972    Stop,
973}