1use std::{
5 collections::{btree_map::Entry, BTreeMap, BTreeSet},
6 sync::Arc,
7 time::Duration,
8};
9
10use futures::{
11 future::{join_all, select_all},
12 lock::Mutex,
13 Future, FutureExt as _, StreamExt,
14};
15use linera_base::{
16 crypto::{CryptoHash, Signer},
17 data_types::{ChainDescription, Epoch, Timestamp},
18 identifiers::{AccountOwner, BlobType, ChainId},
19 task::NonBlockingFuture,
20};
21use linera_core::{
22 client::{AbortOnDrop, ChainClient, ChainClientError, ListeningMode},
23 node::NotificationStream,
24 worker::{Notification, Reason},
25 Environment,
26};
27use linera_storage::{Clock as _, Storage as _};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, info, instrument, warn, Instrument as _};
30
31use crate::{
32 wallet::{UserChain, Wallet},
33 Error,
34};
35
36#[derive(Debug, Default, Clone, clap::Args, serde::Serialize)]
37pub struct ChainListenerConfig {
38 #[arg(
41 long = "listener-skip-process-inbox",
42 env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
43 )]
44 pub skip_process_inbox: bool,
45
46 #[arg(
48 long = "listener-delay-before-ms",
49 default_value = "0",
50 env = "LINERA_LISTENER_DELAY_BEFORE"
51 )]
52 pub delay_before_ms: u64,
53
54 #[arg(
56 long = "listener-delay-after-ms",
57 default_value = "0",
58 env = "LINERA_LISTENER_DELAY_AFTER"
59 )]
60 pub delay_after_ms: u64,
61}
62
63type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
64
65#[cfg_attr(not(web), trait_variant::make(Send))]
66#[allow(async_fn_in_trait)]
67pub trait ClientContext {
68 type Environment: linera_core::Environment;
69
70 fn wallet(&self) -> &Wallet;
71
72 fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
73
74 fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
75
76 #[cfg(not(web))]
78 fn timing_sender(
79 &self,
80 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
81
82 #[cfg(web)]
83 fn timing_sender(
84 &self,
85 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
86 None
87 }
88
89 fn make_chain_client(&self, chain_id: ChainId) -> ChainClient<Self::Environment> {
90 let chain = self
91 .wallet()
92 .get(chain_id)
93 .cloned()
94 .unwrap_or_else(|| UserChain::make_other(chain_id, Timestamp::from(0)));
95 self.client().create_chain_client(
96 chain_id,
97 chain.block_hash,
98 chain.next_block_height,
99 chain.pending_proposal,
100 chain.owner,
101 self.timing_sender(),
102 )
103 }
104
105 async fn update_wallet_for_new_chain(
106 &mut self,
107 chain_id: ChainId,
108 owner: Option<AccountOwner>,
109 timestamp: Timestamp,
110 epoch: Epoch,
111 ) -> Result<(), Error>;
112
113 async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
114}
115
116#[allow(async_fn_in_trait)]
117pub trait ClientContextExt: ClientContext {
118 fn clients(&self) -> Vec<ContextChainClient<Self>> {
119 let chain_ids = self.wallet().chain_ids();
120 let mut clients = vec![];
121 for chain_id in chain_ids {
122 clients.push(self.make_chain_client(chain_id));
123 }
124 clients
125 }
126}
127
128impl<T: ClientContext> ClientContextExt for T {}
129
130struct ListeningClient<C: ClientContext> {
136 client: ContextChainClient<C>,
138 abort_handle: AbortOnDrop,
140 join_handle: NonBlockingFuture<()>,
142 notification_stream: Arc<Mutex<NotificationStream>>,
144 timeout: Timestamp,
146 listening_mode: ListeningMode,
148}
149
150impl<C: ClientContext> ListeningClient<C> {
151 fn new(
152 client: ContextChainClient<C>,
153 abort_handle: AbortOnDrop,
154 join_handle: NonBlockingFuture<()>,
155 notification_stream: NotificationStream,
156 listening_mode: ListeningMode,
157 ) -> Self {
158 Self {
159 client,
160 abort_handle,
161 join_handle,
162 #[allow(clippy::arc_with_non_send_sync)] notification_stream: Arc::new(Mutex::new(notification_stream)),
164 timeout: Timestamp::from(u64::MAX),
165 listening_mode,
166 }
167 }
168
169 async fn stop(self) {
170 drop(self.abort_handle);
171 if let Err(error) = self.join_handle.await {
172 warn!("Failed to join listening task: {error:?}");
173 }
174 }
175}
176
177pub struct ChainListener<C: ClientContext> {
180 context: Arc<Mutex<C>>,
181 storage: <C::Environment as Environment>::Storage,
182 config: Arc<ChainListenerConfig>,
183 listening: BTreeMap<ChainId, ListeningClient<C>>,
184 event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
187 cancellation_token: CancellationToken,
188}
189
190impl<C: ClientContext + 'static> ChainListener<C> {
191 pub fn new(
193 config: ChainListenerConfig,
194 context: Arc<Mutex<C>>,
195 storage: <C::Environment as Environment>::Storage,
196 cancellation_token: CancellationToken,
197 ) -> Self {
198 Self {
199 storage,
200 context,
201 config: Arc::new(config),
202 listening: Default::default(),
203 event_subscribers: Default::default(),
204 cancellation_token,
205 }
206 }
207
208 #[instrument(skip(self))]
210 pub async fn run(
211 mut self,
212 enable_background_sync: bool,
213 ) -> Result<impl Future<Output = Result<(), Error>>, Error> {
214 let chain_ids = {
215 let guard = self.context.lock().await;
216 let admin_chain_id = guard.wallet().genesis_admin_chain();
217 guard
218 .make_chain_client(admin_chain_id)
219 .synchronize_chain_state(admin_chain_id)
220 .await?;
221 BTreeMap::from_iter(
222 guard
223 .wallet()
224 .chain_ids()
225 .into_iter()
226 .chain([admin_chain_id])
227 .map(|chain_id| (chain_id, ListeningMode::FullChain)),
228 )
229 };
230
231 if enable_background_sync {
234 let context = Arc::clone(&self.context);
235 let cancellation_token = self.cancellation_token.clone();
236 for chain_id in chain_ids.keys() {
237 let context = Arc::clone(&context);
238 let cancellation_token = cancellation_token.clone();
239 let chain_id = *chain_id;
240 drop(linera_base::task::spawn(async move {
241 if let Err(e) = Self::background_sync_received_certificates(
242 context,
243 chain_id,
244 cancellation_token,
245 )
246 .await
247 {
248 warn!("Background sync failed for chain {chain_id}: {e}");
249 }
250 }));
251 }
252 }
253
254 Ok(async {
255 self.listen_recursively(chain_ids).await?;
256 loop {
257 match self.next_action().await? {
258 Action::ProcessInbox(chain_id) => self.maybe_process_inbox(chain_id).await?,
259 Action::Notification(notification) => {
260 self.process_notification(notification).await?
261 }
262 Action::Stop => break,
263 }
264 }
265 join_all(self.listening.into_values().map(|client| client.stop())).await;
266 Ok(())
267 })
268 }
269
270 async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
272 Self::sleep(self.config.delay_before_ms).await;
273 let Some(listening_mode) = self
274 .listening
275 .get(¬ification.chain_id)
276 .map(|listening_client| &listening_client.listening_mode)
277 else {
278 warn!(
279 ?notification,
280 "ChainListener::process_notification: got a notification without listening to the chain"
281 );
282 return Ok(());
283 };
284
285 match ¬ification.reason {
286 Reason::NewIncomingBundle { .. } => {
287 self.maybe_process_inbox(notification.chain_id).await?;
288 }
289 Reason::NewRound { .. } => self.update_validators(¬ification).await?,
290 Reason::NewBlock { hash, .. } => {
291 if matches!(listening_mode, ListeningMode::EventsOnly(_)) {
292 debug!("ChainListener::process_notification: ignoring notification due to listening mode");
293 return Ok(());
294 }
295 self.update_wallet(notification.chain_id).await?;
296 self.add_new_chains(*hash).await?;
297 let publishers = self
298 .update_event_subscriptions(notification.chain_id)
299 .await?;
300 if !publishers.is_empty() {
301 self.listen_recursively(publishers).await?;
302 self.maybe_process_inbox(notification.chain_id).await?;
303 }
304 self.process_new_events(notification.chain_id).await?;
305 }
306 Reason::NewEvents { event_streams, .. } => {
307 let should_process = match listening_mode {
308 ListeningMode::FullChain => true,
309 ListeningMode::EventsOnly(relevant_events) => {
310 relevant_events.intersection(event_streams).count() != 0
311 }
312 };
313 if !should_process {
314 debug!(
315 ?notification,
316 ?listening_mode,
317 "ChainListener::process_notification: ignoring notification due to no relevant events",
318 );
319 return Ok(());
320 }
321 self.process_new_events(notification.chain_id).await?;
322 }
323 }
324 Self::sleep(self.config.delay_after_ms).await;
325 Ok(())
326 }
327
328 async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
331 let block = self
332 .storage
333 .read_confirmed_block(hash)
334 .await?
335 .ok_or(ChainClientError::MissingConfirmedBlock(hash))?
336 .into_block();
337 let blobs = block.created_blobs().into_iter();
338 let new_chains = blobs
339 .filter_map(|(blob_id, blob)| {
340 if blob_id.blob_type == BlobType::ChainDescription {
341 let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
342 .expect("ChainDescription should deserialize correctly");
343 let owners = chain_desc.config().ownership.all_owners().cloned();
344 Some((ChainId(blob_id.hash), owners.collect::<Vec<_>>()))
345 } else {
346 None
347 }
348 })
349 .collect::<Vec<_>>();
350 if new_chains.is_empty() {
351 return Ok(());
352 }
353 let mut new_ids = BTreeMap::new();
354 let mut context_guard = self.context.lock().await;
355 for (new_chain_id, owners) in new_chains {
356 for chain_owner in owners {
357 if context_guard
358 .client()
359 .signer()
360 .contains_key(&chain_owner)
361 .await
362 .map_err(ChainClientError::signer_failure)?
363 {
364 context_guard
365 .update_wallet_for_new_chain(
366 new_chain_id,
367 Some(chain_owner),
368 block.header.timestamp,
369 block.header.epoch,
370 )
371 .await?;
372 new_ids.insert(new_chain_id, ListeningMode::FullChain);
373 }
374 }
375 }
376 drop(context_guard);
377 self.listen_recursively(new_ids).await?;
378 Ok(())
379 }
380
381 async fn process_new_events(&mut self, chain_id: ChainId) -> Result<(), Error> {
383 let Some(subscribers) = self.event_subscribers.get(&chain_id).cloned() else {
384 return Ok(());
385 };
386 for subscriber_id in subscribers {
387 self.maybe_process_inbox(subscriber_id).await?;
388 }
389 Ok(())
390 }
391
392 async fn listen_recursively(
395 &mut self,
396 mut chain_ids: BTreeMap<ChainId, ListeningMode>,
397 ) -> Result<(), Error> {
398 while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
399 for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
400 match chain_ids.entry(new_chain_id) {
401 Entry::Vacant(vacant) => {
402 vacant.insert(new_listening_mode);
403 }
404 Entry::Occupied(mut occupied) => {
405 occupied.get_mut().extend(Some(new_listening_mode));
406 }
407 }
408 }
409 }
410
411 Ok(())
412 }
413
414 #[instrument(skip(context, cancellation_token))]
417 async fn background_sync_received_certificates(
418 context: Arc<Mutex<C>>,
419 chain_id: ChainId,
420 cancellation_token: CancellationToken,
421 ) -> Result<(), Error> {
422 info!("Starting background certificate sync for chain {chain_id}");
423 let client = context.lock().await.make_chain_client(chain_id);
424
425 Ok(client
426 .find_received_certificates(Some(cancellation_token))
427 .await?)
428 }
429
430 async fn listen(
434 &mut self,
435 chain_id: ChainId,
436 mut listening_mode: ListeningMode,
437 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
438 if self
439 .listening
440 .get(&chain_id)
441 .is_some_and(|existing_client| existing_client.listening_mode >= listening_mode)
442 {
443 return Ok(BTreeMap::new());
444 }
445 listening_mode.extend(
446 self.listening
447 .get(&chain_id)
448 .map(|existing_client| existing_client.listening_mode.clone()),
449 );
450 let client = self.context.lock().await.make_chain_client(chain_id);
451 let (listener, abort_handle, notification_stream) =
452 client.listen(listening_mode.clone()).await?;
453 let join_handle = linera_base::task::spawn(listener.in_current_span());
454 let listening_client = ListeningClient::new(
455 client,
456 abort_handle,
457 join_handle,
458 notification_stream,
459 listening_mode,
460 );
461 self.listening.insert(chain_id, listening_client);
462 let publishing_chains = self.update_event_subscriptions(chain_id).await?;
463 self.maybe_process_inbox(chain_id).await?;
464 Ok(publishing_chains)
465 }
466
467 async fn update_event_subscriptions(
469 &mut self,
470 chain_id: ChainId,
471 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
472 let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
473 if !listening_client.client.is_tracked() {
474 return Ok(BTreeMap::new());
475 }
476 let publishing_chains: BTreeMap<_, _> = listening_client
477 .client
478 .event_stream_publishers()
479 .await?
480 .into_iter()
481 .map(|(chain_id, streams)| (chain_id, ListeningMode::EventsOnly(streams)))
482 .collect();
483 for publisher_id in publishing_chains.keys() {
484 self.event_subscribers
485 .entry(*publisher_id)
486 .or_default()
487 .insert(chain_id);
488 }
489 Ok(publishing_chains)
490 }
491
492 async fn next_action(&mut self) -> Result<Action, Error> {
494 loop {
495 let (timeout_chain_id, timeout) = self.next_timeout()?;
496 let notification_futures = self
497 .listening
498 .values_mut()
499 .map(|client| {
500 let stream = client.notification_stream.clone();
501 Box::pin(async move { stream.lock().await.next().await })
502 })
503 .collect::<Vec<_>>();
504 futures::select! {
505 () = self.cancellation_token.cancelled().fuse() => {
506 return Ok(Action::Stop);
507 }
508 () = self.storage.clock().sleep_until(timeout).fuse() => {
509 return Ok(Action::ProcessInbox(timeout_chain_id));
510 }
511 (maybe_notification, index, _) = select_all(notification_futures).fuse() => {
512 let Some(notification) = maybe_notification else {
513 let chain_id = *self.listening.keys().nth(index).unwrap();
514 self.listening.remove(&chain_id);
515 warn!("Notification stream for {chain_id} closed");
516 continue;
517 };
518 return Ok(Action::Notification(notification));
519 }
520 }
521 }
522 }
523
524 fn next_timeout(&self) -> Result<(ChainId, Timestamp), Error> {
526 let (chain_id, client) = self
527 .listening
528 .iter()
529 .min_by_key(|(_, client)| client.timeout)
530 .expect("No chains left to listen to");
531 Ok((*chain_id, client.timeout))
532 }
533
534 async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
536 let chain_id = notification.chain_id;
537 let listening_client = self.listening.get(&chain_id).expect("missing client");
538 if let Err(error) = listening_client.client.update_validators(None).await {
539 warn!(
540 "Failed to update validators about the local chain after \
541 receiving {notification:?} with error: {error:?}"
542 );
543 }
544 Ok(())
545 }
546
547 async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
549 let client = &self
550 .listening
551 .get(&chain_id)
552 .expect("missing client")
553 .client;
554 self.context.lock().await.update_wallet(client).await?;
555 Ok(())
556 }
557
558 async fn maybe_process_inbox(&mut self, chain_id: ChainId) -> Result<(), Error> {
566 if self.config.skip_process_inbox {
567 debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
568 return Ok(());
569 }
570 let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
571 if !listening_client.client.is_tracked() {
572 debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
573 return Ok(());
574 }
575 debug!("Processing inbox for {chain_id:.8}");
576 listening_client.timeout = Timestamp::from(u64::MAX);
577 match listening_client
578 .client
579 .process_inbox_without_prepare()
580 .await
581 {
582 Err(ChainClientError::CannotFindKeyForChain(chain_id)) => {
583 debug!(%chain_id, "Cannot find key for chain");
584 }
585 Err(error) => warn!(%error, "Failed to process inbox."),
586 Ok((certs, None)) => info!(
587 "Done processing inbox of chain. {} blocks created on chain {chain_id}.",
588 certs.len()
589 ),
590 Ok((certs, Some(new_timeout))) => {
591 info!(
592 "{} blocks created on chain {chain_id}. Will try processing the inbox later \
593 based on the round timeout: {new_timeout:?}",
594 certs.len(),
595 );
596 listening_client.timeout = new_timeout.timestamp;
597 }
598 }
599 let mut context_guard = self.context.lock().await;
600 context_guard
601 .update_wallet(&listening_client.client)
602 .await?;
603 Ok(())
604 }
605
606 async fn sleep(delay_ms: u64) {
608 if delay_ms > 0 {
609 linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
610 }
611 }
612}
613
614enum Action {
615 ProcessInbox(ChainId),
616 Notification(Notification),
617 Stop,
618}