linera_client/
chain_listener.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{btree_map::Entry, BTreeMap, BTreeSet},
6    sync::Arc,
7    time::Duration,
8};
9
10use futures::{
11    future::{join_all, select_all},
12    lock::Mutex,
13    Future, FutureExt as _, StreamExt,
14};
15use linera_base::{
16    crypto::{CryptoHash, Signer},
17    data_types::{ChainDescription, Epoch, Timestamp},
18    identifiers::{AccountOwner, BlobType, ChainId},
19    task::NonBlockingFuture,
20};
21use linera_core::{
22    client::{
23        chain_client::{self, ChainClient},
24        AbortOnDrop, ListeningMode,
25    },
26    node::NotificationStream,
27    worker::{Notification, Reason},
28    Environment,
29};
30use linera_storage::{Clock as _, Storage as _};
31use tokio_util::sync::CancellationToken;
32use tracing::{debug, info, instrument, warn, Instrument as _};
33
34use crate::{
35    wallet::{UserChain, Wallet},
36    Error,
37};
38
39#[derive(Debug, Default, Clone, clap::Args, serde::Serialize)]
40pub struct ChainListenerConfig {
41    /// Do not create blocks automatically to receive incoming messages. Instead, wait for
42    /// an explicit mutation `processInbox`.
43    #[arg(
44        long = "listener-skip-process-inbox",
45        env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
46    )]
47    pub skip_process_inbox: bool,
48
49    /// Wait before processing any notification (useful for testing).
50    #[arg(
51        long = "listener-delay-before-ms",
52        default_value = "0",
53        env = "LINERA_LISTENER_DELAY_BEFORE"
54    )]
55    pub delay_before_ms: u64,
56
57    /// Wait after processing any notification (useful for rate limiting).
58    #[arg(
59        long = "listener-delay-after-ms",
60        default_value = "0",
61        env = "LINERA_LISTENER_DELAY_AFTER"
62    )]
63    pub delay_after_ms: u64,
64}
65
66type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
67
68#[cfg_attr(not(web), trait_variant::make(Send))]
69#[allow(async_fn_in_trait)]
70pub trait ClientContext {
71    type Environment: linera_core::Environment;
72
73    fn wallet(&self) -> &Wallet;
74
75    fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
76
77    fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
78
79    /// Gets the timing sender for benchmarking, if available.
80    #[cfg(not(web))]
81    fn timing_sender(
82        &self,
83    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
84
85    #[cfg(web)]
86    fn timing_sender(
87        &self,
88    ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
89        None
90    }
91
92    fn make_chain_client(&self, chain_id: ChainId) -> ChainClient<Self::Environment> {
93        let chain = self
94            .wallet()
95            .get(chain_id)
96            .cloned()
97            .unwrap_or_else(|| UserChain::make_other(chain_id, Timestamp::from(0)));
98        self.client().create_chain_client(
99            chain_id,
100            chain.block_hash,
101            chain.next_block_height,
102            chain.pending_proposal,
103            chain.owner,
104            self.timing_sender(),
105        )
106    }
107
108    async fn update_wallet_for_new_chain(
109        &mut self,
110        chain_id: ChainId,
111        owner: Option<AccountOwner>,
112        timestamp: Timestamp,
113        epoch: Epoch,
114    ) -> Result<(), Error>;
115
116    async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
117}
118
119#[allow(async_fn_in_trait)]
120pub trait ClientContextExt: ClientContext {
121    fn clients(&self) -> Vec<ContextChainClient<Self>> {
122        let chain_ids = self.wallet().chain_ids();
123        let mut clients = vec![];
124        for chain_id in chain_ids {
125            clients.push(self.make_chain_client(chain_id));
126        }
127        clients
128    }
129}
130
131impl<T: ClientContext> ClientContextExt for T {}
132
133/// A chain client together with the stream of notifications from the local node.
134///
135/// A background task listens to the validators and updates the local node, so any updates to
136/// this chain will trigger a notification. The background task is terminated when this gets
137/// dropped.
138struct ListeningClient<C: ClientContext> {
139    /// The chain client.
140    client: ContextChainClient<C>,
141    /// The abort handle for the task that listens to the validators.
142    abort_handle: AbortOnDrop,
143    /// The listening task's join handle.
144    join_handle: NonBlockingFuture<()>,
145    /// The stream of notifications from the local node.
146    notification_stream: Arc<Mutex<NotificationStream>>,
147    /// This is only `< u64::MAX` when the client is waiting for a timeout to process the inbox.
148    timeout: Timestamp,
149    /// The mode of listening to this chain.
150    listening_mode: ListeningMode,
151}
152
153impl<C: ClientContext> ListeningClient<C> {
154    fn new(
155        client: ContextChainClient<C>,
156        abort_handle: AbortOnDrop,
157        join_handle: NonBlockingFuture<()>,
158        notification_stream: NotificationStream,
159        listening_mode: ListeningMode,
160    ) -> Self {
161        Self {
162            client,
163            abort_handle,
164            join_handle,
165            #[allow(clippy::arc_with_non_send_sync)] // Only `Send` with `futures-util/alloc`.
166            notification_stream: Arc::new(Mutex::new(notification_stream)),
167            timeout: Timestamp::from(u64::MAX),
168            listening_mode,
169        }
170    }
171
172    async fn stop(self) {
173        drop(self.abort_handle);
174        if let Err(error) = self.join_handle.await {
175            warn!("Failed to join listening task: {error:?}");
176        }
177    }
178}
179
180/// A `ChainListener` is a process that listens to notifications from validators and reacts
181/// appropriately.
182pub struct ChainListener<C: ClientContext> {
183    context: Arc<Mutex<C>>,
184    storage: <C::Environment as Environment>::Storage,
185    config: Arc<ChainListenerConfig>,
186    listening: BTreeMap<ChainId, ListeningClient<C>>,
187    /// Map from publishing chain to subscriber chains.
188    /// Events emitted on the _publishing chain_ are of interest to the _subscriber chains_.
189    event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
190    cancellation_token: CancellationToken,
191}
192
193impl<C: ClientContext + 'static> ChainListener<C> {
194    /// Creates a new chain listener given client chains.
195    pub fn new(
196        config: ChainListenerConfig,
197        context: Arc<Mutex<C>>,
198        storage: <C::Environment as Environment>::Storage,
199        cancellation_token: CancellationToken,
200    ) -> Self {
201        Self {
202            storage,
203            context,
204            config: Arc::new(config),
205            listening: Default::default(),
206            event_subscribers: Default::default(),
207            cancellation_token,
208        }
209    }
210
211    /// Runs the chain listener.
212    #[instrument(skip(self))]
213    pub async fn run(
214        mut self,
215        enable_background_sync: bool,
216    ) -> Result<impl Future<Output = Result<(), Error>>, Error> {
217        let chain_ids = {
218            let guard = self.context.lock().await;
219            let admin_chain_id = guard.wallet().genesis_admin_chain();
220            guard
221                .make_chain_client(admin_chain_id)
222                .synchronize_chain_state(admin_chain_id)
223                .await?;
224            BTreeMap::from_iter(
225                guard
226                    .wallet()
227                    .chain_ids()
228                    .into_iter()
229                    .chain([admin_chain_id])
230                    .map(|chain_id| (chain_id, ListeningMode::FullChain)),
231            )
232        };
233
234        // Start background tasks to sync received certificates for each chain,
235        // if enabled.
236        if enable_background_sync {
237            let context = Arc::clone(&self.context);
238            let cancellation_token = self.cancellation_token.clone();
239            for chain_id in chain_ids.keys() {
240                let context = Arc::clone(&context);
241                let cancellation_token = cancellation_token.clone();
242                let chain_id = *chain_id;
243                drop(linera_base::task::spawn(async move {
244                    if let Err(e) = Self::background_sync_received_certificates(
245                        context,
246                        chain_id,
247                        cancellation_token,
248                    )
249                    .await
250                    {
251                        warn!("Background sync failed for chain {chain_id}: {e}");
252                    }
253                }));
254            }
255        }
256
257        Ok(async {
258            self.listen_recursively(chain_ids).await?;
259            loop {
260                match self.next_action().await? {
261                    Action::ProcessInbox(chain_id) => self.maybe_process_inbox(chain_id).await?,
262                    Action::Notification(notification) => {
263                        self.process_notification(notification).await?
264                    }
265                    Action::Stop => break,
266                }
267            }
268            join_all(self.listening.into_values().map(|client| client.stop())).await;
269            Ok(())
270        })
271    }
272
273    /// Processes a notification, updating local chains and validators as needed.
274    async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
275        Self::sleep(self.config.delay_before_ms).await;
276        let Some(listening_mode) = self
277            .listening
278            .get(&notification.chain_id)
279            .map(|listening_client| &listening_client.listening_mode)
280        else {
281            warn!(
282                ?notification,
283                "ChainListener::process_notification: got a notification without listening to the chain"
284            );
285            return Ok(());
286        };
287
288        match &notification.reason {
289            Reason::NewIncomingBundle { .. } => {
290                self.maybe_process_inbox(notification.chain_id).await?;
291            }
292            Reason::NewRound { .. } => self.update_validators(&notification).await?,
293            Reason::NewBlock { hash, .. } => {
294                if matches!(listening_mode, ListeningMode::EventsOnly(_)) {
295                    debug!("ChainListener::process_notification: ignoring notification due to listening mode");
296                    return Ok(());
297                }
298                self.update_wallet(notification.chain_id).await?;
299                self.add_new_chains(*hash).await?;
300                let publishers = self
301                    .update_event_subscriptions(notification.chain_id)
302                    .await?;
303                if !publishers.is_empty() {
304                    self.listen_recursively(publishers).await?;
305                    self.maybe_process_inbox(notification.chain_id).await?;
306                }
307                self.process_new_events(notification.chain_id).await?;
308            }
309            Reason::NewEvents { event_streams, .. } => {
310                let should_process = match listening_mode {
311                    ListeningMode::FullChain => true,
312                    ListeningMode::EventsOnly(relevant_events) => {
313                        relevant_events.intersection(event_streams).count() != 0
314                    }
315                };
316                if !should_process {
317                    debug!(
318                        ?notification,
319                        ?listening_mode,
320                        "ChainListener::process_notification: ignoring notification due to no relevant events",
321                    );
322                    return Ok(());
323                }
324                self.process_new_events(notification.chain_id).await?;
325            }
326            Reason::BlockExecuted { .. } => {}
327        }
328        Self::sleep(self.config.delay_after_ms).await;
329        Ok(())
330    }
331
332    /// If any new chains were created by the given block, and we have a key pair for them,
333    /// add them to the wallet and start listening for notifications.
334    async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
335        let block = self
336            .storage
337            .read_confirmed_block(hash)
338            .await?
339            .ok_or(chain_client::Error::MissingConfirmedBlock(hash))?
340            .into_block();
341        let blobs = block.created_blobs().into_iter();
342        let new_chains = blobs
343            .filter_map(|(blob_id, blob)| {
344                if blob_id.blob_type == BlobType::ChainDescription {
345                    let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
346                        .expect("ChainDescription should deserialize correctly");
347                    let owners = chain_desc.config().ownership.all_owners().cloned();
348                    Some((ChainId(blob_id.hash), owners.collect::<Vec<_>>()))
349                } else {
350                    None
351                }
352            })
353            .collect::<Vec<_>>();
354        if new_chains.is_empty() {
355            return Ok(());
356        }
357        let mut new_ids = BTreeMap::new();
358        let mut context_guard = self.context.lock().await;
359        for (new_chain_id, owners) in new_chains {
360            for chain_owner in owners {
361                if context_guard
362                    .client()
363                    .signer()
364                    .contains_key(&chain_owner)
365                    .await
366                    .map_err(chain_client::Error::signer_failure)?
367                {
368                    context_guard
369                        .update_wallet_for_new_chain(
370                            new_chain_id,
371                            Some(chain_owner),
372                            block.header.timestamp,
373                            block.header.epoch,
374                        )
375                        .await?;
376                    new_ids.insert(new_chain_id, ListeningMode::FullChain);
377                }
378            }
379        }
380        drop(context_guard);
381        self.listen_recursively(new_ids).await?;
382        Ok(())
383    }
384
385    /// Processes the inboxes of all chains that are subscribed to `chain_id`.
386    async fn process_new_events(&mut self, chain_id: ChainId) -> Result<(), Error> {
387        let Some(subscribers) = self.event_subscribers.get(&chain_id).cloned() else {
388            return Ok(());
389        };
390        for subscriber_id in subscribers {
391            self.maybe_process_inbox(subscriber_id).await?;
392        }
393        Ok(())
394    }
395
396    /// Starts listening for notifications about the given chains, and any chains that publish
397    /// event streams those chains are subscribed to.
398    async fn listen_recursively(
399        &mut self,
400        mut chain_ids: BTreeMap<ChainId, ListeningMode>,
401    ) -> Result<(), Error> {
402        while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
403            for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
404                match chain_ids.entry(new_chain_id) {
405                    Entry::Vacant(vacant) => {
406                        vacant.insert(new_listening_mode);
407                    }
408                    Entry::Occupied(mut occupied) => {
409                        occupied.get_mut().extend(Some(new_listening_mode));
410                    }
411                }
412            }
413        }
414
415        Ok(())
416    }
417
418    /// Background task that syncs received certificates in small batches.
419    /// This discovers unacknowledged sender blocks gradually without overwhelming the system.
420    #[instrument(skip(context, cancellation_token))]
421    async fn background_sync_received_certificates(
422        context: Arc<Mutex<C>>,
423        chain_id: ChainId,
424        cancellation_token: CancellationToken,
425    ) -> Result<(), Error> {
426        info!("Starting background certificate sync for chain {chain_id}");
427        let client = context.lock().await.make_chain_client(chain_id);
428
429        Ok(client
430            .find_received_certificates(Some(cancellation_token))
431            .await?)
432    }
433
434    /// Starts listening for notifications about the given chain.
435    ///
436    /// Returns all publishing chains, that we also need to listen to.
437    async fn listen(
438        &mut self,
439        chain_id: ChainId,
440        mut listening_mode: ListeningMode,
441    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
442        if self
443            .listening
444            .get(&chain_id)
445            .is_some_and(|existing_client| existing_client.listening_mode >= listening_mode)
446        {
447            return Ok(BTreeMap::new());
448        }
449        listening_mode.extend(
450            self.listening
451                .get(&chain_id)
452                .map(|existing_client| existing_client.listening_mode.clone()),
453        );
454        let client = self.context.lock().await.make_chain_client(chain_id);
455        let (listener, abort_handle, notification_stream) =
456            client.listen(listening_mode.clone()).await?;
457        let join_handle = linera_base::task::spawn(listener.in_current_span());
458        let listening_client = ListeningClient::new(
459            client,
460            abort_handle,
461            join_handle,
462            notification_stream,
463            listening_mode,
464        );
465        self.listening.insert(chain_id, listening_client);
466        let publishing_chains = self.update_event_subscriptions(chain_id).await?;
467        self.maybe_process_inbox(chain_id).await?;
468        Ok(publishing_chains)
469    }
470
471    /// Updates the event subscribers map, and returns all publishing chains we need to listen to.
472    async fn update_event_subscriptions(
473        &mut self,
474        chain_id: ChainId,
475    ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
476        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
477        if !listening_client.client.is_tracked() {
478            return Ok(BTreeMap::new());
479        }
480        let publishing_chains: BTreeMap<_, _> = listening_client
481            .client
482            .event_stream_publishers()
483            .await?
484            .into_iter()
485            .map(|(chain_id, streams)| (chain_id, ListeningMode::EventsOnly(streams)))
486            .collect();
487        for publisher_id in publishing_chains.keys() {
488            self.event_subscribers
489                .entry(*publisher_id)
490                .or_default()
491                .insert(chain_id);
492        }
493        Ok(publishing_chains)
494    }
495
496    /// Returns the next notification or timeout to process.
497    async fn next_action(&mut self) -> Result<Action, Error> {
498        loop {
499            let (timeout_chain_id, timeout) = self.next_timeout()?;
500            let notification_futures = self
501                .listening
502                .values_mut()
503                .map(|client| {
504                    let stream = client.notification_stream.clone();
505                    Box::pin(async move { stream.lock().await.next().await })
506                })
507                .collect::<Vec<_>>();
508            futures::select! {
509                () = self.cancellation_token.cancelled().fuse() => {
510                    return Ok(Action::Stop);
511                }
512                () = self.storage.clock().sleep_until(timeout).fuse() => {
513                    return Ok(Action::ProcessInbox(timeout_chain_id));
514                }
515                (maybe_notification, index, _) = select_all(notification_futures).fuse() => {
516                    let Some(notification) = maybe_notification else {
517                        let chain_id = *self.listening.keys().nth(index).unwrap();
518                        self.listening.remove(&chain_id);
519                        warn!("Notification stream for {chain_id} closed");
520                        continue;
521                    };
522                    return Ok(Action::Notification(notification));
523                }
524            }
525        }
526    }
527
528    /// Returns the next timeout to process, and the chain to which it applies.
529    fn next_timeout(&self) -> Result<(ChainId, Timestamp), Error> {
530        let (chain_id, client) = self
531            .listening
532            .iter()
533            .min_by_key(|(_, client)| client.timeout)
534            .expect("No chains left to listen to");
535        Ok((*chain_id, client.timeout))
536    }
537
538    /// Updates the validators about the chain.
539    async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
540        let chain_id = notification.chain_id;
541        let listening_client = self.listening.get(&chain_id).expect("missing client");
542        if let Err(error) = listening_client.client.update_validators(None).await {
543            warn!(
544                "Failed to update validators about the local chain after \
545                 receiving {notification:?} with error: {error:?}"
546            );
547        }
548        Ok(())
549    }
550
551    /// Updates the wallet based on the client for this chain.
552    async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
553        let client = &self
554            .listening
555            .get(&chain_id)
556            .expect("missing client")
557            .client;
558        self.context.lock().await.update_wallet(client).await?;
559        Ok(())
560    }
561
562    /// Processes the inbox, unless `skip_process_inbox` is set.
563    ///
564    /// If no block can be produced because we are not the round leader, a timeout is returned
565    /// for when to retry; otherwise `u64::MAX` is returned.
566    ///
567    /// The wallet is persisted with any blocks that processing the inbox added. An error
568    /// is returned if persisting the wallet fails.
569    async fn maybe_process_inbox(&mut self, chain_id: ChainId) -> Result<(), Error> {
570        if self.config.skip_process_inbox {
571            debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
572            return Ok(());
573        }
574        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
575        if !listening_client.client.is_tracked() {
576            debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
577            return Ok(());
578        }
579        if listening_client.client.preferred_owner().is_none() {
580            debug!("Not processing inbox for non-owned chain {chain_id:.8}");
581            return Ok(());
582        }
583        debug!("Processing inbox for {chain_id:.8}");
584        listening_client.timeout = Timestamp::from(u64::MAX);
585        match listening_client
586            .client
587            .process_inbox_without_prepare()
588            .await
589        {
590            Err(chain_client::Error::CannotFindKeyForChain(chain_id)) => {
591                debug!(%chain_id, "Cannot find key for chain");
592            }
593            Err(error) => warn!(%error, "Failed to process inbox."),
594            Ok((certs, None)) => info!(
595                "Done processing inbox of chain. {} blocks created on chain {chain_id}.",
596                certs.len()
597            ),
598            Ok((certs, Some(new_timeout))) => {
599                info!(
600                    "{} blocks created on chain {chain_id}. Will try processing the inbox later \
601                    based on the round timeout: {new_timeout:?}",
602                    certs.len(),
603                );
604                listening_client.timeout = new_timeout.timestamp;
605            }
606        }
607        let mut context_guard = self.context.lock().await;
608        context_guard
609            .update_wallet(&listening_client.client)
610            .await?;
611        Ok(())
612    }
613
614    /// Sleeps for the given number of milliseconds, if greater than 0.
615    async fn sleep(delay_ms: u64) {
616        if delay_ms > 0 {
617            linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
618        }
619    }
620}
621
622enum Action {
623    ProcessInbox(ChainId),
624    Notification(Notification),
625    Stop,
626}