1mod state;
5use std::{
6 collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
7 convert::Infallible,
8 iter,
9 sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14 future::{self, Either, FusedFuture, Future},
15 stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20 abi::Abi,
21 crypto::{signer, CryptoHash, Signer, ValidatorPublicKey},
22 data_types::{
23 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
24 ChainDescription, Epoch, MessagePolicy, Round, TimeDelta, Timestamp,
25 },
26 ensure,
27 identifiers::{
28 Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
29 ModuleId, StreamId,
30 },
31 ownership::{ChainOwnership, TimeoutConfig},
32 time::{Duration, Instant},
33};
34#[cfg(not(target_arch = "wasm32"))]
35use linera_base::{data_types::Bytecode, vm::VmRuntime};
36use linera_chain::{
37 data_types::{
38 BlockProposal, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight, IncomingBundle,
39 ProposedBlock, Transaction,
40 },
41 manager::LockingBlock,
42 types::{
43 Block, ConfirmedBlock, ConfirmedBlockCertificate, Timeout, TimeoutCertificate,
44 ValidatedBlock,
45 },
46 ChainError, ChainExecutionContext,
47};
48use linera_execution::{
49 committee::Committee,
50 system::{
51 AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
52 REMOVED_EPOCH_STREAM_NAME,
53 },
54 ExecutionError, Operation, Query, QueryOutcome,
55};
56use linera_storage::{Arc as CacheArc, Clock as _, Storage as _};
57use linera_views::ViewError;
58use serde::Serialize;
59pub use state::State;
60use thiserror::Error;
61use tokio::sync::mpsc;
62use tokio_stream::wrappers::UnboundedReceiverStream;
63use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
64
65use super::{
66 received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
67 ListeningMode, PendingProposal, TimingType,
68};
69use crate::{
70 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
71 environment::Environment,
72 local_node::{LocalNodeClient, LocalNodeError},
73 node::{
74 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
75 ValidatorNodeProvider as _,
76 },
77 remote_node::RemoteNode,
78 updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
79 worker::{Notification, Reason, WorkerError},
80};
81
82#[derive(Debug, Clone)]
83pub struct Options {
84 pub max_pending_message_bundles: usize,
86 pub max_block_limit_errors: u32,
91 pub staging_bundles_time_budget: Option<Duration>,
94 pub message_policy: MessagePolicy,
96 pub priority_bundle_origins: HashSet<ChainId>,
98 pub cross_chain_message_delivery: CrossChainMessageDelivery,
100 pub quorum_grace_period: f64,
103 pub blob_download_timeout: Duration,
105 pub certificate_batch_download_timeout: Duration,
107 pub certificate_download_batch_size: u64,
110 pub certificate_upload_batch_size: usize,
113 pub sender_certificate_download_batch_size: usize,
116 pub max_concurrent_batch_downloads: usize,
118 pub max_joined_tasks: usize,
120 pub allow_fast_blocks: bool,
123 pub multi_leader_jitter: bool,
128 pub notification_circuit_breaker_initial_probe_interval: Duration,
132 pub notification_circuit_breaker_max_probe_interval: Duration,
135 pub max_event_stream_queries: usize,
138}
139
140struct CircuitBreakerState {
141 next_probe_at: Instant,
142 probe_interval: Duration,
143}
144
145#[derive(Clone, Copy, Debug, PartialEq, Eq)]
148struct ConsensusStateSnapshot {
149 next_block_height: BlockHeight,
150 current_round: Round,
151 lock_round: Option<Round>,
152 timeout_round: Option<Round>,
153}
154
155#[cfg(with_testing)]
156impl Options {
157 pub fn test_default() -> Self {
158 use super::{
159 DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
160 DEFAULT_MAX_CONCURRENT_BATCH_DOWNLOADS, DEFAULT_MAX_EVENT_STREAM_QUERIES,
161 DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
162 };
163 use crate::DEFAULT_QUORUM_GRACE_PERIOD;
164
165 Options {
166 max_pending_message_bundles: 10,
167 max_block_limit_errors: 3,
168 staging_bundles_time_budget: None,
169 message_policy: MessagePolicy::default(),
170 priority_bundle_origins: HashSet::new(),
171 cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
172 quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
173 blob_download_timeout: Duration::from_secs(1),
174 certificate_batch_download_timeout: Duration::from_secs(1),
175 certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
176 certificate_upload_batch_size: DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
177 sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
178 max_concurrent_batch_downloads: DEFAULT_MAX_CONCURRENT_BATCH_DOWNLOADS,
179 max_joined_tasks: 100,
180 allow_fast_blocks: false,
181 multi_leader_jitter: false,
182 notification_circuit_breaker_initial_probe_interval: Duration::from_secs(300),
183 notification_circuit_breaker_max_probe_interval: Duration::from_secs(3600),
184 max_event_stream_queries: DEFAULT_MAX_EVENT_STREAM_QUERIES,
185 }
186 }
187}
188
189impl Options {
190 pub fn bundle_execution_policy(&self) -> BundleExecutionPolicy {
192 BundleExecutionPolicy {
193 on_failure: BundleFailurePolicy::AutoRetry {
194 max_failures: self.max_block_limit_errors,
195 never_reject_application_ids: Arc::new(
196 self.message_policy.never_reject_application_ids.clone(),
197 ),
198 },
199 time_budget: self.staging_bundles_time_budget,
200 }
201 }
202}
203
204#[derive(Debug)]
210pub struct ChainClient<Env: Environment> {
211 #[debug(skip)]
213 pub(crate) client: Arc<Client<Env>>,
214 chain_id: ChainId,
216 #[debug(skip)]
218 options: Options,
219 preferred_owner: Option<AccountOwner>,
222 initial_next_block_height: BlockHeight,
224 initial_block_hash: Option<CryptoHash>,
226 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
228 skipped_origins: Arc<papaya::HashSet<ChainId>>,
231}
232
233impl<Env: Environment> Clone for ChainClient<Env> {
234 fn clone(&self) -> Self {
235 Self {
236 client: self.client.clone(),
237 chain_id: self.chain_id,
238 options: self.options.clone(),
239 preferred_owner: self.preferred_owner,
240 initial_next_block_height: self.initial_next_block_height,
241 initial_block_hash: self.initial_block_hash,
242 timing_sender: self.timing_sender.clone(),
243 skipped_origins: self.skipped_origins.clone(),
244 }
245 }
246}
247
248#[derive(Debug, Error)]
250pub enum Error {
251 #[error("Local node operation failed: {0}")]
252 LocalNodeError(#[from] LocalNodeError),
253
254 #[error("Remote node operation failed: {0}")]
255 RemoteNodeError(#[from] NodeError),
256
257 #[error(transparent)]
258 ArithmeticError(#[from] ArithmeticError),
259
260 #[error("Missing certificates: {0:?}")]
261 ReadCertificatesError(Vec<CryptoHash>),
262
263 #[error("Missing confirmed block: {0:?}")]
264 MissingConfirmedBlock(CryptoHash),
265
266 #[error("JSON (de)serialization error: {0}")]
267 JsonError(#[from] serde_json::Error),
268
269 #[error("Chain operation failed: {0}")]
270 ChainError(#[from] ChainError),
271
272 #[error(transparent)]
273 CommunicationError(#[from] CommunicationError<NodeError>),
274
275 #[error("Internal error within chain client: {0}")]
276 InternalError(&'static str),
277
278 #[error(
279 "Cannot accept a certificate from an unknown committee in the future. \
280 Please synchronize the local view of the admin chain"
281 )]
282 CommitteeSynchronizationError,
283
284 #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
285 WalletSynchronizationError,
286
287 #[error("The state of the client is incompatible with the proposed block: {0}")]
288 BlockProposalError(&'static str),
289
290 #[error(
291 "Cannot accept a certificate from a committee that was retired. \
292 Try a newer certificate from the same origin"
293 )]
294 CommitteeDeprecationError,
295
296 #[error("Protocol error within chain client: {0}")]
297 ProtocolError(&'static str),
298
299 #[error("Signer doesn't have key to sign for chain {0}")]
300 CannotFindKeyForChain(ChainId),
301
302 #[error("client is not configured to propose on chain {0}")]
303 NoAccountKeyConfigured(ChainId),
304
305 #[error("The chain client isn't owner on chain {0}")]
306 NotAnOwner(ChainId),
307
308 #[error(transparent)]
309 ViewError(#[from] ViewError),
310
311 #[error(
312 "Failed to download certificates and update local node to the next height \
313 {target_next_block_height} of chain {chain_id}"
314 )]
315 CannotDownloadCertificates {
316 chain_id: ChainId,
317 target_next_block_height: BlockHeight,
318 },
319
320 #[error(transparent)]
321 BcsError(#[from] bcs::Error),
322
323 #[error(
324 "Unexpected quorum: validators voted for block hash {hash} in {round}, \
325 expected block hash {expected_hash} in {expected_round}"
326 )]
327 UnexpectedQuorum {
328 hash: CryptoHash,
329 round: Round,
330 expected_hash: CryptoHash,
331 expected_round: Round,
332 },
333
334 #[error("signer error: {0:?}")]
335 Signer(#[source] Box<dyn signer::Error>),
336
337 #[error("Cannot revoke the current epoch {0}")]
338 CannotRevokeCurrentEpoch(Epoch),
339
340 #[error("Epoch is already revoked")]
341 EpochAlreadyRevoked,
342
343 #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
344 CannotDownloadMissingSenderBlock {
345 chain_id: ChainId,
346 height: BlockHeight,
347 },
348
349 #[error(
350 "A different block was already committed at this height. \
351 The committed certificate hash is {0}"
352 )]
353 Conflict(CryptoHash),
354
355 #[error(
356 "Execution outcome mismatch: AutoRetry and committed execution produced \
357 different outcomes for the same block"
358 )]
359 ExecutionOutcomeMismatch,
360}
361
362impl From<Infallible> for Error {
363 fn from(infallible: Infallible) -> Self {
364 match infallible {}
365 }
366}
367
368impl Error {
369 pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
370 Self::Signer(Box::new(err))
371 }
372}
373
374impl<Env: Environment> ChainClient<Env> {
375 pub fn new(
376 client: Arc<Client<Env>>,
377 chain_id: ChainId,
378 options: Options,
379 initial_block_hash: Option<CryptoHash>,
380 initial_next_block_height: BlockHeight,
381 preferred_owner: Option<AccountOwner>,
382 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
383 ) -> Self {
384 ChainClient {
385 client,
386 chain_id,
387 options,
388 preferred_owner,
389 initial_block_hash,
390 initial_next_block_height,
391 timing_sender,
392 skipped_origins: Arc::new(papaya::HashSet::new()),
393 }
394 }
395
396 pub fn is_follow_only(&self) -> bool {
398 self.client.is_chain_follow_only(self.chain_id)
399 }
400
401 #[instrument(level = "trace", skip(self))]
405 fn proposal_mutex(&self) -> Arc<tokio::sync::Mutex<Option<PendingProposal>>> {
406 self.client
407 .chains
408 .pin()
409 .get(&self.chain_id)
410 .expect("Chain client constructed for invalid chain")
411 .proposal_mutex()
412 }
413
414 #[instrument(level = "trace", skip(self))]
416 pub async fn pending_proposal(&self) -> Option<PendingProposal> {
417 self.proposal_mutex().lock().await.clone()
418 }
419
420 #[instrument(level = "trace", skip(self))]
422 pub fn signer(&self) -> &impl Signer {
423 self.client.signer()
424 }
425
426 pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, Error> {
428 self.client.has_key_for(owner).await
429 }
430
431 #[instrument(level = "trace", skip(self))]
433 pub fn options_mut(&mut self) -> &mut Options {
434 &mut self.options
435 }
436
437 #[instrument(level = "trace", skip(self))]
439 pub fn options(&self) -> &Options {
440 &self.options
441 }
442
443 #[instrument(level = "trace", skip(self))]
445 pub fn chain_id(&self) -> ChainId {
446 self.chain_id
447 }
448
449 pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
451 self.timing_sender.clone()
452 }
453
454 #[instrument(level = "trace", skip(self))]
456 pub fn admin_chain_id(&self) -> ChainId {
457 self.client.admin_chain_id
458 }
459
460 #[instrument(level = "trace", skip(self))]
462 pub fn preferred_owner(&self) -> Option<AccountOwner> {
463 self.preferred_owner
464 }
465
466 #[instrument(level = "trace", skip(self))]
468 pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
469 self.preferred_owner = Some(preferred_owner);
470 }
471
472 #[instrument(level = "trace")]
474 pub async fn chain_state_view(
475 &self,
476 ) -> Result<crate::worker::ChainStateViewReadGuard<Env::Storage>, LocalNodeError> {
477 self.client.local_node.chain_state_view(self.chain_id).await
478 }
479
480 #[instrument(level = "trace", skip(self))]
482 pub async fn event_stream_publishers(
483 &self,
484 ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
485 let subscriptions = self
486 .client
487 .local_node
488 .get_event_subscriptions(self.chain_id)
489 .await?;
490 let mut publishers = subscriptions.into_iter().fold(
491 BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
492 |mut map, ((chain_id, stream_id), _)| {
493 map.entry(chain_id).or_default().insert(stream_id);
494 map
495 },
496 );
497 if self.chain_id != self.client.admin_chain_id {
498 publishers.insert(
499 self.client.admin_chain_id,
500 vec![
501 StreamId::system(EPOCH_STREAM_NAME),
502 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
503 ]
504 .into_iter()
505 .collect(),
506 );
507 }
508 Ok(publishers)
509 }
510
511 #[instrument(level = "trace")]
513 pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
514 self.subscribe_to(self.chain_id)
515 }
516
517 #[instrument(level = "trace")]
519 pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
520 Ok(Box::pin(UnboundedReceiverStream::new(
521 self.client.notifier.subscribe(vec![chain_id]),
522 )))
523 }
524
525 #[instrument(level = "trace")]
527 pub fn storage_client(&self) -> &Env::Storage {
528 self.client.storage_client()
529 }
530
531 #[instrument(level = "trace")]
533 pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
534 let query = ChainInfoQuery::new(self.chain_id);
535 let response = self
536 .client
537 .local_node
538 .handle_chain_info_query(query)
539 .await?;
540 Ok(response.info)
541 }
542
543 #[instrument(level = "trace")]
545 pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
546 let query = ChainInfoQuery::new(self.chain_id).with_manager_values();
547 let response = self
548 .client
549 .local_node
550 .handle_chain_info_query(query)
551 .await?;
552 Ok(response.info)
553 }
554
555 async fn consensus_state_snapshot(&self) -> Result<ConsensusStateSnapshot, Error> {
564 let info = self.chain_info_with_manager_values().await?;
565 let lock_round = info
566 .manager
567 .requested_locking
568 .as_deref()
569 .map(LockingBlock::round);
570 let timeout_round = info.manager.timeout.as_deref().map(|cert| cert.round);
571 Ok(ConsensusStateSnapshot {
572 next_block_height: info.next_block_height,
573 current_round: info.manager.current_round,
574 lock_round,
575 timeout_round,
576 })
577 }
578
579 pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
581 self.client.get_chain_description(self.chain_id).await
582 }
583
584 #[instrument(level = "trace")]
587 async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
588 if self.options.message_policy.is_ignore() {
589 return Ok(Vec::new());
591 }
592
593 let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
594 let info = self
595 .client
596 .local_node
597 .handle_chain_info_query(query)
598 .await?
599 .info;
600 if self.preferred_owner.is_some_and(|owner| {
601 info.manager
602 .ownership
603 .is_super_owner_no_regular_owners(&owner)
604 }) {
605 ensure!(
607 info.next_block_height >= self.initial_next_block_height,
608 Error::WalletSynchronizationError
609 );
610 }
611
612 let skipped = self.skipped_origins.pin();
613 let mut bundles = info
614 .requested_pending_message_bundles
615 .into_iter()
616 .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
617 .filter(|bundle| !skipped.contains(&bundle.origin))
618 .collect::<Vec<_>>();
619 let priority_origins = &self.options.priority_bundle_origins;
620 bundles.sort_by(|a, b| {
621 let a_priority = priority_origins.contains(&a.origin);
622 let b_priority = priority_origins.contains(&b.origin);
623 b_priority
624 .cmp(&a_priority)
625 .then(a.bundle.timestamp.cmp(&b.bundle.timestamp))
626 });
627 bundles.truncate(self.options.max_pending_message_bundles);
628 Ok(bundles)
629 }
630
631 #[instrument(level = "trace")]
632 async fn collect_stream_updates(&self) -> Result<Vec<Operation>, Error> {
633 let subscription_map = self
634 .client
635 .local_node
636 .get_event_subscriptions(self.chain_id)
637 .await?;
638 let futures = subscription_map
639 .into_iter()
640 .filter(|((chain_id, _), _)| {
641 self.options
642 .message_policy
643 .restrict_chain_ids_to
644 .as_ref()
645 .is_none_or(|chain_set| chain_set.contains(chain_id))
646 })
647 .filter(|((_, stream_id), _)| {
648 self.options
649 .message_policy
650 .process_events_from_application_ids
651 .as_ref()
652 .is_none_or(|app_set| app_set.contains(&stream_id.application_id))
653 })
654 .map(|((chain_id, stream_id), subscriptions)| {
655 let client = self.client.clone();
656 async move {
657 let next_expected_index = client
658 .local_node
659 .get_next_expected_event(chain_id, stream_id.clone())
660 .await?;
661 let Some(next_index) = next_expected_index
662 .filter(|next_index| *next_index > subscriptions.min_next_index)
663 else {
664 return Ok::<_, Error>(Vec::new());
665 };
666 Ok(subscriptions
667 .applications
668 .into_iter()
669 .filter(|(_, app_index)| *app_index < next_index)
670 .map(|(application_id, _)| {
671 SystemOperation::UpdateStream {
672 application_id,
673 chain_id,
674 stream_id: stream_id.clone(),
675 next_index,
676 }
677 .into()
678 })
679 .collect::<Vec<Operation>>())
680 }
681 });
682 Ok(futures::stream::iter(futures)
683 .buffer_unordered(self.options.max_joined_tasks)
684 .try_collect::<Vec<_>>()
685 .await?
686 .into_iter()
687 .flatten()
688 .collect::<Vec<_>>())
689 }
690
691 #[instrument(level = "trace")]
693 pub async fn local_committee(&self) -> Result<Arc<Committee>, Error> {
694 let info = match self.client.local_node.chain_info(self.chain_id).await {
695 Ok(info) => info,
696 Err(LocalNodeError::BlobsNotFound(_)) => {
697 self.synchronize_chain_state(self.chain_id).await?;
698 self.client.local_node.chain_info(self.chain_id).await?
699 }
700 Err(err) => return Err(err.into()),
701 };
702 let hash = info
703 .committee_hash
704 .ok_or(LocalNodeError::InactiveChain(self.chain_id))?;
705 Ok(self
706 .storage_client()
707 .get_or_load_committee_by_hash(hash)
708 .await
709 .map_err(LocalNodeError::from)?)
710 }
711
712 #[instrument(level = "trace")]
714 pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
715 self.client.admin_committee().await
716 }
717
718 #[instrument(level = "trace")]
722 pub async fn identity(&self) -> Result<AccountOwner, Error> {
723 let Some(preferred_owner) = self.preferred_owner else {
724 return Err(Error::NoAccountKeyConfigured(self.chain_id));
725 };
726 let manager = self.chain_info().await?.manager;
727 ensure!(
728 manager.ownership.is_active(),
729 LocalNodeError::InactiveChain(self.chain_id)
730 );
731 let fallback_owners = if manager.ownership.has_fallback() {
732 self.local_committee()
733 .await?
734 .account_keys_and_weights()
735 .map(|(key, _)| AccountOwner::from(key))
736 .collect()
737 } else {
738 BTreeSet::new()
739 };
740
741 let is_owner = manager
742 .ownership
743 .can_propose_in_multi_leader_round(&preferred_owner)
744 || fallback_owners.contains(&preferred_owner);
745
746 if !is_owner {
747 warn!(
748 chain_id = %self.chain_id,
749 ownership = ?manager.ownership,
750 ?fallback_owners,
751 ?preferred_owner,
752 "The preferred owner is not configured as an owner of this chain",
753 );
754 return Err(Error::NotAnOwner(self.chain_id));
755 }
756
757 let has_signer = self.has_key_for(&preferred_owner).await?;
758
759 if !has_signer {
760 warn!(%self.chain_id, ?preferred_owner,
761 "Chain is one of the owners but its Signer instance doesn't contain the key",
762 );
763 return Err(Error::CannotFindKeyForChain(self.chain_id));
764 }
765
766 Ok(preferred_owner)
767 }
768
769 #[instrument(level = "trace")]
777 pub async fn prepare_for_owner(&self, owner: AccountOwner) -> Result<Box<ChainInfo>, Error> {
778 ensure!(
779 self.has_key_for(&owner).await?,
780 Error::CannotFindKeyForChain(self.chain_id)
781 );
782 self.client
784 .get_chain_description_blob(self.chain_id)
785 .await?;
786
787 let info = self.chain_info().await?;
789
790 ensure!(
792 info.manager
793 .ownership
794 .can_propose_in_multi_leader_round(&owner),
795 Error::NotAnOwner(self.chain_id)
796 );
797
798 Ok(info)
799 }
800
801 #[instrument(level = "trace")]
804 pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
805 #[cfg(with_metrics)]
806 let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
807
808 let mut info = self.synchronize_to_known_height().await?;
809
810 if self.preferred_owner.is_none_or(|owner| {
811 !info
812 .manager
813 .ownership
814 .is_super_owner_no_regular_owners(&owner)
815 }) {
816 info = self.client.synchronize_chain_state(self.chain_id).await?;
820 }
821
822 if info.epoch > self.client.admin_committee().await?.0 {
823 self.client
824 .synchronize_chain_state(self.client.admin_chain_id)
825 .await?;
826 }
827
828 Ok(info)
829 }
830
831 async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
836 let info = self
837 .client
838 .download_certificates(self.chain_id, self.initial_next_block_height)
839 .await?;
840 if info.next_block_height == self.initial_next_block_height {
841 ensure!(
843 self.initial_block_hash == info.block_hash,
844 Error::InternalError("Invalid chain of blocks in local node")
845 );
846 }
847 Ok(info)
848 }
849
850 #[instrument(level = "trace", skip(old_committee, latest_certificate))]
852 pub async fn update_validators(
853 &self,
854 old_committee: Option<&Committee>,
855 latest_certificate: Option<CacheArc<ConfirmedBlockCertificate>>,
856 ) -> Result<(), Error> {
857 let update_validators_start = linera_base::time::Instant::now();
858 if let Some(old_committee) = old_committee {
860 self.communicate_chain_updates(old_committee, latest_certificate.clone())
861 .await?
862 };
863 if let Ok(new_committee) = self.local_committee().await {
864 if old_committee.is_none_or(|old| *new_committee != *old) {
865 self.communicate_chain_updates(&new_committee, latest_certificate)
868 .await?;
869 }
870 }
871 self.send_timing(update_validators_start, TimingType::UpdateValidators);
872 Ok(())
873 }
874
875 #[instrument(level = "trace", skip(committee))]
877 pub async fn communicate_chain_updates(
878 &self,
879 committee: &Committee,
880 latest_certificate: Option<CacheArc<ConfirmedBlockCertificate>>,
881 ) -> Result<(), Error> {
882 let delivery = self.options.cross_chain_message_delivery;
883 let height = self.chain_info().await?.next_block_height;
884 self.client
885 .communicate_chain_updates(
886 committee,
887 self.chain_id,
888 height,
889 delivery,
890 latest_certificate,
891 )
892 .await
893 }
894
895 async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
899 let subscriptions = self
900 .client
901 .local_node
902 .get_event_subscriptions(self.chain_id)
903 .await?;
904 let mut streams_by_chain = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
906 for ((chain_id, stream_id), _) in &subscriptions {
907 if *chain_id != self.chain_id {
908 streams_by_chain
909 .entry(*chain_id)
910 .or_default()
911 .insert(stream_id.clone());
912 }
913 }
914 let admin_chain_id = self.client.admin_chain_id;
916 if admin_chain_id != self.chain_id {
917 self.client.synchronize_chain_state(admin_chain_id).await?;
918 }
919 let (_, committee) = self.admin_committee().await?;
921 let nodes = self.client.make_nodes(&committee)?;
922 let tasks = streams_by_chain
923 .into_iter()
924 .filter(|(chain_id, _)| *chain_id != admin_chain_id)
925 .map(|(chain_id, stream_ids)| {
926 self.sync_publisher_chain_events(chain_id, stream_ids, &nodes, &committee)
927 })
928 .collect::<Vec<_>>();
929 stream::iter(tasks)
930 .buffer_unordered(self.options.max_joined_tasks)
931 .collect::<Vec<_>>()
932 .await
933 .into_iter()
934 .collect::<Result<Vec<_>, _>>()?;
935 Ok(())
936 }
937
938 async fn sync_publisher_chain_events(
945 &self,
946 publisher_chain_id: ChainId,
947 stream_ids: BTreeSet<StreamId>,
948 nodes: &[RemoteNode<Env::ValidatorNode>],
949 committee: &Committee,
950 ) -> Result<(), Error> {
951 let stream_ids_ref = &stream_ids;
952 communicate_with_quorum(
953 nodes,
954 committee,
955 |_: &()| (),
956 |remote_node| async move {
957 self.client
958 .sync_events_from_node(publisher_chain_id, stream_ids_ref, &remote_node)
959 .await
960 },
961 self.options.quorum_grace_period,
962 )
963 .await?;
964 Ok(())
965 }
966
967 #[instrument(level = "debug", skip(self), fields(chain_id = %self.chain_id))]
976 pub async fn find_received_certificates(&self) -> Result<(), Error> {
977 debug!("starting find_received_certificates");
978 #[cfg(with_metrics)]
979 let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
980 let chain_id = self.chain_id;
982 let (_, committee) = self.admin_committee().await?;
983 let nodes = self.client.make_nodes(&committee)?;
984
985 let trackers = self
986 .client
987 .local_node
988 .get_received_certificate_trackers(chain_id)
989 .await?;
990
991 trace!("find_received_certificates: read trackers");
992
993 let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
994 let result = communicate_with_quorum(
996 &nodes,
997 &committee,
998 |_| (),
999 |remote_node| {
1000 let client = &self.client;
1001 let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
1002 let received_log_batches = Arc::clone(&received_log_batches);
1003 Box::pin(async move {
1004 let batch = client
1005 .get_received_log_from_validator(chain_id, &remote_node, tracker)
1006 .await?;
1007 let mut batches = received_log_batches.lock().unwrap();
1008 batches.push((remote_node.public_key, batch));
1009 Ok(())
1010 })
1011 },
1012 self.options.quorum_grace_period,
1013 )
1014 .await;
1015
1016 if let Err(error) = result {
1017 error!(
1018 %error,
1019 "Failed to synchronize received_logs from at least a quorum of validators",
1020 );
1021 }
1022
1023 let received_logs: Vec<_> = {
1024 let mut received_log_batches = received_log_batches.lock().unwrap();
1025 std::mem::take(received_log_batches.as_mut())
1026 };
1027
1028 debug!(
1029 received_logs_len = %received_logs.len(),
1030 received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
1031 "collected received logs"
1032 );
1033
1034 let (received_logs, mut validator_trackers) = {
1035 (
1036 ReceivedLogs::from_received_result(received_logs.clone()),
1037 ValidatorTrackers::new(received_logs, &trackers),
1038 )
1039 };
1040
1041 debug!(
1042 num_chains = %received_logs.num_chains(),
1043 num_certs = %received_logs.num_certs(),
1044 "find_received_certificates: total number of chains and certificates to sync",
1045 );
1046
1047 let max_blocks_per_chain =
1048 self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
1049 for received_log in received_logs.into_batches(
1050 self.options.sender_certificate_download_batch_size,
1051 max_blocks_per_chain,
1052 ) {
1053 validator_trackers = self
1054 .receive_sender_certificates(received_log, validator_trackers, &nodes)
1055 .await?;
1056
1057 self.update_received_certificate_trackers(&validator_trackers)
1058 .await;
1059 }
1060
1061 info!("find_received_certificates finished");
1062
1063 Ok(())
1064 }
1065
1066 async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
1067 let updated_trackers = trackers.to_map();
1068 trace!(?updated_trackers, "updated tracker values");
1069
1070 if let Err(error) = self
1072 .client
1073 .local_node
1074 .update_received_certificate_trackers(self.chain_id, updated_trackers)
1075 .await
1076 {
1077 error!(
1078 chain_id = %self.chain_id,
1079 %error,
1080 "Failed to update the certificate trackers",
1081 );
1082 }
1083 }
1084
1085 async fn receive_sender_certificates(
1088 &self,
1089 mut received_logs: ReceivedLogs,
1090 mut validator_trackers: ValidatorTrackers,
1091 nodes: &[RemoteNode<Env::ValidatorNode>],
1092 ) -> Result<ValidatorTrackers, Error> {
1093 debug!(
1094 num_chains = %received_logs.num_chains(),
1095 num_certs = %received_logs.num_certs(),
1096 "receive_sender_certificates: number of chains and certificates to sync",
1097 );
1098
1099 let local_next_heights = self
1101 .client
1102 .local_node
1103 .next_outbox_heights(received_logs.chains(), self.chain_id)
1104 .await?;
1105
1106 validator_trackers.filter_out_already_known(&mut received_logs, &local_next_heights);
1107
1108 debug!(
1109 remaining_total_certificates = %received_logs.num_certs(),
1110 "receive_sender_certificates: computed remote_heights"
1111 );
1112
1113 let mut other_sender_chains = Vec::new();
1114 let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
1115
1116 let cert_futures = received_logs.heights_per_chain().into_iter().filter_map({
1117 let received_logs = &received_logs;
1118 let other_sender_chains = &mut other_sender_chains;
1119
1120 move |(sender_chain_id, remote_heights)| {
1121 if remote_heights.is_empty() {
1122 other_sender_chains.push(sender_chain_id);
1126 return None;
1127 };
1128 let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
1129 let sender = sender.clone();
1130 let client = self.client.clone();
1131 let nodes = nodes.to_vec();
1132 Some(async move {
1133 client
1134 .download_and_process_sender_chain(
1135 sender_chain_id,
1136 &nodes,
1137 received_logs,
1138 remote_heights,
1139 sender,
1140 )
1141 .await
1142 })
1143 }
1144 });
1145
1146 future::join(
1147 stream::iter(cert_futures)
1148 .buffer_unordered(self.options.max_joined_tasks)
1149 .collect::<()>(),
1150 async {
1151 while let Some(chain_and_height) = receiver.recv().await {
1152 validator_trackers.downloaded_cert(chain_and_height);
1153 }
1154 },
1155 )
1156 .await;
1157
1158 debug!(
1159 num_other_chains = %other_sender_chains.len(),
1160 "receive_sender_certificates: processing certificates finished"
1161 );
1162
1163 self.retry_pending_cross_chain_requests_from_sender_chains(nodes, other_sender_chains)
1167 .await;
1168
1169 debug!("receive_sender_certificates: finished processing other_sender_chains");
1170
1171 Ok(validator_trackers)
1172 }
1173
1174 async fn retry_pending_cross_chain_requests_from_sender_chains(
1178 &self,
1179 nodes: &[RemoteNode<Env::ValidatorNode>],
1180 other_sender_chains: Vec<ChainId>,
1181 ) {
1182 let stream = other_sender_chains
1183 .into_iter()
1184 .map(|chain_id| async move {
1185 if let Err(error) = match self
1186 .client
1187 .retry_pending_cross_chain_requests(chain_id)
1188 .await
1189 {
1190 Ok(()) => Ok(()),
1191 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1192 if let Err(error) = self
1193 .client
1194 .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1195 .await
1196 {
1197 error!(
1198 ?blob_ids,
1199 %error,
1200 "Error while attempting to download blobs during retrying outgoing \
1201 messages"
1202 );
1203 }
1204 self.client
1205 .retry_pending_cross_chain_requests(chain_id)
1206 .await
1207 }
1208 err => err,
1209 } {
1210 error!(
1211 %chain_id,
1212 %error,
1213 "Failed to retry outgoing messages from chain"
1214 );
1215 }
1216 })
1217 .collect::<FuturesUnordered<_>>();
1218 stream.for_each(future::ready).await;
1219 }
1220
1221 #[instrument(level = "trace")]
1223 pub async fn transfer(
1224 &self,
1225 owner: AccountOwner,
1226 amount: Amount,
1227 recipient: Account,
1228 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1229 self.execute_operation(SystemOperation::Transfer {
1231 owner,
1232 recipient,
1233 amount,
1234 })
1235 .await
1236 }
1237
1238 #[instrument(level = "trace")]
1241 pub async fn read_data_blob(
1242 &self,
1243 hash: CryptoHash,
1244 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1245 let blob_id = BlobId {
1246 hash,
1247 blob_type: BlobType::Data,
1248 };
1249 self.execute_operation(SystemOperation::VerifyBlob { blob_id })
1250 .await
1251 }
1252
1253 #[instrument(level = "trace")]
1255 pub async fn claim(
1256 &self,
1257 owner: AccountOwner,
1258 target_id: ChainId,
1259 recipient: Account,
1260 amount: Amount,
1261 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1262 self.execute_operation(SystemOperation::Claim {
1263 owner,
1264 target_id,
1265 recipient,
1266 amount,
1267 })
1268 .await
1269 }
1270
1271 #[instrument(level = "trace")]
1274 pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1275 let chain_id = self.chain_id;
1276 let info = self.chain_info().await?;
1277 let committee = self.local_committee().await?;
1278 let height = info.next_block_height;
1279 let round = info.manager.current_round;
1280 let action = CommunicateAction::RequestTimeout {
1281 height,
1282 round,
1283 chain_id,
1284 };
1285 let value = Timeout::new(chain_id, height, info.epoch);
1286 let certificate = Box::new(
1287 self.client
1288 .communicate_chain_action(&committee, action, value)
1289 .await?,
1290 );
1291 self.client.handle_certificate(*certificate.clone()).await?;
1292 self.client
1294 .communicate_chain_updates(
1295 &committee,
1296 chain_id,
1297 height,
1298 CrossChainMessageDelivery::NonBlocking,
1299 None,
1300 )
1301 .await?;
1302 Ok(*certificate)
1303 }
1304
1305 #[instrument(level = "trace", skip_all)]
1307 pub async fn synchronize_chain_state(
1308 &self,
1309 chain_id: ChainId,
1310 ) -> Result<Box<ChainInfo>, Error> {
1311 self.client.synchronize_chain_state(chain_id).await
1312 }
1313
1314 #[instrument(level = "trace", skip_all)]
1317 pub async fn synchronize_chain_state_from_committee(
1318 &self,
1319 committee: Arc<Committee>,
1320 ) -> Result<Box<ChainInfo>, Error> {
1321 self.client
1322 .synchronize_chain_from_committee(self.chain_id, committee)
1323 .await
1324 }
1325
1326 #[instrument(level = "trace", skip(operations, blobs))]
1328 pub async fn execute_operations(
1329 &self,
1330 operations: Vec<Operation>,
1331 blobs: Vec<Blob>,
1332 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1333 let timing_start = linera_base::time::Instant::now();
1334
1335 let result = loop {
1336 let execute_block_start = linera_base::time::Instant::now();
1337 match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1339 Ok(ClientOutcome::Committed(certificate)) => {
1340 self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1341 break Ok(ClientOutcome::Committed(certificate));
1342 }
1343 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
1344 break Ok(ClientOutcome::WaitForTimeout(timeout));
1345 }
1346 Ok(ClientOutcome::Conflict(certificate)) => {
1347 info!(
1348 height = %certificate.block().header.height,
1349 "Another block was committed."
1350 );
1351 break Ok(ClientOutcome::Conflict(certificate));
1352 }
1353 Err(Error::CommunicationError(CommunicationError::Trusted(
1354 NodeError::UnexpectedBlockHeight {
1355 expected_block_height,
1356 found_block_height,
1357 },
1358 ))) if expected_block_height > found_block_height => {
1359 tracing::info!(
1360 chain_id = %self.chain_id,
1361 "Local state is outdated; synchronizing chain"
1362 );
1363 self.synchronize_chain_state(self.chain_id).await?;
1364 }
1365 Err(err) => return Err(err),
1366 };
1367 };
1368
1369 self.send_timing(timing_start, TimingType::ExecuteOperations);
1370
1371 result
1372 }
1373
1374 pub async fn execute_operation(
1376 &self,
1377 operation: impl Into<Operation>,
1378 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1379 self.execute_operations(vec![operation.into()], vec![])
1380 .await
1381 }
1382
1383 #[instrument(level = "trace", skip(operations, blobs))]
1387 async fn execute_block(
1388 &self,
1389 operations: Vec<Operation>,
1390 blobs: Vec<Blob>,
1391 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1392 #[cfg(with_metrics)]
1393 let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1394
1395 let mutex = self.proposal_mutex();
1396 let lock_start = linera_base::time::Instant::now();
1397 let mut proposal_guard = mutex.lock_owned().await;
1398 tracing::debug!(
1399 chain_id = %self.chain_id,
1400 lock_wait_ms = lock_start.elapsed().as_millis(),
1401 "acquired proposal_mutex in execute_block"
1402 );
1403 match self
1409 .process_pending_block_without_prepare(&mut proposal_guard)
1410 .await?
1411 {
1412 ClientOutcome::Committed(Some(certificate)) => {
1413 return Ok(self.classify_committed(certificate, &operations));
1414 }
1415 ClientOutcome::WaitForTimeout(timeout) => {
1416 return Ok(ClientOutcome::WaitForTimeout(timeout))
1417 }
1418 ClientOutcome::Conflict(certificate) => {
1419 return Ok(ClientOutcome::Conflict(certificate))
1420 }
1421 ClientOutcome::Committed(None) => {}
1422 }
1423
1424 loop {
1425 let transactions = self
1430 .prepend_epochs_messages_and_events(operations.clone())
1431 .await?;
1432
1433 if transactions.is_empty() {
1434 return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1435 WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1436 )));
1437 }
1438
1439 self.new_pending_block(transactions, blobs.clone(), &mut proposal_guard)
1440 .await?;
1441
1442 match self
1443 .process_pending_block_without_prepare(&mut proposal_guard)
1444 .await?
1445 {
1446 ClientOutcome::Committed(Some(certificate)) => {
1447 return Ok(self.classify_committed(certificate, &operations));
1448 }
1449 ClientOutcome::Committed(None) => {
1463 tracing::debug!(
1464 chain_id = %self.chain_id,
1465 "pending proposal cleared without committing ours; re-staging and retrying"
1466 );
1467 continue;
1468 }
1469 ClientOutcome::WaitForTimeout(timeout) => {
1470 return Ok(ClientOutcome::WaitForTimeout(timeout));
1471 }
1472 ClientOutcome::Conflict(certificate) => {
1473 return Ok(ClientOutcome::Conflict(certificate));
1474 }
1475 }
1476 }
1477 }
1478
1479 fn classify_committed(
1483 &self,
1484 certificate: ConfirmedBlockCertificate,
1485 operations: &[Operation],
1486 ) -> ClientOutcome<ConfirmedBlockCertificate> {
1487 let block = certificate.block();
1488 if self.preferred_owner.is_none()
1489 || block.header.authenticated_owner != self.preferred_owner
1490 {
1491 return ClientOutcome::Conflict(Box::new(certificate));
1492 }
1493 let mut operations_iter = operations.iter().peekable();
1494 for tx in &block.body.transactions {
1495 let is_expected = match tx {
1496 Transaction::ReceiveMessages(_) => true,
1497 Transaction::ExecuteOperation(op) if Some(&op) == operations_iter.peek() => {
1498 operations_iter.next();
1499 true
1500 }
1501 Transaction::ExecuteOperation(Operation::System(op)) => matches!(
1502 **op,
1503 SystemOperation::ProcessNewEpoch(_) | SystemOperation::UpdateStream { .. }
1504 ),
1505 Transaction::ExecuteOperation(Operation::User { .. }) => false,
1506 };
1507 if !is_expected {
1508 return ClientOutcome::Conflict(Box::new(certificate));
1509 }
1510 }
1511 if operations_iter.next().is_some() {
1512 ClientOutcome::Conflict(Box::new(certificate))
1513 } else {
1514 ClientOutcome::Committed(certificate)
1515 }
1516 }
1517
1518 #[instrument(level = "trace", skip(operations))]
1524 async fn prepend_epochs_messages_and_events(
1525 &self,
1526 operations: Vec<Operation>,
1527 ) -> Result<Vec<Transaction>, Error> {
1528 let incoming_bundles = self.pending_message_bundles().await?;
1529 let stream_updates = self.collect_stream_updates().await?;
1530 Ok(self
1531 .collect_epoch_changes()
1532 .await?
1533 .into_iter()
1534 .map(Transaction::ExecuteOperation)
1535 .chain(
1536 incoming_bundles
1537 .into_iter()
1538 .map(Transaction::ReceiveMessages),
1539 )
1540 .chain(
1541 stream_updates
1542 .into_iter()
1543 .map(Transaction::ExecuteOperation),
1544 )
1545 .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1546 .collect::<Vec<_>>())
1547 }
1548
1549 #[instrument(level = "trace", skip(transactions, blobs, proposal_guard))]
1554 async fn new_pending_block(
1555 &self,
1556 transactions: Vec<Transaction>,
1557 blobs: Vec<Blob>,
1558 proposal_guard: &mut Option<PendingProposal>,
1559 ) -> Result<Block, Error> {
1560 let identity = self.identity().await?;
1561
1562 ensure!(
1563 proposal_guard.is_none(),
1564 Error::BlockProposalError(
1565 "Client state already has a pending block; \
1566 use the `linera retry-pending-block` command to commit that first"
1567 )
1568 );
1569 let info = self.chain_info().await?;
1570 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1571 let proposed_block = ProposedBlock {
1572 epoch: info.epoch,
1573 chain_id: self.chain_id,
1574 transactions,
1575 previous_block_hash: info.block_hash,
1576 height: info.next_block_height,
1577 authenticated_owner: Some(identity),
1578 timestamp,
1579 };
1580
1581 let round = self.round_for_oracle(&info, &identity).await?;
1582 let (block, _, never_reject_origins) = self
1585 .client
1586 .stage_block_execution(
1587 proposed_block,
1588 round,
1589 blobs.clone(),
1590 self.options.bundle_execution_policy(),
1591 )
1592 .await?;
1593 if !never_reject_origins.is_empty() {
1596 let skipped = self.skipped_origins.pin();
1597 for origin in never_reject_origins {
1598 skipped.insert(origin);
1599 }
1600 }
1601 let (proposed_block, auto_retry_outcome) = block.clone().into_proposal();
1602 *proposal_guard = Some(PendingProposal {
1603 block: proposed_block,
1604 blobs,
1605 auto_retry_outcome: Some(auto_retry_outcome),
1606 round: None,
1607 });
1608 Ok(block)
1609 }
1610
1611 #[instrument(level = "trace", skip(transactions))]
1616 fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1617 let local_time = self.storage_client().clock().current_time();
1618 transactions
1619 .iter()
1620 .filter_map(Transaction::incoming_bundle)
1621 .map(|msg| msg.bundle.timestamp)
1622 .max()
1623 .map_or(local_time, |timestamp| timestamp.max(local_time))
1624 .max(block_time)
1625 }
1626
1627 #[instrument(level = "trace", skip(query))]
1629 pub async fn query_application(
1630 &self,
1631 query: Query,
1632 block_hash: Option<CryptoHash>,
1633 ) -> Result<(QueryOutcome, BlockHeight), Error> {
1634 let mut downloaded_blobs = HashSet::<BlobId>::new();
1635 let mut events = super::EventSetDownloader::new(&self.client);
1636 loop {
1637 let result = self
1638 .client
1639 .local_node
1640 .query_application(self.chain_id, query.clone(), block_hash)
1641 .await;
1642 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1643 let new_blobs = super::filter_new(blob_ids, &downloaded_blobs);
1644 if !new_blobs.is_empty() {
1645 let validators = self.client.validator_nodes().await?;
1646 self.client
1647 .update_local_node_with_blobs_from(new_blobs.clone(), &validators)
1648 .await?;
1649 downloaded_blobs.extend(new_blobs);
1650 continue;
1651 }
1652 }
1653 if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
1654 if events.download_new(event_ids).await? {
1655 continue;
1656 }
1657 }
1658 return Ok(result?);
1659 }
1660 }
1661
1662 #[cfg(with_testing)]
1664 #[instrument(level = "trace", skip(query))]
1665 pub async fn query_system_application(
1666 &self,
1667 query: linera_execution::SystemQuery,
1668 ) -> Result<QueryOutcome<linera_execution::SystemResponse>, Error> {
1669 let (
1670 QueryOutcome {
1671 response,
1672 operations,
1673 },
1674 _,
1675 ) = self.query_application(Query::System(query), None).await?;
1676 match response {
1677 linera_execution::QueryResponse::System(response) => Ok(QueryOutcome {
1678 response,
1679 operations,
1680 }),
1681 _ => Err(Error::InternalError("Unexpected response for system query")),
1682 }
1683 }
1684
1685 #[instrument(level = "trace", skip(application_id, query))]
1687 #[cfg(with_testing)]
1688 pub async fn query_user_application<A: Abi>(
1689 &self,
1690 application_id: ApplicationId<A>,
1691 query: &A::Query,
1692 ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1693 let query = Query::user(application_id, query)?;
1694 let (
1695 QueryOutcome {
1696 response,
1697 operations,
1698 },
1699 _,
1700 ) = self.query_application(query, None).await?;
1701 match response {
1702 linera_execution::QueryResponse::User(response_bytes) => {
1703 let response = serde_json::from_slice(&response_bytes)?;
1704 Ok(QueryOutcome {
1705 response,
1706 operations,
1707 })
1708 }
1709 _ => Err(Error::InternalError("Unexpected response for user query")),
1710 }
1711 }
1712
1713 #[instrument(level = "trace")]
1720 pub async fn query_balance(&self) -> Result<Amount, Error> {
1721 let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
1722 Ok(balance)
1723 }
1724
1725 #[instrument(level = "trace", skip(owner))]
1732 pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1733 if owner.is_chain() {
1734 self.query_balance().await
1735 } else {
1736 Ok(self
1737 .query_balances_with_owner(owner)
1738 .await?
1739 .1
1740 .unwrap_or(Amount::ZERO))
1741 }
1742 }
1743
1744 #[instrument(level = "trace", skip(owner))]
1751 pub(crate) async fn query_balances_with_owner(
1752 &self,
1753 owner: AccountOwner,
1754 ) -> Result<(Amount, Option<Amount>), Error> {
1755 let incoming_bundles = self.pending_message_bundles().await?;
1756 if incoming_bundles.is_empty() {
1759 let chain_balance = self.local_balance().await?;
1760 let owner_balance = self.local_owner_balance(owner).await?;
1761 return Ok((chain_balance, Some(owner_balance)));
1762 }
1763 let info = self.chain_info().await?;
1764 let transactions = incoming_bundles
1765 .into_iter()
1766 .map(Transaction::ReceiveMessages)
1767 .collect::<Vec<_>>();
1768 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1769 let block = ProposedBlock {
1770 epoch: info.epoch,
1771 chain_id: self.chain_id,
1772 transactions,
1773 previous_block_hash: info.block_hash,
1774 height: info.next_block_height,
1775 authenticated_owner: if owner == AccountOwner::CHAIN {
1776 None
1777 } else {
1778 Some(owner)
1779 },
1780 timestamp,
1781 };
1782 match self
1783 .client
1784 .stage_block_execution(
1785 block,
1786 None,
1787 Vec::new(),
1788 self.options.bundle_execution_policy(),
1789 )
1790 .await
1791 {
1792 Ok((_, response, _)) => Ok((
1793 response.info.chain_balance,
1794 response.info.requested_owner_balance,
1795 )),
1796 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1797 error,
1798 )))) if matches!(
1799 &*error,
1800 ChainError::ExecutionError(
1801 execution_error,
1802 ChainExecutionContext::Block
1803 ) if matches!(
1804 **execution_error,
1805 ExecutionError::FeesExceedFunding { .. }
1806 )
1807 ) =>
1808 {
1809 Ok((Amount::ZERO, Some(Amount::ZERO)))
1811 }
1812 Err(error) => Err(error),
1813 }
1814 }
1815
1816 #[instrument(level = "trace")]
1820 pub async fn local_balance(&self) -> Result<Amount, Error> {
1821 let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1822 Ok(balance)
1823 }
1824
1825 #[instrument(level = "trace", skip(owner))]
1829 pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1830 if owner.is_chain() {
1831 self.local_balance().await
1832 } else {
1833 Ok(self
1834 .local_balances_with_owner(owner)
1835 .await?
1836 .1
1837 .unwrap_or(Amount::ZERO))
1838 }
1839 }
1840
1841 #[instrument(level = "trace", skip(owner))]
1845 pub(crate) async fn local_balances_with_owner(
1846 &self,
1847 owner: AccountOwner,
1848 ) -> Result<(Amount, Option<Amount>), Error> {
1849 ensure!(
1850 self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1851 Error::WalletSynchronizationError
1852 );
1853 let mut query = ChainInfoQuery::new(self.chain_id);
1854 query.request_owner_balance = owner;
1855 let response = self
1856 .client
1857 .local_node
1858 .handle_chain_info_query(query)
1859 .await?;
1860 Ok((
1861 response.info.chain_balance,
1862 response.info.requested_owner_balance,
1863 ))
1864 }
1865
1866 #[instrument(level = "trace")]
1868 pub async fn transfer_to_account(
1869 &self,
1870 from: AccountOwner,
1871 amount: Amount,
1872 account: Account,
1873 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1874 self.transfer(from, amount, account).await
1875 }
1876
1877 #[cfg(with_testing)]
1879 #[instrument(level = "trace")]
1880 pub async fn burn(
1881 &self,
1882 owner: AccountOwner,
1883 amount: Amount,
1884 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1885 let recipient = Account::burn_address(self.chain_id);
1886 self.transfer(owner, amount, recipient).await
1887 }
1888
1889 #[instrument(level = "trace")]
1890 pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1891 let validators = self.client.validator_nodes().await?;
1892 self.client
1893 .fetch_chain_info(self.chain_id, &validators)
1894 .await
1895 }
1896
1897 #[instrument(level = "trace")]
1912 pub async fn synchronize_up_to(
1913 &self,
1914 next_height: Option<BlockHeight>,
1915 until_block_time: Option<Timestamp>,
1916 ) -> Result<Box<ChainInfo>, Error> {
1917 let (_, committee) = self.client.admin_committee().await?;
1918 let validators = self.client.make_nodes(&committee)?;
1919 Box::pin(self.client.fetch_chain_info(self.chain_id, &validators)).await?;
1920 communicate_with_quorum(
1921 &validators,
1922 &committee,
1923 |_: &()| (),
1924 |remote_node| async move {
1925 self.client
1926 .download_certificates_from(
1927 &remote_node,
1928 self.chain_id,
1929 next_height.unwrap_or(BlockHeight::MAX),
1930 until_block_time,
1931 )
1932 .await?;
1933 Ok(())
1934 },
1935 self.client.options.quorum_grace_period,
1936 )
1937 .await?;
1938 self.client
1939 .local_node
1940 .chain_info(self.chain_id)
1941 .await
1942 .map_err(Into::into)
1943 }
1944
1945 pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1946 if self.is_follow_only() {
1947 return self.client.synchronize_chain_state(self.chain_id).await;
1948 }
1949 let info = self.prepare_chain().await?;
1950 self.synchronize_publisher_chains().await?;
1951 self.find_received_certificates().await?;
1952 Ok(info)
1953 }
1954
1955 #[instrument(level = "trace")]
1957 pub async fn process_pending_block(
1958 &self,
1959 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1960 self.prepare_chain().await?;
1961 let mutex = self.proposal_mutex();
1962 let mut proposal_guard = mutex.lock_owned().await;
1963 self.process_pending_block_without_prepare(&mut proposal_guard)
1964 .await
1965 }
1966
1967 #[instrument(level = "debug", skip(self, proposal_guard), fields(chain_id = %self.chain_id))]
1982 async fn process_pending_block_without_prepare(
1983 &self,
1984 proposal_guard: &mut Option<PendingProposal>,
1985 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1986 let mut did_fallback_sync = false;
1988 loop {
1989 let mut snapshot = None;
1994 let err = match self
1995 .process_pending_block_inner(proposal_guard, &mut snapshot)
1996 .await
1997 {
1998 Ok(outcome) => return Ok(outcome),
1999 Err(err) => err,
2000 };
2001 let Some(snapshot) = snapshot else {
2002 return Err(err);
2003 };
2004 let Ok(current) = self.consensus_state_snapshot().await else {
2005 return Err(err);
2006 };
2007 if current == snapshot {
2008 if did_fallback_sync {
2020 return Err(err);
2021 }
2022 did_fallback_sync = true;
2023 if let Err(error) = self.synchronize_chain_state(self.chain_id).await {
2024 tracing::error!(%error, "fallback sync failed after rejected proposal");
2025 return Err(err);
2026 }
2027 let Ok(after_sync) = self.consensus_state_snapshot().await else {
2028 return Err(err);
2029 };
2030 if after_sync == snapshot {
2031 return Err(err);
2032 }
2033 tracing::debug!(
2034 chain_id = %self.chain_id,
2035 ?snapshot,
2036 ?after_sync,
2037 %err,
2038 "fallback sync absorbed new consensus state after rejected proposal; retrying"
2039 );
2040 continue;
2041 }
2042 tracing::debug!(
2043 chain_id = %self.chain_id,
2044 ?snapshot,
2045 ?current,
2046 %err,
2047 "local consensus state advanced during process_pending_block; retrying"
2048 );
2049 }
2050 }
2051
2052 async fn process_pending_block_inner(
2053 &self,
2054 proposal_guard: &mut Option<PendingProposal>,
2055 snapshot: &mut Option<ConsensusStateSnapshot>,
2056 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2057 let process_start = linera_base::time::Instant::now();
2058 tracing::debug!("process_pending_block_without_prepare started");
2059 let info = self.request_leader_timeout_if_needed().await?;
2060 *snapshot = Some(self.consensus_state_snapshot().await?);
2065
2066 if let Some(pending) = &*proposal_guard {
2068 if pending.block.height < info.next_block_height {
2069 tracing::debug!(
2070 "Clearing pending proposal: a block was committed at height {}",
2071 pending.block.height
2072 );
2073 *proposal_guard = None;
2074 }
2075 }
2076
2077 if info.manager.has_locking_block_in_current_round()
2079 && !info.manager.current_round.is_fast()
2080 {
2081 return self.finalize_locking_block(info).await;
2082 }
2083 let identity = self.identity().await?;
2084
2085 let local_node = &self.client.local_node;
2086 let (block, blobs, owner) = if let Some(locking) = &info.manager.requested_locking {
2088 let (block, blobs) = match &**locking {
2089 LockingBlock::Regular(certificate) => {
2090 let blob_ids = certificate.block().required_blob_ids();
2091 let blobs = local_node
2092 .get_locking_blobs(&blob_ids, self.chain_id)
2093 .await?
2094 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
2095 debug!("Retrying locking block from round {}", certificate.round);
2096 (certificate.block().clone(), blobs)
2097 }
2098 LockingBlock::Fast(proposal) => {
2099 let proposed_block = proposal.content.block.clone();
2100 let blob_ids = proposed_block.published_blob_ids();
2101 let blobs = local_node
2102 .get_locking_blobs(&blob_ids, self.chain_id)
2103 .await?
2104 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
2105 let (block, _, _) = self
2106 .client
2107 .stage_block_execution(
2108 proposed_block,
2109 None,
2110 blobs.clone(),
2111 BundleExecutionPolicy::committed(),
2112 )
2113 .await?;
2114 debug!("Retrying locking block from fast round.");
2115 (block, blobs)
2116 }
2117 };
2118 (block, blobs, identity)
2119 } else if let Some(pending) = proposal_guard.as_ref() {
2120 let owner = match pending.block.authenticated_owner {
2127 Some(staged_owner) if staged_owner != identity => {
2128 if !self.has_key_for(&staged_owner).await? {
2129 if pending.round.is_some_and(|round| round.is_fast()) {
2136 return Err(Error::BlockProposalError(
2137 "pending fast block was signed by an owner whose key is no \
2138 longer available; recover the key or wait for the round to \
2139 time out before retrying",
2140 ));
2141 }
2142 warn!(
2143 ?staged_owner, %identity,
2144 "Discarding pending block: no signer key for its authenticated owner",
2145 );
2146 *proposal_guard = None;
2147 return Ok(ClientOutcome::Committed(None));
2148 }
2149 staged_owner
2150 }
2151 _ => identity,
2152 };
2153 let proposed_block = pending.block.clone();
2154 let blobs = pending.blobs.clone();
2155 let staging_outcome = pending.auto_retry_outcome.as_ref();
2156 let round = self.round_for_oracle(&info, &owner).await?;
2157 let (block, _, _) = self
2158 .client
2159 .stage_block_execution(
2160 proposed_block,
2161 round,
2162 blobs.clone(),
2163 BundleExecutionPolicy::committed(),
2164 )
2165 .await?;
2166 if let Some(staging_outcome) = staging_outcome {
2170 ensure!(
2171 block.outcome_matches(staging_outcome),
2172 Error::ExecutionOutcomeMismatch
2173 );
2174 }
2175 debug!("Proposing the local pending block.");
2176 (block, blobs, owner)
2177 } else {
2178 return Ok(ClientOutcome::Committed(None)); };
2180
2181 let has_oracle_responses = block.has_oracle_responses();
2182 let (proposed_block, outcome) = block.into_proposal();
2183 let round = match self
2184 .round_for_new_proposal(&info, &owner, has_oracle_responses)
2185 .await?
2186 {
2187 Either::Left(round) => round,
2188 Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
2189 };
2190 debug!("Proposing block for round {}", round);
2191 if let Some(pending) = proposal_guard.as_mut() {
2192 pending.round.get_or_insert(round);
2193 }
2194
2195 let already_handled_locally = info
2196 .manager
2197 .already_handled_proposal(round, &proposed_block);
2198 let proposal = if let Some(locking) = info.manager.requested_locking {
2200 Box::new(match *locking {
2201 LockingBlock::Regular(cert) => {
2202 BlockProposal::new_retry_regular(owner, round, cert, self.signer())
2203 .await
2204 .map_err(Error::signer_failure)?
2205 }
2206 LockingBlock::Fast(proposal) => {
2207 BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
2208 .await
2209 .map_err(Error::signer_failure)?
2210 }
2211 })
2212 } else {
2213 Box::new(
2214 BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
2215 .await
2216 .map_err(Error::signer_failure)?,
2217 )
2218 };
2219 if !already_handled_locally {
2220 if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
2222 match err {
2223 LocalNodeError::BlobsNotFound(_) => {
2224 local_node
2225 .handle_pending_blobs(self.chain_id, blobs)
2226 .await?;
2227 local_node.handle_block_proposal(*proposal.clone()).await?;
2228 }
2229 err => return Err(err.into()),
2230 }
2231 }
2232 }
2233 *snapshot = Some(self.consensus_state_snapshot().await?);
2238 let committee = self.local_committee().await?;
2239 let block = Block::new(proposed_block, outcome);
2240 let submit_block_proposal_start = linera_base::time::Instant::now();
2242 let certificate = if round.is_fast() {
2243 let hashed_value = ConfirmedBlock::new(block);
2244 self.client
2245 .submit_block_proposal(committee.clone(), proposal, hashed_value)
2246 .await?
2247 } else {
2248 let hashed_value = ValidatedBlock::new(block);
2249 let certificate = self
2250 .client
2251 .submit_block_proposal(committee.clone(), proposal, hashed_value.clone())
2252 .await?;
2253 self.client.finalize_block(&committee, certificate).await?
2254 };
2255 self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
2256 tracing::debug!(
2257 total_process_ms = process_start.elapsed().as_millis(),
2258 "process_pending_block_without_prepare completing"
2259 );
2260 debug!(round = %certificate.round, "Sending confirmed block to validators");
2261 let certificate = self.client.storage_client().cache_certificate(certificate);
2262 self.update_validators(Some(&committee), Some(certificate.clone()))
2263 .await?;
2264 *proposal_guard = None;
2266 Ok(ClientOutcome::Committed(Some(CacheArc::unwrap_or_clone(
2267 certificate,
2268 ))))
2269 }
2270
2271 #[expect(
2272 clippy::cast_possible_truncation,
2273 reason = "elapsed millis fits in u64 for any realistic measurement window"
2274 )]
2275 fn send_timing(&self, start: Instant, timing_type: TimingType) {
2276 let Some(sender) = &self.timing_sender else {
2277 return;
2278 };
2279 if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
2280 tracing::warn!(%err, "Failed to send timing info");
2281 }
2282 }
2283
2284 async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
2287 let mut info = self.chain_info_with_manager_values().await?;
2288 if let Some(round_timeout) = info.manager.round_timeout {
2291 if round_timeout <= self.storage_client().clock().current_time() {
2292 if let Err(e) = self.request_leader_timeout().await {
2293 debug!("Failed to obtain a timeout certificate: {}", e);
2294 } else {
2295 info = self.chain_info_with_manager_values().await?;
2296 }
2297 }
2298 }
2299 Ok(info)
2300 }
2301
2302 async fn finalize_locking_block(
2306 &self,
2307 info: Box<ChainInfo>,
2308 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2309 let locking = info
2310 .manager
2311 .requested_locking
2312 .expect("Should have a locking block");
2313 let LockingBlock::Regular(certificate) = *locking else {
2314 panic!("Should have a locking validated block");
2315 };
2316 debug!(
2317 round = %certificate.round,
2318 "Finalizing locking block"
2319 );
2320 let committee = self.local_committee().await?;
2321 let certificate = self
2322 .client
2323 .finalize_block(&committee, certificate.clone())
2324 .await?;
2325 let certificate = self.client.storage_client().cache_certificate(certificate);
2326 self.update_validators(Some(&committee), Some(certificate.clone()))
2327 .await?;
2328 Ok(ClientOutcome::Committed(Some(CacheArc::unwrap_or_clone(
2329 certificate,
2330 ))))
2331 }
2332
2333 async fn round_for_oracle(
2335 &self,
2336 info: &ChainInfo,
2337 identity: &AccountOwner,
2338 ) -> Result<Option<u32>, Error> {
2339 match self.round_for_new_proposal(info, identity, true).await {
2341 Ok(Either::Left(round)) => Ok(round.multi_leader()),
2343 Err(Error::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
2347 Err(err) => Err(err),
2348 }
2349 }
2350
2351 async fn round_for_new_proposal(
2353 &self,
2354 info: &ChainInfo,
2355 identity: &AccountOwner,
2356 has_oracle_responses: bool,
2357 ) -> Result<Either<Round, RoundTimeout>, Error> {
2358 let manager = &info.manager;
2359 let seed = manager.seed;
2360 let skip_fast = manager.current_round.is_fast()
2365 && (has_oracle_responses || !self.options.allow_fast_blocks);
2366 let conflict = manager
2367 .requested_signed_proposal
2368 .as_ref()
2369 .into_iter()
2370 .chain(&manager.requested_proposed)
2371 .any(|proposal| proposal.content.round == manager.current_round)
2372 || skip_fast;
2373 let round = if !conflict {
2374 manager.current_round
2375 } else if let Some(round) = manager
2376 .ownership
2377 .next_round(manager.current_round)
2378 .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
2379 {
2380 round
2381 } else if let Some(timeout) = info.round_timeout() {
2382 return Ok(Either::Right(timeout));
2383 } else {
2384 return Err(Error::BlockProposalError(
2385 "Conflicting proposal in the current round",
2386 ));
2387 };
2388 let current_committee = self
2389 .local_committee()
2390 .await?
2391 .validators
2392 .values()
2393 .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
2394 .collect();
2395 if manager.should_propose(identity, round, seed, ¤t_committee) {
2396 if let Some(wait_until) = self.multi_leader_jitter_target(info, identity, round) {
2397 return Ok(Either::Right(RoundTimeout {
2398 timestamp: wait_until,
2399 current_round: round,
2400 next_block_height: info.next_block_height,
2401 }));
2402 }
2403 return Ok(Either::Left(round));
2404 }
2405 if let Some(timeout) = info.round_timeout() {
2406 return Ok(Either::Right(timeout));
2407 }
2408 Err(Error::BlockProposalError(
2409 "Not a leader in the current round",
2410 ))
2411 }
2412
2413 fn multi_leader_jitter_target(
2422 &self,
2423 info: &ChainInfo,
2424 owner: &AccountOwner,
2425 round: Round,
2426 ) -> Option<Timestamp> {
2427 if !self.options.multi_leader_jitter {
2428 return None;
2429 }
2430 let ownership = &info.manager.ownership;
2431 let delay = ownership.multi_leader_proposal_delay(owner, round)?;
2432 if delay == TimeDelta::ZERO {
2433 return None;
2434 }
2435 let now = self.storage_client().clock().current_time();
2436 let round_start = if round == info.manager.current_round {
2437 match (info.manager.round_timeout, ownership.round_timeout(round)) {
2438 (Some(end), Some(duration)) => end.saturating_sub(duration),
2439 _ => now,
2440 }
2441 } else {
2442 now
2443 };
2444 let propose_at = round_start.saturating_add(delay);
2445 (propose_at > now).then_some(propose_at)
2446 }
2447
2448 #[instrument(level = "trace")]
2455 pub async fn clear_pending_proposal(&self) {
2456 *self.proposal_mutex().lock().await = None;
2457 }
2458
2459 #[cfg(with_testing)]
2463 #[instrument(level = "trace")]
2464 pub async fn rotate_key_pair(
2465 &self,
2466 public_key: linera_base::crypto::AccountPublicKey,
2467 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2468 self.transfer_ownership(public_key.into()).await
2469 }
2470
2471 #[instrument(level = "trace")]
2473 pub async fn transfer_ownership(
2474 &self,
2475 new_owner: AccountOwner,
2476 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2477 self.execute_operation(SystemOperation::ChangeOwnership {
2478 super_owners: vec![new_owner],
2479 owners: Vec::new(),
2480 first_leader: None,
2481 multi_leader_rounds: 5,
2482 open_multi_leader_rounds: false,
2483 timeout_config: TimeoutConfig::default(),
2484 })
2485 .await
2486 }
2487
2488 #[instrument(level = "trace")]
2490 pub async fn share_ownership(
2491 &self,
2492 new_owner: AccountOwner,
2493 new_weight: u64,
2494 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2495 let ownership = self.prepare_chain().await?.manager.ownership;
2496 ensure!(
2497 ownership.is_active(),
2498 ChainError::InactiveChain(self.chain_id)
2499 );
2500 let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
2501 owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
2502 owners.push((new_owner, new_weight));
2503 let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
2504 super_owners: Vec::new(),
2505 owners,
2506 first_leader: ownership.first_leader,
2507 multi_leader_rounds: ownership.multi_leader_rounds,
2508 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2509 timeout_config: ownership.timeout_config,
2510 })];
2511 self.execute_block(operations, vec![]).await
2512 }
2513
2514 #[instrument(level = "trace")]
2516 pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, Error> {
2517 Ok(self
2518 .client
2519 .local_node
2520 .chain_state_view(self.chain_id)
2521 .await?
2522 .execution_state
2523 .system
2524 .ownership
2525 .get()
2526 .await?
2527 .clone())
2528 }
2529
2530 #[instrument(level = "trace")]
2533 pub async fn change_ownership(
2534 &self,
2535 ownership: ChainOwnership,
2536 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2537 self.execute_operation(SystemOperation::ChangeOwnership {
2538 super_owners: ownership.super_owners.into_iter().collect(),
2539 owners: ownership.owners.into_iter().collect(),
2540 first_leader: ownership.first_leader,
2541 multi_leader_rounds: ownership.multi_leader_rounds,
2542 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2543 timeout_config: ownership.timeout_config.clone(),
2544 })
2545 .await
2546 }
2547
2548 #[instrument(level = "trace")]
2550 pub async fn query_application_permissions(&self) -> Result<ApplicationPermissions, Error> {
2551 Ok(self
2552 .client
2553 .local_node
2554 .chain_state_view(self.chain_id)
2555 .await?
2556 .execution_state
2557 .system
2558 .application_permissions
2559 .get()
2560 .await?
2561 .clone())
2562 }
2563
2564 #[instrument(level = "trace", skip(application_permissions))]
2566 pub async fn change_application_permissions(
2567 &self,
2568 application_permissions: ApplicationPermissions,
2569 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2570 self.execute_operation(SystemOperation::ChangeApplicationPermissions(
2571 application_permissions,
2572 ))
2573 .await
2574 }
2575
2576 #[instrument(level = "trace", skip(self))]
2578 pub async fn open_chain(
2579 &self,
2580 ownership: ChainOwnership,
2581 application_permissions: ApplicationPermissions,
2582 balance: Amount,
2583 ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2584 let mut has_key = false;
2586 for owner in ownership.all_owners() {
2587 if self.has_key_for(owner).await? {
2588 has_key = true;
2589 break;
2590 }
2591 }
2592 let config = OpenChainConfig {
2593 ownership,
2594 balance,
2595 application_permissions,
2596 };
2597 let operation = Operation::system(SystemOperation::OpenChain(config));
2598 let certificate = match self.execute_block(vec![operation], vec![]).await? {
2599 ClientOutcome::Committed(certificate) => certificate,
2600 ClientOutcome::Conflict(certificate) => {
2601 return Ok(ClientOutcome::Conflict(certificate));
2602 }
2603 ClientOutcome::WaitForTimeout(timeout) => {
2604 return Ok(ClientOutcome::WaitForTimeout(timeout));
2605 }
2606 };
2607 let chain_blob = certificate
2609 .block()
2610 .body
2611 .blobs
2612 .last()
2613 .and_then(|blobs| blobs.last())
2614 .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2615 let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2616 if has_key {
2618 self.client
2619 .extend_chain_mode(description.id(), ListeningMode::FullChain);
2620 self.client
2621 .retry_pending_cross_chain_requests(self.chain_id)
2622 .await?;
2623 }
2624 Ok(ClientOutcome::Committed((description, certificate)))
2625 }
2626
2627 #[instrument(level = "trace")]
2631 pub async fn checkpoint(&self) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2632 self.execute_operation(SystemOperation::Checkpoint).await
2633 }
2634
2635 #[instrument(level = "trace")]
2638 pub async fn close_chain(
2639 &self,
2640 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2641 match self.execute_operation(SystemOperation::CloseChain).await {
2642 Ok(outcome) => Ok(outcome.map(Some)),
2643 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2644 chain_error,
2645 )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2646 Ok(ClientOutcome::Committed(None)) }
2648 Err(error) => Err(error),
2649 }
2650 }
2651
2652 #[cfg(not(target_arch = "wasm32"))]
2655 #[instrument(level = "trace", skip(contract, service, formats))]
2656 pub async fn publish_module(
2657 &self,
2658 contract: Bytecode,
2659 service: Bytecode,
2660 vm_runtime: VmRuntime,
2661 formats: Option<Vec<u8>>,
2662 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2663 let (blobs, module_id) =
2664 super::create_bytecode_blobs(contract, service, vm_runtime, formats).await;
2665 self.publish_module_blobs(blobs, module_id).await
2666 }
2667
2668 #[cfg(not(target_arch = "wasm32"))]
2670 #[instrument(level = "trace", skip(blobs, module_id))]
2671 pub async fn publish_module_blobs(
2672 &self,
2673 blobs: Vec<Blob>,
2674 module_id: ModuleId,
2675 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2676 self.execute_operations(
2677 vec![Operation::system(SystemOperation::PublishModule {
2678 module_id,
2679 })],
2680 blobs,
2681 )
2682 .await?
2683 .try_map(|certificate| Ok((module_id, certificate)))
2684 }
2685
2686 #[instrument(level = "trace", skip(bytes))]
2688 pub async fn publish_data_blobs(
2689 &self,
2690 bytes: Vec<Vec<u8>>,
2691 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2692 let blobs = bytes.into_iter().map(Blob::new_data);
2693 let publish_blob_operations = blobs
2694 .clone()
2695 .map(|blob| {
2696 Operation::system(SystemOperation::PublishDataBlob {
2697 blob_hash: blob.id().hash,
2698 })
2699 })
2700 .collect();
2701 self.execute_operations(publish_blob_operations, blobs.collect())
2702 .await
2703 }
2704
2705 #[instrument(level = "trace", skip(bytes))]
2707 pub async fn publish_data_blob(
2708 &self,
2709 bytes: Vec<u8>,
2710 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2711 self.publish_data_blobs(vec![bytes]).await
2712 }
2713
2714 #[instrument(
2716 level = "trace",
2717 skip(self, parameters, instantiation_argument, required_application_ids)
2718 )]
2719 pub async fn create_application<
2720 A: Abi,
2721 Parameters: Serialize,
2722 InstantiationArgument: Serialize,
2723 >(
2724 &self,
2725 module_id: ModuleId<A, Parameters, InstantiationArgument>,
2726 parameters: &Parameters,
2727 instantiation_argument: &InstantiationArgument,
2728 required_application_ids: Vec<ApplicationId>,
2729 ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2730 let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2731 let parameters = serde_json::to_vec(parameters)?;
2732 Ok(self
2733 .create_application_untyped(
2734 module_id.forget_abi(),
2735 parameters,
2736 instantiation_argument,
2737 required_application_ids,
2738 )
2739 .await?
2740 .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2741 }
2742
2743 #[instrument(
2745 level = "trace",
2746 skip(
2747 self,
2748 module_id,
2749 parameters,
2750 instantiation_argument,
2751 required_application_ids
2752 )
2753 )]
2754 pub async fn create_application_untyped(
2755 &self,
2756 module_id: ModuleId,
2757 parameters: Vec<u8>,
2758 instantiation_argument: Vec<u8>,
2759 required_application_ids: Vec<ApplicationId>,
2760 ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2761 self.execute_operation(SystemOperation::CreateApplication {
2762 module_id,
2763 parameters,
2764 instantiation_argument,
2765 required_application_ids,
2766 })
2767 .await?
2768 .try_map(|certificate| {
2769 let mut creation = certificate
2771 .block()
2772 .created_blob_ids()
2773 .into_iter()
2774 .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2775 .collect::<Vec<_>>();
2776 if creation.len() > 1 {
2777 return Err(Error::InternalError(
2778 "Unexpected number of application descriptions published",
2779 ));
2780 }
2781 let blob_id = creation.pop().ok_or(Error::InternalError(
2782 "ApplicationDescription blob not found.",
2783 ))?;
2784 let id = ApplicationId::new(blob_id.hash);
2785 Ok((id, certificate))
2786 })
2787 }
2788
2789 #[instrument(level = "trace", skip(committee))]
2791 pub async fn stage_new_committee(
2792 &self,
2793 committee: Committee,
2794 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2795 let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2796 let blob_hash = blob.id().hash;
2797 match self
2798 .execute_operations(
2799 vec![Operation::system(SystemOperation::Admin(
2800 AdminOperation::PublishCommitteeBlob { blob_hash },
2801 ))],
2802 vec![blob],
2803 )
2804 .await?
2805 {
2806 ClientOutcome::Committed(_) => {}
2807 outcome @ ClientOutcome::WaitForTimeout(_) | outcome @ ClientOutcome::Conflict(_) => {
2808 return Ok(outcome)
2809 }
2810 }
2811 let epoch = self.chain_info().await?.epoch.try_add_one()?;
2812 self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2813 epoch,
2814 blob_hash,
2815 }))
2816 .await
2817 }
2818
2819 #[instrument(level = "trace")]
2825 pub async fn process_inbox(
2826 &self,
2827 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2828 self.prepare_chain().await?;
2829 self.process_inbox_without_prepare().await
2830 }
2831
2832 #[instrument(level = "trace")]
2838 pub async fn process_inbox_without_prepare(
2839 &self,
2840 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2841 #[cfg(with_metrics)]
2842 let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2843
2844 let mut certificates = Vec::new();
2845 loop {
2846 match self.execute_block(vec![], vec![]).await {
2850 Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
2851 Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
2852 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
2853 return Ok((certificates, Some(timeout)));
2854 }
2855 Err(Error::LocalNodeError(LocalNodeError::WorkerError(
2857 WorkerError::ChainError(chain_error),
2858 ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
2859 return Ok((certificates, None));
2860 }
2861 Err(error) => return Err(error),
2862 };
2863 }
2864 }
2865
2866 async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2868 let mut next_epoch = self.chain_info().await?.epoch.try_add_one()?;
2869 let mut epoch_change_ops = Vec::new();
2870 while self
2871 .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2872 .await?
2873 {
2874 epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2875 next_epoch,
2876 )));
2877 next_epoch.try_add_assign_one()?;
2878 }
2879 Ok(epoch_change_ops)
2880 }
2881
2882 async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2885 let event_id = EventId {
2886 chain_id: self.client.admin_chain_id,
2887 stream_id: StreamId::system(stream_name),
2888 index,
2889 };
2890 Ok(self
2891 .client
2892 .storage_client()
2893 .read_event(event_id)
2894 .await?
2895 .is_some())
2896 }
2897
2898 pub async fn events_from_index(
2900 &self,
2901 stream_id: StreamId,
2902 start_index: u32,
2903 ) -> Result<Vec<IndexAndEvent>, Error> {
2904 Ok(self
2905 .client
2906 .storage_client()
2907 .read_events_from_index(&self.chain_id, &stream_id, start_index)
2908 .await?)
2909 }
2910
2911 #[instrument(level = "trace")]
2915 pub async fn revoke_epochs(
2916 &self,
2917 revoked_epoch: Epoch,
2918 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2919 self.prepare_chain().await?;
2920 let current_epoch = self.chain_info().await?.epoch;
2921 ensure!(
2922 revoked_epoch < current_epoch,
2923 Error::CannotRevokeCurrentEpoch(current_epoch)
2924 );
2925 let mut operations = Vec::new();
2926 for epoch_index in 0..=revoked_epoch.0 {
2927 let epoch = Epoch(epoch_index);
2928 if self
2929 .has_admin_event(REMOVED_EPOCH_STREAM_NAME, epoch.0)
2930 .await?
2931 {
2932 continue;
2933 }
2934 operations.push(Operation::system(SystemOperation::Admin(
2935 AdminOperation::RemoveCommittee { epoch },
2936 )));
2937 }
2938 ensure!(!operations.is_empty(), Error::EpochAlreadyRevoked);
2939 self.execute_operations(operations, vec![]).await
2940 }
2941
2942 #[cfg(with_testing)]
2946 #[instrument(level = "trace")]
2947 pub async fn transfer_to_account_unsafe_unconfirmed(
2948 &self,
2949 owner: AccountOwner,
2950 amount: Amount,
2951 recipient: Account,
2952 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2953 self.execute_operation(SystemOperation::Transfer {
2954 owner,
2955 recipient,
2956 amount,
2957 })
2958 .await
2959 }
2960
2961 #[instrument(level = "trace", skip(hash))]
2962 pub async fn read_confirmed_block(
2963 &self,
2964 hash: CryptoHash,
2965 ) -> Result<Arc<ConfirmedBlock>, Error> {
2966 self.client
2967 .storage_client()
2968 .read_confirmed_block(hash)
2969 .await?
2970 .ok_or(Error::MissingConfirmedBlock(hash))
2971 .map(|b| b.into_std())
2972 }
2973
2974 #[instrument(level = "trace", skip(hash))]
2975 pub async fn read_certificate(
2976 &self,
2977 hash: CryptoHash,
2978 ) -> Result<CacheArc<ConfirmedBlockCertificate>, Error> {
2979 self.client
2980 .storage_client()
2981 .read_certificate(hash)
2982 .await?
2983 .ok_or(Error::ReadCertificatesError(vec![hash]))
2984 }
2985
2986 #[instrument(level = "trace")]
2988 pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2989 self.client
2990 .retry_pending_cross_chain_requests(self.chain_id)
2991 .await?;
2992 Ok(())
2993 }
2994
2995 #[instrument(level = "trace", skip(local_node))]
2996 async fn maybe_local_chain_info(
2997 &self,
2998 chain_id: ChainId,
2999 local_node: &LocalNodeClient<Env::Storage>,
3000 ) -> Result<Option<Box<ChainInfo>>, Error> {
3001 match local_node.chain_info(chain_id).await {
3002 Ok(info) => Ok(Some(info)),
3003 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
3004 Err(err) => Err(err.into()),
3005 }
3006 }
3007
3008 #[instrument(level = "trace", skip(chain_id, local_node))]
3009 async fn local_next_block_height(
3010 &self,
3011 chain_id: ChainId,
3012 local_node: &LocalNodeClient<Env::Storage>,
3013 ) -> Result<BlockHeight, Error> {
3014 Ok(self
3015 .maybe_local_chain_info(chain_id, local_node)
3016 .await?
3017 .map_or(BlockHeight::ZERO, |info| info.next_block_height))
3018 }
3019
3020 #[instrument(level = "trace")]
3023 async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
3024 Ok(self
3025 .client
3026 .local_node
3027 .get_inbox_next_height(self.chain_id, origin)
3028 .await?)
3029 }
3030
3031 #[instrument(level = "trace", skip(remote_node, local_node, notification))]
3032 async fn process_notification(
3033 &self,
3034 remote_node: RemoteNode<Env::ValidatorNode>,
3035 local_node: LocalNodeClient<Env::Storage>,
3036 notification: Notification,
3037 ) -> Result<(), Error> {
3038 let listening_mode = self.client.chain_mode(notification.chain_id);
3039 let relevant = listening_mode
3040 .as_ref()
3041 .is_some_and(|mode| mode.is_relevant(¬ification.reason));
3042 if !relevant {
3043 tracing::trace!(
3044 chain_id = %notification.chain_id,
3045 reason = ?notification.reason,
3046 ?listening_mode,
3047 "Ignoring notification due to listening mode"
3048 );
3049 return Ok(());
3050 }
3051 match notification.reason {
3052 Reason::NewIncomingBundle { origin, height } => {
3053 if self.options.message_policy.ignores_origin(&origin) {
3054 trace!(
3055 chain_id = %self.chain_id,
3056 %origin,
3057 %height,
3058 "Skipping NewIncomingBundle notification: origin filtered by message_policy"
3059 );
3060 return Ok(());
3061 }
3062 if self.local_next_height_to_receive(origin).await? > height {
3063 debug!(
3064 chain_id = %self.chain_id,
3065 "Accepting redundant notification for new message"
3066 );
3067 return Ok(());
3068 }
3069 self.client
3070 .download_sender_block_with_sending_ancestors(
3071 self.chain_id,
3072 origin,
3073 height,
3074 &remote_node,
3075 )
3076 .await?;
3077 if self.local_next_height_to_receive(origin).await? <= height {
3078 info!(
3079 chain_id = %self.chain_id,
3080 "NewIncomingBundle: Fail to synchronize new message after notification"
3081 );
3082 }
3083 }
3084 Reason::NewBlock { height, .. } => {
3085 let chain_id = notification.chain_id;
3086 let local_height = self.local_next_block_height(chain_id, &local_node).await?;
3087 if local_height > height {
3088 debug!(
3089 chain_id = %self.chain_id,
3090 "Accepting redundant notification for new block"
3091 );
3092 return Ok(());
3093 }
3094 self.client
3097 .synchronize_chain_state_from(&remote_node, chain_id)
3098 .await?;
3099 if self.local_next_block_height(chain_id, &local_node).await? <= height {
3100 error!("NewBlock: Fail to synchronize new block after notification");
3101 }
3102 trace!(
3103 chain_id = %self.chain_id,
3104 %height,
3105 "NewBlock: processed notification",
3106 );
3107 }
3108 Reason::NewEvents {
3109 height, block_hash, ..
3110 } => {
3111 let chain_id = notification.chain_id;
3112 let local_height = self.local_next_block_height(chain_id, &local_node).await?;
3113 if local_height > height {
3114 debug!(
3115 chain_id = %self.chain_id,
3116 "Accepting redundant notification for new events"
3117 );
3118 return Ok(());
3119 }
3120 trace!(
3121 chain_id = %self.chain_id,
3122 %height,
3123 "NewEvents: processing notification"
3124 );
3125 let relevant_streams = match self.listening_mode() {
3128 Some(ListeningMode::EventsOnly(subscribed)) => subscribed,
3129 _ => unreachable!(),
3132 };
3133 self.client
3134 .download_event_bearing_blocks(
3135 self.chain_id,
3136 BTreeSet::from([(height, block_hash)]),
3137 local_height,
3138 &relevant_streams,
3139 &remote_node,
3140 )
3141 .await?;
3142 }
3143 Reason::NewRound { height, round } => {
3144 let chain_id = notification.chain_id;
3145 if let Some(info) = self.maybe_local_chain_info(chain_id, &local_node).await? {
3146 if (info.next_block_height, info.manager.current_round) >= (height, round) {
3147 debug!(
3148 chain_id = %self.chain_id,
3149 "Accepting redundant notification for new round"
3150 );
3151 return Ok(());
3152 }
3153 }
3154 self.client
3155 .synchronize_chain_state_from(&remote_node, chain_id)
3156 .await?;
3157 let Some(info) = self.maybe_local_chain_info(chain_id, &local_node).await? else {
3158 error!(
3159 chain_id = %self.chain_id,
3160 "NewRound: Fail to read local chain info for {chain_id}"
3161 );
3162 return Ok(());
3163 };
3164 if (info.next_block_height, info.manager.current_round) < (height, round) {
3165 info!(
3166 chain_id = %self.chain_id,
3167 "NewRound: Fail to synchronize new block after notification"
3168 );
3169 }
3170 }
3171 Reason::BlockExecuted { .. } => {
3172 }
3174 }
3175 Ok(())
3176 }
3177
3178 pub fn is_tracked(&self) -> bool {
3180 self.client.is_tracked(self.chain_id)
3181 }
3182
3183 pub fn listening_mode(&self) -> Option<ListeningMode> {
3185 self.client.chain_mode(self.chain_id)
3186 }
3187
3188 #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
3193 pub async fn listen(
3194 &self,
3195 ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
3196 use future::FutureExt as _;
3197
3198 async fn await_while_polling<F: FusedFuture>(
3199 future: F,
3200 background_work: impl FusedStream<Item = ()>,
3201 ) -> F::Output {
3202 tokio::pin!(future);
3203 tokio::pin!(background_work);
3204 loop {
3205 futures::select! {
3206 _ = background_work.next() => (),
3207 result = future => return result,
3208 }
3209 }
3210 }
3211
3212 let mut senders = HashMap::new();
3213 let mut circuit_breakers: HashMap<ValidatorPublicKey, CircuitBreakerState> = HashMap::new();
3214 let notifications = self.subscribe()?;
3215 let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
3216
3217 let mut process_notifications = FuturesUnordered::new();
3224
3225 match self
3226 .update_notification_streams(&mut senders, &mut circuit_breakers)
3227 .await
3228 {
3229 Ok(handler) => process_notifications.push(handler),
3230 Err(error) => error!("Failed to update committee: {error}"),
3231 };
3232
3233 let this = self.clone();
3234 let update_streams = async move {
3235 let mut abortable_notifications = abortable_notifications.fuse();
3236
3237 while let Some(notification) =
3238 await_while_polling(abortable_notifications.next(), &mut process_notifications)
3239 .await
3240 {
3241 if let Reason::NewBlock { .. } = notification.reason {
3242 match Box::pin(await_while_polling(
3243 this.update_notification_streams(&mut senders, &mut circuit_breakers)
3244 .fuse(),
3245 &mut process_notifications,
3246 ))
3247 .await
3248 {
3249 Ok(handler) => process_notifications.push(handler),
3250 Err(error) => error!("Failed to update committee: {error}"),
3251 }
3252 }
3253 }
3254
3255 for abort in senders.into_values() {
3256 abort.abort();
3257 }
3258
3259 let () = process_notifications.collect().await;
3260 }
3261 .in_current_span();
3262
3263 Ok((update_streams, AbortOnDrop(abort), notifications))
3264 }
3265
3266 #[instrument(level = "trace", skip(senders, circuit_breakers))]
3267 async fn update_notification_streams(
3268 &self,
3269 senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
3270 circuit_breakers: &mut HashMap<ValidatorPublicKey, CircuitBreakerState>,
3271 ) -> Result<impl Future<Output = ()>, Error> {
3272 let initial_probe_interval = self
3273 .options
3274 .notification_circuit_breaker_initial_probe_interval;
3275 let max_probe_interval = self.options.notification_circuit_breaker_max_probe_interval;
3276 let (nodes, local_node) = {
3277 let committee = if self
3281 .listening_mode()
3282 .is_some_and(|m| m.should_sync_chain_state())
3283 {
3284 self.local_committee().await?
3285 } else {
3286 self.client.admin_committee().await?.1
3287 };
3288 let nodes = self
3289 .client
3290 .validator_node_provider()
3291 .make_nodes(&committee)?
3292 .collect::<HashMap<_, _>>();
3293 (nodes, self.client.local_node.clone())
3294 };
3295 for (validator, abort) in senders.iter() {
3297 if abort.is_aborted() && nodes.contains_key(validator) {
3298 if let Some(state) = circuit_breakers.get_mut(validator) {
3299 state.probe_interval = (state.probe_interval * 2).min(max_probe_interval);
3301 state.next_probe_at = Instant::now() + state.probe_interval;
3302 warn!(
3303 %validator,
3304 chain_id = %self.chain_id,
3305 next_probe_in = ?state.probe_interval,
3306 "Validator still unhealthy after probe; increasing probe interval"
3307 );
3308 } else {
3309 circuit_breakers.insert(
3311 *validator,
3312 CircuitBreakerState {
3313 next_probe_at: Instant::now() + initial_probe_interval,
3314 probe_interval: initial_probe_interval,
3315 },
3316 );
3317 error!(
3318 %validator,
3319 chain_id = %self.chain_id,
3320 next_probe_in = ?initial_probe_interval,
3321 "Validator notification stream ended; entering circuit breaker"
3322 );
3323 }
3324 } else if !abort.is_aborted() && circuit_breakers.contains_key(validator) {
3325 info!(
3327 %validator,
3328 chain_id = %self.chain_id,
3329 "Validator recovered from circuit breaker"
3330 );
3331 circuit_breakers.remove(validator);
3332 }
3333 }
3334
3335 senders.retain(|validator, abort| {
3336 if !nodes.contains_key(validator) {
3337 abort.abort();
3338 }
3339 !abort.is_aborted()
3340 });
3341 circuit_breakers.retain(|validator, _| nodes.contains_key(validator));
3342
3343 let validator_tasks = FuturesUnordered::new();
3344 for (public_key, node) in nodes {
3345 let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
3346 continue;
3347 };
3348
3349 if let Some(state) = circuit_breakers.get(&public_key) {
3351 if Instant::now() < state.next_probe_at {
3352 continue;
3353 }
3354 debug!(
3355 validator = %public_key,
3356 chain_id = %self.chain_id,
3357 "Probing unhealthy validator"
3358 );
3359 }
3360
3361 let address = node.address();
3362 let this = self.clone();
3363 let listening_mode_for_sync = self.listening_mode();
3364 let stream = stream::once({
3365 let node = node.clone();
3366 async move {
3367 let stream = node.subscribe(vec![this.chain_id]).await?;
3368 let remote_node = RemoteNode { public_key, node };
3371 if listening_mode_for_sync
3372 .as_ref()
3373 .is_some_and(|mode| mode.should_sync_chain_state())
3374 {
3375 this.client
3376 .synchronize_chain_state_from(&remote_node, this.chain_id)
3377 .await?;
3378 } else {
3379 if let Some(ListeningMode::EventsOnly(subscribed)) =
3383 listening_mode_for_sync.as_ref()
3384 {
3385 if let Err(error) = this
3386 .client
3387 .sync_events_from_node(this.chain_id, subscribed, &remote_node)
3388 .await
3389 {
3390 debug!(
3391 chain_id = %this.chain_id,
3392 %error,
3393 "Failed initial sparse sync for EventsOnly chain"
3394 );
3395 }
3396 }
3397 }
3398 Ok::<_, Error>(stream)
3399 }
3400 })
3401 .filter_map(move |result| {
3402 let address = address.clone();
3403 async move {
3404 if let Err(error) = &result {
3405 info!(?error, address, "could not connect to validator");
3406 } else {
3407 debug!(address, "connected to validator");
3408 }
3409 result.ok()
3410 }
3411 })
3412 .flatten();
3413 let (stream, abort) = stream::abortable(stream);
3414 let mut stream = Box::pin(stream);
3415 let abort_on_exit = abort.clone();
3416 let this = self.clone();
3417 let local_node = local_node.clone();
3418 let remote_node = RemoteNode { public_key, node };
3419 validator_tasks.push(async move {
3420 while let Some(notification) = stream.next().await {
3421 if let Err(error) = this
3422 .process_notification(
3423 remote_node.clone(),
3424 local_node.clone(),
3425 notification.clone(),
3426 )
3427 .await
3428 {
3429 tracing::info!(
3430 chain_id = %this.chain_id,
3431 address = remote_node.address(),
3432 ?notification,
3433 %error,
3434 "failed to process notification",
3435 );
3436 }
3437 }
3438 warn!(
3439 chain_id = %this.chain_id,
3440 address = remote_node.address(),
3441 "Validator notification stream ended"
3442 );
3443 abort_on_exit.abort();
3444 });
3445 entry.insert(abort);
3446 }
3447 Ok(validator_tasks.collect())
3448 }
3449
3450 #[instrument(level = "trace", skip(remote_node))]
3452 pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
3453 let validator_next_block_height = match remote_node
3454 .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
3455 .await
3456 {
3457 Ok(info) => info.info.next_block_height,
3458 Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
3460 Err(err) => return Err(err.into()),
3461 };
3462 let local_next_block_height = self.chain_info().await?.next_block_height;
3463
3464 if validator_next_block_height >= local_next_block_height {
3465 debug!("Validator is up-to-date with local state");
3466 return Ok(());
3467 }
3468
3469 let heights = (validator_next_block_height.0..local_next_block_height.0)
3470 .map(BlockHeight)
3471 .collect::<Vec<_>>();
3472
3473 let certificates = self
3474 .client
3475 .storage_client()
3476 .read_certificates_by_heights(self.chain_id, &heights)
3477 .await?
3478 .into_iter()
3479 .flatten();
3480
3481 for certificate in certificates {
3482 let missing_blob_ids = match remote_node
3483 .handle_confirmed_certificate(
3484 certificate.clone(),
3485 CrossChainMessageDelivery::NonBlocking,
3486 )
3487 .await
3488 {
3489 Ok(_) => continue,
3490 Err(NodeError::BlobsNotFound(missing_blob_ids)) => missing_blob_ids,
3491 Err(err) => return Err(err.into()),
3492 };
3493 let missing_blobs = self
3496 .client
3497 .storage_client()
3498 .read_blobs(&missing_blob_ids)
3499 .await?
3500 .into_iter()
3501 .flatten()
3502 .map(|b| b.into_std())
3503 .collect();
3504 remote_node.upload_blobs(missing_blobs).await?;
3505 remote_node
3506 .handle_confirmed_certificate(certificate, CrossChainMessageDelivery::NonBlocking)
3507 .await?;
3508 }
3509
3510 Ok(())
3511 }
3512}
3513
3514#[cfg(with_testing)]
3515impl<Env: Environment> ChainClient<Env> {
3516 pub async fn process_notification_from(
3517 &self,
3518 notification: Notification,
3519 validator: (ValidatorPublicKey, &str),
3520 ) {
3521 let mut node_list = self
3522 .client
3523 .validator_node_provider()
3524 .make_nodes_from_list(vec![validator])
3525 .unwrap();
3526 let (public_key, node) = node_list.next().unwrap();
3527 let remote_node = RemoteNode { node, public_key };
3528 let local_node = self.client.local_node.clone();
3529 self.process_notification(remote_node, local_node, notification)
3530 .await
3531 .unwrap();
3532 }
3533}