linera_client/
chain_listener.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    sync::Arc,
7    time::Duration,
8};
9
10use futures::{
11    future::{join_all, select_all},
12    lock::Mutex,
13    FutureExt as _, StreamExt,
14};
15use linera_base::{
16    crypto::{CryptoHash, Signer},
17    data_types::{ChainDescription, Timestamp},
18    identifiers::{AccountOwner, BlobType, ChainId},
19    task::NonBlockingFuture,
20};
21use linera_core::{
22    client::{AbortOnDrop, ChainClient, ChainClientError},
23    node::NotificationStream,
24    worker::{Notification, Reason},
25    Environment,
26};
27use linera_storage::{Clock as _, Storage as _};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, info, instrument, warn, Instrument as _};
30
31use crate::{
32    wallet::{UserChain, Wallet},
33    Error,
34};
35
36#[derive(Debug, Default, Clone, clap::Args)]
37pub struct ChainListenerConfig {
38    /// Do not create blocks automatically to receive incoming messages. Instead, wait for
39    /// an explicit mutation `processInbox`.
40    #[arg(
41        long = "listener-skip-process-inbox",
42        env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
43    )]
44    pub skip_process_inbox: bool,
45
46    /// Wait before processing any notification (useful for testing).
47    #[arg(
48        long = "listener-delay-before-ms",
49        default_value = "0",
50        env = "LINERA_LISTENER_DELAY_BEFORE"
51    )]
52    pub delay_before_ms: u64,
53
54    /// Wait after processing any notification (useful for rate limiting).
55    #[arg(
56        long = "listener-delay-after-ms",
57        default_value = "0",
58        env = "LINERA_LISTENER_DELAY_AFTER"
59    )]
60    pub delay_after_ms: u64,
61}
62
63type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
64
65#[cfg_attr(not(web), trait_variant::make(Send))]
66#[allow(async_fn_in_trait)]
67pub trait ClientContext {
68    type Environment: linera_core::Environment;
69
70    fn wallet(&self) -> &Wallet;
71
72    fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
73
74    fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
75
76    fn make_chain_client(&self, chain_id: ChainId) -> ChainClient<Self::Environment> {
77        let chain = self
78            .wallet()
79            .get(chain_id)
80            .cloned()
81            .unwrap_or_else(|| UserChain::make_other(chain_id, Timestamp::from(0)));
82        self.client().create_chain_client(
83            chain_id,
84            chain.block_hash,
85            chain.next_block_height,
86            chain.pending_proposal,
87            chain.owner,
88        )
89    }
90
91    async fn update_wallet_for_new_chain(
92        &mut self,
93        chain_id: ChainId,
94        owner: Option<AccountOwner>,
95        timestamp: Timestamp,
96    ) -> Result<(), Error>;
97
98    async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
99}
100
101#[allow(async_fn_in_trait)]
102pub trait ClientContextExt: ClientContext {
103    fn clients(&self) -> Vec<ContextChainClient<Self>> {
104        let chain_ids = self.wallet().chain_ids();
105        let mut clients = vec![];
106        for chain_id in chain_ids {
107            clients.push(self.make_chain_client(chain_id));
108        }
109        clients
110    }
111}
112
113impl<T: ClientContext> ClientContextExt for T {}
114
115/// A chain client together with the stream of notifications from the local node.
116///
117/// A background task listens to the validators and updates the local node, so any updates to
118/// this chain will trigger a notification. The background task is terminated when this gets
119/// dropped.
120struct ListeningClient<C: ClientContext> {
121    /// The chain client.
122    client: ContextChainClient<C>,
123    /// The abort handle for the task that listens to the validators.
124    abort_handle: AbortOnDrop,
125    /// The listening task's join handle.
126    join_handle: NonBlockingFuture<()>,
127    /// The stream of notifications from the local node.
128    notification_stream: Arc<Mutex<NotificationStream>>,
129    /// This is only `< u64::MAX` when the client is waiting for a timeout to process the inbox.
130    timeout: Timestamp,
131}
132
133impl<C: ClientContext> ListeningClient<C> {
134    fn new(
135        client: ContextChainClient<C>,
136        abort_handle: AbortOnDrop,
137        join_handle: NonBlockingFuture<()>,
138        notification_stream: NotificationStream,
139    ) -> Self {
140        Self {
141            client,
142            abort_handle,
143            join_handle,
144            #[allow(clippy::arc_with_non_send_sync)] // Only `Send` with `futures-util/alloc`.
145            notification_stream: Arc::new(Mutex::new(notification_stream)),
146            timeout: Timestamp::from(u64::MAX),
147        }
148    }
149
150    async fn stop(self) {
151        drop(self.abort_handle);
152        if let Err(error) = self.join_handle.await {
153            warn!("Failed to join listening task: {error:?}");
154        }
155    }
156}
157
158/// A `ChainListener` is a process that listens to notifications from validators and reacts
159/// appropriately.
160pub struct ChainListener<C: ClientContext> {
161    context: Arc<Mutex<C>>,
162    storage: <C::Environment as Environment>::Storage,
163    config: Arc<ChainListenerConfig>,
164    listening: BTreeMap<ChainId, ListeningClient<C>>,
165    /// Map from publishing chain to subscriber chains.
166    /// Events emitted on the _publishing chain_ are of interest to the _subscriber chains_.
167    event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
168    cancellation_token: CancellationToken,
169}
170
171impl<C: ClientContext> ChainListener<C> {
172    /// Creates a new chain listener given client chains.
173    pub fn new(
174        config: ChainListenerConfig,
175        context: Arc<Mutex<C>>,
176        storage: <C::Environment as Environment>::Storage,
177        cancellation_token: CancellationToken,
178    ) -> Self {
179        Self {
180            storage,
181            context,
182            config: Arc::new(config),
183            listening: Default::default(),
184            event_subscribers: Default::default(),
185            cancellation_token,
186        }
187    }
188
189    /// Runs the chain listener.
190    #[instrument(skip(self))]
191    pub async fn run(mut self) -> Result<(), Error> {
192        let chain_ids = {
193            let guard = self.context.lock().await;
194            let mut chain_ids = BTreeSet::from_iter(guard.wallet().chain_ids());
195            chain_ids.insert(guard.wallet().genesis_admin_chain());
196            chain_ids
197        };
198        self.listen_recursively(chain_ids).await?;
199        loop {
200            match self.next_action().await? {
201                Action::ProcessInbox(chain_id) => self.maybe_process_inbox(chain_id).await?,
202                Action::Notification(notification) => {
203                    self.process_notification(notification).await?
204                }
205                Action::Stop => break,
206            }
207        }
208        join_all(self.listening.into_values().map(|client| client.stop())).await;
209        Ok(())
210    }
211
212    /// Processes a notification, updating local chains and validators as needed.
213    async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
214        Self::sleep(self.config.delay_before_ms).await;
215        match &notification.reason {
216            Reason::NewIncomingBundle { .. } => {
217                self.maybe_process_inbox(notification.chain_id).await?;
218            }
219            Reason::NewRound { .. } => self.update_validators(&notification).await?,
220            Reason::NewBlock { hash, .. } => {
221                self.update_validators(&notification).await?;
222                self.update_wallet(notification.chain_id).await?;
223                self.add_new_chains(*hash).await?;
224                let publishers = self
225                    .update_event_subscriptions(notification.chain_id)
226                    .await?;
227                if !publishers.is_empty() {
228                    self.listen_recursively(publishers).await?;
229                    self.maybe_process_inbox(notification.chain_id).await?;
230                }
231                self.process_new_events(notification.chain_id).await?;
232            }
233        }
234        Self::sleep(self.config.delay_after_ms).await;
235        Ok(())
236    }
237
238    /// If any new chains were created by the given block, and we have a key pair for them,
239    /// add them to the wallet and start listening for notifications.
240    async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
241        let block = self
242            .storage
243            .read_confirmed_block(hash)
244            .await?
245            .ok_or(ChainClientError::MissingConfirmedBlock(hash))?
246            .into_block();
247        let blobs = block.created_blobs().into_iter();
248        let new_chains = blobs
249            .filter_map(|(blob_id, blob)| {
250                if blob_id.blob_type == BlobType::ChainDescription {
251                    let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
252                        .expect("ChainDescription should deserialize correctly");
253                    let owners = chain_desc.config().ownership.all_owners().cloned();
254                    Some((ChainId(blob_id.hash), owners.collect::<Vec<_>>()))
255                } else {
256                    None
257                }
258            })
259            .collect::<Vec<_>>();
260        if new_chains.is_empty() {
261            return Ok(());
262        }
263        let mut new_ids = BTreeSet::new();
264        let mut context_guard = self.context.lock().await;
265        for (new_chain_id, owners) in new_chains {
266            for chain_owner in owners {
267                if context_guard
268                    .client()
269                    .signer()
270                    .contains_key(&chain_owner)
271                    .await
272                    .map_err(ChainClientError::signer_failure)?
273                {
274                    context_guard
275                        .update_wallet_for_new_chain(
276                            new_chain_id,
277                            Some(chain_owner),
278                            block.header.timestamp,
279                        )
280                        .await?;
281                    new_ids.insert(new_chain_id);
282                }
283            }
284        }
285        drop(context_guard);
286        self.listen_recursively(new_ids).await?;
287        Ok(())
288    }
289
290    /// Processes the inboxes of all chains that are subscribed to `chain_id`.
291    async fn process_new_events(&mut self, chain_id: ChainId) -> Result<(), Error> {
292        let Some(subscribers) = self.event_subscribers.get(&chain_id).cloned() else {
293            return Ok(());
294        };
295        for subscriber_id in subscribers {
296            self.maybe_process_inbox(subscriber_id).await?;
297        }
298        Ok(())
299    }
300
301    /// Starts listening for notifications about the given chains, and any chains that publish
302    /// event streams those chains are subscribed to.
303    async fn listen_recursively(&mut self, mut chain_ids: BTreeSet<ChainId>) -> Result<(), Error> {
304        while let Some(chain_id) = chain_ids.pop_first() {
305            chain_ids.extend(self.listen(chain_id).await?);
306        }
307        Ok(())
308    }
309
310    /// Starts listening for notifications about the given chain.
311    ///
312    /// Returns all publishing chains, that we also need to listen to.
313    async fn listen(&mut self, chain_id: ChainId) -> Result<BTreeSet<ChainId>, Error> {
314        if self.listening.contains_key(&chain_id) {
315            return Ok(BTreeSet::new());
316        }
317        let client = self.context.lock().await.make_chain_client(chain_id);
318        let (listener, abort_handle, notification_stream) = client.listen().await?;
319        let join_handle = linera_base::task::spawn(listener.in_current_span());
320        let listening_client =
321            ListeningClient::new(client, abort_handle, join_handle, notification_stream);
322        self.listening.insert(chain_id, listening_client);
323        let publishing_chains = self.update_event_subscriptions(chain_id).await?;
324        self.maybe_process_inbox(chain_id).await?;
325        Ok(publishing_chains)
326    }
327
328    /// Updates the event subscribers map, and returns all publishing chains we need to listen to.
329    async fn update_event_subscriptions(
330        &mut self,
331        chain_id: ChainId,
332    ) -> Result<BTreeSet<ChainId>, Error> {
333        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
334        if !listening_client.client.is_tracked() {
335            return Ok(BTreeSet::new());
336        }
337        let publishing_chains = listening_client.client.event_stream_publishers().await?;
338        for publisher_id in &publishing_chains {
339            self.event_subscribers
340                .entry(*publisher_id)
341                .or_default()
342                .insert(chain_id);
343        }
344        Ok(publishing_chains)
345    }
346
347    /// Returns the next notification or timeout to process.
348    async fn next_action(&mut self) -> Result<Action, Error> {
349        loop {
350            let (timeout_chain_id, timeout) = self.next_timeout()?;
351            let notification_futures = self
352                .listening
353                .values_mut()
354                .map(|client| {
355                    let stream = client.notification_stream.clone();
356                    Box::pin(async move { stream.lock().await.next().await })
357                })
358                .collect::<Vec<_>>();
359            futures::select! {
360                () = self.cancellation_token.cancelled().fuse() => {
361                    return Ok(Action::Stop);
362                }
363                () = self.storage.clock().sleep_until(timeout).fuse() => {
364                    return Ok(Action::ProcessInbox(timeout_chain_id));
365                }
366                (maybe_notification, index, _) = select_all(notification_futures).fuse() => {
367                    let Some(notification) = maybe_notification else {
368                        let chain_id = *self.listening.keys().nth(index).unwrap();
369                        self.listening.remove(&chain_id);
370                        warn!("Notification stream for {chain_id} closed");
371                        continue;
372                    };
373                    return Ok(Action::Notification(notification));
374                }
375            }
376        }
377    }
378
379    /// Returns the next timeout to process, and the chain to which it applies.
380    fn next_timeout(&self) -> Result<(ChainId, Timestamp), Error> {
381        let (chain_id, client) = self
382            .listening
383            .iter()
384            .min_by_key(|(_, client)| client.timeout)
385            .expect("No chains left to listen to");
386        Ok((*chain_id, client.timeout))
387    }
388
389    /// Updates the validators about the chain.
390    async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
391        let chain_id = notification.chain_id;
392        let listening_client = self.listening.get(&chain_id).expect("missing client");
393        if let Err(error) = listening_client.client.update_validators(None).await {
394            warn!(
395                "Failed to update validators about the local chain after \
396                 receiving {notification:?} with error: {error:?}"
397            );
398        }
399        Ok(())
400    }
401
402    /// Updates the wallet based on the client for this chain.
403    async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
404        let client = &self
405            .listening
406            .get(&chain_id)
407            .expect("missing client")
408            .client;
409        self.context.lock().await.update_wallet(client).await?;
410        Ok(())
411    }
412
413    /// Processes the inbox, unless `skip_process_inbox` is set.
414    ///
415    /// If no block can be produced because we are not the round leader, a timeout is returned
416    /// for when to retry; otherwise `u64::MAX` is returned.
417    ///
418    /// The wallet is persisted with any blocks that processing the inbox added. An error
419    /// is returned if persisting the wallet fails.
420    async fn maybe_process_inbox(&mut self, chain_id: ChainId) -> Result<(), Error> {
421        if self.config.skip_process_inbox {
422            debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
423            return Ok(());
424        }
425        let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
426        if !listening_client.client.is_tracked() {
427            debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
428            return Ok(());
429        }
430        debug!("Processing inbox for {chain_id:.8}");
431        listening_client.timeout = Timestamp::from(u64::MAX);
432        match listening_client
433            .client
434            .process_inbox_without_prepare()
435            .await
436        {
437            Err(ChainClientError::CannotFindKeyForChain(chain_id)) => {
438                debug!(%chain_id, "Cannot find key for chain");
439            }
440            Err(error) => warn!(%error, "Failed to process inbox."),
441            Ok((certs, None)) => info!("Done processing inbox. {} blocks created.", certs.len()),
442            Ok((certs, Some(new_timeout))) => {
443                info!(
444                    "{} blocks created. Will try processing the inbox later based \
445                     on the given round timeout: {new_timeout:?}",
446                    certs.len(),
447                );
448                listening_client.timeout = new_timeout.timestamp;
449            }
450        }
451        let mut context_guard = self.context.lock().await;
452        context_guard
453            .update_wallet(&listening_client.client)
454            .await?;
455        Ok(())
456    }
457
458    /// Sleeps for the given number of milliseconds, if greater than 0.
459    async fn sleep(delay_ms: u64) {
460        if delay_ms > 0 {
461            linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
462        }
463    }
464}
465
466enum Action {
467    ProcessInbox(ChainId),
468    Notification(Notification),
469    Stop,
470}