linera_client/
chain_listener.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{btree_map::Entry, BTreeMap, BTreeSet},
6    sync::Arc,
7    time::Duration,
8};
9
10use futures::{future, lock::Mutex, Future, FutureExt as _, StreamExt};
11use linera_base::{
12    crypto::{CryptoHash, Signer},
13    data_types::{ChainDescription, Epoch, MessagePolicy, TimeDelta, Timestamp},
14    identifiers::{AccountOwner, BlobType, ChainId},
15    util::future::FutureSyncExt as _,
16    Task,
17};
18use linera_core::{
19    client::{
20        chain_client::{self, ChainClient},
21        AbortOnDrop, ListeningMode,
22    },
23    node::NotificationStream,
24    worker::{Notification, Reason},
25    Environment, Wallet,
26};
27use linera_storage::Storage as _;
28use tokio::sync::{mpsc::UnboundedReceiver, Notify};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, info, instrument, warn, Instrument as _};
31
32use crate::error::{self, Error};
33
34#[derive(Default, Debug, Clone, clap::Args, serde::Serialize, serde::Deserialize, tsify::Tsify)]
35#[serde(rename_all = "camelCase")]
36pub struct ChainListenerConfig {
37    /// Do not create blocks automatically to receive incoming messages. Instead, wait for
38    /// an explicit mutation `processInbox`.
39    #[serde(default)]
40    #[arg(
41        long = "listener-skip-process-inbox",
42        env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
43    )]
44    pub skip_process_inbox: bool,
45
46    /// Wait before processing any notification (useful for testing).
47    #[serde(default)]
48    #[arg(
49        long = "listener-delay-before-ms",
50        default_value = "0",
51        env = "LINERA_LISTENER_DELAY_BEFORE"
52    )]
53    pub delay_before_ms: u64,
54
55    /// Wait after processing any notification (useful for rate limiting).
56    #[serde(default)]
57    #[arg(
58        long = "listener-delay-after-ms",
59        default_value = "0",
60        env = "LINERA_LISTENER_DELAY_AFTER"
61    )]
62    pub delay_after_ms: u64,
63}
64
65type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
66
67#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
68#[allow(async_fn_in_trait)]
69pub trait ClientContext {
70    type Environment: linera_core::Environment;
71
72    fn wallet(&self) -> &<Self::Environment as linera_core::Environment>::Wallet;
73
74    fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
75
76    fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
77
78    fn admin_chain_id(&self) -> ChainId {
79        self.client().admin_chain_id()
80    }
81
82    /// Gets the timing sender for benchmarking, if available.
83    #[cfg(not(web))]
84    fn timing_sender(
85        &self,
86    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
87
88    #[cfg(web)]
89    fn timing_sender(
90        &self,
91    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
92        None
93    }
94
95    fn make_chain_client(
96        &self,
97        chain_id: ChainId,
98    ) -> impl Future<Output = Result<ChainClient<Self::Environment>, Error>> {
99        async move {
100            let chain = self
101                .wallet()
102                .get(chain_id)
103                .make_sync()
104                .await
105                .map_err(error::Inner::wallet)?
106                .unwrap_or_default();
107            let follow_only = chain.is_follow_only();
108            Ok(self.client().create_chain_client(
109                chain_id,
110                chain.block_hash,
111                chain.next_block_height,
112                &chain.pending_fast_proposal,
113                chain.owner,
114                self.timing_sender(),
115                follow_only,
116            ))
117        }
118    }
119
120    async fn update_wallet_for_new_chain(
121        &mut self,
122        chain_id: ChainId,
123        owner: Option<AccountOwner>,
124        timestamp: Timestamp,
125        epoch: Epoch,
126    ) -> Result<(), Error>;
127
128    async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
129}
130
131#[allow(async_fn_in_trait)]
132pub trait ClientContextExt: ClientContext {
133    async fn clients(&self) -> Result<Vec<ContextChainClient<Self>>, Error> {
134        use futures::stream::TryStreamExt as _;
135        self.wallet()
136            .chain_ids()
137            .map_err(|e| error::Inner::wallet(e).into())
138            .and_then(|chain_id| self.make_chain_client(chain_id))
139            .try_collect()
140            .await
141    }
142}
143
144impl<T: ClientContext> ClientContextExt for T {}
145
146/// A chain client together with the stream of notifications from the local node.
147///
148/// A background task listens to the validators and updates the local node, so any updates to
149/// this chain will trigger a notification. The background task is terminated when this gets
150/// dropped.
151struct ListeningClient<C: ClientContext> {
152    /// The chain client.
153    client: ContextChainClient<C>,
154    /// The abort handle for the task that listens to the validators.
155    abort_handle: AbortOnDrop,
156    /// The listening task.
157    listener: Task<()>,
158    /// The stream of notifications from the local node.
159    notification_stream: Arc<Mutex<NotificationStream>>,
160    /// The background sync process.
161    background_sync: Task<()>,
162    /// Signal to wake the per-chain inbox processing task.
163    inbox_notify: Arc<Notify>,
164    /// The long-lived per-chain inbox processing task.
165    inbox_task: Task<()>,
166    /// Cancellation token for the per-chain inbox task (child of the global token).
167    inbox_cancellation: CancellationToken,
168}
169
170impl<C: ClientContext + 'static> ListeningClient<C> {
171    #[expect(clippy::too_many_arguments)]
172    fn new(
173        client: ContextChainClient<C>,
174        abort_handle: AbortOnDrop,
175        listener: Task<()>,
176        notification_stream: NotificationStream,
177        background_sync: Task<()>,
178        context: &Arc<Mutex<C>>,
179        config: &Arc<ChainListenerConfig>,
180        parent_cancellation: &CancellationToken,
181    ) -> Self {
182        let inbox_notify = Arc::new(Notify::new());
183        let inbox_cancellation = parent_cancellation.child_token();
184        let inbox_task =
185            Self::spawn_inbox_task(&client, context, config, &inbox_notify, &inbox_cancellation);
186        Self {
187            client,
188            abort_handle,
189            listener,
190            #[allow(clippy::arc_with_non_send_sync)] // Only `Send` with `futures-util/alloc`.
191            notification_stream: Arc::new(Mutex::new(notification_stream)),
192            background_sync,
193            inbox_notify,
194            inbox_task,
195            inbox_cancellation,
196        }
197    }
198
199    /// Respawns the per-chain inbox task with a fresh clone of the client.
200    /// The `inbox_notify` `Arc` is reused so no pending permits are lost.
201    fn respawn_inbox_task(
202        &mut self,
203        parent_cancellation: &CancellationToken,
204        context: &Arc<Mutex<C>>,
205        config: &Arc<ChainListenerConfig>,
206    ) {
207        self.inbox_cancellation.cancel();
208        self.inbox_cancellation = parent_cancellation.child_token();
209        self.inbox_task = Self::spawn_inbox_task(
210            &self.client,
211            context,
212            config,
213            &self.inbox_notify,
214            &self.inbox_cancellation,
215        );
216    }
217
218    fn spawn_inbox_task(
219        client: &ContextChainClient<C>,
220        context: &Arc<Mutex<C>>,
221        config: &Arc<ChainListenerConfig>,
222        inbox_notify: &Arc<Notify>,
223        inbox_cancellation: &CancellationToken,
224    ) -> Task<()> {
225        Task::spawn(inbox_processing_loop(
226            client.clone(),
227            Arc::clone(context),
228            Arc::clone(config),
229            Arc::clone(inbox_notify),
230            inbox_cancellation.clone(),
231        ))
232    }
233
234    async fn stop(self) {
235        // TODO(#4965): this is unnecessary: the join handle now also acts as an abort handle
236        drop(self.abort_handle);
237        self.inbox_cancellation.cancel();
238        futures::future::join3(
239            self.listener.cancel(),
240            self.background_sync.cancel(),
241            self.inbox_task.cancel(),
242        )
243        .await;
244    }
245}
246
247/// Commands to the chain listener.
248pub enum ListenerCommand {
249    /// Command: start listening to the given chains. If the chain must produce blocks,
250    /// an owner is required.
251    Listen(BTreeMap<ChainId, Option<AccountOwner>>),
252    /// Command: stop listening to the given chains.
253    StopListening(BTreeSet<ChainId>),
254    /// Command: set the message policies of some chain clients.
255    SetMessagePolicy(BTreeMap<ChainId, MessagePolicy>),
256}
257
258/// A `ChainListener` is a process that listens to notifications from validators and reacts
259/// appropriately.
260pub struct ChainListener<C: ClientContext> {
261    context: Arc<Mutex<C>>,
262    storage: <C::Environment as Environment>::Storage,
263    config: Arc<ChainListenerConfig>,
264    listening: BTreeMap<ChainId, ListeningClient<C>>,
265    cancellation_token: CancellationToken,
266    /// Map from publishing chain to subscriber chains.
267    /// Events emitted on the _publishing chain_ are of interest to the _subscriber chains_.
268    event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
269    /// The channel through which the listener can receive commands.
270    command_receiver: UnboundedReceiver<ListenerCommand>,
271    /// Whether to fully sync chains in the background.
272    enable_background_sync: bool,
273}
274
275impl<C: ClientContext + 'static> ChainListener<C> {
276    /// Creates a new chain listener given client chains.
277    pub fn new(
278        config: ChainListenerConfig,
279        context: Arc<Mutex<C>>,
280        storage: <C::Environment as Environment>::Storage,
281        cancellation_token: CancellationToken,
282        command_receiver: UnboundedReceiver<ListenerCommand>,
283        enable_background_sync: bool,
284    ) -> Self {
285        Self {
286            storage,
287            context,
288            config: Arc::new(config),
289            listening: Default::default(),
290            cancellation_token,
291            event_subscribers: Default::default(),
292            command_receiver,
293            enable_background_sync,
294        }
295    }
296
297    /// Runs the chain listener.
298    #[instrument(skip(self))]
299    pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
300        let chain_ids = {
301            let guard = self.context.lock().await;
302            let admin_chain_id = guard.admin_chain_id();
303            guard
304                .make_chain_client(admin_chain_id)
305                .await?
306                .synchronize_chain_state(admin_chain_id)
307                .await?;
308            let mut chain_ids: BTreeMap<_, _> = guard
309                .wallet()
310                .items()
311                .collect::<Vec<_>>()
312                .await
313                .into_iter()
314                .map(|result| {
315                    let (chain_id, chain) = result?;
316                    let mode = if chain.is_follow_only() {
317                        ListeningMode::FollowChain
318                    } else {
319                        ListeningMode::FullChain
320                    };
321                    Ok((chain_id, mode))
322                })
323                .collect::<Result<BTreeMap<_, _>, _>>()
324                .map_err(
325                    |e: <<C::Environment as Environment>::Wallet as Wallet>::Error| {
326                        crate::error::Inner::Wallet(Box::new(e) as _)
327                    },
328                )?;
329            // If the admin chain is not in the wallet, add it as follow-only since we
330            // typically don't own it.
331            chain_ids
332                .entry(admin_chain_id)
333                .or_insert(ListeningMode::FollowChain);
334            chain_ids
335        };
336
337        Ok(async move {
338            self.listen_recursively(chain_ids).await?;
339            loop {
340                match self.next_action().await? {
341                    Action::Stop => break,
342                    Action::Notification(notification) => {
343                        self.process_notification(notification).await?
344                    }
345                }
346            }
347            future::join_all(self.listening.into_values().map(|client| client.stop())).await;
348            Ok(())
349        })
350    }
351
352    /// Processes a notification, updating local chains and validators as needed.
353    async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
354        Self::sleep(self.config.delay_before_ms).await;
355        let Some(listening_client) = self.listening.get(&notification.chain_id) else {
356            warn!(
357                ?notification,
358                "ChainListener::process_notification: got a notification without listening to the chain"
359            );
360            return Ok(());
361        };
362        let Some(listening_mode) = listening_client.client.listening_mode() else {
363            warn!(
364                ?notification,
365                "ChainListener::process_notification: chain has no listening mode"
366            );
367            return Ok(());
368        };
369
370        if !listening_mode.is_relevant(&notification.reason) {
371            debug!(
372                reason = ?notification.reason,
373                "ChainListener: ignoring notification due to listening mode"
374            );
375            return Ok(());
376        }
377        match &notification.reason {
378            Reason::NewIncomingBundle { .. } => {
379                self.maybe_notify_inbox_processing(notification.chain_id);
380            }
381            Reason::NewRound { .. } => {
382                self.update_validators(&notification).await?;
383            }
384            Reason::NewBlock { hash, .. } => {
385                self.update_wallet(notification.chain_id).await?;
386                if listening_mode.is_full() {
387                    self.add_new_chains(*hash).await?;
388                    let publishers = self
389                        .update_event_subscriptions(notification.chain_id)
390                        .await?;
391                    if !publishers.is_empty() {
392                        self.listen_recursively(publishers).await?;
393                        self.maybe_notify_inbox_processing(notification.chain_id);
394                    }
395                }
396                self.process_new_events(notification.chain_id);
397            }
398            Reason::NewEvents { .. } => {
399                self.process_new_events(notification.chain_id);
400            }
401            Reason::BlockExecuted { .. } => {}
402        }
403        Self::sleep(self.config.delay_after_ms).await;
404        Ok(())
405    }
406
407    /// If any new chains were created by the given block, and we have a key pair for them,
408    /// add them to the wallet and start listening for notifications. (This is not done for
409    /// fallback owners, as those would have to monitor all chains anyway.)
410    async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
411        let block = Arc::unwrap_or_clone(
412            self.storage
413                .read_confirmed_block(hash)
414                .await?
415                .ok_or(chain_client::Error::MissingConfirmedBlock(hash))?,
416        )
417        .into_block();
418        let parent_chain_id = block.header.chain_id;
419        let blobs = block.created_blobs().into_iter();
420        let new_chains = blobs
421            .filter_map(|(blob_id, blob)| {
422                if blob_id.blob_type == BlobType::ChainDescription {
423                    let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
424                        .expect("ChainDescription should deserialize correctly");
425                    Some((ChainId(blob_id.hash), chain_desc))
426                } else {
427                    None
428                }
429            })
430            .collect::<Vec<_>>();
431        if new_chains.is_empty() {
432            return Ok(());
433        }
434        let mut new_ids = BTreeMap::new();
435        let mut context_guard = self.context.lock().await;
436        for (new_chain_id, chain_desc) in new_chains {
437            for chain_owner in chain_desc.config().ownership.all_owners() {
438                if context_guard.client().has_key_for(chain_owner).await? {
439                    context_guard
440                        .update_wallet_for_new_chain(
441                            new_chain_id,
442                            Some(*chain_owner),
443                            block.header.timestamp,
444                            block.header.epoch,
445                        )
446                        .await?;
447                    context_guard
448                        .client()
449                        .extend_chain_mode(new_chain_id, ListeningMode::FullChain);
450                    new_ids.insert(new_chain_id, ListeningMode::FullChain);
451                }
452            }
453        }
454        // Re-process the parent chain's outboxes now that the new chains are tracked.
455        // This ensures cross-chain messages to newly created chains are delivered.
456        if !new_ids.is_empty() {
457            context_guard
458                .client()
459                .retry_pending_cross_chain_requests(parent_chain_id)
460                .await?;
461        }
462        drop(context_guard);
463        self.listen_recursively(new_ids).await?;
464        Ok(())
465    }
466
467    /// Notifies all chains subscribed to `chain_id` to process their inboxes.
468    fn process_new_events(&self, chain_id: ChainId) {
469        let Some(subscribers) = self.event_subscribers.get(&chain_id) else {
470            return;
471        };
472        for subscriber_id in subscribers {
473            self.maybe_notify_inbox_processing(*subscriber_id);
474        }
475    }
476
477    /// Starts listening for notifications about the given chains, and any chains that publish
478    /// event streams those chains are subscribed to.
479    async fn listen_recursively(
480        &mut self,
481        mut chain_ids: BTreeMap<ChainId, ListeningMode>,
482    ) -> Result<(), Error> {
483        while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
484            for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
485                match chain_ids.entry(new_chain_id) {
486                    Entry::Vacant(vacant) => {
487                        vacant.insert(new_listening_mode);
488                    }
489                    Entry::Occupied(mut occupied) => {
490                        occupied.get_mut().extend(Some(new_listening_mode));
491                    }
492                }
493            }
494        }
495
496        Ok(())
497    }
498
499    /// Background task that syncs received certificates in small batches.
500    /// This discovers unacknowledged sender blocks gradually without overwhelming the system.
501    #[instrument(skip(context))]
502    async fn background_sync_received_certificates(
503        context: Arc<Mutex<C>>,
504        chain_id: ChainId,
505    ) -> Result<(), Error> {
506        info!("Starting background certificate sync");
507        let client = context.lock().await.make_chain_client(chain_id).await?;
508
509        Ok(client.find_received_certificates().await?)
510    }
511
512    /// Starts listening for notifications about the given chain.
513    ///
514    /// Returns all publishing chains, that we also need to listen to.
515    async fn listen(
516        &mut self,
517        chain_id: ChainId,
518        listening_mode: ListeningMode,
519    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
520        let context_guard = self.context.lock().await;
521        let existing_mode = context_guard.client().chain_mode(chain_id);
522        // If we already have a listener with a sufficient mode, nothing to do.
523        if self.listening.contains_key(&chain_id)
524            && existing_mode.as_ref().is_some_and(|m| *m >= listening_mode)
525        {
526            return Ok(BTreeMap::new());
527        }
528        // Extend the mode in the central map.
529        context_guard
530            .client()
531            .extend_chain_mode(chain_id, listening_mode);
532        drop(context_guard);
533
534        // Start background tasks to sync received certificates, if enabled.
535        let background_sync_task = self.start_background_sync(chain_id).await;
536        let client = self
537            .context
538            .lock()
539            .await
540            .make_chain_client(chain_id)
541            .await?;
542        let (listener, abort_handle, notification_stream) = client.listen().await?;
543        let listening_client = ListeningClient::new(
544            client,
545            abort_handle,
546            Task::spawn(listener.in_current_span()),
547            notification_stream,
548            background_sync_task,
549            &self.context,
550            &self.config,
551            &self.cancellation_token,
552        );
553        self.listening.insert(chain_id, listening_client);
554        let publishing_chains = self.update_event_subscriptions(chain_id).await?;
555        self.maybe_notify_inbox_processing(chain_id);
556        Ok(publishing_chains)
557    }
558
559    async fn start_background_sync(&mut self, chain_id: ChainId) -> Task<()> {
560        if !self.enable_background_sync
561            || !self
562                .context
563                .lock()
564                .await
565                .client()
566                .chain_mode(chain_id)
567                .is_some_and(|m| m.is_full())
568        {
569            return Task::ready(());
570        }
571
572        let context = Arc::clone(&self.context);
573        Task::spawn(async move {
574            if let Err(e) = Self::background_sync_received_certificates(context, chain_id).await {
575                warn!("Background sync failed for chain {chain_id}: {e}");
576            }
577        })
578    }
579
580    fn remove_event_subscriber(&mut self, chain_id: ChainId) {
581        self.event_subscribers.retain(|_, subscribers| {
582            subscribers.remove(&chain_id);
583            !subscribers.is_empty()
584        });
585    }
586
587    /// Updates the event subscribers map, and returns all publishing chains we need to listen to.
588    async fn update_event_subscriptions(
589        &mut self,
590        chain_id: ChainId,
591    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
592        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
593        if !listening_client.client.is_tracked() {
594            return Ok(BTreeMap::new());
595        }
596        let app_filter = listening_client
597            .client
598            .options()
599            .message_policy
600            .process_events_from_application_ids
601            .clone();
602        let publishing_chains: BTreeMap<_, _> = listening_client
603            .client
604            .event_stream_publishers()
605            .await?
606            .into_iter()
607            .filter_map(|(chain_id, streams)| {
608                let streams = if let Some(app_set) = &app_filter {
609                    streams
610                        .into_iter()
611                        .filter(|s| app_set.contains(&s.application_id))
612                        .collect::<BTreeSet<_>>()
613                } else {
614                    streams
615                };
616                if streams.is_empty() {
617                    None
618                } else {
619                    Some((chain_id, ListeningMode::EventsOnly(streams)))
620                }
621            })
622            .collect();
623        for publisher_id in publishing_chains.keys() {
624            self.event_subscribers
625                .entry(*publisher_id)
626                .or_default()
627                .insert(chain_id);
628        }
629        Ok(publishing_chains)
630    }
631
632    /// Returns the next notification to process, or a stop signal.
633    async fn next_action(&mut self) -> Result<Action, Error> {
634        loop {
635            let notification_futures = self
636                .listening
637                .values_mut()
638                .map(|client| {
639                    let stream = client.notification_stream.clone();
640                    Box::pin(async move { stream.lock().await.next().await })
641                })
642                .collect::<Vec<_>>();
643            futures::select! {
644                () = self.cancellation_token.cancelled().fuse() => {
645                    return Ok(Action::Stop);
646                }
647                command = self.command_receiver.recv().then(async |maybe_command| {
648                    if let Some(command) = maybe_command {
649                        command
650                    } else {
651                        std::future::pending().await
652                    }
653                }).fuse() => {
654                    match command {
655                        ListenerCommand::Listen(new_chains) => {
656                            debug!(?new_chains, "received command to listen to new chains");
657                            let listening_modes = self.update_wallet_for_listening(new_chains).await?;
658                            self.listen_recursively(listening_modes).await?;
659                        }
660                        ListenerCommand::StopListening(chains) => {
661                            debug!(?chains, "received command to stop listening to chains");
662                            for chain_id in chains {
663                                debug!(%chain_id, "stopping the listener for chain");
664                                let Some(listening_client) = self.listening.remove(&chain_id) else {
665                                    error!(%chain_id, "attempted to drop a non-existent listener");
666                                    continue;
667                                };
668                                self.remove_event_subscriber(chain_id);
669                                listening_client.stop().await;
670                                if let Err(error) = self.context.lock().await.wallet().remove(chain_id).await {
671                                    error!(%error, %chain_id, "error removing a chain from the wallet");
672                                }
673                            }
674                        }
675                        ListenerCommand::SetMessagePolicy(policies) => {
676                            debug!(?policies, "received command to set message policies");
677                            for (chain_id, policy) in policies {
678                                let Some(listening_client) = self.listening.get_mut(&chain_id) else {
679                                    error!(
680                                        %chain_id,
681                                        "attempted to set the message policy of a non-existent \
682                                        listener"
683                                    );
684                                    continue;
685                                };
686                                listening_client.client.options_mut().message_policy = policy;
687                                listening_client.respawn_inbox_task(
688                                    &self.cancellation_token,
689                                    &self.context,
690                                    &self.config,
691                                );
692                            }
693                        }
694                    }
695                }
696                (maybe_notification, index, _) = future::select_all(notification_futures).fuse() => {
697                    let Some(notification) = maybe_notification else {
698                        let chain_id = *self.listening.keys().nth(index).unwrap();
699                        warn!("Notification stream for {chain_id} closed");
700                        let Some(listening_client) = self.listening.remove(&chain_id) else {
701                            error!(%chain_id, "attempted to drop a non-existent listener");
702                            continue;
703                        };
704                        self.remove_event_subscriber(chain_id);
705                        listening_client.stop().await;
706                        continue;
707                    };
708                    return Ok(Action::Notification(notification));
709                }
710            }
711        }
712    }
713
714    /// Updates the validators about the chain.
715    async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
716        let chain_id = notification.chain_id;
717        let listening_client = self.listening.get(&chain_id).expect("missing client");
718        let latest_block = if let Reason::NewBlock { hash, .. } = &notification.reason {
719            listening_client.client.read_certificate(*hash).await.ok()
720        } else {
721            None
722        };
723        if let Err(error) = listening_client
724            .client
725            .update_validators(None, latest_block)
726            .await
727        {
728            warn!(
729                "Failed to update validators about the local chain after \
730                 receiving {notification:?} with error: {error:?}"
731            );
732        }
733        Ok(())
734    }
735
736    /// Updates the wallet based on the client for this chain.
737    async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
738        let client = &self
739            .listening
740            .get(&chain_id)
741            .expect("missing client")
742            .client;
743        self.context.lock().await.update_wallet(client).await?;
744        Ok(())
745    }
746
747    /// Updates the wallet with the set of chains we're supposed to start listening to,
748    /// and returns the appropriate listening modes based on whether we have the private
749    /// keys corresponding to the given chains' owners.
750    async fn update_wallet_for_listening(
751        &self,
752        new_chains: BTreeMap<ChainId, Option<AccountOwner>>,
753    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
754        let mut chains = BTreeMap::new();
755        let context_guard = self.context.lock().await;
756        for (chain_id, owner) in new_chains {
757            if let Some(owner) = owner {
758                if context_guard
759                    .client()
760                    .signer()
761                    .contains_key(&owner)
762                    .await
763                    .map_err(chain_client::Error::signer_failure)?
764                {
765                    // Try to modify existing chain entry, setting the owner.
766                    let modified = context_guard
767                        .wallet()
768                        .modify(chain_id, |chain| chain.owner = Some(owner))
769                        .await
770                        .map_err(error::Inner::wallet)?;
771                    // If the chain didn't exist, insert a new entry.
772                    if modified.is_none() {
773                        let chain_description = context_guard
774                            .client()
775                            .get_chain_description(chain_id)
776                            .await?;
777                        let timestamp = chain_description.timestamp();
778                        let epoch = chain_description.config().epoch;
779                        context_guard
780                            .wallet()
781                            .insert(
782                                chain_id,
783                                linera_core::wallet::Chain {
784                                    owner: Some(owner),
785                                    timestamp,
786                                    epoch: Some(epoch),
787                                    ..Default::default()
788                                },
789                            )
790                            .await
791                            .map_err(error::Inner::wallet)?;
792                    }
793
794                    chains.insert(chain_id, ListeningMode::FullChain);
795                }
796            } else {
797                chains.insert(chain_id, ListeningMode::FollowChain);
798            }
799        }
800        Ok(chains)
801    }
802
803    /// Signals the per-chain inbox processing task to wake up and process the inbox.
804    fn maybe_notify_inbox_processing(&self, chain_id: ChainId) {
805        if let Some(listening_client) = self.listening.get(&chain_id) {
806            listening_client.inbox_notify.notify_one();
807        }
808    }
809
810    /// Sleeps for the given number of milliseconds, if greater than 0.
811    async fn sleep(delay_ms: u64) {
812        if delay_ms > 0 {
813            linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
814        }
815    }
816}
817
818/// Per-chain inbox processing loop. Runs as a long-lived tokio task. Wakes on
819/// `inbox_notify` signals and processes the inbox, handling round-leader timeouts
820/// internally. Multiple notifications while busy collapse into a single permit.
821async fn inbox_processing_loop<C: ClientContext>(
822    client: ContextChainClient<C>,
823    context: Arc<Mutex<C>>,
824    config: Arc<ChainListenerConfig>,
825    inbox_notify: Arc<Notify>,
826    cancellation_token: CancellationToken,
827) {
828    let chain_id = client.chain_id();
829    loop {
830        futures::select! {
831            () = cancellation_token.cancelled().fuse() => break,
832            () = inbox_notify.notified().fuse() => {
833                if config.skip_process_inbox {
834                    debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
835                    continue;
836                }
837                if !client.is_tracked() {
838                    debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
839                    continue;
840                }
841                if client.preferred_owner().is_none() {
842                    debug!("Not processing inbox for follow-only chain {chain_id:.8}");
843                    continue;
844                }
845                debug!("Processing inbox for {chain_id:.8}");
846
847                // Inner loop handles round-leader timeouts: if we can't produce a block
848                // because we're not the leader, sleep until the timeout then retry.
849                // A new notification or cancellation can interrupt the sleep.
850                loop {
851                    match client.process_inbox_without_prepare().await {
852                        Err(chain_client::Error::CannotFindKeyForChain(chain_id)) => {
853                            debug!(%chain_id, "Cannot find key for chain");
854                            break;
855                        }
856                        Err(error) => {
857                            warn!(%error, "Failed to process inbox");
858                            break;
859                        }
860                        Ok((certs, None)) => {
861                            if certs.is_empty() {
862                                debug!(%chain_id, "done processing inbox: no blocks created");
863                            } else {
864                                info!(
865                                    %chain_id,
866                                    created_block_count = %certs.len(),
867                                    "done processing inbox",
868                                );
869                            }
870                            break;
871                        }
872                        Ok((certs, Some(new_timeout))) => {
873                            info!(
874                                %chain_id,
875                                created_block_count = %certs.len(),
876                                timeout = %new_timeout,
877                                "waiting for round timeout before continuing to process the inbox",
878                            );
879                            let delta = new_timeout.timestamp.delta_since(Timestamp::now());
880                            if delta > TimeDelta::ZERO {
881                                futures::select! {
882                                    () = cancellation_token.cancelled().fuse() => return,
883                                    () = linera_base::time::timer::sleep(delta.as_duration()).fuse() => {},
884                                    () = inbox_notify.notified().fuse() => {},
885                                }
886                            }
887                        }
888                    }
889                }
890
891                if let Err(error) = context.lock().await.update_wallet(&client).await {
892                    warn!(%error, "Failed to update wallet after inbox processing");
893                }
894            }
895        }
896    }
897}
898
899enum Action {
900    Notification(Notification),
901    Stop,
902}