Skip to main content

linera_client/
chain_listener.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{btree_map::Entry, BTreeMap, BTreeSet},
6    sync::Arc,
7    time::Duration,
8};
9
10use futures::{future, lock::Mutex, Future, FutureExt as _, StreamExt};
11use linera_base::{
12    crypto::{CryptoHash, Signer},
13    data_types::{ChainDescription, Epoch, MessagePolicy, TimeDelta, Timestamp},
14    identifiers::{AccountOwner, BlobType, ChainId},
15    ownership::ChainOwnership,
16    util::future::FutureSyncExt as _,
17    Task,
18};
19use linera_core::{
20    client::{
21        chain_client::{self, ChainClient},
22        AbortOnDrop, ListeningMode,
23    },
24    node::NotificationStream,
25    worker::{Notification, Reason},
26    Environment, Wallet,
27};
28use linera_storage::{Arc as CacheArc, Storage as _};
29use tokio::sync::{mpsc::UnboundedReceiver, Notify};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info, instrument, warn, Instrument as _};
32
33use crate::error::{self, Error};
34
35/// The configuration for the chain listener.
36#[derive(Default, Debug, Clone, clap::Args, serde::Serialize, serde::Deserialize, tsify::Tsify)]
37#[serde(rename_all = "camelCase")]
38pub struct ChainListenerConfig {
39    /// Do not create blocks automatically to receive incoming messages. Instead, wait for
40    /// an explicit mutation `processInbox`.
41    #[serde(default)]
42    #[arg(
43        long = "listener-skip-process-inbox",
44        env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
45    )]
46    pub skip_process_inbox: bool,
47
48    /// Wait before processing any notification (useful for testing).
49    #[serde(default)]
50    #[arg(
51        long = "listener-delay-before-ms",
52        default_value = "0",
53        env = "LINERA_LISTENER_DELAY_BEFORE"
54    )]
55    pub delay_before_ms: u64,
56
57    /// Wait after processing any notification (useful for rate limiting).
58    #[serde(default)]
59    #[arg(
60        long = "listener-delay-after-ms",
61        default_value = "0",
62        env = "LINERA_LISTENER_DELAY_AFTER"
63    )]
64    pub delay_after_ms: u64,
65}
66
67type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
68
69/// The context in which a chain listener operates, providing access to the wallet, storage and client.
70#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
71#[allow(async_fn_in_trait)]
72pub trait ClientContext {
73    /// The execution environment used by the client.
74    type Environment: linera_core::Environment;
75
76    /// Returns a reference to the wallet.
77    fn wallet(&self) -> &<Self::Environment as linera_core::Environment>::Wallet;
78
79    /// Returns a reference to the storage.
80    fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
81
82    /// Returns a reference to the client.
83    fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
84
85    /// Returns the ID of the admin chain.
86    fn admin_chain_id(&self) -> ChainId {
87        self.client().admin_chain_id()
88    }
89
90    /// Gets the timing sender for benchmarking, if available.
91    #[cfg(not(web))]
92    fn timing_sender(
93        &self,
94    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
95
96    /// Gets the timing sender for benchmarking, if available.
97    #[cfg(web)]
98    fn timing_sender(
99        &self,
100    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
101        None
102    }
103
104    /// Creates a chain client for the chain with the given ID, using the wallet's state.
105    fn make_chain_client(
106        &self,
107        chain_id: ChainId,
108    ) -> impl Future<Output = Result<ChainClient<Self::Environment>, Error>> {
109        async move {
110            let chain = self
111                .wallet()
112                .get(chain_id)
113                .make_sync()
114                .await
115                .map_err(error::Inner::wallet)?
116                .unwrap_or_default();
117            let follow_only = chain.is_follow_only();
118            Ok(self.client().create_chain_client(
119                chain_id,
120                chain.block_hash,
121                chain.next_block_height,
122                &chain.pending_fast_proposal,
123                chain.owner,
124                self.timing_sender(),
125                follow_only,
126            ))
127        }
128    }
129
130    /// Adds a newly created chain to the wallet.
131    async fn update_wallet_for_new_chain(
132        &mut self,
133        chain_id: ChainId,
134        owner: Option<AccountOwner>,
135        timestamp: Timestamp,
136        epoch: Epoch,
137    ) -> Result<(), Error>;
138
139    /// Updates the wallet with the latest state of the given chain client.
140    async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
141}
142
143/// Extension methods for [`ClientContext`].
144#[allow(async_fn_in_trait)]
145pub trait ClientContextExt: ClientContext {
146    /// Returns a chain client for every chain in the wallet.
147    async fn clients(&self) -> Result<Vec<ContextChainClient<Self>>, Error> {
148        use futures::stream::TryStreamExt as _;
149        self.wallet()
150            .chain_ids()
151            .map_err(|e| error::Inner::wallet(e).into())
152            .and_then(|chain_id| self.make_chain_client(chain_id))
153            .try_collect()
154            .await
155    }
156
157    /// Returns the subset of `owners` for which we have a key pair in the wallet.
158    ///
159    /// Duplicate inputs are treated as one.
160    async fn owners_with_key(
161        &self,
162        owners: impl IntoIterator<Item = AccountOwner>,
163    ) -> Result<BTreeSet<AccountOwner>, Error> {
164        let mut result = BTreeSet::new();
165        for owner in owners.into_iter().collect::<BTreeSet<_>>() {
166            if self.client().has_key_for(&owner).await? {
167                result.insert(owner);
168            }
169        }
170        Ok(result)
171    }
172
173    /// Returns the unique owner from `owners` for which we have a key pair in the wallet.
174    ///
175    /// Returns `None` when zero or multiple distinct owners match.
176    async fn unique_owner_with_key(
177        &self,
178        owners: impl IntoIterator<Item = AccountOwner>,
179    ) -> Result<Option<AccountOwner>, Error> {
180        let with_key = self.owners_with_key(owners).await?;
181        Ok(if with_key.len() == 1 {
182            with_key.into_iter().next()
183        } else {
184            None
185        })
186    }
187
188    /// Sets the preferred owner of `chain_client`'s chain if the current one is no longer
189    /// in `ownership` and we have a key pair for exactly one of the new owners.
190    async fn maybe_auto_assign_preferred_owner(
191        &self,
192        chain_client: &mut ContextChainClient<Self>,
193        ownership: &ChainOwnership,
194    ) -> Result<(), Error> {
195        let chain_id = chain_client.chain_id();
196        let old_owner = chain_client.preferred_owner();
197        if old_owner.is_some_and(|o| ownership.all_owners().any(|n| *n == o)) {
198            return Ok(());
199        }
200        let Some(new_owner) = self
201            .unique_owner_with_key(ownership.all_owners().copied())
202            .await?
203        else {
204            return Ok(());
205        };
206        info!(
207            %chain_id, ?old_owner, %new_owner,
208            "Auto-assigning preferred owner from wallet key pair",
209        );
210        chain_client.set_preferred_owner(new_owner);
211        self.wallet()
212            .modify(chain_id, |chain| chain.owner = Some(new_owner))
213            .await
214            .map_err(error::Inner::wallet)?;
215        Ok(())
216    }
217}
218
219impl<T: ClientContext> ClientContextExt for T {}
220
221/// A chain client together with the stream of notifications from the local node.
222///
223/// A background task listens to the validators and updates the local node, so any updates to
224/// this chain will trigger a notification. The background task is terminated when this gets
225/// dropped.
226struct ListeningClient<C: ClientContext> {
227    /// The chain client.
228    client: ContextChainClient<C>,
229    /// The abort handle for the task that listens to the validators.
230    abort_handle: AbortOnDrop,
231    /// The listening task.
232    listener: Task<()>,
233    /// The stream of notifications from the local node.
234    notification_stream: Arc<Mutex<NotificationStream>>,
235    /// The background sync process.
236    background_sync: Task<()>,
237    /// Signal to wake the per-chain inbox processing task.
238    inbox_notify: Arc<Notify>,
239    /// The long-lived per-chain inbox processing task.
240    inbox_task: Task<()>,
241    /// Cancellation token for the per-chain inbox task (child of the global token).
242    inbox_cancellation: CancellationToken,
243}
244
245impl<C: ClientContext + 'static> ListeningClient<C> {
246    #[expect(clippy::too_many_arguments)]
247    fn new(
248        client: ContextChainClient<C>,
249        abort_handle: AbortOnDrop,
250        listener: Task<()>,
251        notification_stream: NotificationStream,
252        background_sync: Task<()>,
253        context: &Arc<Mutex<C>>,
254        config: &Arc<ChainListenerConfig>,
255        parent_cancellation: &CancellationToken,
256    ) -> Self {
257        let inbox_notify = Arc::new(Notify::new());
258        let inbox_cancellation = parent_cancellation.child_token();
259        let inbox_task =
260            Self::spawn_inbox_task(&client, context, config, &inbox_notify, &inbox_cancellation);
261        Self {
262            client,
263            abort_handle,
264            listener,
265            #[allow(clippy::arc_with_non_send_sync)] // Only `Send` with `futures-util/alloc`.
266            notification_stream: Arc::new(Mutex::new(notification_stream)),
267            background_sync,
268            inbox_notify,
269            inbox_task,
270            inbox_cancellation,
271        }
272    }
273
274    /// Respawns the per-chain inbox task with a fresh clone of the client.
275    /// The `inbox_notify` `Arc` is reused so no pending permits are lost.
276    fn respawn_inbox_task(
277        &mut self,
278        parent_cancellation: &CancellationToken,
279        context: &Arc<Mutex<C>>,
280        config: &Arc<ChainListenerConfig>,
281    ) {
282        self.inbox_cancellation.cancel();
283        self.inbox_cancellation = parent_cancellation.child_token();
284        self.inbox_task = Self::spawn_inbox_task(
285            &self.client,
286            context,
287            config,
288            &self.inbox_notify,
289            &self.inbox_cancellation,
290        );
291    }
292
293    fn spawn_inbox_task(
294        client: &ContextChainClient<C>,
295        context: &Arc<Mutex<C>>,
296        config: &Arc<ChainListenerConfig>,
297        inbox_notify: &Arc<Notify>,
298        inbox_cancellation: &CancellationToken,
299    ) -> Task<()> {
300        Task::spawn(inbox_processing_loop(
301            client.clone(),
302            Arc::clone(context),
303            Arc::clone(config),
304            Arc::clone(inbox_notify),
305            inbox_cancellation.clone(),
306        ))
307    }
308
309    async fn stop(self) {
310        // TODO(#4965): this is unnecessary: the join handle now also acts as an abort handle
311        drop(self.abort_handle);
312        self.inbox_cancellation.cancel();
313        futures::future::join3(
314            self.listener.cancel(),
315            self.background_sync.cancel(),
316            self.inbox_task.cancel(),
317        )
318        .await;
319    }
320}
321
322/// Commands to the chain listener.
323pub enum ListenerCommand {
324    /// Command: start listening to the given chains. If the chain must produce blocks,
325    /// an owner is required.
326    Listen(BTreeMap<ChainId, Option<AccountOwner>>),
327    /// Command: stop listening to the given chains.
328    StopListening(BTreeSet<ChainId>),
329    /// Command: set the message policies of some chain clients.
330    SetMessagePolicy(BTreeMap<ChainId, MessagePolicy>),
331}
332
333/// A `ChainListener` is a process that listens to notifications from validators and reacts
334/// appropriately.
335pub struct ChainListener<C: ClientContext> {
336    context: Arc<Mutex<C>>,
337    storage: <C::Environment as Environment>::Storage,
338    config: Arc<ChainListenerConfig>,
339    listening: BTreeMap<ChainId, ListeningClient<C>>,
340    cancellation_token: CancellationToken,
341    /// Map from publishing chain to subscriber chains.
342    /// Events emitted on the _publishing chain_ are of interest to the _subscriber chains_.
343    event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
344    /// The channel through which the listener can receive commands.
345    command_receiver: UnboundedReceiver<ListenerCommand>,
346    /// Whether to fully sync chains in the background.
347    enable_background_sync: bool,
348}
349
350impl<C: ClientContext + 'static> ChainListener<C> {
351    /// Creates a new chain listener given client chains.
352    pub fn new(
353        config: ChainListenerConfig,
354        context: Arc<Mutex<C>>,
355        storage: <C::Environment as Environment>::Storage,
356        cancellation_token: CancellationToken,
357        command_receiver: UnboundedReceiver<ListenerCommand>,
358        enable_background_sync: bool,
359    ) -> Self {
360        Self {
361            storage,
362            context,
363            config: Arc::new(config),
364            listening: Default::default(),
365            cancellation_token,
366            event_subscribers: Default::default(),
367            command_receiver,
368            enable_background_sync,
369        }
370    }
371
372    /// Runs the chain listener.
373    #[instrument(skip(self))]
374    pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
375        let chain_ids = {
376            let guard = self.context.lock().await;
377            let admin_chain_id = guard.admin_chain_id();
378            guard
379                .make_chain_client(admin_chain_id)
380                .await?
381                .synchronize_chain_state(admin_chain_id)
382                .await?;
383            let mut chain_ids: BTreeMap<_, _> = guard
384                .wallet()
385                .items()
386                .collect::<Vec<_>>()
387                .await
388                .into_iter()
389                .map(|result| {
390                    let (chain_id, chain) = result?;
391                    let mode = if chain.is_follow_only() {
392                        ListeningMode::FollowChain
393                    } else {
394                        ListeningMode::FullChain
395                    };
396                    Ok((chain_id, mode))
397                })
398                .collect::<Result<BTreeMap<_, _>, _>>()
399                .map_err(
400                    |e: <<C::Environment as Environment>::Wallet as Wallet>::Error| {
401                        crate::error::Inner::Wallet(Box::new(e) as _)
402                    },
403                )?;
404            // If the admin chain is not in the wallet, add it as follow-only since we
405            // typically don't own it.
406            chain_ids
407                .entry(admin_chain_id)
408                .or_insert(ListeningMode::FollowChain);
409            chain_ids
410        };
411
412        Ok(async move {
413            self.listen_recursively(chain_ids).await?;
414            loop {
415                match self.next_action().await? {
416                    Action::Stop => break,
417                    Action::Notification(notification) => {
418                        self.process_notification(notification).await?
419                    }
420                }
421            }
422            future::join_all(self.listening.into_values().map(|client| client.stop())).await;
423            Ok(())
424        })
425    }
426
427    /// Processes a notification, updating local chains and validators as needed.
428    async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
429        Self::sleep(self.config.delay_before_ms).await;
430        let Some(listening_client) = self.listening.get(&notification.chain_id) else {
431            warn!(
432                ?notification,
433                "ChainListener::process_notification: got a notification without listening to the chain"
434            );
435            return Ok(());
436        };
437        let Some(listening_mode) = listening_client.client.listening_mode() else {
438            warn!(
439                ?notification,
440                "ChainListener::process_notification: chain has no listening mode"
441            );
442            return Ok(());
443        };
444
445        if !listening_mode.is_relevant(&notification.reason) {
446            debug!(
447                reason = ?notification.reason,
448                "ChainListener: ignoring notification due to listening mode"
449            );
450            return Ok(());
451        }
452        match &notification.reason {
453            Reason::NewIncomingBundle { .. } => {
454                self.maybe_notify_inbox_processing(notification.chain_id);
455            }
456            Reason::NewRound { .. } => {
457                self.update_validators(&notification).await?;
458            }
459            Reason::NewBlock { hash, .. } => {
460                self.update_wallet(notification.chain_id).await?;
461                if listening_mode.is_full() {
462                    self.add_new_chains(*hash).await?;
463                    let publishers = self
464                        .update_event_subscriptions(notification.chain_id)
465                        .await?;
466                    if !publishers.is_empty() {
467                        self.listen_recursively(publishers).await?;
468                        self.maybe_notify_inbox_processing(notification.chain_id);
469                    }
470                }
471                self.process_new_events(notification.chain_id);
472            }
473            Reason::NewEvents { .. } => {
474                self.process_new_events(notification.chain_id);
475            }
476            Reason::BlockExecuted { .. } => {}
477        }
478        Self::sleep(self.config.delay_after_ms).await;
479        Ok(())
480    }
481
482    /// If any new chains were created by the given block, and we have a key pair for at
483    /// least one of their owners, add them to the wallet and start listening for
484    /// notifications. The preferred owner is assigned only when we hold a key pair for
485    /// exactly one of the chain's owners. (Fallback owners are ignored, as those would
486    /// have to monitor all chains anyway.)
487    async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
488        let block = CacheArc::unwrap_or_clone(
489            self.storage
490                .read_confirmed_block(hash)
491                .await?
492                .ok_or(chain_client::Error::MissingConfirmedBlock(hash))?,
493        )
494        .into_block();
495        let parent_chain_id = block.header.chain_id;
496        let blobs = block.created_blobs().into_iter();
497        let new_chains = blobs
498            .filter_map(|(blob_id, blob)| {
499                if blob_id.blob_type == BlobType::ChainDescription {
500                    let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
501                        .expect("ChainDescription should deserialize correctly");
502                    Some((ChainId(blob_id.hash), chain_desc))
503                } else {
504                    None
505                }
506            })
507            .collect::<Vec<_>>();
508        if new_chains.is_empty() {
509            return Ok(());
510        }
511        let mut new_ids = BTreeMap::new();
512        let mut context_guard = self.context.lock().await;
513        for (new_chain_id, chain_desc) in new_chains {
514            let with_key = context_guard
515                .owners_with_key(chain_desc.config().ownership.all_owners().copied())
516                .await?;
517            if with_key.is_empty() {
518                continue;
519            }
520            let owner = if with_key.len() == 1 {
521                with_key.into_iter().next()
522            } else {
523                None
524            };
525            context_guard
526                .update_wallet_for_new_chain(
527                    new_chain_id,
528                    owner,
529                    block.header.timestamp,
530                    block.header.epoch,
531                )
532                .await?;
533            context_guard
534                .client()
535                .extend_chain_mode(new_chain_id, ListeningMode::FullChain);
536            new_ids.insert(new_chain_id, ListeningMode::FullChain);
537        }
538        // Re-process the parent chain's outboxes now that the new chains are tracked.
539        // This ensures cross-chain messages to newly created chains are delivered.
540        if !new_ids.is_empty() {
541            context_guard
542                .client()
543                .retry_pending_cross_chain_requests(parent_chain_id)
544                .await?;
545        }
546        drop(context_guard);
547        self.listen_recursively(new_ids).await?;
548        Ok(())
549    }
550
551    /// Notifies all chains subscribed to `chain_id` to process their inboxes.
552    fn process_new_events(&self, chain_id: ChainId) {
553        let Some(subscribers) = self.event_subscribers.get(&chain_id) else {
554            return;
555        };
556        for subscriber_id in subscribers {
557            self.maybe_notify_inbox_processing(*subscriber_id);
558        }
559    }
560
561    /// Starts listening for notifications about the given chains, and any chains that publish
562    /// event streams those chains are subscribed to.
563    async fn listen_recursively(
564        &mut self,
565        mut chain_ids: BTreeMap<ChainId, ListeningMode>,
566    ) -> Result<(), Error> {
567        while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
568            for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
569                match chain_ids.entry(new_chain_id) {
570                    Entry::Vacant(vacant) => {
571                        vacant.insert(new_listening_mode);
572                    }
573                    Entry::Occupied(mut occupied) => {
574                        occupied.get_mut().extend(Some(new_listening_mode));
575                    }
576                }
577            }
578        }
579
580        Ok(())
581    }
582
583    /// Background task that syncs received certificates in small batches.
584    /// This discovers unacknowledged sender blocks gradually without overwhelming the system.
585    #[instrument(skip(context))]
586    async fn background_sync_received_certificates(
587        context: Arc<Mutex<C>>,
588        chain_id: ChainId,
589    ) -> Result<(), Error> {
590        info!("Starting background certificate sync");
591        let client = context.lock().await.make_chain_client(chain_id).await?;
592
593        Ok(client.find_received_certificates().await?)
594    }
595
596    /// Starts listening for notifications about the given chain.
597    ///
598    /// Returns all publishing chains, that we also need to listen to.
599    async fn listen(
600        &mut self,
601        chain_id: ChainId,
602        listening_mode: ListeningMode,
603    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
604        let context_guard = self.context.lock().await;
605        let existing_mode = context_guard.client().chain_mode(chain_id);
606        // If we already have a listener with a sufficient mode, nothing to do.
607        if self.listening.contains_key(&chain_id)
608            && existing_mode.as_ref().is_some_and(|m| *m >= listening_mode)
609        {
610            return Ok(BTreeMap::new());
611        }
612        // Extend the mode in the central map.
613        context_guard
614            .client()
615            .extend_chain_mode(chain_id, listening_mode);
616        drop(context_guard);
617
618        // Start background tasks to sync received certificates, if enabled.
619        let background_sync_task = self.start_background_sync(chain_id).await;
620        let client = self
621            .context
622            .lock()
623            .await
624            .make_chain_client(chain_id)
625            .await?;
626        let (listener, abort_handle, notification_stream) = client.listen().await?;
627        let listening_client = ListeningClient::new(
628            client,
629            abort_handle,
630            Task::spawn(listener.in_current_span()),
631            notification_stream,
632            background_sync_task,
633            &self.context,
634            &self.config,
635            &self.cancellation_token,
636        );
637        self.listening.insert(chain_id, listening_client);
638        let publishing_chains = self.update_event_subscriptions(chain_id).await?;
639        self.maybe_notify_inbox_processing(chain_id);
640        Ok(publishing_chains)
641    }
642
643    async fn start_background_sync(&mut self, chain_id: ChainId) -> Task<()> {
644        if !self.enable_background_sync
645            || !self
646                .context
647                .lock()
648                .await
649                .client()
650                .chain_mode(chain_id)
651                .is_some_and(|m| m.is_full())
652        {
653            return Task::ready(());
654        }
655
656        let context = Arc::clone(&self.context);
657        Task::spawn(async move {
658            if let Err(e) = Self::background_sync_received_certificates(context, chain_id).await {
659                warn!("Background sync failed for chain {chain_id}: {e}");
660            }
661        })
662    }
663
664    /// Removes `chain_id` as a subscriber from every publisher, and returns the publishers that
665    /// no longer have any subscribers as a result.
666    fn remove_event_subscriber(&mut self, chain_id: ChainId) -> Vec<ChainId> {
667        let mut orphaned = Vec::new();
668        self.event_subscribers.retain(|publisher_id, subscribers| {
669            subscribers.remove(&chain_id);
670            if subscribers.is_empty() {
671                orphaned.push(*publisher_id);
672                false
673            } else {
674                true
675            }
676        });
677        orphaned
678    }
679
680    /// Stops listening to a publisher chain whose last subscriber went away, provided the chain is
681    /// only tracked because of event subscriptions (i.e. its mode is `EventsOnly`). Wallet chains
682    /// (`FullChain`/`FollowChain`) are left untouched, as is any chain we're not listening to.
683    async fn stop_tracking_publisher(&mut self, publisher_id: ChainId) {
684        let mode = self.context.lock().await.client().chain_mode(publisher_id);
685        if !matches!(mode, Some(ListeningMode::EventsOnly(_))) {
686            return;
687        }
688        let Some(listening_client) = self.listening.remove(&publisher_id) else {
689            return;
690        };
691        self.context
692            .lock()
693            .await
694            .client()
695            .remove_chain_mode(publisher_id);
696        listening_client.stop().await;
697        debug!(%publisher_id, "stopped tracking publisher chain after its last subscriber went away");
698    }
699
700    /// Updates the event subscribers map, and returns all publishing chains we need to listen to.
701    async fn update_event_subscriptions(
702        &mut self,
703        chain_id: ChainId,
704    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
705        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
706        if !listening_client.client.is_tracked() {
707            return Ok(BTreeMap::new());
708        }
709        let app_filter = listening_client
710            .client
711            .options()
712            .message_policy
713            .process_events_from_application_ids
714            .clone();
715        let publishing_chains: BTreeMap<_, _> = listening_client
716            .client
717            .event_stream_publishers()
718            .await?
719            .into_iter()
720            .filter_map(|(chain_id, streams)| {
721                let streams = if let Some(app_set) = &app_filter {
722                    streams
723                        .into_iter()
724                        .filter(|s| app_set.contains(&s.application_id))
725                        .collect::<BTreeSet<_>>()
726                } else {
727                    streams
728                };
729                if streams.is_empty() {
730                    None
731                } else {
732                    Some((chain_id, ListeningMode::EventsOnly(streams)))
733                }
734            })
735            .collect();
736        for publisher_id in publishing_chains.keys() {
737            self.event_subscribers
738                .entry(*publisher_id)
739                .or_default()
740                .insert(chain_id);
741        }
742        // Detect publishers this chain no longer subscribes to (e.g. an application called
743        // `unsubscribe_from_events`), and stop tracking any that were only tracked on its behalf.
744        let removed_publishers = self
745            .event_subscribers
746            .iter()
747            .filter(|(publisher_id, subscribers)| {
748                subscribers.contains(&chain_id) && !publishing_chains.contains_key(publisher_id)
749            })
750            .map(|(publisher_id, _)| *publisher_id)
751            .collect::<Vec<_>>();
752        for publisher_id in removed_publishers {
753            let orphaned = {
754                let Some(subscribers) = self.event_subscribers.get_mut(&publisher_id) else {
755                    continue;
756                };
757                subscribers.remove(&chain_id);
758                subscribers.is_empty()
759            };
760            if orphaned {
761                self.event_subscribers.remove(&publisher_id);
762                self.stop_tracking_publisher(publisher_id).await;
763            }
764        }
765        Ok(publishing_chains)
766    }
767
768    /// Returns the next notification to process, or a stop signal.
769    async fn next_action(&mut self) -> Result<Action, Error> {
770        loop {
771            let notification_futures = self
772                .listening
773                .values_mut()
774                .map(|client| {
775                    let stream = client.notification_stream.clone();
776                    Box::pin(async move { stream.lock().await.next().await })
777                })
778                .collect::<Vec<_>>();
779            futures::select! {
780                () = self.cancellation_token.cancelled().fuse() => {
781                    return Ok(Action::Stop);
782                }
783                command = self.command_receiver.recv().then(async |maybe_command| {
784                    if let Some(command) = maybe_command {
785                        command
786                    } else {
787                        std::future::pending().await
788                    }
789                }).fuse() => {
790                    match command {
791                        ListenerCommand::Listen(new_chains) => {
792                            debug!(?new_chains, "received command to listen to new chains");
793                            let listening_modes = self.update_wallet_for_listening(new_chains).await?;
794                            self.listen_recursively(listening_modes).await?;
795                        }
796                        ListenerCommand::StopListening(chains) => {
797                            debug!(?chains, "received command to stop listening to chains");
798                            for chain_id in chains {
799                                debug!(%chain_id, "stopping the listener for chain");
800                                let Some(listening_client) = self.listening.remove(&chain_id) else {
801                                    error!(%chain_id, "attempted to drop a non-existent listener");
802                                    continue;
803                                };
804                                let orphaned = self.remove_event_subscriber(chain_id);
805                                listening_client.stop().await;
806                                for publisher_id in orphaned {
807                                    self.stop_tracking_publisher(publisher_id).await;
808                                }
809                                if let Err(error) = self.context.lock().await.wallet().remove(chain_id).await {
810                                    error!(%error, %chain_id, "error removing a chain from the wallet");
811                                }
812                            }
813                        }
814                        ListenerCommand::SetMessagePolicy(policies) => {
815                            debug!(?policies, "received command to set message policies");
816                            for (chain_id, policy) in policies {
817                                let Some(listening_client) = self.listening.get_mut(&chain_id) else {
818                                    error!(
819                                        %chain_id,
820                                        "attempted to set the message policy of a non-existent \
821                                        listener"
822                                    );
823                                    continue;
824                                };
825                                listening_client.client.options_mut().message_policy = policy;
826                                listening_client.respawn_inbox_task(
827                                    &self.cancellation_token,
828                                    &self.context,
829                                    &self.config,
830                                );
831                            }
832                        }
833                    }
834                }
835                (maybe_notification, index, _) = future::select_all(notification_futures).fuse() => {
836                    let Some(notification) = maybe_notification else {
837                        let chain_id = *self.listening.keys().nth(index).unwrap();
838                        warn!("Notification stream for {chain_id} closed");
839                        let Some(listening_client) = self.listening.remove(&chain_id) else {
840                            error!(%chain_id, "attempted to drop a non-existent listener");
841                            continue;
842                        };
843                        let orphaned = self.remove_event_subscriber(chain_id);
844                        listening_client.stop().await;
845                        for publisher_id in orphaned {
846                            self.stop_tracking_publisher(publisher_id).await;
847                        }
848                        continue;
849                    };
850                    return Ok(Action::Notification(notification));
851                }
852            }
853        }
854    }
855
856    /// Updates the validators about the chain.
857    async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
858        let chain_id = notification.chain_id;
859        let listening_client = self.listening.get(&chain_id).expect("missing client");
860        let latest_block = if let Reason::NewBlock { hash, .. } = &notification.reason {
861            listening_client.client.read_certificate(*hash).await.ok()
862        } else {
863            None
864        };
865        if let Err(error) = listening_client
866            .client
867            .update_validators(None, latest_block)
868            .await
869        {
870            warn!(
871                "Failed to update validators about the local chain after \
872                 receiving {notification:?} with error: {error:?}"
873            );
874        }
875        Ok(())
876    }
877
878    /// Updates the wallet based on the client for this chain.
879    async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
880        let client = &self
881            .listening
882            .get(&chain_id)
883            .expect("missing client")
884            .client;
885        self.context.lock().await.update_wallet(client).await?;
886        Ok(())
887    }
888
889    /// Updates the wallet with the set of chains we're supposed to start listening to,
890    /// and returns the appropriate listening modes based on whether we have the private
891    /// keys corresponding to the given chains' owners.
892    async fn update_wallet_for_listening(
893        &self,
894        new_chains: BTreeMap<ChainId, Option<AccountOwner>>,
895    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
896        let mut chains = BTreeMap::new();
897        let context_guard = self.context.lock().await;
898        for (chain_id, owner) in new_chains {
899            if let Some(owner) = owner {
900                if context_guard
901                    .client()
902                    .signer()
903                    .contains_key(&owner)
904                    .await
905                    .map_err(chain_client::Error::signer_failure)?
906                {
907                    // Try to modify existing chain entry, setting the owner.
908                    let modified = context_guard
909                        .wallet()
910                        .modify(chain_id, |chain| chain.owner = Some(owner))
911                        .await
912                        .map_err(error::Inner::wallet)?;
913                    // If the chain didn't exist, insert a new entry.
914                    if modified.is_none() {
915                        let chain_description = context_guard
916                            .client()
917                            .get_chain_description(chain_id)
918                            .await?;
919                        let timestamp = chain_description.timestamp();
920                        let epoch = chain_description.config().epoch;
921                        context_guard
922                            .wallet()
923                            .insert(
924                                chain_id,
925                                linera_core::wallet::Chain {
926                                    owner: Some(owner),
927                                    timestamp,
928                                    epoch: Some(epoch),
929                                    ..Default::default()
930                                },
931                            )
932                            .await
933                            .map_err(error::Inner::wallet)?;
934                    }
935
936                    chains.insert(chain_id, ListeningMode::FullChain);
937                }
938            } else {
939                chains.insert(chain_id, ListeningMode::FollowChain);
940            }
941        }
942        Ok(chains)
943    }
944
945    /// Signals the per-chain inbox processing task to wake up and process the inbox.
946    fn maybe_notify_inbox_processing(&self, chain_id: ChainId) {
947        if let Some(listening_client) = self.listening.get(&chain_id) {
948            listening_client.inbox_notify.notify_one();
949        }
950    }
951
952    /// Sleeps for the given number of milliseconds, if greater than 0.
953    async fn sleep(delay_ms: u64) {
954        if delay_ms > 0 {
955            linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
956        }
957    }
958}
959
960/// Per-chain inbox processing loop. Runs as a long-lived tokio task. Wakes on
961/// `inbox_notify` signals and processes the inbox, handling round-leader timeouts
962/// internally. Multiple notifications while busy collapse into a single permit.
963async fn inbox_processing_loop<C: ClientContext>(
964    client: ContextChainClient<C>,
965    context: Arc<Mutex<C>>,
966    config: Arc<ChainListenerConfig>,
967    inbox_notify: Arc<Notify>,
968    cancellation_token: CancellationToken,
969) {
970    let chain_id = client.chain_id();
971    loop {
972        futures::select! {
973            () = cancellation_token.cancelled().fuse() => break,
974            () = inbox_notify.notified().fuse() => {
975                if config.skip_process_inbox {
976                    debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
977                    continue;
978                }
979                if !client.is_tracked() {
980                    debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
981                    continue;
982                }
983                if client.preferred_owner().is_none() {
984                    debug!("Not processing inbox for follow-only chain {chain_id:.8}");
985                    continue;
986                }
987                debug!("Processing inbox for {chain_id:.8}");
988
989                // Inner loop handles round-leader timeouts: if we can't produce a block
990                // because we're not the leader, sleep until the timeout then retry.
991                // A new notification or cancellation can interrupt the sleep.
992                loop {
993                    match client.process_inbox_without_prepare().await {
994                        Err(chain_client::Error::CannotFindKeyForChain(chain_id)) => {
995                            debug!(%chain_id, "Cannot find key for chain");
996                            break;
997                        }
998                        Err(error) => {
999                            warn!(%error, "Failed to process inbox");
1000                            break;
1001                        }
1002                        Ok((certs, None)) => {
1003                            if certs.is_empty() {
1004                                debug!(%chain_id, "done processing inbox: no blocks created");
1005                            } else {
1006                                info!(
1007                                    %chain_id,
1008                                    created_block_count = %certs.len(),
1009                                    "done processing inbox",
1010                                );
1011                            }
1012                            break;
1013                        }
1014                        Ok((certs, Some(new_timeout))) => {
1015                            info!(
1016                                %chain_id,
1017                                created_block_count = %certs.len(),
1018                                timeout = %new_timeout,
1019                                "waiting for round timeout before continuing to process the inbox",
1020                            );
1021                            let delta = new_timeout.timestamp.delta_since(Timestamp::now());
1022                            if delta > TimeDelta::ZERO {
1023                                futures::select! {
1024                                    () = cancellation_token.cancelled().fuse() => return,
1025                                    () = linera_base::time::timer::sleep(delta.as_duration()).fuse() => {},
1026                                    () = inbox_notify.notified().fuse() => {},
1027                                }
1028                            }
1029                        }
1030                    }
1031                }
1032
1033                if let Err(error) = context.lock().await.update_wallet(&client).await {
1034                    warn!(%error, "Failed to update wallet after inbox processing");
1035                }
1036            }
1037        }
1038    }
1039}
1040
1041enum Action {
1042    Notification(Notification),
1043    Stop,
1044}