1use std::{
5 collections::{btree_map::Entry, BTreeMap, BTreeSet},
6 sync::Arc,
7 time::Duration,
8};
9
10use futures::{future, lock::Mutex, Future, FutureExt as _, StreamExt};
11use linera_base::{
12 crypto::{CryptoHash, Signer},
13 data_types::{ChainDescription, Epoch, MessagePolicy, Timestamp},
14 identifiers::{AccountOwner, BlobType, ChainId},
15 util::future::FutureSyncExt as _,
16 Task,
17};
18use linera_core::{
19 client::{
20 chain_client::{self, ChainClient},
21 AbortOnDrop, ListeningMode,
22 },
23 node::NotificationStream,
24 worker::{Notification, Reason},
25 Environment, Wallet,
26};
27use linera_storage::{Clock as _, Storage as _};
28use tokio::sync::mpsc::UnboundedReceiver;
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, info, instrument, warn, Instrument as _};
31
32use crate::error::{self, Error};
33
34#[derive(Default, Debug, Clone, clap::Args, serde::Serialize, serde::Deserialize, tsify::Tsify)]
35#[serde(rename_all = "camelCase")]
36pub struct ChainListenerConfig {
37 #[serde(default)]
40 #[arg(
41 long = "listener-skip-process-inbox",
42 env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
43 )]
44 pub skip_process_inbox: bool,
45
46 #[serde(default)]
48 #[arg(
49 long = "listener-delay-before-ms",
50 default_value = "0",
51 env = "LINERA_LISTENER_DELAY_BEFORE"
52 )]
53 pub delay_before_ms: u64,
54
55 #[serde(default)]
57 #[arg(
58 long = "listener-delay-after-ms",
59 default_value = "0",
60 env = "LINERA_LISTENER_DELAY_AFTER"
61 )]
62 pub delay_after_ms: u64,
63}
64
65type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
66
67#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
68#[allow(async_fn_in_trait)]
69pub trait ClientContext {
70 type Environment: linera_core::Environment;
71
72 fn wallet(&self) -> &<Self::Environment as linera_core::Environment>::Wallet;
73
74 fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
75
76 fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
77
78 fn admin_chain_id(&self) -> ChainId {
79 self.client().admin_chain_id()
80 }
81
82 #[cfg(not(web))]
84 fn timing_sender(
85 &self,
86 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
87
88 #[cfg(web)]
89 fn timing_sender(
90 &self,
91 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
92 None
93 }
94
95 fn make_chain_client(
96 &self,
97 chain_id: ChainId,
98 ) -> impl Future<Output = Result<ChainClient<Self::Environment>, Error>> {
99 async move {
100 let chain = self
101 .wallet()
102 .get(chain_id)
103 .make_sync()
104 .await
105 .map_err(error::Inner::wallet)?
106 .unwrap_or_default();
107 let follow_only = chain.is_follow_only();
108 Ok(self.client().create_chain_client(
109 chain_id,
110 chain.block_hash,
111 chain.next_block_height,
112 chain.pending_proposal,
113 chain.owner,
114 self.timing_sender(),
115 follow_only,
116 ))
117 }
118 }
119
120 async fn update_wallet_for_new_chain(
121 &mut self,
122 chain_id: ChainId,
123 owner: Option<AccountOwner>,
124 timestamp: Timestamp,
125 epoch: Epoch,
126 ) -> Result<(), Error>;
127
128 async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
129}
130
131#[allow(async_fn_in_trait)]
132pub trait ClientContextExt: ClientContext {
133 async fn clients(&self) -> Result<Vec<ContextChainClient<Self>>, Error> {
134 use futures::stream::TryStreamExt as _;
135 self.wallet()
136 .chain_ids()
137 .map_err(|e| error::Inner::wallet(e).into())
138 .and_then(|chain_id| self.make_chain_client(chain_id))
139 .try_collect()
140 .await
141 }
142}
143
144impl<T: ClientContext> ClientContextExt for T {}
145
146struct ListeningClient<C: ClientContext> {
152 client: ContextChainClient<C>,
154 abort_handle: AbortOnDrop,
156 listener: Task<()>,
158 notification_stream: Arc<Mutex<NotificationStream>>,
160 timeout: Timestamp,
162 background_sync: Task<()>,
164}
165
166impl<C: ClientContext> ListeningClient<C> {
167 fn new(
168 client: ContextChainClient<C>,
169 abort_handle: AbortOnDrop,
170 listener: Task<()>,
171 notification_stream: NotificationStream,
172 background_sync: Task<()>,
173 ) -> Self {
174 Self {
175 client,
176 abort_handle,
177 listener,
178 #[allow(clippy::arc_with_non_send_sync)] notification_stream: Arc::new(Mutex::new(notification_stream)),
180 timeout: Timestamp::from(u64::MAX),
181 background_sync,
182 }
183 }
184
185 async fn stop(self) {
186 drop(self.abort_handle);
188 futures::future::join(self.listener.cancel(), self.background_sync.cancel()).await;
189 }
190}
191
192pub enum ListenerCommand {
194 Listen(BTreeMap<ChainId, Option<AccountOwner>>),
197 StopListening(BTreeSet<ChainId>),
199 SetMessagePolicy(BTreeMap<ChainId, MessagePolicy>),
201}
202
203pub struct ChainListener<C: ClientContext> {
206 context: Arc<Mutex<C>>,
207 storage: <C::Environment as Environment>::Storage,
208 config: Arc<ChainListenerConfig>,
209 listening: BTreeMap<ChainId, ListeningClient<C>>,
210 cancellation_token: CancellationToken,
211 event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
214 command_receiver: UnboundedReceiver<ListenerCommand>,
216 enable_background_sync: bool,
218}
219
220impl<C: ClientContext + 'static> ChainListener<C> {
221 pub fn new(
223 config: ChainListenerConfig,
224 context: Arc<Mutex<C>>,
225 storage: <C::Environment as Environment>::Storage,
226 cancellation_token: CancellationToken,
227 command_receiver: UnboundedReceiver<ListenerCommand>,
228 enable_background_sync: bool,
229 ) -> Self {
230 Self {
231 storage,
232 context,
233 config: Arc::new(config),
234 listening: Default::default(),
235 cancellation_token,
236 event_subscribers: Default::default(),
237 command_receiver,
238 enable_background_sync,
239 }
240 }
241
242 #[instrument(skip(self))]
244 pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
245 let chain_ids = {
246 let guard = self.context.lock().await;
247 let admin_chain_id = guard.admin_chain_id();
248 guard
249 .make_chain_client(admin_chain_id)
250 .await?
251 .synchronize_chain_state(admin_chain_id)
252 .await?;
253 let mut chain_ids: BTreeMap<_, _> = guard
254 .wallet()
255 .items()
256 .collect::<Vec<_>>()
257 .await
258 .into_iter()
259 .map(|result| {
260 let (chain_id, chain) = result?;
261 let mode = if chain.is_follow_only() {
262 ListeningMode::FollowChain
263 } else {
264 ListeningMode::FullChain
265 };
266 Ok((chain_id, mode))
267 })
268 .collect::<Result<BTreeMap<_, _>, _>>()
269 .map_err(
270 |e: <<C::Environment as Environment>::Wallet as Wallet>::Error| {
271 crate::error::Inner::Wallet(Box::new(e) as _)
272 },
273 )?;
274 chain_ids
277 .entry(admin_chain_id)
278 .or_insert(ListeningMode::FollowChain);
279 chain_ids
280 };
281
282 Ok(async move {
283 self.listen_recursively(chain_ids).await?;
284 loop {
285 match self.next_action().await? {
286 Action::Stop => break,
287 Action::ProcessInbox(chain_id) => self.maybe_process_inbox(chain_id).await?,
288 Action::Notification(notification) => {
289 self.process_notification(notification).await?
290 }
291 }
292 }
293 future::join_all(self.listening.into_values().map(|client| client.stop())).await;
294 Ok(())
295 })
296 }
297
298 async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
300 Self::sleep(self.config.delay_before_ms).await;
301 let Some(listening_client) = self.listening.get(¬ification.chain_id) else {
302 warn!(
303 ?notification,
304 "ChainListener::process_notification: got a notification without listening to the chain"
305 );
306 return Ok(());
307 };
308 let Some(listening_mode) = listening_client.client.listening_mode() else {
309 warn!(
310 ?notification,
311 "ChainListener::process_notification: chain has no listening mode"
312 );
313 return Ok(());
314 };
315
316 if !listening_mode.is_relevant(¬ification.reason) {
317 debug!(
318 reason = ?notification.reason,
319 "ChainListener: ignoring notification due to listening mode"
320 );
321 return Ok(());
322 }
323 match ¬ification.reason {
324 Reason::NewIncomingBundle { .. } => {
325 self.maybe_process_inbox(notification.chain_id).await?;
326 }
327 Reason::NewRound { .. } => {
328 self.update_validators(¬ification).await?;
329 }
330 Reason::NewBlock { hash, .. } => {
331 self.update_wallet(notification.chain_id).await?;
332 if listening_mode.is_full() {
333 self.add_new_chains(*hash).await?;
334 let publishers = self
335 .update_event_subscriptions(notification.chain_id)
336 .await?;
337 if !publishers.is_empty() {
338 self.listen_recursively(publishers).await?;
339 self.maybe_process_inbox(notification.chain_id).await?;
340 }
341 self.process_new_events(notification.chain_id).await?;
342 }
343 }
344 Reason::NewEvents { .. } => {
345 self.process_new_events(notification.chain_id).await?;
346 }
347 Reason::BlockExecuted { .. } => {}
348 }
349 Self::sleep(self.config.delay_after_ms).await;
350 Ok(())
351 }
352
353 async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
357 let block = self
358 .storage
359 .read_confirmed_block(hash)
360 .await?
361 .ok_or(chain_client::Error::MissingConfirmedBlock(hash))?
362 .into_block();
363 let parent_chain_id = block.header.chain_id;
364 let blobs = block.created_blobs().into_iter();
365 let new_chains = blobs
366 .filter_map(|(blob_id, blob)| {
367 if blob_id.blob_type == BlobType::ChainDescription {
368 let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
369 .expect("ChainDescription should deserialize correctly");
370 Some((ChainId(blob_id.hash), chain_desc))
371 } else {
372 None
373 }
374 })
375 .collect::<Vec<_>>();
376 if new_chains.is_empty() {
377 return Ok(());
378 }
379 let mut new_ids = BTreeMap::new();
380 let mut context_guard = self.context.lock().await;
381 for (new_chain_id, chain_desc) in new_chains {
382 for chain_owner in chain_desc.config().ownership.all_owners() {
383 if context_guard.client().has_key_for(chain_owner).await? {
384 context_guard
385 .update_wallet_for_new_chain(
386 new_chain_id,
387 Some(*chain_owner),
388 block.header.timestamp,
389 block.header.epoch,
390 )
391 .await?;
392 context_guard
393 .client()
394 .extend_chain_mode(new_chain_id, ListeningMode::FullChain);
395 new_ids.insert(new_chain_id, ListeningMode::FullChain);
396 }
397 }
398 }
399 if !new_ids.is_empty() {
402 context_guard
403 .client()
404 .local_node
405 .retry_pending_cross_chain_requests(parent_chain_id)
406 .await?;
407 }
408 drop(context_guard);
409 self.listen_recursively(new_ids).await?;
410 Ok(())
411 }
412
413 async fn process_new_events(&mut self, chain_id: ChainId) -> Result<(), Error> {
415 let Some(subscribers) = self.event_subscribers.get(&chain_id).cloned() else {
416 return Ok(());
417 };
418 for subscriber_id in subscribers {
419 self.maybe_process_inbox(subscriber_id).await?;
420 }
421 Ok(())
422 }
423
424 async fn listen_recursively(
427 &mut self,
428 mut chain_ids: BTreeMap<ChainId, ListeningMode>,
429 ) -> Result<(), Error> {
430 while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
431 for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
432 match chain_ids.entry(new_chain_id) {
433 Entry::Vacant(vacant) => {
434 vacant.insert(new_listening_mode);
435 }
436 Entry::Occupied(mut occupied) => {
437 occupied.get_mut().extend(Some(new_listening_mode));
438 }
439 }
440 }
441 }
442
443 Ok(())
444 }
445
446 #[instrument(skip(context))]
449 async fn background_sync_received_certificates(
450 context: Arc<Mutex<C>>,
451 chain_id: ChainId,
452 ) -> Result<(), Error> {
453 info!("Starting background certificate sync for chain {chain_id}");
454 let client = context.lock().await.make_chain_client(chain_id).await?;
455
456 Ok(client.find_received_certificates().await?)
457 }
458
459 async fn listen(
463 &mut self,
464 chain_id: ChainId,
465 listening_mode: ListeningMode,
466 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
467 let context_guard = self.context.lock().await;
468 let existing_mode = context_guard.client().chain_mode(chain_id);
469 if self.listening.contains_key(&chain_id)
471 && existing_mode.as_ref().is_some_and(|m| *m >= listening_mode)
472 {
473 return Ok(BTreeMap::new());
474 }
475 context_guard
477 .client()
478 .extend_chain_mode(chain_id, listening_mode);
479 drop(context_guard);
480
481 let background_sync_task = self.start_background_sync(chain_id).await;
483 let client = self
484 .context
485 .lock()
486 .await
487 .make_chain_client(chain_id)
488 .await?;
489 let (listener, abort_handle, notification_stream) = client.listen().await?;
490 let listening_client = ListeningClient::new(
491 client,
492 abort_handle,
493 Task::spawn(listener.in_current_span()),
494 notification_stream,
495 background_sync_task,
496 );
497 self.listening.insert(chain_id, listening_client);
498 let publishing_chains = self.update_event_subscriptions(chain_id).await?;
499 self.maybe_process_inbox(chain_id).await?;
500 Ok(publishing_chains)
501 }
502
503 async fn start_background_sync(&mut self, chain_id: ChainId) -> Task<()> {
504 if !self.enable_background_sync
505 || !self
506 .context
507 .lock()
508 .await
509 .client()
510 .chain_mode(chain_id)
511 .is_some_and(|m| m.is_full())
512 {
513 return Task::ready(());
514 }
515
516 let context = Arc::clone(&self.context);
517 Task::spawn(async move {
518 if let Err(e) = Self::background_sync_received_certificates(context, chain_id).await {
519 warn!("Background sync failed for chain {chain_id}: {e}");
520 }
521 })
522 }
523
524 fn remove_event_subscriber(&mut self, chain_id: ChainId) {
525 self.event_subscribers.retain(|_, subscribers| {
526 subscribers.remove(&chain_id);
527 !subscribers.is_empty()
528 });
529 }
530
531 async fn update_event_subscriptions(
533 &mut self,
534 chain_id: ChainId,
535 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
536 let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
537 if !listening_client.client.is_tracked() {
538 return Ok(BTreeMap::new());
539 }
540 let publishing_chains: BTreeMap<_, _> = listening_client
541 .client
542 .event_stream_publishers()
543 .await?
544 .into_iter()
545 .map(|(chain_id, streams)| (chain_id, ListeningMode::EventsOnly(streams)))
546 .collect();
547 for publisher_id in publishing_chains.keys() {
548 self.event_subscribers
549 .entry(*publisher_id)
550 .or_default()
551 .insert(chain_id);
552 }
553 Ok(publishing_chains)
554 }
555
556 async fn next_action(&mut self) -> Result<Action, Error> {
558 loop {
559 let (timeout_chain_id, timeout) = self.next_timeout()?;
560 let notification_futures = self
561 .listening
562 .values_mut()
563 .map(|client| {
564 let stream = client.notification_stream.clone();
565 Box::pin(async move { stream.lock().await.next().await })
566 })
567 .collect::<Vec<_>>();
568 futures::select! {
569 () = self.cancellation_token.cancelled().fuse() => {
570 return Ok(Action::Stop);
571 }
572 () = self.storage.clock().sleep_until(timeout).fuse() => {
573 return Ok(Action::ProcessInbox(timeout_chain_id));
574 }
575 command = self.command_receiver.recv().then(async |maybe_command| {
576 if let Some(command) = maybe_command {
577 command
578 } else {
579 std::future::pending().await
580 }
581 }).fuse() => {
582 match command {
583 ListenerCommand::Listen(new_chains) => {
584 debug!(?new_chains, "received command to listen to new chains");
585 let listening_modes = self.update_wallet_for_listening(new_chains).await?;
586 self.listen_recursively(listening_modes).await?;
587 }
588 ListenerCommand::StopListening(chains) => {
589 debug!(?chains, "received command to stop listening to chains");
590 for chain_id in chains {
591 debug!(%chain_id, "stopping the listener for chain");
592 let Some(listening_client) = self.listening.remove(&chain_id) else {
593 error!(%chain_id, "attempted to drop a non-existent listener");
594 continue;
595 };
596 self.remove_event_subscriber(chain_id);
597 listening_client.stop().await;
598 if let Err(error) = self.context.lock().await.wallet().remove(chain_id).await {
599 error!(%error, %chain_id, "error removing a chain from the wallet");
600 }
601 }
602 }
603 ListenerCommand::SetMessagePolicy(policies) => {
604 debug!(?policies, "received command to set message policies");
605 for (chain_id, policy) in policies {
606 let Some(listening_client) = self.listening.get_mut(&chain_id) else {
607 error!(
608 %chain_id,
609 "attempted to set the message policy of a non-existent \
610 listener"
611 );
612 continue;
613 };
614 listening_client.client.options_mut().message_policy = policy;
615 }
616 }
617 }
618 }
619 (maybe_notification, index, _) = future::select_all(notification_futures).fuse() => {
620 let Some(notification) = maybe_notification else {
621 let chain_id = *self.listening.keys().nth(index).unwrap();
622 warn!("Notification stream for {chain_id} closed");
623 let Some(listening_client) = self.listening.remove(&chain_id) else {
624 error!(%chain_id, "attempted to drop a non-existent listener");
625 continue;
626 };
627 self.remove_event_subscriber(chain_id);
628 listening_client.stop().await;
629 continue;
630 };
631 return Ok(Action::Notification(notification));
632 }
633 }
634 }
635 }
636
637 fn next_timeout(&self) -> Result<(ChainId, Timestamp), Error> {
639 let (chain_id, client) = self
640 .listening
641 .iter()
642 .min_by_key(|(_, client)| client.timeout)
643 .expect("No chains left to listen to");
644 Ok((*chain_id, client.timeout))
645 }
646
647 async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
649 let chain_id = notification.chain_id;
650 let listening_client = self.listening.get(&chain_id).expect("missing client");
651 let latest_block = if let Reason::NewBlock { hash, .. } = ¬ification.reason {
652 listening_client.client.read_certificate(*hash).await.ok()
653 } else {
654 None
655 };
656 if let Err(error) = listening_client
657 .client
658 .update_validators(None, latest_block)
659 .await
660 {
661 warn!(
662 "Failed to update validators about the local chain after \
663 receiving {notification:?} with error: {error:?}"
664 );
665 }
666 Ok(())
667 }
668
669 async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
671 let client = &self
672 .listening
673 .get(&chain_id)
674 .expect("missing client")
675 .client;
676 self.context.lock().await.update_wallet(client).await?;
677 Ok(())
678 }
679
680 async fn update_wallet_for_listening(
684 &self,
685 new_chains: BTreeMap<ChainId, Option<AccountOwner>>,
686 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
687 let mut chains = BTreeMap::new();
688 let context_guard = self.context.lock().await;
689 for (chain_id, owner) in new_chains {
690 if let Some(owner) = owner {
691 if context_guard
692 .client()
693 .signer()
694 .contains_key(&owner)
695 .await
696 .map_err(chain_client::Error::signer_failure)?
697 {
698 let modified = context_guard
700 .wallet()
701 .modify(chain_id, |chain| chain.owner = Some(owner))
702 .await
703 .map_err(error::Inner::wallet)?;
704 if modified.is_none() {
706 let chain_description = context_guard
707 .client()
708 .get_chain_description(chain_id)
709 .await?;
710 let timestamp = chain_description.timestamp();
711 let epoch = chain_description.config().epoch;
712 context_guard
713 .wallet()
714 .insert(
715 chain_id,
716 linera_core::wallet::Chain {
717 owner: Some(owner),
718 timestamp,
719 epoch: Some(epoch),
720 ..Default::default()
721 },
722 )
723 .await
724 .map_err(error::Inner::wallet)?;
725 }
726
727 chains.insert(chain_id, ListeningMode::FullChain);
728 }
729 } else {
730 chains.insert(chain_id, ListeningMode::FollowChain);
731 }
732 }
733 Ok(chains)
734 }
735
736 async fn maybe_process_inbox(&mut self, chain_id: ChainId) -> Result<(), Error> {
744 if self.config.skip_process_inbox {
745 debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
746 return Ok(());
747 }
748 let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
749 if !listening_client.client.is_tracked() {
750 debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
751 return Ok(());
752 }
753 if listening_client.client.preferred_owner().is_none() {
754 debug!("Not processing inbox for non-owned chain {chain_id:.8}");
755 return Ok(());
756 }
757 debug!("Processing inbox for {chain_id:.8}");
758 listening_client.timeout = Timestamp::from(u64::MAX);
759 match listening_client
760 .client
761 .process_inbox_without_prepare()
762 .await
763 {
764 Err(chain_client::Error::CannotFindKeyForChain(chain_id)) => {
765 debug!(%chain_id, "Cannot find key for chain");
766 }
767 Err(error) => warn!(%error, "Failed to process inbox."),
768 Ok((certs, None)) if certs.is_empty() => debug!(
769 %chain_id,
770 "done processing inbox: no blocks created",
771 ),
772 Ok((certs, None)) => info!(
773 %chain_id,
774 created_block_count = %certs.len(),
775 "done processing inbox",
776 ),
777 Ok((certs, Some(new_timeout))) => {
778 info!(
779 %chain_id,
780 created_block_count = %certs.len(),
781 timeout = %new_timeout,
782 "waiting for round timeout before continuing to process the inbox",
783 );
784 listening_client.timeout = new_timeout.timestamp;
785 }
786 }
787 let mut context_guard = self.context.lock().await;
788 context_guard
789 .update_wallet(&listening_client.client)
790 .await?;
791 Ok(())
792 }
793
794 async fn sleep(delay_ms: u64) {
796 if delay_ms > 0 {
797 linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
798 }
799 }
800}
801
802enum Action {
803 ProcessInbox(ChainId),
804 Notification(Notification),
805 Stop,
806}