linera_client/
chain_listener.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{btree_map::Entry, BTreeMap, BTreeSet},
6    sync::Arc,
7    time::Duration,
8};
9
10use futures::{
11    future::{join_all, select_all},
12    lock::Mutex,
13    Future, FutureExt as _, StreamExt,
14};
15use linera_base::{
16    crypto::{CryptoHash, Signer},
17    data_types::{ChainDescription, Epoch, Timestamp},
18    identifiers::{AccountOwner, BlobType, ChainId},
19    task::NonBlockingFuture,
20};
21use linera_core::{
22    client::{AbortOnDrop, ChainClient, ChainClientError, ListeningMode},
23    node::NotificationStream,
24    worker::{Notification, Reason},
25    Environment,
26};
27use linera_storage::{Clock as _, Storage as _};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, info, instrument, warn, Instrument as _};
30
31use crate::{
32    wallet::{UserChain, Wallet},
33    Error,
34};
35
36#[derive(Debug, Default, Clone, clap::Args, serde::Serialize)]
37pub struct ChainListenerConfig {
38    /// Do not create blocks automatically to receive incoming messages. Instead, wait for
39    /// an explicit mutation `processInbox`.
40    #[arg(
41        long = "listener-skip-process-inbox",
42        env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
43    )]
44    pub skip_process_inbox: bool,
45
46    /// Wait before processing any notification (useful for testing).
47    #[arg(
48        long = "listener-delay-before-ms",
49        default_value = "0",
50        env = "LINERA_LISTENER_DELAY_BEFORE"
51    )]
52    pub delay_before_ms: u64,
53
54    /// Wait after processing any notification (useful for rate limiting).
55    #[arg(
56        long = "listener-delay-after-ms",
57        default_value = "0",
58        env = "LINERA_LISTENER_DELAY_AFTER"
59    )]
60    pub delay_after_ms: u64,
61}
62
63type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
64
65#[cfg_attr(not(web), trait_variant::make(Send))]
66#[allow(async_fn_in_trait)]
67pub trait ClientContext {
68    type Environment: linera_core::Environment;
69
70    fn wallet(&self) -> &Wallet;
71
72    fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
73
74    fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
75
76    /// Gets the timing sender for benchmarking, if available.
77    #[cfg(not(web))]
78    fn timing_sender(
79        &self,
80    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
81
82    #[cfg(web)]
83    fn timing_sender(
84        &self,
85    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
86        None
87    }
88
89    fn make_chain_client(&self, chain_id: ChainId) -> ChainClient<Self::Environment> {
90        let chain = self
91            .wallet()
92            .get(chain_id)
93            .cloned()
94            .unwrap_or_else(|| UserChain::make_other(chain_id, Timestamp::from(0)));
95        self.client().create_chain_client(
96            chain_id,
97            chain.block_hash,
98            chain.next_block_height,
99            chain.pending_proposal,
100            chain.owner,
101            self.timing_sender(),
102        )
103    }
104
105    async fn update_wallet_for_new_chain(
106        &mut self,
107        chain_id: ChainId,
108        owner: Option<AccountOwner>,
109        timestamp: Timestamp,
110        epoch: Epoch,
111    ) -> Result<(), Error>;
112
113    async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
114}
115
116#[allow(async_fn_in_trait)]
117pub trait ClientContextExt: ClientContext {
118    fn clients(&self) -> Vec<ContextChainClient<Self>> {
119        let chain_ids = self.wallet().chain_ids();
120        let mut clients = vec![];
121        for chain_id in chain_ids {
122            clients.push(self.make_chain_client(chain_id));
123        }
124        clients
125    }
126}
127
128impl<T: ClientContext> ClientContextExt for T {}
129
130/// A chain client together with the stream of notifications from the local node.
131///
132/// A background task listens to the validators and updates the local node, so any updates to
133/// this chain will trigger a notification. The background task is terminated when this gets
134/// dropped.
135struct ListeningClient<C: ClientContext> {
136    /// The chain client.
137    client: ContextChainClient<C>,
138    /// The abort handle for the task that listens to the validators.
139    abort_handle: AbortOnDrop,
140    /// The listening task's join handle.
141    join_handle: NonBlockingFuture<()>,
142    /// The stream of notifications from the local node.
143    notification_stream: Arc<Mutex<NotificationStream>>,
144    /// This is only `< u64::MAX` when the client is waiting for a timeout to process the inbox.
145    timeout: Timestamp,
146    /// The mode of listening to this chain.
147    listening_mode: ListeningMode,
148}
149
150impl<C: ClientContext> ListeningClient<C> {
151    fn new(
152        client: ContextChainClient<C>,
153        abort_handle: AbortOnDrop,
154        join_handle: NonBlockingFuture<()>,
155        notification_stream: NotificationStream,
156        listening_mode: ListeningMode,
157    ) -> Self {
158        Self {
159            client,
160            abort_handle,
161            join_handle,
162            #[allow(clippy::arc_with_non_send_sync)] // Only `Send` with `futures-util/alloc`.
163            notification_stream: Arc::new(Mutex::new(notification_stream)),
164            timeout: Timestamp::from(u64::MAX),
165            listening_mode,
166        }
167    }
168
169    async fn stop(self) {
170        drop(self.abort_handle);
171        if let Err(error) = self.join_handle.await {
172            warn!("Failed to join listening task: {error:?}");
173        }
174    }
175}
176
177/// A `ChainListener` is a process that listens to notifications from validators and reacts
178/// appropriately.
179pub struct ChainListener<C: ClientContext> {
180    context: Arc<Mutex<C>>,
181    storage: <C::Environment as Environment>::Storage,
182    config: Arc<ChainListenerConfig>,
183    listening: BTreeMap<ChainId, ListeningClient<C>>,
184    /// Map from publishing chain to subscriber chains.
185    /// Events emitted on the _publishing chain_ are of interest to the _subscriber chains_.
186    event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
187    cancellation_token: CancellationToken,
188}
189
190impl<C: ClientContext + 'static> ChainListener<C> {
191    /// Creates a new chain listener given client chains.
192    pub fn new(
193        config: ChainListenerConfig,
194        context: Arc<Mutex<C>>,
195        storage: <C::Environment as Environment>::Storage,
196        cancellation_token: CancellationToken,
197    ) -> Self {
198        Self {
199            storage,
200            context,
201            config: Arc::new(config),
202            listening: Default::default(),
203            event_subscribers: Default::default(),
204            cancellation_token,
205        }
206    }
207
208    /// Runs the chain listener.
209    #[instrument(skip(self))]
210    pub async fn run(
211        mut self,
212        enable_background_sync: bool,
213    ) -> Result<impl Future<Output = Result<(), Error>>, Error> {
214        let chain_ids = {
215            let guard = self.context.lock().await;
216            let admin_chain_id = guard.wallet().genesis_admin_chain();
217            guard
218                .make_chain_client(admin_chain_id)
219                .synchronize_chain_state(admin_chain_id)
220                .await?;
221            BTreeMap::from_iter(
222                guard
223                    .wallet()
224                    .chain_ids()
225                    .into_iter()
226                    .chain([admin_chain_id])
227                    .map(|chain_id| (chain_id, ListeningMode::FullChain)),
228            )
229        };
230
231        // Start background tasks to sync received certificates for each chain,
232        // if enabled.
233        if enable_background_sync {
234            let context = Arc::clone(&self.context);
235            let cancellation_token = self.cancellation_token.clone();
236            for chain_id in chain_ids.keys() {
237                let context = Arc::clone(&context);
238                let cancellation_token = cancellation_token.clone();
239                let chain_id = *chain_id;
240                drop(linera_base::task::spawn(async move {
241                    if let Err(e) = Self::background_sync_received_certificates(
242                        context,
243                        chain_id,
244                        cancellation_token,
245                    )
246                    .await
247                    {
248                        warn!("Background sync failed for chain {chain_id}: {e}");
249                    }
250                }));
251            }
252        }
253
254        Ok(async {
255            self.listen_recursively(chain_ids).await?;
256            loop {
257                match self.next_action().await? {
258                    Action::ProcessInbox(chain_id) => self.maybe_process_inbox(chain_id).await?,
259                    Action::Notification(notification) => {
260                        self.process_notification(notification).await?
261                    }
262                    Action::Stop => break,
263                }
264            }
265            join_all(self.listening.into_values().map(|client| client.stop())).await;
266            Ok(())
267        })
268    }
269
270    /// Processes a notification, updating local chains and validators as needed.
271    async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
272        Self::sleep(self.config.delay_before_ms).await;
273        let Some(listening_mode) = self
274            .listening
275            .get(&notification.chain_id)
276            .map(|listening_client| &listening_client.listening_mode)
277        else {
278            warn!(
279                ?notification,
280                "ChainListener::process_notification: got a notification without listening to the chain"
281            );
282            return Ok(());
283        };
284
285        match &notification.reason {
286            Reason::NewIncomingBundle { .. } => {
287                self.maybe_process_inbox(notification.chain_id).await?;
288            }
289            Reason::NewRound { .. } => self.update_validators(&notification).await?,
290            Reason::NewBlock { hash, .. } => {
291                if matches!(listening_mode, ListeningMode::EventsOnly(_)) {
292                    debug!("ChainListener::process_notification: ignoring notification due to listening mode");
293                    return Ok(());
294                }
295                self.update_wallet(notification.chain_id).await?;
296                self.add_new_chains(*hash).await?;
297                let publishers = self
298                    .update_event_subscriptions(notification.chain_id)
299                    .await?;
300                if !publishers.is_empty() {
301                    self.listen_recursively(publishers).await?;
302                    self.maybe_process_inbox(notification.chain_id).await?;
303                }
304                self.process_new_events(notification.chain_id).await?;
305            }
306            Reason::NewEvents { event_streams, .. } => {
307                let should_process = match listening_mode {
308                    ListeningMode::FullChain => true,
309                    ListeningMode::EventsOnly(relevant_events) => {
310                        relevant_events.intersection(event_streams).count() != 0
311                    }
312                };
313                if !should_process {
314                    debug!(
315                        ?notification,
316                        ?listening_mode,
317                        "ChainListener::process_notification: ignoring notification due to no relevant events",
318                    );
319                    return Ok(());
320                }
321                self.process_new_events(notification.chain_id).await?;
322            }
323        }
324        Self::sleep(self.config.delay_after_ms).await;
325        Ok(())
326    }
327
328    /// If any new chains were created by the given block, and we have a key pair for them,
329    /// add them to the wallet and start listening for notifications.
330    async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
331        let block = self
332            .storage
333            .read_confirmed_block(hash)
334            .await?
335            .ok_or(ChainClientError::MissingConfirmedBlock(hash))?
336            .into_block();
337        let blobs = block.created_blobs().into_iter();
338        let new_chains = blobs
339            .filter_map(|(blob_id, blob)| {
340                if blob_id.blob_type == BlobType::ChainDescription {
341                    let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
342                        .expect("ChainDescription should deserialize correctly");
343                    let owners = chain_desc.config().ownership.all_owners().cloned();
344                    Some((ChainId(blob_id.hash), owners.collect::<Vec<_>>()))
345                } else {
346                    None
347                }
348            })
349            .collect::<Vec<_>>();
350        if new_chains.is_empty() {
351            return Ok(());
352        }
353        let mut new_ids = BTreeMap::new();
354        let mut context_guard = self.context.lock().await;
355        for (new_chain_id, owners) in new_chains {
356            for chain_owner in owners {
357                if context_guard
358                    .client()
359                    .signer()
360                    .contains_key(&chain_owner)
361                    .await
362                    .map_err(ChainClientError::signer_failure)?
363                {
364                    context_guard
365                        .update_wallet_for_new_chain(
366                            new_chain_id,
367                            Some(chain_owner),
368                            block.header.timestamp,
369                            block.header.epoch,
370                        )
371                        .await?;
372                    new_ids.insert(new_chain_id, ListeningMode::FullChain);
373                }
374            }
375        }
376        drop(context_guard);
377        self.listen_recursively(new_ids).await?;
378        Ok(())
379    }
380
381    /// Processes the inboxes of all chains that are subscribed to `chain_id`.
382    async fn process_new_events(&mut self, chain_id: ChainId) -> Result<(), Error> {
383        let Some(subscribers) = self.event_subscribers.get(&chain_id).cloned() else {
384            return Ok(());
385        };
386        for subscriber_id in subscribers {
387            self.maybe_process_inbox(subscriber_id).await?;
388        }
389        Ok(())
390    }
391
392    /// Starts listening for notifications about the given chains, and any chains that publish
393    /// event streams those chains are subscribed to.
394    async fn listen_recursively(
395        &mut self,
396        mut chain_ids: BTreeMap<ChainId, ListeningMode>,
397    ) -> Result<(), Error> {
398        while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
399            for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
400                match chain_ids.entry(new_chain_id) {
401                    Entry::Vacant(vacant) => {
402                        vacant.insert(new_listening_mode);
403                    }
404                    Entry::Occupied(mut occupied) => {
405                        occupied.get_mut().extend(Some(new_listening_mode));
406                    }
407                }
408            }
409        }
410
411        Ok(())
412    }
413
414    /// Background task that syncs received certificates in small batches.
415    /// This discovers unacknowledged sender blocks gradually without overwhelming the system.
416    #[instrument(skip(context, cancellation_token))]
417    async fn background_sync_received_certificates(
418        context: Arc<Mutex<C>>,
419        chain_id: ChainId,
420        cancellation_token: CancellationToken,
421    ) -> Result<(), Error> {
422        info!("Starting background certificate sync for chain {chain_id}");
423        let client = context.lock().await.make_chain_client(chain_id);
424
425        Ok(client
426            .find_received_certificates(Some(cancellation_token))
427            .await?)
428    }
429
430    /// Starts listening for notifications about the given chain.
431    ///
432    /// Returns all publishing chains, that we also need to listen to.
433    async fn listen(
434        &mut self,
435        chain_id: ChainId,
436        mut listening_mode: ListeningMode,
437    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
438        if self
439            .listening
440            .get(&chain_id)
441            .is_some_and(|existing_client| existing_client.listening_mode >= listening_mode)
442        {
443            return Ok(BTreeMap::new());
444        }
445        listening_mode.extend(
446            self.listening
447                .get(&chain_id)
448                .map(|existing_client| existing_client.listening_mode.clone()),
449        );
450        let client = self.context.lock().await.make_chain_client(chain_id);
451        let (listener, abort_handle, notification_stream) =
452            client.listen(listening_mode.clone()).await?;
453        let join_handle = linera_base::task::spawn(listener.in_current_span());
454        let listening_client = ListeningClient::new(
455            client,
456            abort_handle,
457            join_handle,
458            notification_stream,
459            listening_mode,
460        );
461        self.listening.insert(chain_id, listening_client);
462        let publishing_chains = self.update_event_subscriptions(chain_id).await?;
463        self.maybe_process_inbox(chain_id).await?;
464        Ok(publishing_chains)
465    }
466
467    /// Updates the event subscribers map, and returns all publishing chains we need to listen to.
468    async fn update_event_subscriptions(
469        &mut self,
470        chain_id: ChainId,
471    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
472        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
473        if !listening_client.client.is_tracked() {
474            return Ok(BTreeMap::new());
475        }
476        let publishing_chains: BTreeMap<_, _> = listening_client
477            .client
478            .event_stream_publishers()
479            .await?
480            .into_iter()
481            .map(|(chain_id, streams)| (chain_id, ListeningMode::EventsOnly(streams)))
482            .collect();
483        for publisher_id in publishing_chains.keys() {
484            self.event_subscribers
485                .entry(*publisher_id)
486                .or_default()
487                .insert(chain_id);
488        }
489        Ok(publishing_chains)
490    }
491
492    /// Returns the next notification or timeout to process.
493    async fn next_action(&mut self) -> Result<Action, Error> {
494        loop {
495            let (timeout_chain_id, timeout) = self.next_timeout()?;
496            let notification_futures = self
497                .listening
498                .values_mut()
499                .map(|client| {
500                    let stream = client.notification_stream.clone();
501                    Box::pin(async move { stream.lock().await.next().await })
502                })
503                .collect::<Vec<_>>();
504            futures::select! {
505                () = self.cancellation_token.cancelled().fuse() => {
506                    return Ok(Action::Stop);
507                }
508                () = self.storage.clock().sleep_until(timeout).fuse() => {
509                    return Ok(Action::ProcessInbox(timeout_chain_id));
510                }
511                (maybe_notification, index, _) = select_all(notification_futures).fuse() => {
512                    let Some(notification) = maybe_notification else {
513                        let chain_id = *self.listening.keys().nth(index).unwrap();
514                        self.listening.remove(&chain_id);
515                        warn!("Notification stream for {chain_id} closed");
516                        continue;
517                    };
518                    return Ok(Action::Notification(notification));
519                }
520            }
521        }
522    }
523
524    /// Returns the next timeout to process, and the chain to which it applies.
525    fn next_timeout(&self) -> Result<(ChainId, Timestamp), Error> {
526        let (chain_id, client) = self
527            .listening
528            .iter()
529            .min_by_key(|(_, client)| client.timeout)
530            .expect("No chains left to listen to");
531        Ok((*chain_id, client.timeout))
532    }
533
534    /// Updates the validators about the chain.
535    async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
536        let chain_id = notification.chain_id;
537        let listening_client = self.listening.get(&chain_id).expect("missing client");
538        if let Err(error) = listening_client.client.update_validators(None).await {
539            warn!(
540                "Failed to update validators about the local chain after \
541                 receiving {notification:?} with error: {error:?}"
542            );
543        }
544        Ok(())
545    }
546
547    /// Updates the wallet based on the client for this chain.
548    async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
549        let client = &self
550            .listening
551            .get(&chain_id)
552            .expect("missing client")
553            .client;
554        self.context.lock().await.update_wallet(client).await?;
555        Ok(())
556    }
557
558    /// Processes the inbox, unless `skip_process_inbox` is set.
559    ///
560    /// If no block can be produced because we are not the round leader, a timeout is returned
561    /// for when to retry; otherwise `u64::MAX` is returned.
562    ///
563    /// The wallet is persisted with any blocks that processing the inbox added. An error
564    /// is returned if persisting the wallet fails.
565    async fn maybe_process_inbox(&mut self, chain_id: ChainId) -> Result<(), Error> {
566        if self.config.skip_process_inbox {
567            debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
568            return Ok(());
569        }
570        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
571        if !listening_client.client.is_tracked() {
572            debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
573            return Ok(());
574        }
575        debug!("Processing inbox for {chain_id:.8}");
576        listening_client.timeout = Timestamp::from(u64::MAX);
577        match listening_client
578            .client
579            .process_inbox_without_prepare()
580            .await
581        {
582            Err(ChainClientError::CannotFindKeyForChain(chain_id)) => {
583                debug!(%chain_id, "Cannot find key for chain");
584            }
585            Err(error) => warn!(%error, "Failed to process inbox."),
586            Ok((certs, None)) => info!(
587                "Done processing inbox of chain. {} blocks created on chain {chain_id}.",
588                certs.len()
589            ),
590            Ok((certs, Some(new_timeout))) => {
591                info!(
592                    "{} blocks created on chain {chain_id}. Will try processing the inbox later \
593                    based on the round timeout: {new_timeout:?}",
594                    certs.len(),
595                );
596                listening_client.timeout = new_timeout.timestamp;
597            }
598        }
599        let mut context_guard = self.context.lock().await;
600        context_guard
601            .update_wallet(&listening_client.client)
602            .await?;
603        Ok(())
604    }
605
606    /// Sleeps for the given number of milliseconds, if greater than 0.
607    async fn sleep(delay_ms: u64) {
608        if delay_ms > 0 {
609            linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
610        }
611    }
612}
613
614enum Action {
615    ProcessInbox(ChainId),
616    Notification(Notification),
617    Stop,
618}