1use std::{
5 collections::{btree_map::Entry, BTreeMap, BTreeSet},
6 sync::Arc,
7 time::Duration,
8};
9
10use futures::{future, lock::Mutex, Future, FutureExt as _, StreamExt};
11use linera_base::{
12 crypto::{CryptoHash, Signer},
13 data_types::{ChainDescription, Epoch, MessagePolicy, TimeDelta, Timestamp},
14 identifiers::{AccountOwner, BlobType, ChainId},
15 ownership::ChainOwnership,
16 util::future::FutureSyncExt as _,
17 Task,
18};
19use linera_core::{
20 client::{
21 chain_client::{self, ChainClient},
22 AbortOnDrop, ListeningMode,
23 },
24 node::NotificationStream,
25 worker::{Notification, Reason},
26 Environment, Wallet,
27};
28use linera_storage::{Arc as CacheArc, Storage as _};
29use tokio::sync::{mpsc::UnboundedReceiver, Notify};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info, instrument, warn, Instrument as _};
32
33use crate::error::{self, Error};
34
35#[derive(Default, Debug, Clone, clap::Args, serde::Serialize, serde::Deserialize, tsify::Tsify)]
37#[serde(rename_all = "camelCase")]
38pub struct ChainListenerConfig {
39 #[serde(default)]
42 #[arg(
43 long = "listener-skip-process-inbox",
44 env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
45 )]
46 pub skip_process_inbox: bool,
47
48 #[serde(default)]
50 #[arg(
51 long = "listener-delay-before-ms",
52 default_value = "0",
53 env = "LINERA_LISTENER_DELAY_BEFORE"
54 )]
55 pub delay_before_ms: u64,
56
57 #[serde(default)]
59 #[arg(
60 long = "listener-delay-after-ms",
61 default_value = "0",
62 env = "LINERA_LISTENER_DELAY_AFTER"
63 )]
64 pub delay_after_ms: u64,
65}
66
67type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
68
69#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
71#[allow(async_fn_in_trait)]
72pub trait ClientContext {
73 type Environment: linera_core::Environment;
75
76 fn wallet(&self) -> &<Self::Environment as linera_core::Environment>::Wallet;
78
79 fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
81
82 fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
84
85 fn admin_chain_id(&self) -> ChainId {
87 self.client().admin_chain_id()
88 }
89
90 #[cfg(not(web))]
92 fn timing_sender(
93 &self,
94 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
95
96 #[cfg(web)]
98 fn timing_sender(
99 &self,
100 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
101 None
102 }
103
104 fn make_chain_client(
106 &self,
107 chain_id: ChainId,
108 ) -> impl Future<Output = Result<ChainClient<Self::Environment>, Error>> {
109 async move {
110 let chain = self
111 .wallet()
112 .get(chain_id)
113 .make_sync()
114 .await
115 .map_err(error::Inner::wallet)?
116 .unwrap_or_default();
117 let follow_only = chain.is_follow_only();
118 Ok(self.client().create_chain_client(
119 chain_id,
120 chain.block_hash,
121 chain.next_block_height,
122 &chain.pending_fast_proposal,
123 chain.owner,
124 self.timing_sender(),
125 follow_only,
126 ))
127 }
128 }
129
130 async fn update_wallet_for_new_chain(
132 &mut self,
133 chain_id: ChainId,
134 owner: Option<AccountOwner>,
135 timestamp: Timestamp,
136 epoch: Epoch,
137 ) -> Result<(), Error>;
138
139 async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
141}
142
143#[allow(async_fn_in_trait)]
145pub trait ClientContextExt: ClientContext {
146 async fn clients(&self) -> Result<Vec<ContextChainClient<Self>>, Error> {
148 use futures::stream::TryStreamExt as _;
149 self.wallet()
150 .chain_ids()
151 .map_err(|e| error::Inner::wallet(e).into())
152 .and_then(|chain_id| self.make_chain_client(chain_id))
153 .try_collect()
154 .await
155 }
156
157 async fn owners_with_key(
161 &self,
162 owners: impl IntoIterator<Item = AccountOwner>,
163 ) -> Result<BTreeSet<AccountOwner>, Error> {
164 let mut result = BTreeSet::new();
165 for owner in owners.into_iter().collect::<BTreeSet<_>>() {
166 if self.client().has_key_for(&owner).await? {
167 result.insert(owner);
168 }
169 }
170 Ok(result)
171 }
172
173 async fn unique_owner_with_key(
177 &self,
178 owners: impl IntoIterator<Item = AccountOwner>,
179 ) -> Result<Option<AccountOwner>, Error> {
180 let with_key = self.owners_with_key(owners).await?;
181 Ok(if with_key.len() == 1 {
182 with_key.into_iter().next()
183 } else {
184 None
185 })
186 }
187
188 async fn maybe_auto_assign_preferred_owner(
191 &self,
192 chain_client: &mut ContextChainClient<Self>,
193 ownership: &ChainOwnership,
194 ) -> Result<(), Error> {
195 let chain_id = chain_client.chain_id();
196 let old_owner = chain_client.preferred_owner();
197 if old_owner.is_some_and(|o| ownership.all_owners().any(|n| *n == o)) {
198 return Ok(());
199 }
200 let Some(new_owner) = self
201 .unique_owner_with_key(ownership.all_owners().copied())
202 .await?
203 else {
204 return Ok(());
205 };
206 info!(
207 %chain_id, ?old_owner, %new_owner,
208 "Auto-assigning preferred owner from wallet key pair",
209 );
210 chain_client.set_preferred_owner(new_owner);
211 self.wallet()
212 .modify(chain_id, |chain| chain.owner = Some(new_owner))
213 .await
214 .map_err(error::Inner::wallet)?;
215 Ok(())
216 }
217}
218
219impl<T: ClientContext> ClientContextExt for T {}
220
221struct ListeningClient<C: ClientContext> {
227 client: ContextChainClient<C>,
229 abort_handle: AbortOnDrop,
231 listener: Task<()>,
233 notification_stream: Arc<Mutex<NotificationStream>>,
235 background_sync: Task<()>,
237 inbox_notify: Arc<Notify>,
239 inbox_task: Task<()>,
241 inbox_cancellation: CancellationToken,
243}
244
245impl<C: ClientContext + 'static> ListeningClient<C> {
246 #[expect(clippy::too_many_arguments)]
247 fn new(
248 client: ContextChainClient<C>,
249 abort_handle: AbortOnDrop,
250 listener: Task<()>,
251 notification_stream: NotificationStream,
252 background_sync: Task<()>,
253 context: &Arc<Mutex<C>>,
254 config: &Arc<ChainListenerConfig>,
255 parent_cancellation: &CancellationToken,
256 ) -> Self {
257 let inbox_notify = Arc::new(Notify::new());
258 let inbox_cancellation = parent_cancellation.child_token();
259 let inbox_task =
260 Self::spawn_inbox_task(&client, context, config, &inbox_notify, &inbox_cancellation);
261 Self {
262 client,
263 abort_handle,
264 listener,
265 #[allow(clippy::arc_with_non_send_sync)] notification_stream: Arc::new(Mutex::new(notification_stream)),
267 background_sync,
268 inbox_notify,
269 inbox_task,
270 inbox_cancellation,
271 }
272 }
273
274 fn respawn_inbox_task(
277 &mut self,
278 parent_cancellation: &CancellationToken,
279 context: &Arc<Mutex<C>>,
280 config: &Arc<ChainListenerConfig>,
281 ) {
282 self.inbox_cancellation.cancel();
283 self.inbox_cancellation = parent_cancellation.child_token();
284 self.inbox_task = Self::spawn_inbox_task(
285 &self.client,
286 context,
287 config,
288 &self.inbox_notify,
289 &self.inbox_cancellation,
290 );
291 }
292
293 fn spawn_inbox_task(
294 client: &ContextChainClient<C>,
295 context: &Arc<Mutex<C>>,
296 config: &Arc<ChainListenerConfig>,
297 inbox_notify: &Arc<Notify>,
298 inbox_cancellation: &CancellationToken,
299 ) -> Task<()> {
300 Task::spawn(inbox_processing_loop(
301 client.clone(),
302 Arc::clone(context),
303 Arc::clone(config),
304 Arc::clone(inbox_notify),
305 inbox_cancellation.clone(),
306 ))
307 }
308
309 async fn stop(self) {
310 drop(self.abort_handle);
312 self.inbox_cancellation.cancel();
313 futures::future::join3(
314 self.listener.cancel(),
315 self.background_sync.cancel(),
316 self.inbox_task.cancel(),
317 )
318 .await;
319 }
320}
321
322pub enum ListenerCommand {
324 Listen(BTreeMap<ChainId, Option<AccountOwner>>),
327 StopListening(BTreeSet<ChainId>),
329 SetMessagePolicy(BTreeMap<ChainId, MessagePolicy>),
331}
332
333pub struct ChainListener<C: ClientContext> {
336 context: Arc<Mutex<C>>,
337 storage: <C::Environment as Environment>::Storage,
338 config: Arc<ChainListenerConfig>,
339 listening: BTreeMap<ChainId, ListeningClient<C>>,
340 cancellation_token: CancellationToken,
341 event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
344 command_receiver: UnboundedReceiver<ListenerCommand>,
346 enable_background_sync: bool,
348}
349
350impl<C: ClientContext + 'static> ChainListener<C> {
351 pub fn new(
353 config: ChainListenerConfig,
354 context: Arc<Mutex<C>>,
355 storage: <C::Environment as Environment>::Storage,
356 cancellation_token: CancellationToken,
357 command_receiver: UnboundedReceiver<ListenerCommand>,
358 enable_background_sync: bool,
359 ) -> Self {
360 Self {
361 storage,
362 context,
363 config: Arc::new(config),
364 listening: Default::default(),
365 cancellation_token,
366 event_subscribers: Default::default(),
367 command_receiver,
368 enable_background_sync,
369 }
370 }
371
372 #[instrument(skip(self))]
374 pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
375 let chain_ids = {
376 let guard = self.context.lock().await;
377 let admin_chain_id = guard.admin_chain_id();
378 guard
379 .make_chain_client(admin_chain_id)
380 .await?
381 .synchronize_chain_state(admin_chain_id)
382 .await?;
383 let mut chain_ids: BTreeMap<_, _> = guard
384 .wallet()
385 .items()
386 .collect::<Vec<_>>()
387 .await
388 .into_iter()
389 .map(|result| {
390 let (chain_id, chain) = result?;
391 let mode = if chain.is_follow_only() {
392 ListeningMode::FollowChain
393 } else {
394 ListeningMode::FullChain
395 };
396 Ok((chain_id, mode))
397 })
398 .collect::<Result<BTreeMap<_, _>, _>>()
399 .map_err(
400 |e: <<C::Environment as Environment>::Wallet as Wallet>::Error| {
401 crate::error::Inner::Wallet(Box::new(e) as _)
402 },
403 )?;
404 chain_ids
407 .entry(admin_chain_id)
408 .or_insert(ListeningMode::FollowChain);
409 chain_ids
410 };
411
412 Ok(async move {
413 self.listen_recursively(chain_ids).await?;
414 loop {
415 match self.next_action().await? {
416 Action::Stop => break,
417 Action::Notification(notification) => {
418 self.process_notification(notification).await?
419 }
420 }
421 }
422 future::join_all(self.listening.into_values().map(|client| client.stop())).await;
423 Ok(())
424 })
425 }
426
427 async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
429 Self::sleep(self.config.delay_before_ms).await;
430 let Some(listening_client) = self.listening.get(¬ification.chain_id) else {
431 warn!(
432 ?notification,
433 "ChainListener::process_notification: got a notification without listening to the chain"
434 );
435 return Ok(());
436 };
437 let Some(listening_mode) = listening_client.client.listening_mode() else {
438 warn!(
439 ?notification,
440 "ChainListener::process_notification: chain has no listening mode"
441 );
442 return Ok(());
443 };
444
445 if !listening_mode.is_relevant(¬ification.reason) {
446 debug!(
447 reason = ?notification.reason,
448 "ChainListener: ignoring notification due to listening mode"
449 );
450 return Ok(());
451 }
452 match ¬ification.reason {
453 Reason::NewIncomingBundle { .. } => {
454 self.maybe_notify_inbox_processing(notification.chain_id);
455 }
456 Reason::NewRound { .. } => {
457 self.update_validators(¬ification).await?;
458 }
459 Reason::NewBlock { hash, .. } => {
460 self.update_wallet(notification.chain_id).await?;
461 if listening_mode.is_full() {
462 self.add_new_chains(*hash).await?;
463 let publishers = self
464 .update_event_subscriptions(notification.chain_id)
465 .await?;
466 if !publishers.is_empty() {
467 self.listen_recursively(publishers).await?;
468 self.maybe_notify_inbox_processing(notification.chain_id);
469 }
470 }
471 self.process_new_events(notification.chain_id);
472 }
473 Reason::NewEvents { .. } => {
474 self.process_new_events(notification.chain_id);
475 }
476 Reason::BlockExecuted { .. } => {}
477 }
478 Self::sleep(self.config.delay_after_ms).await;
479 Ok(())
480 }
481
482 async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
488 let block = CacheArc::unwrap_or_clone(
489 self.storage
490 .read_confirmed_block(hash)
491 .await?
492 .ok_or(chain_client::Error::MissingConfirmedBlock(hash))?,
493 )
494 .into_block();
495 let parent_chain_id = block.header.chain_id;
496 let blobs = block.created_blobs().into_iter();
497 let new_chains = blobs
498 .filter_map(|(blob_id, blob)| {
499 if blob_id.blob_type == BlobType::ChainDescription {
500 let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
501 .expect("ChainDescription should deserialize correctly");
502 Some((ChainId(blob_id.hash), chain_desc))
503 } else {
504 None
505 }
506 })
507 .collect::<Vec<_>>();
508 if new_chains.is_empty() {
509 return Ok(());
510 }
511 let mut new_ids = BTreeMap::new();
512 let mut context_guard = self.context.lock().await;
513 for (new_chain_id, chain_desc) in new_chains {
514 let with_key = context_guard
515 .owners_with_key(chain_desc.config().ownership.all_owners().copied())
516 .await?;
517 if with_key.is_empty() {
518 continue;
519 }
520 let owner = if with_key.len() == 1 {
521 with_key.into_iter().next()
522 } else {
523 None
524 };
525 context_guard
526 .update_wallet_for_new_chain(
527 new_chain_id,
528 owner,
529 block.header.timestamp,
530 block.header.epoch,
531 )
532 .await?;
533 context_guard
534 .client()
535 .extend_chain_mode(new_chain_id, ListeningMode::FullChain);
536 new_ids.insert(new_chain_id, ListeningMode::FullChain);
537 }
538 if !new_ids.is_empty() {
541 context_guard
542 .client()
543 .retry_pending_cross_chain_requests(parent_chain_id)
544 .await?;
545 }
546 drop(context_guard);
547 self.listen_recursively(new_ids).await?;
548 Ok(())
549 }
550
551 fn process_new_events(&self, chain_id: ChainId) {
553 let Some(subscribers) = self.event_subscribers.get(&chain_id) else {
554 return;
555 };
556 for subscriber_id in subscribers {
557 self.maybe_notify_inbox_processing(*subscriber_id);
558 }
559 }
560
561 async fn listen_recursively(
564 &mut self,
565 mut chain_ids: BTreeMap<ChainId, ListeningMode>,
566 ) -> Result<(), Error> {
567 while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
568 for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
569 match chain_ids.entry(new_chain_id) {
570 Entry::Vacant(vacant) => {
571 vacant.insert(new_listening_mode);
572 }
573 Entry::Occupied(mut occupied) => {
574 occupied.get_mut().extend(Some(new_listening_mode));
575 }
576 }
577 }
578 }
579
580 Ok(())
581 }
582
583 #[instrument(skip(context))]
586 async fn background_sync_received_certificates(
587 context: Arc<Mutex<C>>,
588 chain_id: ChainId,
589 ) -> Result<(), Error> {
590 info!("Starting background certificate sync");
591 let client = context.lock().await.make_chain_client(chain_id).await?;
592
593 Ok(client.find_received_certificates().await?)
594 }
595
596 async fn listen(
600 &mut self,
601 chain_id: ChainId,
602 listening_mode: ListeningMode,
603 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
604 let context_guard = self.context.lock().await;
605 let existing_mode = context_guard.client().chain_mode(chain_id);
606 if self.listening.contains_key(&chain_id)
608 && existing_mode.as_ref().is_some_and(|m| *m >= listening_mode)
609 {
610 return Ok(BTreeMap::new());
611 }
612 context_guard
614 .client()
615 .extend_chain_mode(chain_id, listening_mode);
616 drop(context_guard);
617
618 let background_sync_task = self.start_background_sync(chain_id).await;
620 let client = self
621 .context
622 .lock()
623 .await
624 .make_chain_client(chain_id)
625 .await?;
626 let (listener, abort_handle, notification_stream) = client.listen().await?;
627 let listening_client = ListeningClient::new(
628 client,
629 abort_handle,
630 Task::spawn(listener.in_current_span()),
631 notification_stream,
632 background_sync_task,
633 &self.context,
634 &self.config,
635 &self.cancellation_token,
636 );
637 self.listening.insert(chain_id, listening_client);
638 let publishing_chains = self.update_event_subscriptions(chain_id).await?;
639 self.maybe_notify_inbox_processing(chain_id);
640 Ok(publishing_chains)
641 }
642
643 async fn start_background_sync(&mut self, chain_id: ChainId) -> Task<()> {
644 if !self.enable_background_sync
645 || !self
646 .context
647 .lock()
648 .await
649 .client()
650 .chain_mode(chain_id)
651 .is_some_and(|m| m.is_full())
652 {
653 return Task::ready(());
654 }
655
656 let context = Arc::clone(&self.context);
657 Task::spawn(async move {
658 if let Err(e) = Self::background_sync_received_certificates(context, chain_id).await {
659 warn!("Background sync failed for chain {chain_id}: {e}");
660 }
661 })
662 }
663
664 fn remove_event_subscriber(&mut self, chain_id: ChainId) -> Vec<ChainId> {
667 let mut orphaned = Vec::new();
668 self.event_subscribers.retain(|publisher_id, subscribers| {
669 subscribers.remove(&chain_id);
670 if subscribers.is_empty() {
671 orphaned.push(*publisher_id);
672 false
673 } else {
674 true
675 }
676 });
677 orphaned
678 }
679
680 async fn stop_tracking_publisher(&mut self, publisher_id: ChainId) {
684 let mode = self.context.lock().await.client().chain_mode(publisher_id);
685 if !matches!(mode, Some(ListeningMode::EventsOnly(_))) {
686 return;
687 }
688 let Some(listening_client) = self.listening.remove(&publisher_id) else {
689 return;
690 };
691 self.context
692 .lock()
693 .await
694 .client()
695 .remove_chain_mode(publisher_id);
696 listening_client.stop().await;
697 debug!(%publisher_id, "stopped tracking publisher chain after its last subscriber went away");
698 }
699
700 async fn update_event_subscriptions(
702 &mut self,
703 chain_id: ChainId,
704 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
705 let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
706 if !listening_client.client.is_tracked() {
707 return Ok(BTreeMap::new());
708 }
709 let app_filter = listening_client
710 .client
711 .options()
712 .message_policy
713 .process_events_from_application_ids
714 .clone();
715 let publishing_chains: BTreeMap<_, _> = listening_client
716 .client
717 .event_stream_publishers()
718 .await?
719 .into_iter()
720 .filter_map(|(chain_id, streams)| {
721 let streams = if let Some(app_set) = &app_filter {
722 streams
723 .into_iter()
724 .filter(|s| app_set.contains(&s.application_id))
725 .collect::<BTreeSet<_>>()
726 } else {
727 streams
728 };
729 if streams.is_empty() {
730 None
731 } else {
732 Some((chain_id, ListeningMode::EventsOnly(streams)))
733 }
734 })
735 .collect();
736 for publisher_id in publishing_chains.keys() {
737 self.event_subscribers
738 .entry(*publisher_id)
739 .or_default()
740 .insert(chain_id);
741 }
742 let removed_publishers = self
745 .event_subscribers
746 .iter()
747 .filter(|(publisher_id, subscribers)| {
748 subscribers.contains(&chain_id) && !publishing_chains.contains_key(publisher_id)
749 })
750 .map(|(publisher_id, _)| *publisher_id)
751 .collect::<Vec<_>>();
752 for publisher_id in removed_publishers {
753 let orphaned = {
754 let Some(subscribers) = self.event_subscribers.get_mut(&publisher_id) else {
755 continue;
756 };
757 subscribers.remove(&chain_id);
758 subscribers.is_empty()
759 };
760 if orphaned {
761 self.event_subscribers.remove(&publisher_id);
762 self.stop_tracking_publisher(publisher_id).await;
763 }
764 }
765 Ok(publishing_chains)
766 }
767
768 async fn next_action(&mut self) -> Result<Action, Error> {
770 loop {
771 let notification_futures = self
772 .listening
773 .values_mut()
774 .map(|client| {
775 let stream = client.notification_stream.clone();
776 Box::pin(async move { stream.lock().await.next().await })
777 })
778 .collect::<Vec<_>>();
779 futures::select! {
780 () = self.cancellation_token.cancelled().fuse() => {
781 return Ok(Action::Stop);
782 }
783 command = self.command_receiver.recv().then(async |maybe_command| {
784 if let Some(command) = maybe_command {
785 command
786 } else {
787 std::future::pending().await
788 }
789 }).fuse() => {
790 match command {
791 ListenerCommand::Listen(new_chains) => {
792 debug!(?new_chains, "received command to listen to new chains");
793 let listening_modes = self.update_wallet_for_listening(new_chains).await?;
794 self.listen_recursively(listening_modes).await?;
795 }
796 ListenerCommand::StopListening(chains) => {
797 debug!(?chains, "received command to stop listening to chains");
798 for chain_id in chains {
799 debug!(%chain_id, "stopping the listener for chain");
800 let Some(listening_client) = self.listening.remove(&chain_id) else {
801 error!(%chain_id, "attempted to drop a non-existent listener");
802 continue;
803 };
804 let orphaned = self.remove_event_subscriber(chain_id);
805 listening_client.stop().await;
806 for publisher_id in orphaned {
807 self.stop_tracking_publisher(publisher_id).await;
808 }
809 if let Err(error) = self.context.lock().await.wallet().remove(chain_id).await {
810 error!(%error, %chain_id, "error removing a chain from the wallet");
811 }
812 }
813 }
814 ListenerCommand::SetMessagePolicy(policies) => {
815 debug!(?policies, "received command to set message policies");
816 for (chain_id, policy) in policies {
817 let Some(listening_client) = self.listening.get_mut(&chain_id) else {
818 error!(
819 %chain_id,
820 "attempted to set the message policy of a non-existent \
821 listener"
822 );
823 continue;
824 };
825 listening_client.client.options_mut().message_policy = policy;
826 listening_client.respawn_inbox_task(
827 &self.cancellation_token,
828 &self.context,
829 &self.config,
830 );
831 }
832 }
833 }
834 }
835 (maybe_notification, index, _) = future::select_all(notification_futures).fuse() => {
836 let Some(notification) = maybe_notification else {
837 let chain_id = *self.listening.keys().nth(index).unwrap();
838 warn!("Notification stream for {chain_id} closed");
839 let Some(listening_client) = self.listening.remove(&chain_id) else {
840 error!(%chain_id, "attempted to drop a non-existent listener");
841 continue;
842 };
843 let orphaned = self.remove_event_subscriber(chain_id);
844 listening_client.stop().await;
845 for publisher_id in orphaned {
846 self.stop_tracking_publisher(publisher_id).await;
847 }
848 continue;
849 };
850 return Ok(Action::Notification(notification));
851 }
852 }
853 }
854 }
855
856 async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
858 let chain_id = notification.chain_id;
859 let listening_client = self.listening.get(&chain_id).expect("missing client");
860 let latest_block = if let Reason::NewBlock { hash, .. } = ¬ification.reason {
861 listening_client.client.read_certificate(*hash).await.ok()
862 } else {
863 None
864 };
865 if let Err(error) = listening_client
866 .client
867 .update_validators(None, latest_block)
868 .await
869 {
870 warn!(
871 "Failed to update validators about the local chain after \
872 receiving {notification:?} with error: {error:?}"
873 );
874 }
875 Ok(())
876 }
877
878 async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
880 let client = &self
881 .listening
882 .get(&chain_id)
883 .expect("missing client")
884 .client;
885 self.context.lock().await.update_wallet(client).await?;
886 Ok(())
887 }
888
889 async fn update_wallet_for_listening(
893 &self,
894 new_chains: BTreeMap<ChainId, Option<AccountOwner>>,
895 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
896 let mut chains = BTreeMap::new();
897 let context_guard = self.context.lock().await;
898 for (chain_id, owner) in new_chains {
899 if let Some(owner) = owner {
900 if context_guard
901 .client()
902 .signer()
903 .contains_key(&owner)
904 .await
905 .map_err(chain_client::Error::signer_failure)?
906 {
907 let modified = context_guard
909 .wallet()
910 .modify(chain_id, |chain| chain.owner = Some(owner))
911 .await
912 .map_err(error::Inner::wallet)?;
913 if modified.is_none() {
915 let chain_description = context_guard
916 .client()
917 .get_chain_description(chain_id)
918 .await?;
919 let timestamp = chain_description.timestamp();
920 let epoch = chain_description.config().epoch;
921 context_guard
922 .wallet()
923 .insert(
924 chain_id,
925 linera_core::wallet::Chain {
926 owner: Some(owner),
927 timestamp,
928 epoch: Some(epoch),
929 ..Default::default()
930 },
931 )
932 .await
933 .map_err(error::Inner::wallet)?;
934 }
935
936 chains.insert(chain_id, ListeningMode::FullChain);
937 }
938 } else {
939 chains.insert(chain_id, ListeningMode::FollowChain);
940 }
941 }
942 Ok(chains)
943 }
944
945 fn maybe_notify_inbox_processing(&self, chain_id: ChainId) {
947 if let Some(listening_client) = self.listening.get(&chain_id) {
948 listening_client.inbox_notify.notify_one();
949 }
950 }
951
952 async fn sleep(delay_ms: u64) {
954 if delay_ms > 0 {
955 linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
956 }
957 }
958}
959
960async fn inbox_processing_loop<C: ClientContext>(
964 client: ContextChainClient<C>,
965 context: Arc<Mutex<C>>,
966 config: Arc<ChainListenerConfig>,
967 inbox_notify: Arc<Notify>,
968 cancellation_token: CancellationToken,
969) {
970 let chain_id = client.chain_id();
971 loop {
972 futures::select! {
973 () = cancellation_token.cancelled().fuse() => break,
974 () = inbox_notify.notified().fuse() => {
975 if config.skip_process_inbox {
976 debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
977 continue;
978 }
979 if !client.is_tracked() {
980 debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
981 continue;
982 }
983 if client.preferred_owner().is_none() {
984 debug!("Not processing inbox for follow-only chain {chain_id:.8}");
985 continue;
986 }
987 debug!("Processing inbox for {chain_id:.8}");
988
989 loop {
993 match client.process_inbox_without_prepare().await {
994 Err(chain_client::Error::CannotFindKeyForChain(chain_id)) => {
995 debug!(%chain_id, "Cannot find key for chain");
996 break;
997 }
998 Err(error) => {
999 warn!(%error, "Failed to process inbox");
1000 break;
1001 }
1002 Ok((certs, None)) => {
1003 if certs.is_empty() {
1004 debug!(%chain_id, "done processing inbox: no blocks created");
1005 } else {
1006 info!(
1007 %chain_id,
1008 created_block_count = %certs.len(),
1009 "done processing inbox",
1010 );
1011 }
1012 break;
1013 }
1014 Ok((certs, Some(new_timeout))) => {
1015 info!(
1016 %chain_id,
1017 created_block_count = %certs.len(),
1018 timeout = %new_timeout,
1019 "waiting for round timeout before continuing to process the inbox",
1020 );
1021 let delta = new_timeout.timestamp.delta_since(Timestamp::now());
1022 if delta > TimeDelta::ZERO {
1023 futures::select! {
1024 () = cancellation_token.cancelled().fuse() => return,
1025 () = linera_base::time::timer::sleep(delta.as_duration()).fuse() => {},
1026 () = inbox_notify.notified().fuse() => {},
1027 }
1028 }
1029 }
1030 }
1031 }
1032
1033 if let Err(error) = context.lock().await.update_wallet(&client).await {
1034 warn!(%error, "Failed to update wallet after inbox processing");
1035 }
1036 }
1037 }
1038 }
1039}
1040
1041enum Action {
1042 Notification(Notification),
1043 Stop,
1044}