1use std::{
5 collections::{btree_map::Entry, BTreeMap, BTreeSet},
6 sync::Arc,
7 time::Duration,
8};
9
10use futures::{future, lock::Mutex, Future, FutureExt as _, StreamExt};
11use linera_base::{
12 crypto::{CryptoHash, Signer},
13 data_types::{ChainDescription, Epoch, MessagePolicy, TimeDelta, Timestamp},
14 identifiers::{AccountOwner, BlobType, ChainId},
15 util::future::FutureSyncExt as _,
16 Task,
17};
18use linera_core::{
19 client::{
20 chain_client::{self, ChainClient},
21 AbortOnDrop, ListeningMode,
22 },
23 node::NotificationStream,
24 worker::{Notification, Reason},
25 Environment, Wallet,
26};
27use linera_storage::Storage as _;
28use tokio::sync::{mpsc::UnboundedReceiver, Notify};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, info, instrument, warn, Instrument as _};
31
32use crate::error::{self, Error};
33
34#[derive(Default, Debug, Clone, clap::Args, serde::Serialize, serde::Deserialize, tsify::Tsify)]
35#[serde(rename_all = "camelCase")]
36pub struct ChainListenerConfig {
37 #[serde(default)]
40 #[arg(
41 long = "listener-skip-process-inbox",
42 env = "LINERA_LISTENER_SKIP_PROCESS_INBOX"
43 )]
44 pub skip_process_inbox: bool,
45
46 #[serde(default)]
48 #[arg(
49 long = "listener-delay-before-ms",
50 default_value = "0",
51 env = "LINERA_LISTENER_DELAY_BEFORE"
52 )]
53 pub delay_before_ms: u64,
54
55 #[serde(default)]
57 #[arg(
58 long = "listener-delay-after-ms",
59 default_value = "0",
60 env = "LINERA_LISTENER_DELAY_AFTER"
61 )]
62 pub delay_after_ms: u64,
63}
64
65type ContextChainClient<C> = ChainClient<<C as ClientContext>::Environment>;
66
67#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
68#[allow(async_fn_in_trait)]
69pub trait ClientContext {
70 type Environment: linera_core::Environment;
71
72 fn wallet(&self) -> &<Self::Environment as linera_core::Environment>::Wallet;
73
74 fn storage(&self) -> &<Self::Environment as linera_core::Environment>::Storage;
75
76 fn client(&self) -> &Arc<linera_core::client::Client<Self::Environment>>;
77
78 fn admin_chain_id(&self) -> ChainId {
79 self.client().admin_chain_id()
80 }
81
82 #[cfg(not(web))]
84 fn timing_sender(
85 &self,
86 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>>;
87
88 #[cfg(web)]
89 fn timing_sender(
90 &self,
91 ) -> Option<tokio::sync::mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
92 None
93 }
94
95 fn make_chain_client(
96 &self,
97 chain_id: ChainId,
98 ) -> impl Future<Output = Result<ChainClient<Self::Environment>, Error>> {
99 async move {
100 let chain = self
101 .wallet()
102 .get(chain_id)
103 .make_sync()
104 .await
105 .map_err(error::Inner::wallet)?
106 .unwrap_or_default();
107 let follow_only = chain.is_follow_only();
108 Ok(self.client().create_chain_client(
109 chain_id,
110 chain.block_hash,
111 chain.next_block_height,
112 &chain.pending_fast_proposal,
113 chain.owner,
114 self.timing_sender(),
115 follow_only,
116 ))
117 }
118 }
119
120 async fn update_wallet_for_new_chain(
121 &mut self,
122 chain_id: ChainId,
123 owner: Option<AccountOwner>,
124 timestamp: Timestamp,
125 epoch: Epoch,
126 ) -> Result<(), Error>;
127
128 async fn update_wallet(&mut self, client: &ContextChainClient<Self>) -> Result<(), Error>;
129}
130
131#[allow(async_fn_in_trait)]
132pub trait ClientContextExt: ClientContext {
133 async fn clients(&self) -> Result<Vec<ContextChainClient<Self>>, Error> {
134 use futures::stream::TryStreamExt as _;
135 self.wallet()
136 .chain_ids()
137 .map_err(|e| error::Inner::wallet(e).into())
138 .and_then(|chain_id| self.make_chain_client(chain_id))
139 .try_collect()
140 .await
141 }
142}
143
144impl<T: ClientContext> ClientContextExt for T {}
145
146struct ListeningClient<C: ClientContext> {
152 client: ContextChainClient<C>,
154 abort_handle: AbortOnDrop,
156 listener: Task<()>,
158 notification_stream: Arc<Mutex<NotificationStream>>,
160 background_sync: Task<()>,
162 inbox_notify: Arc<Notify>,
164 inbox_task: Task<()>,
166 inbox_cancellation: CancellationToken,
168}
169
170impl<C: ClientContext + 'static> ListeningClient<C> {
171 #[expect(clippy::too_many_arguments)]
172 fn new(
173 client: ContextChainClient<C>,
174 abort_handle: AbortOnDrop,
175 listener: Task<()>,
176 notification_stream: NotificationStream,
177 background_sync: Task<()>,
178 context: &Arc<Mutex<C>>,
179 config: &Arc<ChainListenerConfig>,
180 parent_cancellation: &CancellationToken,
181 ) -> Self {
182 let inbox_notify = Arc::new(Notify::new());
183 let inbox_cancellation = parent_cancellation.child_token();
184 let inbox_task =
185 Self::spawn_inbox_task(&client, context, config, &inbox_notify, &inbox_cancellation);
186 Self {
187 client,
188 abort_handle,
189 listener,
190 #[allow(clippy::arc_with_non_send_sync)] notification_stream: Arc::new(Mutex::new(notification_stream)),
192 background_sync,
193 inbox_notify,
194 inbox_task,
195 inbox_cancellation,
196 }
197 }
198
199 fn respawn_inbox_task(
202 &mut self,
203 parent_cancellation: &CancellationToken,
204 context: &Arc<Mutex<C>>,
205 config: &Arc<ChainListenerConfig>,
206 ) {
207 self.inbox_cancellation.cancel();
208 self.inbox_cancellation = parent_cancellation.child_token();
209 self.inbox_task = Self::spawn_inbox_task(
210 &self.client,
211 context,
212 config,
213 &self.inbox_notify,
214 &self.inbox_cancellation,
215 );
216 }
217
218 fn spawn_inbox_task(
219 client: &ContextChainClient<C>,
220 context: &Arc<Mutex<C>>,
221 config: &Arc<ChainListenerConfig>,
222 inbox_notify: &Arc<Notify>,
223 inbox_cancellation: &CancellationToken,
224 ) -> Task<()> {
225 Task::spawn(inbox_processing_loop(
226 client.clone(),
227 Arc::clone(context),
228 Arc::clone(config),
229 Arc::clone(inbox_notify),
230 inbox_cancellation.clone(),
231 ))
232 }
233
234 async fn stop(self) {
235 drop(self.abort_handle);
237 self.inbox_cancellation.cancel();
238 futures::future::join3(
239 self.listener.cancel(),
240 self.background_sync.cancel(),
241 self.inbox_task.cancel(),
242 )
243 .await;
244 }
245}
246
247pub enum ListenerCommand {
249 Listen(BTreeMap<ChainId, Option<AccountOwner>>),
252 StopListening(BTreeSet<ChainId>),
254 SetMessagePolicy(BTreeMap<ChainId, MessagePolicy>),
256}
257
258pub struct ChainListener<C: ClientContext> {
261 context: Arc<Mutex<C>>,
262 storage: <C::Environment as Environment>::Storage,
263 config: Arc<ChainListenerConfig>,
264 listening: BTreeMap<ChainId, ListeningClient<C>>,
265 cancellation_token: CancellationToken,
266 event_subscribers: BTreeMap<ChainId, BTreeSet<ChainId>>,
269 command_receiver: UnboundedReceiver<ListenerCommand>,
271 enable_background_sync: bool,
273}
274
275impl<C: ClientContext + 'static> ChainListener<C> {
276 pub fn new(
278 config: ChainListenerConfig,
279 context: Arc<Mutex<C>>,
280 storage: <C::Environment as Environment>::Storage,
281 cancellation_token: CancellationToken,
282 command_receiver: UnboundedReceiver<ListenerCommand>,
283 enable_background_sync: bool,
284 ) -> Self {
285 Self {
286 storage,
287 context,
288 config: Arc::new(config),
289 listening: Default::default(),
290 cancellation_token,
291 event_subscribers: Default::default(),
292 command_receiver,
293 enable_background_sync,
294 }
295 }
296
297 #[instrument(skip(self))]
299 pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
300 let chain_ids = {
301 let guard = self.context.lock().await;
302 let admin_chain_id = guard.admin_chain_id();
303 guard
304 .make_chain_client(admin_chain_id)
305 .await?
306 .synchronize_chain_state(admin_chain_id)
307 .await?;
308 let mut chain_ids: BTreeMap<_, _> = guard
309 .wallet()
310 .items()
311 .collect::<Vec<_>>()
312 .await
313 .into_iter()
314 .map(|result| {
315 let (chain_id, chain) = result?;
316 let mode = if chain.is_follow_only() {
317 ListeningMode::FollowChain
318 } else {
319 ListeningMode::FullChain
320 };
321 Ok((chain_id, mode))
322 })
323 .collect::<Result<BTreeMap<_, _>, _>>()
324 .map_err(
325 |e: <<C::Environment as Environment>::Wallet as Wallet>::Error| {
326 crate::error::Inner::Wallet(Box::new(e) as _)
327 },
328 )?;
329 chain_ids
332 .entry(admin_chain_id)
333 .or_insert(ListeningMode::FollowChain);
334 chain_ids
335 };
336
337 Ok(async move {
338 self.listen_recursively(chain_ids).await?;
339 loop {
340 match self.next_action().await? {
341 Action::Stop => break,
342 Action::Notification(notification) => {
343 self.process_notification(notification).await?
344 }
345 }
346 }
347 future::join_all(self.listening.into_values().map(|client| client.stop())).await;
348 Ok(())
349 })
350 }
351
352 async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
354 Self::sleep(self.config.delay_before_ms).await;
355 let Some(listening_client) = self.listening.get(¬ification.chain_id) else {
356 warn!(
357 ?notification,
358 "ChainListener::process_notification: got a notification without listening to the chain"
359 );
360 return Ok(());
361 };
362 let Some(listening_mode) = listening_client.client.listening_mode() else {
363 warn!(
364 ?notification,
365 "ChainListener::process_notification: chain has no listening mode"
366 );
367 return Ok(());
368 };
369
370 if !listening_mode.is_relevant(¬ification.reason) {
371 debug!(
372 reason = ?notification.reason,
373 "ChainListener: ignoring notification due to listening mode"
374 );
375 return Ok(());
376 }
377 match ¬ification.reason {
378 Reason::NewIncomingBundle { .. } => {
379 self.maybe_notify_inbox_processing(notification.chain_id);
380 }
381 Reason::NewRound { .. } => {
382 self.update_validators(¬ification).await?;
383 }
384 Reason::NewBlock { hash, .. } => {
385 self.update_wallet(notification.chain_id).await?;
386 if listening_mode.is_full() {
387 self.add_new_chains(*hash).await?;
388 let publishers = self
389 .update_event_subscriptions(notification.chain_id)
390 .await?;
391 if !publishers.is_empty() {
392 self.listen_recursively(publishers).await?;
393 self.maybe_notify_inbox_processing(notification.chain_id);
394 }
395 }
396 self.process_new_events(notification.chain_id);
397 }
398 Reason::NewEvents { .. } => {
399 self.process_new_events(notification.chain_id);
400 }
401 Reason::BlockExecuted { .. } => {}
402 }
403 Self::sleep(self.config.delay_after_ms).await;
404 Ok(())
405 }
406
407 async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
411 let block = Arc::unwrap_or_clone(
412 self.storage
413 .read_confirmed_block(hash)
414 .await?
415 .ok_or(chain_client::Error::MissingConfirmedBlock(hash))?,
416 )
417 .into_block();
418 let parent_chain_id = block.header.chain_id;
419 let blobs = block.created_blobs().into_iter();
420 let new_chains = blobs
421 .filter_map(|(blob_id, blob)| {
422 if blob_id.blob_type == BlobType::ChainDescription {
423 let chain_desc: ChainDescription = bcs::from_bytes(blob.content().bytes())
424 .expect("ChainDescription should deserialize correctly");
425 Some((ChainId(blob_id.hash), chain_desc))
426 } else {
427 None
428 }
429 })
430 .collect::<Vec<_>>();
431 if new_chains.is_empty() {
432 return Ok(());
433 }
434 let mut new_ids = BTreeMap::new();
435 let mut context_guard = self.context.lock().await;
436 for (new_chain_id, chain_desc) in new_chains {
437 for chain_owner in chain_desc.config().ownership.all_owners() {
438 if context_guard.client().has_key_for(chain_owner).await? {
439 context_guard
440 .update_wallet_for_new_chain(
441 new_chain_id,
442 Some(*chain_owner),
443 block.header.timestamp,
444 block.header.epoch,
445 )
446 .await?;
447 context_guard
448 .client()
449 .extend_chain_mode(new_chain_id, ListeningMode::FullChain);
450 new_ids.insert(new_chain_id, ListeningMode::FullChain);
451 }
452 }
453 }
454 if !new_ids.is_empty() {
457 context_guard
458 .client()
459 .retry_pending_cross_chain_requests(parent_chain_id)
460 .await?;
461 }
462 drop(context_guard);
463 self.listen_recursively(new_ids).await?;
464 Ok(())
465 }
466
467 fn process_new_events(&self, chain_id: ChainId) {
469 let Some(subscribers) = self.event_subscribers.get(&chain_id) else {
470 return;
471 };
472 for subscriber_id in subscribers {
473 self.maybe_notify_inbox_processing(*subscriber_id);
474 }
475 }
476
477 async fn listen_recursively(
480 &mut self,
481 mut chain_ids: BTreeMap<ChainId, ListeningMode>,
482 ) -> Result<(), Error> {
483 while let Some((chain_id, listening_mode)) = chain_ids.pop_first() {
484 for (new_chain_id, new_listening_mode) in self.listen(chain_id, listening_mode).await? {
485 match chain_ids.entry(new_chain_id) {
486 Entry::Vacant(vacant) => {
487 vacant.insert(new_listening_mode);
488 }
489 Entry::Occupied(mut occupied) => {
490 occupied.get_mut().extend(Some(new_listening_mode));
491 }
492 }
493 }
494 }
495
496 Ok(())
497 }
498
499 #[instrument(skip(context))]
502 async fn background_sync_received_certificates(
503 context: Arc<Mutex<C>>,
504 chain_id: ChainId,
505 ) -> Result<(), Error> {
506 info!("Starting background certificate sync");
507 let client = context.lock().await.make_chain_client(chain_id).await?;
508
509 Ok(client.find_received_certificates().await?)
510 }
511
512 async fn listen(
516 &mut self,
517 chain_id: ChainId,
518 listening_mode: ListeningMode,
519 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
520 let context_guard = self.context.lock().await;
521 let existing_mode = context_guard.client().chain_mode(chain_id);
522 if self.listening.contains_key(&chain_id)
524 && existing_mode.as_ref().is_some_and(|m| *m >= listening_mode)
525 {
526 return Ok(BTreeMap::new());
527 }
528 context_guard
530 .client()
531 .extend_chain_mode(chain_id, listening_mode);
532 drop(context_guard);
533
534 let background_sync_task = self.start_background_sync(chain_id).await;
536 let client = self
537 .context
538 .lock()
539 .await
540 .make_chain_client(chain_id)
541 .await?;
542 let (listener, abort_handle, notification_stream) = client.listen().await?;
543 let listening_client = ListeningClient::new(
544 client,
545 abort_handle,
546 Task::spawn(listener.in_current_span()),
547 notification_stream,
548 background_sync_task,
549 &self.context,
550 &self.config,
551 &self.cancellation_token,
552 );
553 self.listening.insert(chain_id, listening_client);
554 let publishing_chains = self.update_event_subscriptions(chain_id).await?;
555 self.maybe_notify_inbox_processing(chain_id);
556 Ok(publishing_chains)
557 }
558
559 async fn start_background_sync(&mut self, chain_id: ChainId) -> Task<()> {
560 if !self.enable_background_sync
561 || !self
562 .context
563 .lock()
564 .await
565 .client()
566 .chain_mode(chain_id)
567 .is_some_and(|m| m.is_full())
568 {
569 return Task::ready(());
570 }
571
572 let context = Arc::clone(&self.context);
573 Task::spawn(async move {
574 if let Err(e) = Self::background_sync_received_certificates(context, chain_id).await {
575 warn!("Background sync failed for chain {chain_id}: {e}");
576 }
577 })
578 }
579
580 fn remove_event_subscriber(&mut self, chain_id: ChainId) {
581 self.event_subscribers.retain(|_, subscribers| {
582 subscribers.remove(&chain_id);
583 !subscribers.is_empty()
584 });
585 }
586
587 async fn update_event_subscriptions(
589 &mut self,
590 chain_id: ChainId,
591 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
592 let listening_client = self.listening.get_mut(&chain_id).expect("missing client");
593 if !listening_client.client.is_tracked() {
594 return Ok(BTreeMap::new());
595 }
596 let app_filter = listening_client
597 .client
598 .options()
599 .message_policy
600 .process_events_from_application_ids
601 .clone();
602 let publishing_chains: BTreeMap<_, _> = listening_client
603 .client
604 .event_stream_publishers()
605 .await?
606 .into_iter()
607 .filter_map(|(chain_id, streams)| {
608 let streams = if let Some(app_set) = &app_filter {
609 streams
610 .into_iter()
611 .filter(|s| app_set.contains(&s.application_id))
612 .collect::<BTreeSet<_>>()
613 } else {
614 streams
615 };
616 if streams.is_empty() {
617 None
618 } else {
619 Some((chain_id, ListeningMode::EventsOnly(streams)))
620 }
621 })
622 .collect();
623 for publisher_id in publishing_chains.keys() {
624 self.event_subscribers
625 .entry(*publisher_id)
626 .or_default()
627 .insert(chain_id);
628 }
629 Ok(publishing_chains)
630 }
631
632 async fn next_action(&mut self) -> Result<Action, Error> {
634 loop {
635 let notification_futures = self
636 .listening
637 .values_mut()
638 .map(|client| {
639 let stream = client.notification_stream.clone();
640 Box::pin(async move { stream.lock().await.next().await })
641 })
642 .collect::<Vec<_>>();
643 futures::select! {
644 () = self.cancellation_token.cancelled().fuse() => {
645 return Ok(Action::Stop);
646 }
647 command = self.command_receiver.recv().then(async |maybe_command| {
648 if let Some(command) = maybe_command {
649 command
650 } else {
651 std::future::pending().await
652 }
653 }).fuse() => {
654 match command {
655 ListenerCommand::Listen(new_chains) => {
656 debug!(?new_chains, "received command to listen to new chains");
657 let listening_modes = self.update_wallet_for_listening(new_chains).await?;
658 self.listen_recursively(listening_modes).await?;
659 }
660 ListenerCommand::StopListening(chains) => {
661 debug!(?chains, "received command to stop listening to chains");
662 for chain_id in chains {
663 debug!(%chain_id, "stopping the listener for chain");
664 let Some(listening_client) = self.listening.remove(&chain_id) else {
665 error!(%chain_id, "attempted to drop a non-existent listener");
666 continue;
667 };
668 self.remove_event_subscriber(chain_id);
669 listening_client.stop().await;
670 if let Err(error) = self.context.lock().await.wallet().remove(chain_id).await {
671 error!(%error, %chain_id, "error removing a chain from the wallet");
672 }
673 }
674 }
675 ListenerCommand::SetMessagePolicy(policies) => {
676 debug!(?policies, "received command to set message policies");
677 for (chain_id, policy) in policies {
678 let Some(listening_client) = self.listening.get_mut(&chain_id) else {
679 error!(
680 %chain_id,
681 "attempted to set the message policy of a non-existent \
682 listener"
683 );
684 continue;
685 };
686 listening_client.client.options_mut().message_policy = policy;
687 listening_client.respawn_inbox_task(
688 &self.cancellation_token,
689 &self.context,
690 &self.config,
691 );
692 }
693 }
694 }
695 }
696 (maybe_notification, index, _) = future::select_all(notification_futures).fuse() => {
697 let Some(notification) = maybe_notification else {
698 let chain_id = *self.listening.keys().nth(index).unwrap();
699 warn!("Notification stream for {chain_id} closed");
700 let Some(listening_client) = self.listening.remove(&chain_id) else {
701 error!(%chain_id, "attempted to drop a non-existent listener");
702 continue;
703 };
704 self.remove_event_subscriber(chain_id);
705 listening_client.stop().await;
706 continue;
707 };
708 return Ok(Action::Notification(notification));
709 }
710 }
711 }
712 }
713
714 async fn update_validators(&self, notification: &Notification) -> Result<(), Error> {
716 let chain_id = notification.chain_id;
717 let listening_client = self.listening.get(&chain_id).expect("missing client");
718 let latest_block = if let Reason::NewBlock { hash, .. } = ¬ification.reason {
719 listening_client.client.read_certificate(*hash).await.ok()
720 } else {
721 None
722 };
723 if let Err(error) = listening_client
724 .client
725 .update_validators(None, latest_block)
726 .await
727 {
728 warn!(
729 "Failed to update validators about the local chain after \
730 receiving {notification:?} with error: {error:?}"
731 );
732 }
733 Ok(())
734 }
735
736 async fn update_wallet(&self, chain_id: ChainId) -> Result<(), Error> {
738 let client = &self
739 .listening
740 .get(&chain_id)
741 .expect("missing client")
742 .client;
743 self.context.lock().await.update_wallet(client).await?;
744 Ok(())
745 }
746
747 async fn update_wallet_for_listening(
751 &self,
752 new_chains: BTreeMap<ChainId, Option<AccountOwner>>,
753 ) -> Result<BTreeMap<ChainId, ListeningMode>, Error> {
754 let mut chains = BTreeMap::new();
755 let context_guard = self.context.lock().await;
756 for (chain_id, owner) in new_chains {
757 if let Some(owner) = owner {
758 if context_guard
759 .client()
760 .signer()
761 .contains_key(&owner)
762 .await
763 .map_err(chain_client::Error::signer_failure)?
764 {
765 let modified = context_guard
767 .wallet()
768 .modify(chain_id, |chain| chain.owner = Some(owner))
769 .await
770 .map_err(error::Inner::wallet)?;
771 if modified.is_none() {
773 let chain_description = context_guard
774 .client()
775 .get_chain_description(chain_id)
776 .await?;
777 let timestamp = chain_description.timestamp();
778 let epoch = chain_description.config().epoch;
779 context_guard
780 .wallet()
781 .insert(
782 chain_id,
783 linera_core::wallet::Chain {
784 owner: Some(owner),
785 timestamp,
786 epoch: Some(epoch),
787 ..Default::default()
788 },
789 )
790 .await
791 .map_err(error::Inner::wallet)?;
792 }
793
794 chains.insert(chain_id, ListeningMode::FullChain);
795 }
796 } else {
797 chains.insert(chain_id, ListeningMode::FollowChain);
798 }
799 }
800 Ok(chains)
801 }
802
803 fn maybe_notify_inbox_processing(&self, chain_id: ChainId) {
805 if let Some(listening_client) = self.listening.get(&chain_id) {
806 listening_client.inbox_notify.notify_one();
807 }
808 }
809
810 async fn sleep(delay_ms: u64) {
812 if delay_ms > 0 {
813 linera_base::time::timer::sleep(Duration::from_millis(delay_ms)).await;
814 }
815 }
816}
817
818async fn inbox_processing_loop<C: ClientContext>(
822 client: ContextChainClient<C>,
823 context: Arc<Mutex<C>>,
824 config: Arc<ChainListenerConfig>,
825 inbox_notify: Arc<Notify>,
826 cancellation_token: CancellationToken,
827) {
828 let chain_id = client.chain_id();
829 loop {
830 futures::select! {
831 () = cancellation_token.cancelled().fuse() => break,
832 () = inbox_notify.notified().fuse() => {
833 if config.skip_process_inbox {
834 debug!("Not processing inbox for {chain_id:.8} due to listener configuration");
835 continue;
836 }
837 if !client.is_tracked() {
838 debug!("Not processing inbox for non-tracked chain {chain_id:.8}");
839 continue;
840 }
841 if client.preferred_owner().is_none() {
842 debug!("Not processing inbox for follow-only chain {chain_id:.8}");
843 continue;
844 }
845 debug!("Processing inbox for {chain_id:.8}");
846
847 loop {
851 match client.process_inbox_without_prepare().await {
852 Err(chain_client::Error::CannotFindKeyForChain(chain_id)) => {
853 debug!(%chain_id, "Cannot find key for chain");
854 break;
855 }
856 Err(error) => {
857 warn!(%error, "Failed to process inbox");
858 break;
859 }
860 Ok((certs, None)) => {
861 if certs.is_empty() {
862 debug!(%chain_id, "done processing inbox: no blocks created");
863 } else {
864 info!(
865 %chain_id,
866 created_block_count = %certs.len(),
867 "done processing inbox",
868 );
869 }
870 break;
871 }
872 Ok((certs, Some(new_timeout))) => {
873 info!(
874 %chain_id,
875 created_block_count = %certs.len(),
876 timeout = %new_timeout,
877 "waiting for round timeout before continuing to process the inbox",
878 );
879 let delta = new_timeout.timestamp.delta_since(Timestamp::now());
880 if delta > TimeDelta::ZERO {
881 futures::select! {
882 () = cancellation_token.cancelled().fuse() => return,
883 () = linera_base::time::timer::sleep(delta.as_duration()).fuse() => {},
884 () = inbox_notify.notified().fuse() => {},
885 }
886 }
887 }
888 }
889 }
890
891 if let Err(error) = context.lock().await.update_wallet(&client).await {
892 warn!(%error, "Failed to update wallet after inbox processing");
893 }
894 }
895 }
896 }
897}
898
899enum Action {
900 Notification(Notification),
901 Stop,
902}