1use std::{
5 collections::{btree_map::Entry, BTreeMap, BTreeSet},
6 sync::Arc,
7 time::Duration,
8};
9
10use futures::{future, lock::Mutex, Future, FutureExt as _, StreamExt};
11use linera_base::{
12 crypto::{CryptoHash, Signer},
13 data_types::{ChainDescription, Epoch, MessagePolicy, TimeDelta, Timestamp},
14 identifiers::{AccountOwner, BlobType, ChainId},
15 ownership::ChainOwnership,
16 util::future::FutureSyncExt as _,
17 Task,
18};
19use linera_core::{
20 client::{
21 chain_client::{self, ChainClient},
22 AbortOnDrop, ListeningMode,
23 },
24 node::NotificationStream,
25 worker::{Notification, Reason},
26 Environment, Wallet,
27};
28use linera_storage::{Arc as CacheArc, Storage as _};
29use tokio::sync::{mpsc::UnboundedReceiver, Notify};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info, instrument, warn, Instrument as _};
32
33use crate::error::{self, Error};
34
35#[derive(Default, Debug, Clone, clap::Args, serde::Serialize, serde::Deserialize, tsify::Tsify)]
36#[serde(rename_all = "camelCase")]
37pub struct ChainListenerConfig {
38 #[serde(default)]
41 #[arg(
42 long = "listener-skip-process-inbox",
43 env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
44 )]
45 pub skip_process_inbox: bool,
46
47 #[serde(default)]
49 #[arg(
50 long = "listener-delay-before-ms",
51 default_value = "0",
52 env = "LINERA_LISTENER_DELAY_BEFORE"
53 )]
54 pub delay_before_ms: u64,
55
56 #[serde(default)]
58 #[arg(
59 long = "listener-delay-after-ms",
60 default_value = "0",
61 env = "LINERA_LISTENER_DELAY_AFTER"
62 )]
63 pub delay_after_ms: u64,
64}
65
66type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
67
68#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
69#[allow(async_fn_in_trait)]
70pub trait ClientContext {
71 type Environment: linera_core::Environment;
72
73 fn wallet(&self) -> &<Self::Environment as linera_core::Environment>::Wallet;
74
75 fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
76
77 fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
78
79 fn admin_chain_id(&self) -> ChainId {
80 self.client().admin_chain_id()
81 }
82
83 #[cfg(not(web))]
85 fn timing_sender(
86 &self,
87 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
88
89 #[cfg(web)]
90 fn timing_sender(
91 &self,
92 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
93 None
94 }
95
96 fn make_chain_client(
97 &self,
98 chain_id: ChainId,
99 ) -> impl Future<Output = Result<ChainClient<Self::Environment>, Error>> {
100 async move {
101 let chain = self
102 .wallet()
103 .get(chain_id)
104 .make_sync()
105 .await
106 .map_err(error::Inner::wallet)?
107 .unwrap_or_default();
108 let follow_only = chain.is_follow_only();
109 Ok(self.client().create_chain_client(
110 chain_id,
111 chain.block_hash,
112 chain.next_block_height,
113 &chain.pending_fast_proposal,
114 chain.owner,
115 self.timing_sender(),
116 follow_only,
117 ))
118 }
119 }
120
121 async fn update_wallet_for_new_chain(
122 &mut self,
123 chain_id: ChainId,
124 owner: Option<AccountOwner>,
125 timestamp: Timestamp,
126 epoch: Epoch,
127 ) -> Result<(), Error>;
128
129 async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
130}
131
132#[allow(async_fn_in_trait)]
133pub trait ClientContextExt: ClientContext {
134 async fn clients(&self) -> Result<Vec<ContextChainClient<Self>>, Error> {
135 use futures::stream::TryStreamExt as _;
136 self.wallet()
137 .chain_ids()
138 .map_err(|e| error::Inner::wallet(e).into())
139 .and_then(|chain_id| self.make_chain_client(chain_id))
140 .try_collect()
141 .await
142 }
143
144 async fn owners_with_key(
148 &self,
149 owners: impl IntoIterator<Item = AccountOwner>,
150 ) -> Result<BTreeSet<AccountOwner>, Error> {
151 let mut result = BTreeSet::new();
152 for owner in owners.into_iter().collect::<BTreeSet<_>>() {
153 if self.client().has_key_for(&owner).await? {
154 result.insert(owner);
155 }
156 }
157 Ok(result)
158 }
159
160 async fn unique_owner_with_key(
164 &self,
165 owners: impl IntoIterator<Item = AccountOwner>,
166 ) -> Result<Option<AccountOwner>, Error> {
167 let with_key = self.owners_with_key(owners).await?;
168 Ok(if with_key.len() == 1 {
169 with_key.into_iter().next()
170 } else {
171 None
172 })
173 }
174
175 async fn maybe_auto_assign_preferred_owner(
178 &self,
179 chain_client: &mut ContextChainClient<Self>,
180 ownership: &ChainOwnership,
181 ) -> Result<(), Error> {
182 let chain_id = chain_client.chain_id();
183 let old_owner = chain_client.preferred_owner();
184 if old_owner.is_some_and(|o| ownership.all_owners().any(|n| *n == o)) {
185 return Ok(());
186 }
187 let Some(new_owner) = self
188 .unique_owner_with_key(ownership.all_owners().copied())
189 .await?
190 else {
191 return Ok(());
192 };
193 info!(
194 %chain_id, ?old_owner, %new_owner,
195 "Auto-assigning preferred owner from wallet key pair",
196 );
197 chain_client.set_preferred_owner(new_owner);
198 self.wallet()
199 .modify(chain_id, |chain| chain.owner = Some(new_owner))
200 .await
201 .map_err(error::Inner::wallet)?;
202 Ok(())
203 }
204}
205
206impl<T: ClientContext> ClientContextExt for T {}
207
208struct ListeningClient<C: ClientContext> {
214 client: ContextChainClient<C>,
216 abort_handle: AbortOnDrop,
218 listener: Task<()>,
220 notification_stream: Arc<Mutex<NotificationStream>>,
222 background_sync: Task<()>,
224 inbox_notify: Arc<Notify>,
226 inbox_task: Task<()>,
228 inbox_cancellation: CancellationToken,
230}
231
232impl<C: ClientContext + 'static> ListeningClient<C> {
233 #[expect(clippy::too_many_arguments)]
234 fn new(
235 client: ContextChainClient<C>,
236 abort_handle: AbortOnDrop,
237 listener: Task<()>,
238 notification_stream: NotificationStream,
239 background_sync: Task<()>,
240 context: &Arc<Mutex<C>>,
241 config: &Arc<ChainListenerConfig>,
242 parent_cancellation: &CancellationToken,
243 ) -> Self {
244 let inbox_notify = Arc::new(Notify::new());
245 let inbox_cancellation = parent_cancellation.child_token();
246 let inbox_task =
247 Self::spawn_inbox_task(&client, context, config, &inbox_notify, &inbox_cancellation);
248 Self {
249 client,
250 abort_handle,
251 listener,
252 #[allow(clippy::arc_with_non_send_sync)] notification_stream: Arc::new(Mutex::new(notification_stream)),
254 background_sync,
255 inbox_notify,
256 inbox_task,
257 inbox_cancellation,
258 }
259 }
260
261 fn respawn_inbox_task(
264 &mut self,
265 parent_cancellation: &CancellationToken,
266 context: &Arc<Mutex<C>>,
267 config: &Arc<ChainListenerConfig>,
268 ) {
269 self.inbox_cancellation.cancel();
270 self.inbox_cancellation = parent_cancellation.child_token();
271 self.inbox_task = Self::spawn_inbox_task(
272 &self.client,
273 context,
274 config,
275 &self.inbox_notify,
276 &self.inbox_cancellation,
277 );
278 }
279
280 fn spawn_inbox_task(
281 client: &ContextChainClient<C>,
282 context: &Arc<Mutex<C>>,
283 config: &Arc<ChainListenerConfig>,
284 inbox_notify: &Arc<Notify>,
285 inbox_cancellation: &CancellationToken,
286 ) -> Task<()> {
287 Task::spawn(inbox_processing_loop(
288 client.clone(),
289 Arc::clone(context),
290 Arc::clone(config),
291 Arc::clone(inbox_notify),
292 inbox_cancellation.clone(),
293 ))
294 }
295
296 async fn stop(self) {
297 drop(self.abort_handle);
299 self.inbox_cancellation.cancel();
300 futures::future::join3(
301 self.listener.cancel(),
302 self.background_sync.cancel(),
303 self.inbox_task.cancel(),
304 )
305 .await;
306 }
307}
308
309pub enum ListenerCommand {
311 Listen(BTreeMap<ChainId, Option<AccountOwner>>),
314 StopListening(BTreeSet<ChainId>),
316 SetMessagePolicy(BTreeMap<ChainId, MessagePolicy>),
318}
319
320pub struct ChainListener<C: ClientContext> {
323 context: Arc<Mutex<C>>,
324 storage: <C::Environment as Environment>::Storage,
325 config: Arc<ChainListenerConfig>,
326 listening: BTreeMap<ChainId, ListeningClient<C>>,
327 cancellation_token: CancellationToken,
328 event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
331 command_receiver: UnboundedReceiver<ListenerCommand>,
333 enable_background_sync: bool,
335}
336
337impl<C: ClientContext + 'static> ChainListener<C> {
338 pub fn new(
340 config: ChainListenerConfig,
341 context: Arc<Mutex<C>>,
342 storage: <C::Environment as Environment>::Storage,
343 cancellation_token: CancellationToken,
344 command_receiver: UnboundedReceiver<ListenerCommand>,
345 enable_background_sync: bool,
346 ) -> Self {
347 Self {
348 storage,
349 context,
350 config: Arc::new(config),
351 listening: Default::default(),
352 cancellation_token,
353 event_subscribers: Default::default(),
354 command_receiver,
355 enable_background_sync,
356 }
357 }
358
359 #[instrument(skip(self))]
361 pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
362 let chain_ids = {
363 let guard = self.context.lock().await;
364 let admin_chain_id = guard.admin_chain_id();
365 guard
366 .make_chain_client(admin_chain_id)
367 .await?
368 .synchronize_chain_state(admin_chain_id)
369 .await?;
370 let mut chain_ids: BTreeMap<_, _> = guard
371 .wallet()
372 .items()
373 .collect::<Vec<_>>()
374 .await
375 .into_iter()
376 .map(|result| {
377 let (chain_id, chain) = result?;
378 let mode = if chain.is_follow_only() {
379 ListeningMode::FollowChain
380 } else {
381 ListeningMode::FullChain
382 };
383 Ok((chain_id, mode))
384 })
385 .collect::<Result<BTreeMap<_, _>, _>>()
386 .map_err(
387 |e: <<C::Environment as Environment>::Wallet as Wallet>::Error| {
388 crate::error::Inner::Wallet(Box::new(e) as _)
389 },
390 )?;
391 chain_ids
394 .entry(admin_chain_id)
395 .or_insert(ListeningMode::FollowChain);
396 chain_ids
397 };
398
399 Ok(async move {
400 self.listen_recursively(chain_ids).await?;
401 loop {
402 match self.next_action().await? {
403 Action::Stop => break,
404 Action::Notification(notification) => {
405 self.process_notification(notification).await?
406 }
407 }
408 }
409 future::join_all(self.listening.into_values().map(|client| client.stop())).await;
410 Ok(())
411 })
412 }
413
414 async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
416 Self::sleep(self.config.delay_before_ms).await;
417 let Some(listening_client) = self.listening.get(¬ification.chain_id) else {
418 warn!(
419 ?notification,
420 "ChainListener::process_notification: got a notification without listening to the chain"
421 );
422 return Ok(());
423 };
424 let Some(listening_mode) = listening_client.client.listening_mode() else {
425 warn!(
426 ?notification,
427 "ChainListener::process_notification: chain has no listening mode"
428 );
429 return Ok(());
430 };
431
432 if !listening_mode.is_relevant(¬ification.reason) {
433 debug!(
434 reason = ?notification.reason,
435 "ChainListener: ignoring notification due to listening mode"
436 );
437 return Ok(());
438 }
439 match ¬ification.reason {
440 Reason::NewIncomingBundle { .. } => {
441 self.maybe_notify_inbox_processing(notification.chain_id);
442 }
443 Reason::NewRound { .. } => {
444 self.update_validators(¬ification).await?;
445 }
446 Reason::NewBlock { hash, .. } => {
447 self.update_wallet(notification.chain_id).await?;
448 if listening_mode.is_full() {
449 self.add_new_chains(*hash).await?;
450 let publishers = self
451 .update_event_subscriptions(notification.chain_id)
452 .await?;
453 if !publishers.is_empty() {
454 self.listen_recursively(publishers).await?;
455 self.maybe_notify_inbox_processing(notification.chain_id);
456 }
457 }
458 self.process_new_events(notification.chain_id);
459 }
460 Reason::NewEvents { .. } => {
461 self.process_new_events(notification.chain_id);
462 }
463 Reason::BlockExecuted { .. } => {}
464 }
465 Self::sleep(self.config.delay_after_ms).await;
466 Ok(())
467 }
468
469 async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
475 let block = CacheArc::unwrap_or_clone(
476 self.storage
477 .read_confirmed_block(hash)
478 .await?
479 .ok_or(chain_client::Error::MissingConfirmedBlock(hash))?,
480 )
481 .into_block();
482 let parent_chain_id = block.header.chain_id;
483 let blobs = block.created_blobs().into_iter();
484 let new_chains = blobs
485 .filter_map(|(blob_id, blob)| {
486 if blob_id.blob_type == BlobType::ChainDescription {
487 let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
488 .expect("ChainDescription should deserialize correctly");
489 Some((ChainId(blob_id.hash), chain_desc))
490 } else {
491 None
492 }
493 })
494 .collect::<Vec<_>>();
495 if new_chains.is_empty() {
496 return Ok(());
497 }
498 let mut new_ids = BTreeMap::new();
499 let mut context_guard = self.context.lock().await;
500 for (new_chain_id, chain_desc) in new_chains {
501 let with_key = context_guard
502 .owners_with_key(chain_desc.config().ownership.all_owners().copied())
503 .await?;
504 if with_key.is_empty() {
505 continue;
506 }
507 let owner = if with_key.len() == 1 {
508 with_key.into_iter().next()
509 } else {
510 None
511 };
512 context_guard
513 .update_wallet_for_new_chain(
514 new_chain_id,
515 owner,
516 block.header.timestamp,
517 block.header.epoch,
518 )
519 .await?;
520 context_guard
521 .client()
522 .extend_chain_mode(new_chain_id, ListeningMode::FullChain);
523 new_ids.insert(new_chain_id, ListeningMode::FullChain);
524 }
525 if !new_ids.is_empty() {
528 context_guard
529 .client()
530 .retry_pending_cross_chain_requests(parent_chain_id)
531 .await?;
532 }
533 drop(context_guard);
534 self.listen_recursively(new_ids).await?;
535 Ok(())
536 }
537
538 fn process_new_events(&self, chain_id: ChainId) {
540 let Some(subscribers) = self.event_subscribers.get(&chain_id) else {
541 return;
542 };
543 for subscriber_id in subscribers {
544 self.maybe_notify_inbox_processing(*subscriber_id);
545 }
546 }
547
548 async fn listen_recursively(
551 &mut self,
552 mut chain_ids: BTreeMap<ChainId, ListeningMode>,
553 ) -> Result<(), Error> {
554 while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
555 for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
556 match chain_ids.entry(new_chain_id) {
557 Entry::Vacant(vacant) => {
558 vacant.insert(new_listening_mode);
559 }
560 Entry::Occupied(mut occupied) => {
561 occupied.get_mut().extend(Some(new_listening_mode));
562 }
563 }
564 }
565 }
566
567 Ok(())
568 }
569
570 #[instrument(skip(context))]
573 async fn background_sync_received_certificates(
574 context: Arc<Mutex<C>>,
575 chain_id: ChainId,
576 ) -> Result<(), Error> {
577 info!("Starting background certificate sync");
578 let client = context.lock().await.make_chain_client(chain_id).await?;
579
580 Ok(client.find_received_certificates().await?)
581 }
582
583 async fn listen(
587 &mut self,
588 chain_id: ChainId,
589 listening_mode: ListeningMode,
590 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
591 let context_guard = self.context.lock().await;
592 let existing_mode = context_guard.client().chain_mode(chain_id);
593 if self.listening.contains_key(&chain_id)
595 && existing_mode.as_ref().is_some_and(|m| *m >= listening_mode)
596 {
597 return Ok(BTreeMap::new());
598 }
599 context_guard
601 .client()
602 .extend_chain_mode(chain_id, listening_mode);
603 drop(context_guard);
604
605 let background_sync_task = self.start_background_sync(chain_id).await;
607 let client = self
608 .context
609 .lock()
610 .await
611 .make_chain_client(chain_id)
612 .await?;
613 let (listener, abort_handle, notification_stream) = client.listen().await?;
614 let listening_client = ListeningClient::new(
615 client,
616 abort_handle,
617 Task::spawn(listener.in_current_span()),
618 notification_stream,
619 background_sync_task,
620 &self.context,
621 &self.config,
622 &self.cancellation_token,
623 );
624 self.listening.insert(chain_id, listening_client);
625 let publishing_chains = self.update_event_subscriptions(chain_id).await?;
626 self.maybe_notify_inbox_processing(chain_id);
627 Ok(publishing_chains)
628 }
629
630 async fn start_background_sync(&mut self, chain_id: ChainId) -> Task<()> {
631 if !self.enable_background_sync
632 || !self
633 .context
634 .lock()
635 .await
636 .client()
637 .chain_mode(chain_id)
638 .is_some_and(|m| m.is_full())
639 {
640 return Task::ready(());
641 }
642
643 let context = Arc::clone(&self.context);
644 Task::spawn(async move {
645 if let Err(e) = Self::background_sync_received_certificates(context, chain_id).await {
646 warn!("Background sync failed for chain {chain_id}: {e}");
647 }
648 })
649 }
650
651 fn remove_event_subscriber(&mut self, chain_id: ChainId) {
652 self.event_subscribers.retain(|_, subscribers| {
653 subscribers.remove(&chain_id);
654 !subscribers.is_empty()
655 });
656 }
657
658 async fn update_event_subscriptions(
660 &mut self,
661 chain_id: ChainId,
662 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
663 let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
664 if !listening_client.client.is_tracked() {
665 return Ok(BTreeMap::new());
666 }
667 let app_filter = listening_client
668 .client
669 .options()
670 .message_policy
671 .process_events_from_application_ids
672 .clone();
673 let publishing_chains: BTreeMap<_, _> = listening_client
674 .client
675 .event_stream_publishers()
676 .await?
677 .into_iter()
678 .filter_map(|(chain_id, streams)| {
679 let streams = if let Some(app_set) = &app_filter {
680 streams
681 .into_iter()
682 .filter(|s| app_set.contains(&s.application_id))
683 .collect::<BTreeSet<_>>()
684 } else {
685 streams
686 };
687 if streams.is_empty() {
688 None
689 } else {
690 Some((chain_id, ListeningMode::EventsOnly(streams)))
691 }
692 })
693 .collect();
694 for publisher_id in publishing_chains.keys() {
695 self.event_subscribers
696 .entry(*publisher_id)
697 .or_default()
698 .insert(chain_id);
699 }
700 Ok(publishing_chains)
701 }
702
703 async fn next_action(&mut self) -> Result<Action, Error> {
705 loop {
706 let notification_futures = self
707 .listening
708 .values_mut()
709 .map(|client| {
710 let stream = client.notification_stream.clone();
711 Box::pin(async move { stream.lock().await.next().await })
712 })
713 .collect::<Vec<_>>();
714 futures::select! {
715 () = self.cancellation_token.cancelled().fuse() => {
716 return Ok(Action::Stop);
717 }
718 command = self.command_receiver.recv().then(async |maybe_command| {
719 if let Some(command) = maybe_command {
720 command
721 } else {
722 std::future::pending().await
723 }
724 }).fuse() => {
725 match command {
726 ListenerCommand::Listen(new_chains) => {
727 debug!(?new_chains, "received command to listen to new chains");
728 let listening_modes = self.update_wallet_for_listening(new_chains).await?;
729 self.listen_recursively(listening_modes).await?;
730 }
731 ListenerCommand::StopListening(chains) => {
732 debug!(?chains, "received command to stop listening to chains");
733 for chain_id in chains {
734 debug!(%chain_id, "stopping the listener for chain");
735 let Some(listening_client) = self.listening.remove(&chain_id) else {
736 error!(%chain_id, "attempted to drop a non-existent listener");
737 continue;
738 };
739 self.remove_event_subscriber(chain_id);
740 listening_client.stop().await;
741 if let Err(error) = self.context.lock().await.wallet().remove(chain_id).await {
742 error!(%error, %chain_id, "error removing a chain from the wallet");
743 }
744 }
745 }
746 ListenerCommand::SetMessagePolicy(policies) => {
747 debug!(?policies, "received command to set message policies");
748 for (chain_id, policy) in policies {
749 let Some(listening_client) = self.listening.get_mut(&chain_id) else {
750 error!(
751 %chain_id,
752 "attempted to set the message policy of a non-existent \
753 listener"
754 );
755 continue;
756 };
757 listening_client.client.options_mut().message_policy = policy;
758 listening_client.respawn_inbox_task(
759 &self.cancellation_token,
760 &self.context,
761 &self.config,
762 );
763 }
764 }
765 }
766 }
767 (maybe_notification, index, _) = future::select_all(notification_futures).fuse() => {
768 let Some(notification) = maybe_notification else {
769 let chain_id = *self.listening.keys().nth(index).unwrap();
770 warn!("Notification stream for {chain_id} closed");
771 let Some(listening_client) = self.listening.remove(&chain_id) else {
772 error!(%chain_id, "attempted to drop a non-existent listener");
773 continue;
774 };
775 self.remove_event_subscriber(chain_id);
776 listening_client.stop().await;
777 continue;
778 };
779 return Ok(Action::Notification(notification));
780 }
781 }
782 }
783 }
784
785 async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
787 let chain_id = notification.chain_id;
788 let listening_client = self.listening.get(&chain_id).expect("missing client");
789 let latest_block = if let Reason::NewBlock { hash, .. } = ¬ification.reason {
790 listening_client.client.read_certificate(*hash).await.ok()
791 } else {
792 None
793 };
794 if let Err(error) = listening_client
795 .client
796 .update_validators(None, latest_block)
797 .await
798 {
799 warn!(
800 "Failed to update validators about the local chain after \
801 receiving {notification:?} with error: {error:?}"
802 );
803 }
804 Ok(())
805 }
806
807 async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
809 let client = &self
810 .listening
811 .get(&chain_id)
812 .expect("missing client")
813 .client;
814 self.context.lock().await.update_wallet(client).await?;
815 Ok(())
816 }
817
818 async fn update_wallet_for_listening(
822 &self,
823 new_chains: BTreeMap<ChainId, Option<AccountOwner>>,
824 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
825 let mut chains = BTreeMap::new();
826 let context_guard = self.context.lock().await;
827 for (chain_id, owner) in new_chains {
828 if let Some(owner) = owner {
829 if context_guard
830 .client()
831 .signer()
832 .contains_key(&owner)
833 .await
834 .map_err(chain_client::Error::signer_failure)?
835 {
836 let modified = context_guard
838 .wallet()
839 .modify(chain_id, |chain| chain.owner = Some(owner))
840 .await
841 .map_err(error::Inner::wallet)?;
842 if modified.is_none() {
844 let chain_description = context_guard
845 .client()
846 .get_chain_description(chain_id)
847 .await?;
848 let timestamp = chain_description.timestamp();
849 let epoch = chain_description.config().epoch;
850 context_guard
851 .wallet()
852 .insert(
853 chain_id,
854 linera_core::wallet::Chain {
855 owner: Some(owner),
856 timestamp,
857 epoch: Some(epoch),
858 ..Default::default()
859 },
860 )
861 .await
862 .map_err(error::Inner::wallet)?;
863 }
864
865 chains.insert(chain_id, ListeningMode::FullChain);
866 }
867 } else {
868 chains.insert(chain_id, ListeningMode::FollowChain);
869 }
870 }
871 Ok(chains)
872 }
873
874 fn maybe_notify_inbox_processing(&self, chain_id: ChainId) {
876 if let Some(listening_client) = self.listening.get(&chain_id) {
877 listening_client.inbox_notify.notify_one();
878 }
879 }
880
881 async fn sleep(delay_ms: u64) {
883 if delay_ms > 0 {
884 linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
885 }
886 }
887}
888
889async fn inbox_processing_loop<C: ClientContext>(
893 client: ContextChainClient<C>,
894 context: Arc<Mutex<C>>,
895 config: Arc<ChainListenerConfig>,
896 inbox_notify: Arc<Notify>,
897 cancellation_token: CancellationToken,
898) {
899 let chain_id = client.chain_id();
900 loop {
901 futures::select! {
902 () = cancellation_token.cancelled().fuse() => break,
903 () = inbox_notify.notified().fuse() => {
904 if config.skip_process_inbox {
905 debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
906 continue;
907 }
908 if !client.is_tracked() {
909 debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
910 continue;
911 }
912 if client.preferred_owner().is_none() {
913 debug!("Not processing inbox for follow-only chain {chain_id:.8}");
914 continue;
915 }
916 debug!("Processing inbox for {chain_id:.8}");
917
918 loop {
922 match client.process_inbox_without_prepare().await {
923 Err(chain_client::Error::CannotFindKeyForChain(chain_id)) => {
924 debug!(%chain_id, "Cannot find key for chain");
925 break;
926 }
927 Err(error) => {
928 warn!(%error, "Failed to process inbox");
929 break;
930 }
931 Ok((certs, None)) => {
932 if certs.is_empty() {
933 debug!(%chain_id, "done processing inbox: no blocks created");
934 } else {
935 info!(
936 %chain_id,
937 created_block_count = %certs.len(),
938 "done processing inbox",
939 );
940 }
941 break;
942 }
943 Ok((certs, Some(new_timeout))) => {
944 info!(
945 %chain_id,
946 created_block_count = %certs.len(),
947 timeout = %new_timeout,
948 "waiting for round timeout before continuing to process the inbox",
949 );
950 let delta = new_timeout.timestamp.delta_since(Timestamp::now());
951 if delta > TimeDelta::ZERO {
952 futures::select! {
953 () = cancellation_token.cancelled().fuse() => return,
954 () = linera_base::time::timer::sleep(delta.as_duration()).fuse() => {},
955 () = inbox_notify.notified().fuse() => {},
956 }
957 }
958 }
959 }
960 }
961
962 if let Err(error) = context.lock().await.update_wallet(&client).await {
963 warn!(%error, "Failed to update wallet after inbox processing");
964 }
965 }
966 }
967 }
968}
969
970enum Action {
971 Notification(Notification),
972 Stop,
973}