1use std::{
5 collections::{btree_map::Entry, BTreeMap, BTreeSet},
6 sync::Arc,
7 time::Duration,
8};
9
10use futures::{
11 future::{join_all, select_all},
12 lock::Mutex,
13 Future, FutureExt as _, StreamExt,
14};
15use linera_base::{
16 crypto::{CryptoHash, Signer},
17 data_types::{ChainDescription, Epoch, Timestamp},
18 identifiers::{AccountOwner, BlobType, ChainId},
19 task::NonBlockingFuture,
20};
21use linera_core::{
22 client::{
23 chain_client::{self, ChainClient},
24 AbortOnDrop, ListeningMode,
25 },
26 node::NotificationStream,
27 worker::{Notification, Reason},
28 Environment,
29};
30use linera_storage::{Clock as _, Storage as _};
31use tokio_util::sync::CancellationToken;
32use tracing::{debug, info, instrument, warn, Instrument as _};
33
34use crate::{
35 wallet::{UserChain, Wallet},
36 Error,
37};
38
39#[derive(Debug, Default, Clone, clap::Args, serde::Serialize)]
40pub struct ChainListenerConfig {
41 #[arg(
44 long = "listener-skip-process-inbox",
45 env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
46 )]
47 pub skip_process_inbox: bool,
48
49 #[arg(
51 long = "listener-delay-before-ms",
52 default_value = "0",
53 env = "LINERA_LISTENER_DELAY_BEFORE"
54 )]
55 pub delay_before_ms: u64,
56
57 #[arg(
59 long = "listener-delay-after-ms",
60 default_value = "0",
61 env = "LINERA_LISTENER_DELAY_AFTER"
62 )]
63 pub delay_after_ms: u64,
64}
65
66type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
67
68#[cfg_attr(not(web), trait_variant::make(Send))]
69#[allow(async_fn_in_trait)]
70pub trait ClientContext {
71 type Environment: linera_core::Environment;
72
73 fn wallet(&self) -> &Wallet;
74
75 fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
76
77 fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
78
79 #[cfg(not(web))]
81 fn timing_sender(
82 &self,
83 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
84
85 #[cfg(web)]
86 fn timing_sender(
87 &self,
88 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
89 None
90 }
91
92 fn make_chain_client(&self, chain_id: ChainId) -> ChainClient<Self::Environment> {
93 let chain = self
94 .wallet()
95 .get(chain_id)
96 .cloned()
97 .unwrap_or_else(|| UserChain::make_other(chain_id, Timestamp::from(0)));
98 self.client().create_chain_client(
99 chain_id,
100 chain.block_hash,
101 chain.next_block_height,
102 chain.pending_proposal,
103 chain.owner,
104 self.timing_sender(),
105 )
106 }
107
108 async fn update_wallet_for_new_chain(
109 &mut self,
110 chain_id: ChainId,
111 owner: Option<AccountOwner>,
112 timestamp: Timestamp,
113 epoch: Epoch,
114 ) -> Result<(), Error>;
115
116 async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
117}
118
119#[allow(async_fn_in_trait)]
120pub trait ClientContextExt: ClientContext {
121 fn clients(&self) -> Vec<ContextChainClient<Self>> {
122 let chain_ids = self.wallet().chain_ids();
123 let mut clients = vec![];
124 for chain_id in chain_ids {
125 clients.push(self.make_chain_client(chain_id));
126 }
127 clients
128 }
129}
130
131impl<T: ClientContext> ClientContextExt for T {}
132
133struct ListeningClient<C: ClientContext> {
139 client: ContextChainClient<C>,
141 abort_handle: AbortOnDrop,
143 join_handle: NonBlockingFuture<()>,
145 notification_stream: Arc<Mutex<NotificationStream>>,
147 timeout: Timestamp,
149 listening_mode: ListeningMode,
151}
152
153impl<C: ClientContext> ListeningClient<C> {
154 fn new(
155 client: ContextChainClient<C>,
156 abort_handle: AbortOnDrop,
157 join_handle: NonBlockingFuture<()>,
158 notification_stream: NotificationStream,
159 listening_mode: ListeningMode,
160 ) -> Self {
161 Self {
162 client,
163 abort_handle,
164 join_handle,
165 #[allow(clippy::arc_with_non_send_sync)] notification_stream: Arc::new(Mutex::new(notification_stream)),
167 timeout: Timestamp::from(u64::MAX),
168 listening_mode,
169 }
170 }
171
172 async fn stop(self) {
173 drop(self.abort_handle);
174 if let Err(error) = self.join_handle.await {
175 warn!("Failed to join listening task: {error:?}");
176 }
177 }
178}
179
180pub struct ChainListener<C: ClientContext> {
183 context: Arc<Mutex<C>>,
184 storage: <C::Environment as Environment>::Storage,
185 config: Arc<ChainListenerConfig>,
186 listening: BTreeMap<ChainId, ListeningClient<C>>,
187 event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
190 cancellation_token: CancellationToken,
191}
192
193impl<C: ClientContext + 'static> ChainListener<C> {
194 pub fn new(
196 config: ChainListenerConfig,
197 context: Arc<Mutex<C>>,
198 storage: <C::Environment as Environment>::Storage,
199 cancellation_token: CancellationToken,
200 ) -> Self {
201 Self {
202 storage,
203 context,
204 config: Arc::new(config),
205 listening: Default::default(),
206 event_subscribers: Default::default(),
207 cancellation_token,
208 }
209 }
210
211 #[instrument(skip(self))]
213 pub async fn run(
214 mut self,
215 enable_background_sync: bool,
216 ) -> Result<impl Future<Output = Result<(), Error>>, Error> {
217 let chain_ids = {
218 let guard = self.context.lock().await;
219 let admin_chain_id = guard.wallet().genesis_admin_chain();
220 guard
221 .make_chain_client(admin_chain_id)
222 .synchronize_chain_state(admin_chain_id)
223 .await?;
224 BTreeMap::from_iter(
225 guard
226 .wallet()
227 .chain_ids()
228 .into_iter()
229 .chain([admin_chain_id])
230 .map(|chain_id| (chain_id, ListeningMode::FullChain)),
231 )
232 };
233
234 if enable_background_sync {
237 let context = Arc::clone(&self.context);
238 let cancellation_token = self.cancellation_token.clone();
239 for chain_id in chain_ids.keys() {
240 let context = Arc::clone(&context);
241 let cancellation_token = cancellation_token.clone();
242 let chain_id = *chain_id;
243 drop(linera_base::task::spawn(async move {
244 if let Err(e) = Self::background_sync_received_certificates(
245 context,
246 chain_id,
247 cancellation_token,
248 )
249 .await
250 {
251 warn!("Background sync failed for chain {chain_id}: {e}");
252 }
253 }));
254 }
255 }
256
257 Ok(async {
258 self.listen_recursively(chain_ids).await?;
259 loop {
260 match self.next_action().await? {
261 Action::ProcessInbox(chain_id) => self.maybe_process_inbox(chain_id).await?,
262 Action::Notification(notification) => {
263 self.process_notification(notification).await?
264 }
265 Action::Stop => break,
266 }
267 }
268 join_all(self.listening.into_values().map(|client| client.stop())).await;
269 Ok(())
270 })
271 }
272
273 async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
275 Self::sleep(self.config.delay_before_ms).await;
276 let Some(listening_mode) = self
277 .listening
278 .get(¬ification.chain_id)
279 .map(|listening_client| &listening_client.listening_mode)
280 else {
281 warn!(
282 ?notification,
283 "ChainListener::process_notification: got a notification without listening to the chain"
284 );
285 return Ok(());
286 };
287
288 match ¬ification.reason {
289 Reason::NewIncomingBundle { .. } => {
290 self.maybe_process_inbox(notification.chain_id).await?;
291 }
292 Reason::NewRound { .. } => self.update_validators(¬ification).await?,
293 Reason::NewBlock { hash, .. } => {
294 if matches!(listening_mode, ListeningMode::EventsOnly(_)) {
295 debug!("ChainListener::process_notification: ignoring notification due to listening mode");
296 return Ok(());
297 }
298 self.update_wallet(notification.chain_id).await?;
299 self.add_new_chains(*hash).await?;
300 let publishers = self
301 .update_event_subscriptions(notification.chain_id)
302 .await?;
303 if !publishers.is_empty() {
304 self.listen_recursively(publishers).await?;
305 self.maybe_process_inbox(notification.chain_id).await?;
306 }
307 self.process_new_events(notification.chain_id).await?;
308 }
309 Reason::NewEvents { event_streams, .. } => {
310 let should_process = match listening_mode {
311 ListeningMode::FullChain => true,
312 ListeningMode::EventsOnly(relevant_events) => {
313 relevant_events.intersection(event_streams).count() != 0
314 }
315 };
316 if !should_process {
317 debug!(
318 ?notification,
319 ?listening_mode,
320 "ChainListener::process_notification: ignoring notification due to no relevant events",
321 );
322 return Ok(());
323 }
324 self.process_new_events(notification.chain_id).await?;
325 }
326 Reason::BlockExecuted { .. } => {}
327 }
328 Self::sleep(self.config.delay_after_ms).await;
329 Ok(())
330 }
331
332 async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
335 let block = self
336 .storage
337 .read_confirmed_block(hash)
338 .await?
339 .ok_or(chain_client::Error::MissingConfirmedBlock(hash))?
340 .into_block();
341 let blobs = block.created_blobs().into_iter();
342 let new_chains = blobs
343 .filter_map(|(blob_id, blob)| {
344 if blob_id.blob_type == BlobType::ChainDescription {
345 let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
346 .expect("ChainDescription should deserialize correctly");
347 let owners = chain_desc.config().ownership.all_owners().cloned();
348 Some((ChainId(blob_id.hash), owners.collect::<Vec<_>>()))
349 } else {
350 None
351 }
352 })
353 .collect::<Vec<_>>();
354 if new_chains.is_empty() {
355 return Ok(());
356 }
357 let mut new_ids = BTreeMap::new();
358 let mut context_guard = self.context.lock().await;
359 for (new_chain_id, owners) in new_chains {
360 for chain_owner in owners {
361 if context_guard
362 .client()
363 .signer()
364 .contains_key(&chain_owner)
365 .await
366 .map_err(chain_client::Error::signer_failure)?
367 {
368 context_guard
369 .update_wallet_for_new_chain(
370 new_chain_id,
371 Some(chain_owner),
372 block.header.timestamp,
373 block.header.epoch,
374 )
375 .await?;
376 new_ids.insert(new_chain_id, ListeningMode::FullChain);
377 }
378 }
379 }
380 drop(context_guard);
381 self.listen_recursively(new_ids).await?;
382 Ok(())
383 }
384
385 async fn process_new_events(&mut self, chain_id: ChainId) -> Result<(), Error> {
387 let Some(subscribers) = self.event_subscribers.get(&chain_id).cloned() else {
388 return Ok(());
389 };
390 for subscriber_id in subscribers {
391 self.maybe_process_inbox(subscriber_id).await?;
392 }
393 Ok(())
394 }
395
396 async fn listen_recursively(
399 &mut self,
400 mut chain_ids: BTreeMap<ChainId, ListeningMode>,
401 ) -> Result<(), Error> {
402 while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
403 for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
404 match chain_ids.entry(new_chain_id) {
405 Entry::Vacant(vacant) => {
406 vacant.insert(new_listening_mode);
407 }
408 Entry::Occupied(mut occupied) => {
409 occupied.get_mut().extend(Some(new_listening_mode));
410 }
411 }
412 }
413 }
414
415 Ok(())
416 }
417
418 #[instrument(skip(context, cancellation_token))]
421 async fn background_sync_received_certificates(
422 context: Arc<Mutex<C>>,
423 chain_id: ChainId,
424 cancellation_token: CancellationToken,
425 ) -> Result<(), Error> {
426 info!("Starting background certificate sync for chain {chain_id}");
427 let client = context.lock().await.make_chain_client(chain_id);
428
429 Ok(client
430 .find_received_certificates(Some(cancellation_token))
431 .await?)
432 }
433
434 async fn listen(
438 &mut self,
439 chain_id: ChainId,
440 mut listening_mode: ListeningMode,
441 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
442 if self
443 .listening
444 .get(&chain_id)
445 .is_some_and(|existing_client| existing_client.listening_mode >= listening_mode)
446 {
447 return Ok(BTreeMap::new());
448 }
449 listening_mode.extend(
450 self.listening
451 .get(&chain_id)
452 .map(|existing_client| existing_client.listening_mode.clone()),
453 );
454 let client = self.context.lock().await.make_chain_client(chain_id);
455 let (listener, abort_handle, notification_stream) =
456 client.listen(listening_mode.clone()).await?;
457 let join_handle = linera_base::task::spawn(listener.in_current_span());
458 let listening_client = ListeningClient::new(
459 client,
460 abort_handle,
461 join_handle,
462 notification_stream,
463 listening_mode,
464 );
465 self.listening.insert(chain_id, listening_client);
466 let publishing_chains = self.update_event_subscriptions(chain_id).await?;
467 self.maybe_process_inbox(chain_id).await?;
468 Ok(publishing_chains)
469 }
470
471 async fn update_event_subscriptions(
473 &mut self,
474 chain_id: ChainId,
475 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
476 let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
477 if !listening_client.client.is_tracked() {
478 return Ok(BTreeMap::new());
479 }
480 let publishing_chains: BTreeMap<_, _> = listening_client
481 .client
482 .event_stream_publishers()
483 .await?
484 .into_iter()
485 .map(|(chain_id, streams)| (chain_id, ListeningMode::EventsOnly(streams)))
486 .collect();
487 for publisher_id in publishing_chains.keys() {
488 self.event_subscribers
489 .entry(*publisher_id)
490 .or_default()
491 .insert(chain_id);
492 }
493 Ok(publishing_chains)
494 }
495
496 async fn next_action(&mut self) -> Result<Action, Error> {
498 loop {
499 let (timeout_chain_id, timeout) = self.next_timeout()?;
500 let notification_futures = self
501 .listening
502 .values_mut()
503 .map(|client| {
504 let stream = client.notification_stream.clone();
505 Box::pin(async move { stream.lock().await.next().await })
506 })
507 .collect::<Vec<_>>();
508 futures::select! {
509 () = self.cancellation_token.cancelled().fuse() => {
510 return Ok(Action::Stop);
511 }
512 () = self.storage.clock().sleep_until(timeout).fuse() => {
513 return Ok(Action::ProcessInbox(timeout_chain_id));
514 }
515 (maybe_notification, index, _) = select_all(notification_futures).fuse() => {
516 let Some(notification) = maybe_notification else {
517 let chain_id = *self.listening.keys().nth(index).unwrap();
518 self.listening.remove(&chain_id);
519 warn!("Notification stream for {chain_id} closed");
520 continue;
521 };
522 return Ok(Action::Notification(notification));
523 }
524 }
525 }
526 }
527
528 fn next_timeout(&self) -> Result<(ChainId, Timestamp), Error> {
530 let (chain_id, client) = self
531 .listening
532 .iter()
533 .min_by_key(|(_, client)| client.timeout)
534 .expect("No chains left to listen to");
535 Ok((*chain_id, client.timeout))
536 }
537
538 async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
540 let chain_id = notification.chain_id;
541 let listening_client = self.listening.get(&chain_id).expect("missing client");
542 if let Err(error) = listening_client.client.update_validators(None).await {
543 warn!(
544 "Failed to update validators about the local chain after \
545 receiving {notification:?} with error: {error:?}"
546 );
547 }
548 Ok(())
549 }
550
551 async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
553 let client = &self
554 .listening
555 .get(&chain_id)
556 .expect("missing client")
557 .client;
558 self.context.lock().await.update_wallet(client).await?;
559 Ok(())
560 }
561
562 async fn maybe_process_inbox(&mut self, chain_id: ChainId) -> Result<(), Error> {
570 if self.config.skip_process_inbox {
571 debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
572 return Ok(());
573 }
574 let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
575 if !listening_client.client.is_tracked() {
576 debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
577 return Ok(());
578 }
579 if listening_client.client.preferred_owner().is_none() {
580 debug!("Not processing inbox for non-owned chain {chain_id:.8}");
581 return Ok(());
582 }
583 debug!("Processing inbox for {chain_id:.8}");
584 listening_client.timeout = Timestamp::from(u64::MAX);
585 match listening_client
586 .client
587 .process_inbox_without_prepare()
588 .await
589 {
590 Err(chain_client::Error::CannotFindKeyForChain(chain_id)) => {
591 debug!(%chain_id, "Cannot find key for chain");
592 }
593 Err(error) => warn!(%error, "Failed to process inbox."),
594 Ok((certs, None)) => info!(
595 "Done processing inbox of chain. {} blocks created on chain {chain_id}.",
596 certs.len()
597 ),
598 Ok((certs, Some(new_timeout))) => {
599 info!(
600 "{} blocks created on chain {chain_id}. Will try processing the inbox later \
601 based on the round timeout: {new_timeout:?}",
602 certs.len(),
603 );
604 listening_client.timeout = new_timeout.timestamp;
605 }
606 }
607 let mut context_guard = self.context.lock().await;
608 context_guard
609 .update_wallet(&listening_client.client)
610 .await?;
611 Ok(())
612 }
613
614 async fn sleep(delay_ms: u64) {
616 if delay_ms > 0 {
617 linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
618 }
619 }
620}
621
622enum Action {
623 ProcessInbox(ChainId),
624 Notification(Notification),
625 Stop,
626}