linera_client/
chain_listener.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    sync::Arc,
7    time::Duration,
8};
9
10use futures::{
11    future::{join_all, select_all},
12    lock::Mutex,
13    FutureExt as _, StreamExt,
14};
15use linera_base::{
16    crypto::{CryptoHash, Signer},
17    data_types::{ChainDescription, Timestamp},
18    identifiers::{AccountOwner, BlobId, BlobType, ChainId},
19    task::NonBlockingFuture,
20};
21use linera_core::{
22    client::{AbortOnDrop, ChainClient, ChainClientError},
23    node::NotificationStream,
24    worker::{Notification, Reason},
25    Environment,
26};
27use linera_storage::{Clock as _, Storage as _};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, info, instrument, warn, Instrument as _};
30
31use crate::{
32    wallet::{UserChain, Wallet},
33    Error,
34};
35
36#[derive(Debug, Default, Clone, clap::Args)]
37pub struct ChainListenerConfig {
38    /// Do not create blocks automatically to receive incoming messages. Instead, wait for
39    /// an explicit mutation `processInbox`.
40    #[arg(
41        long = "listener-skip-process-inbox",
42        env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
43    )]
44    pub skip_process_inbox: bool,
45
46    /// Wait before processing any notification (useful for testing).
47    #[arg(
48        long = "listener-delay-before-ms",
49        default_value = "0",
50        env = "LINERA_LISTENER_DELAY_BEFORE"
51    )]
52    pub delay_before_ms: u64,
53
54    /// Wait after processing any notification (useful for rate limiting).
55    #[arg(
56        long = "listener-delay-after-ms",
57        default_value = "0",
58        env = "LINERA_LISTENER_DELAY_AFTER"
59    )]
60    pub delay_after_ms: u64,
61}
62
63type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
64
65#[cfg_attr(not(web), trait_variant::make(Send))]
66#[allow(async_fn_in_trait)]
67pub trait ClientContext {
68    type Environment: linera_core::Environment;
69
70    fn wallet(&self) -> &Wallet;
71
72    fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
73
74    fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
75
76    fn make_chain_client(&self, chain_id: ChainId) -> ChainClient<Self::Environment> {
77        let chain = self
78            .wallet()
79            .get(chain_id)
80            .cloned()
81            .unwrap_or_else(|| UserChain::make_other(chain_id, Timestamp::from(0)));
82        self.client().create_chain_client(
83            chain_id,
84            chain.block_hash,
85            chain.next_block_height,
86            chain.pending_proposal,
87            chain.owner,
88        )
89    }
90
91    async fn update_wallet_for_new_chain(
92        &mut self,
93        chain_id: ChainId,
94        owner: Option<AccountOwner>,
95        timestamp: Timestamp,
96    ) -> Result<(), Error>;
97
98    async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
99}
100
101#[allow(async_fn_in_trait)]
102pub trait ClientContextExt: ClientContext {
103    fn clients(&self) -> Vec<ContextChainClient<Self>> {
104        let chain_ids = self.wallet().chain_ids();
105        let mut clients = vec![];
106        for chain_id in chain_ids {
107            clients.push(self.make_chain_client(chain_id));
108        }
109        clients
110    }
111
112    async fn chain_description(&mut self, chain_id: ChainId) -> Result<ChainDescription, Error> {
113        let blob_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
114
115        let blob = match self.storage().read_blob(blob_id).await {
116            Ok(Some(blob)) => blob,
117            Ok(None) => {
118                // we're missing the blob describing the chain we're assigning - try to
119                // get it
120                self.client().ensure_has_chain_description(chain_id).await?
121            }
122            Err(err) => {
123                return Err(err.into());
124            }
125        };
126
127        Ok(bcs::from_bytes(blob.bytes())?)
128    }
129}
130
131impl<T: ClientContext> ClientContextExt for T {}
132
133/// A chain client together with the stream of notifications from the local node.
134///
135/// A background task listens to the validators and updates the local node, so any updates to
136/// this chain will trigger a notification. The background task is terminated when this gets
137/// dropped.
138struct ListeningClient<C: ClientContext> {
139    /// The chain client.
140    client: ContextChainClient<C>,
141    /// The abort handle for the task that listens to the validators.
142    abort_handle: AbortOnDrop,
143    /// The listening task's join handle.
144    join_handle: NonBlockingFuture<()>,
145    /// The stream of notifications from the local node.
146    notification_stream: Arc<Mutex<NotificationStream>>,
147    /// This is only `< u64::MAX` when the client is waiting for a timeout to process the inbox.
148    timeout: Timestamp,
149}
150
151impl<C: ClientContext> ListeningClient<C> {
152    fn new(
153        client: ContextChainClient<C>,
154        abort_handle: AbortOnDrop,
155        join_handle: NonBlockingFuture<()>,
156        notification_stream: NotificationStream,
157    ) -> Self {
158        Self {
159            client,
160            abort_handle,
161            join_handle,
162            #[allow(clippy::arc_with_non_send_sync)] // Only `Send` with `futures-util/alloc`.
163            notification_stream: Arc::new(Mutex::new(notification_stream)),
164            timeout: Timestamp::from(u64::MAX),
165        }
166    }
167
168    async fn stop(self) {
169        drop(self.abort_handle);
170        if let Err(error) = self.join_handle.await {
171            warn!("Failed to join listening task: {error:?}");
172        }
173    }
174}
175
176/// A `ChainListener` is a process that listens to notifications from validators and reacts
177/// appropriately.
178pub struct ChainListener<C: ClientContext> {
179    context: Arc<Mutex<C>>,
180    storage: <C::Environment as Environment>::Storage,
181    config: Arc<ChainListenerConfig>,
182    listening: BTreeMap<ChainId, ListeningClient<C>>,
183    /// Map from publishing chain to subscriber chains.
184    /// Events emitted on the _publishing chain_ are of interest to the _subscriber chains_.
185    event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
186    cancellation_token: CancellationToken,
187}
188
189impl<C: ClientContext> ChainListener<C> {
190    /// Creates a new chain listener given client chains.
191    pub fn new(
192        config: ChainListenerConfig,
193        context: Arc<Mutex<C>>,
194        storage: <C::Environment as Environment>::Storage,
195        cancellation_token: CancellationToken,
196    ) -> Self {
197        Self {
198            storage,
199            context,
200            config: Arc::new(config),
201            listening: Default::default(),
202            event_subscribers: Default::default(),
203            cancellation_token,
204        }
205    }
206
207    /// Runs the chain listener.
208    #[instrument(skip(self))]
209    pub async fn run(mut self) -> Result<(), Error> {
210        let chain_ids = {
211            let guard = self.context.lock().await;
212            let mut chain_ids = BTreeSet::from_iter(guard.wallet().chain_ids());
213            chain_ids.insert(guard.wallet().genesis_admin_chain());
214            chain_ids
215        };
216        self.listen_recursively(chain_ids).await?;
217        loop {
218            match self.next_action().await? {
219                Action::ProcessInbox(chain_id) => self.maybe_process_inbox(chain_id).await?,
220                Action::Notification(notification) => {
221                    self.process_notification(notification).await?
222                }
223                Action::Stop => break,
224            }
225        }
226        join_all(self.listening.into_values().map(|client| client.stop())).await;
227        Ok(())
228    }
229
230    /// Processes a notification, updating local chains and validators as needed.
231    async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
232        Self::sleep(self.config.delay_before_ms).await;
233        match &notification.reason {
234            Reason::NewIncomingBundle { .. } => {
235                self.maybe_process_inbox(notification.chain_id).await?;
236            }
237            Reason::NewRound { .. } => self.update_validators(&notification).await?,
238            Reason::NewBlock { hash, .. } => {
239                self.update_validators(&notification).await?;
240                self.update_wallet(notification.chain_id).await?;
241                self.add_new_chains(*hash).await?;
242                let publishers = self
243                    .update_event_subscriptions(notification.chain_id)
244                    .await?;
245                if !publishers.is_empty() {
246                    self.listen_recursively(publishers).await?;
247                    self.maybe_process_inbox(notification.chain_id).await?;
248                }
249                self.process_new_events(notification.chain_id).await?;
250            }
251        }
252        Self::sleep(self.config.delay_after_ms).await;
253        Ok(())
254    }
255
256    /// If any new chains were created by the given block, and we have a key pair for them,
257    /// add them to the wallet and start listening for notifications.
258    async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
259        let block = self.storage.read_confirmed_block(hash).await?.into_block();
260        let blobs = block.created_blobs().into_iter();
261        let new_chains = blobs
262            .filter_map(|(blob_id, blob)| {
263                if blob_id.blob_type == BlobType::ChainDescription {
264                    let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
265                        .expect("ChainDescription should deserialize correctly");
266                    let owners = chain_desc.config().ownership.all_owners().cloned();
267                    Some((ChainId(blob_id.hash), owners.collect::<Vec<_>>()))
268                } else {
269                    None
270                }
271            })
272            .collect::<Vec<_>>();
273        if new_chains.is_empty() {
274            return Ok(());
275        }
276        let mut new_ids = BTreeSet::new();
277        let mut context_guard = self.context.lock().await;
278        for (new_chain_id, owners) in new_chains {
279            for chain_owner in owners {
280                if context_guard
281                    .client()
282                    .signer()
283                    .contains_key(&chain_owner)
284                    .await
285                    .map_err(ChainClientError::signer_failure)?
286                {
287                    context_guard
288                        .update_wallet_for_new_chain(
289                            new_chain_id,
290                            Some(chain_owner),
291                            block.header.timestamp,
292                        )
293                        .await?;
294                    new_ids.insert(new_chain_id);
295                }
296            }
297        }
298        drop(context_guard);
299        self.listen_recursively(new_ids).await?;
300        Ok(())
301    }
302
303    /// Processes the inboxes of all chains that are subscribed to `chain_id`.
304    async fn process_new_events(&mut self, chain_id: ChainId) -> Result<(), Error> {
305        let Some(subscribers) = self.event_subscribers.get(&chain_id).cloned() else {
306            return Ok(());
307        };
308        for subscriber_id in subscribers {
309            self.maybe_process_inbox(subscriber_id).await?;
310        }
311        Ok(())
312    }
313
314    /// Starts listening for notifications about the given chains, and any chains that publish
315    /// event streams those chains are subscribed to.
316    async fn listen_recursively(&mut self, mut chain_ids: BTreeSet<ChainId>) -> Result<(), Error> {
317        while let Some(chain_id) = chain_ids.pop_first() {
318            chain_ids.extend(self.listen(chain_id).await?);
319        }
320        Ok(())
321    }
322
323    /// Starts listening for notifications about the given chain.
324    ///
325    /// Returns all publishing chains, that we also need to listen to.
326    async fn listen(&mut self, chain_id: ChainId) -> Result<BTreeSet<ChainId>, Error> {
327        if self.listening.contains_key(&chain_id) {
328            return Ok(BTreeSet::new());
329        }
330        let client = self.context.lock().await.make_chain_client(chain_id);
331        let (listener, abort_handle, notification_stream) = client.listen().await?;
332        if client.is_tracked() {
333            client.synchronize_from_validators().await?;
334        } else {
335            client.synchronize_chain_state(chain_id).await?;
336        }
337        let join_handle = linera_base::task::spawn(listener.in_current_span());
338        let listening_client =
339            ListeningClient::new(client, abort_handle, join_handle, notification_stream);
340        self.listening.insert(chain_id, listening_client);
341        let publishing_chains = self.update_event_subscriptions(chain_id).await?;
342        self.maybe_process_inbox(chain_id).await?;
343        Ok(publishing_chains)
344    }
345
346    /// Updates the event subscribers map, and returns all publishing chains we need to listen to.
347    async fn update_event_subscriptions(
348        &mut self,
349        chain_id: ChainId,
350    ) -> Result<BTreeSet<ChainId>, Error> {
351        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
352        if !listening_client.client.is_tracked() {
353            return Ok(BTreeSet::new());
354        }
355        let publishing_chains = listening_client.client.event_stream_publishers().await?;
356        for publisher_id in &publishing_chains {
357            self.event_subscribers
358                .entry(*publisher_id)
359                .or_default()
360                .insert(chain_id);
361        }
362        Ok(publishing_chains)
363    }
364
365    /// Returns the next notification or timeout to process.
366    async fn next_action(&mut self) -> Result<Action, Error> {
367        loop {
368            let (timeout_chain_id, timeout) = self.next_timeout()?;
369            let notification_futures = self
370                .listening
371                .values_mut()
372                .map(|client| {
373                    let stream = client.notification_stream.clone();
374                    Box::pin(async move { stream.lock().await.next().await })
375                })
376                .collect::<Vec<_>>();
377            futures::select! {
378                () = self.cancellation_token.cancelled().fuse() => {
379                    return Ok(Action::Stop);
380                }
381                () = self.storage.clock().sleep_until(timeout).fuse() => {
382                    return Ok(Action::ProcessInbox(timeout_chain_id));
383                }
384                (maybe_notification, index, _) = select_all(notification_futures).fuse() => {
385                    let Some(notification) = maybe_notification else {
386                        let chain_id = *self.listening.keys().nth(index).unwrap();
387                        self.listening.remove(&chain_id);
388                        warn!("Notification stream for {chain_id} closed");
389                        continue;
390                    };
391                    return Ok(Action::Notification(notification));
392                }
393            }
394        }
395    }
396
397    /// Returns the next timeout to process, and the chain to which it applies.
398    fn next_timeout(&self) -> Result<(ChainId, Timestamp), Error> {
399        let (chain_id, client) = self
400            .listening
401            .iter()
402            .min_by_key(|(_, client)| client.timeout)
403            .expect("No chains left to listen to");
404        Ok((*chain_id, client.timeout))
405    }
406
407    /// Updates the validators about the chain.
408    async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
409        let chain_id = notification.chain_id;
410        let listening_client = self.listening.get(&chain_id).expect("missing client");
411        if let Err(error) = listening_client.client.update_validators(None).await {
412            warn!(
413                "Failed to update validators about the local chain after \
414                 receiving {notification:?} with error: {error:?}"
415            );
416        }
417        Ok(())
418    }
419
420    /// Updates the wallet based on the client for this chain.
421    async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
422        let client = &self
423            .listening
424            .get(&chain_id)
425            .expect("missing client")
426            .client;
427        self.context.lock().await.update_wallet(client).await?;
428        Ok(())
429    }
430
431    /// Processes the inbox, unless `skip_process_inbox` is set.
432    ///
433    /// If no block can be produced because we are not the round leader, a timeout is returned
434    /// for when to retry; otherwise `u64::MAX` is returned.
435    ///
436    /// The wallet is persisted with any blocks that processing the inbox added. An error
437    /// is returned if persisting the wallet fails.
438    async fn maybe_process_inbox(&mut self, chain_id: ChainId) -> Result<(), Error> {
439        if self.config.skip_process_inbox {
440            debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
441            return Ok(());
442        }
443        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
444        if !listening_client.client.is_tracked() {
445            debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
446            return Ok(());
447        }
448        debug!("Processing inbox for {chain_id:.8}");
449        listening_client.timeout = Timestamp::from(u64::MAX);
450        match listening_client
451            .client
452            .process_inbox_without_prepare()
453            .await
454        {
455            Err(ChainClientError::CannotFindKeyForChain(chain_id)) => {
456                debug!(%chain_id, "Cannot find key for chain");
457            }
458            Err(error) => warn!(%error, "Failed to process inbox."),
459            Ok((certs, None)) => info!("Done processing inbox. {} blocks created.", certs.len()),
460            Ok((certs, Some(new_timeout))) => {
461                info!(
462                    "{} blocks created. Will try processing the inbox later based \
463                     on the given round timeout: {new_timeout:?}",
464                    certs.len(),
465                );
466                listening_client.timeout = new_timeout.timestamp;
467            }
468        }
469        let mut context_guard = self.context.lock().await;
470        context_guard
471            .update_wallet(&listening_client.client)
472            .await?;
473        Ok(())
474    }
475
476    /// Sleeps for the given number of milliseconds, if greater than 0.
477    async fn sleep(delay_ms: u64) {
478        if delay_ms > 0 {
479            linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
480        }
481    }
482}
483
484enum Action {
485    ProcessInbox(ChainId),
486    Notification(Notification),
487    Stop,
488}