1mod state;
5use std::{
6 collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
7 convert::Infallible,
8 iter,
9 sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14 future::{self, Either, FusedFuture, Future},
15 stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20 abi::Abi,
21 crypto::{signer, CryptoHash, Signer, ValidatorPublicKey},
22 data_types::{
23 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
24 ChainDescription, Epoch, MessagePolicy, Round, Timestamp,
25 },
26 ensure,
27 identifiers::{
28 Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
29 ModuleId, StreamId,
30 },
31 ownership::{ChainOwnership, TimeoutConfig},
32 time::{Duration, Instant},
33};
34#[cfg(not(target_arch = "wasm32"))]
35use linera_base::{data_types::Bytecode, vm::VmRuntime};
36use linera_chain::{
37 data_types::{
38 BlockProposal, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight, IncomingBundle,
39 ProposedBlock, Transaction,
40 },
41 manager::LockingBlock,
42 types::{
43 Block, ConfirmedBlock, ConfirmedBlockCertificate, Timeout, TimeoutCertificate,
44 ValidatedBlock,
45 },
46 ChainError, ChainExecutionContext,
47};
48use linera_execution::{
49 committee::Committee,
50 system::{
51 AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
52 REMOVED_EPOCH_STREAM_NAME,
53 },
54 ExecutionError, Operation, Query, QueryOutcome,
55};
56use linera_storage::{Clock as _, Storage as _};
57use linera_views::ViewError;
58use serde::Serialize;
59pub use state::State;
60use thiserror::Error;
61use tokio::sync::mpsc;
62use tokio_stream::wrappers::UnboundedReceiverStream;
63use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
64
65use super::{
66 received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
67 ListeningMode, PendingProposal, TimingType,
68};
69use crate::{
70 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
71 environment::Environment,
72 local_node::{LocalNodeClient, LocalNodeError},
73 node::{
74 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
75 ValidatorNodeProvider as _,
76 },
77 remote_node::RemoteNode,
78 updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
79 worker::{Notification, Reason, WorkerError},
80};
81
82#[derive(Debug, Clone)]
83pub struct Options {
84 pub max_pending_message_bundles: usize,
86 pub max_block_limit_errors: u32,
91 pub staging_bundles_time_budget: Option<Duration>,
94 pub message_policy: MessagePolicy,
96 pub priority_bundle_origins: HashSet<ChainId>,
98 pub cross_chain_message_delivery: CrossChainMessageDelivery,
100 pub quorum_grace_period: f64,
103 pub blob_download_timeout: Duration,
105 pub certificate_batch_download_timeout: Duration,
107 pub certificate_download_batch_size: u64,
110 pub certificate_upload_batch_size: u64,
113 pub sender_certificate_download_batch_size: usize,
116 pub max_joined_tasks: usize,
118 pub allow_fast_blocks: bool,
121 pub notification_circuit_breaker_initial_probe_interval: Duration,
125 pub notification_circuit_breaker_max_probe_interval: Duration,
128 pub max_event_stream_queries: usize,
131}
132
133struct CircuitBreakerState {
134 next_probe_at: Instant,
135 probe_interval: Duration,
136}
137
138#[cfg(with_testing)]
139impl Options {
140 pub fn test_default() -> Self {
141 use super::{
142 DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
143 DEFAULT_MAX_EVENT_STREAM_QUERIES, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
144 };
145 use crate::DEFAULT_QUORUM_GRACE_PERIOD;
146
147 Options {
148 max_pending_message_bundles: 10,
149 max_block_limit_errors: 3,
150 staging_bundles_time_budget: None,
151 message_policy: MessagePolicy::default(),
152 priority_bundle_origins: HashSet::new(),
153 cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
154 quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
155 blob_download_timeout: Duration::from_secs(1),
156 certificate_batch_download_timeout: Duration::from_secs(1),
157 certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
158 certificate_upload_batch_size: DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
159 sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
160 max_joined_tasks: 100,
161 allow_fast_blocks: false,
162 notification_circuit_breaker_initial_probe_interval: Duration::from_secs(300),
163 notification_circuit_breaker_max_probe_interval: Duration::from_secs(3600),
164 max_event_stream_queries: DEFAULT_MAX_EVENT_STREAM_QUERIES,
165 }
166 }
167}
168
169impl Options {
170 pub fn bundle_execution_policy(&self) -> BundleExecutionPolicy {
172 BundleExecutionPolicy {
173 on_failure: BundleFailurePolicy::AutoRetry {
174 max_failures: self.max_block_limit_errors,
175 never_reject_application_ids: Arc::new(
176 self.message_policy.never_reject_application_ids.clone(),
177 ),
178 },
179 time_budget: self.staging_bundles_time_budget,
180 }
181 }
182}
183
184#[derive(Debug)]
190pub struct ChainClient<Env: Environment> {
191 #[debug(skip)]
193 pub(crate) client: Arc<Client<Env>>,
194 chain_id: ChainId,
196 #[debug(skip)]
198 options: Options,
199 preferred_owner: Option<AccountOwner>,
202 initial_next_block_height: BlockHeight,
204 initial_block_hash: Option<CryptoHash>,
206 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
208 skipped_origins: Arc<papaya::HashSet<ChainId>>,
211}
212
213impl<Env: Environment> Clone for ChainClient<Env> {
214 fn clone(&self) -> Self {
215 Self {
216 client: self.client.clone(),
217 chain_id: self.chain_id,
218 options: self.options.clone(),
219 preferred_owner: self.preferred_owner,
220 initial_next_block_height: self.initial_next_block_height,
221 initial_block_hash: self.initial_block_hash,
222 timing_sender: self.timing_sender.clone(),
223 skipped_origins: self.skipped_origins.clone(),
224 }
225 }
226}
227
228#[derive(Debug, Error)]
230pub enum Error {
231 #[error("Local node operation failed: {0}")]
232 LocalNodeError(#[from] LocalNodeError),
233
234 #[error("Remote node operation failed: {0}")]
235 RemoteNodeError(#[from] NodeError),
236
237 #[error(transparent)]
238 ArithmeticError(#[from] ArithmeticError),
239
240 #[error("Missing certificates: {0:?}")]
241 ReadCertificatesError(Vec<CryptoHash>),
242
243 #[error("Missing confirmed block: {0:?}")]
244 MissingConfirmedBlock(CryptoHash),
245
246 #[error("JSON (de)serialization error: {0}")]
247 JsonError(#[from] serde_json::Error),
248
249 #[error("Chain operation failed: {0}")]
250 ChainError(#[from] ChainError),
251
252 #[error(transparent)]
253 CommunicationError(#[from] CommunicationError<NodeError>),
254
255 #[error("Internal error within chain client: {0}")]
256 InternalError(&'static str),
257
258 #[error(
259 "Cannot accept a certificate from an unknown committee in the future. \
260 Please synchronize the local view of the admin chain"
261 )]
262 CommitteeSynchronizationError,
263
264 #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
265 WalletSynchronizationError,
266
267 #[error("The state of the client is incompatible with the proposed block: {0}")]
268 BlockProposalError(&'static str),
269
270 #[error(
271 "Cannot accept a certificate from a committee that was retired. \
272 Try a newer certificate from the same origin"
273 )]
274 CommitteeDeprecationError,
275
276 #[error("Protocol error within chain client: {0}")]
277 ProtocolError(&'static str),
278
279 #[error("Signer doesn't have key to sign for chain {0}")]
280 CannotFindKeyForChain(ChainId),
281
282 #[error("client is not configured to propose on chain {0}")]
283 NoAccountKeyConfigured(ChainId),
284
285 #[error("The chain client isn't owner on chain {0}")]
286 NotAnOwner(ChainId),
287
288 #[error(transparent)]
289 ViewError(#[from] ViewError),
290
291 #[error(
292 "Failed to download certificates and update local node to the next height \
293 {target_next_block_height} of chain {chain_id}"
294 )]
295 CannotDownloadCertificates {
296 chain_id: ChainId,
297 target_next_block_height: BlockHeight,
298 },
299
300 #[error(transparent)]
301 BcsError(#[from] bcs::Error),
302
303 #[error(
304 "Unexpected quorum: validators voted for block hash {hash} in {round}, \
305 expected block hash {expected_hash} in {expected_round}"
306 )]
307 UnexpectedQuorum {
308 hash: CryptoHash,
309 round: Round,
310 expected_hash: CryptoHash,
311 expected_round: Round,
312 },
313
314 #[error("signer error: {0:?}")]
315 Signer(#[source] Box<dyn signer::Error>),
316
317 #[error("Cannot revoke the current epoch {0}")]
318 CannotRevokeCurrentEpoch(Epoch),
319
320 #[error("Epoch is already revoked")]
321 EpochAlreadyRevoked,
322
323 #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
324 CannotDownloadMissingSenderBlock {
325 chain_id: ChainId,
326 height: BlockHeight,
327 },
328
329 #[error(
330 "A different block was already committed at this height. \
331 The committed certificate hash is {0}"
332 )]
333 Conflict(CryptoHash),
334
335 #[error(
336 "Execution outcome mismatch: AutoRetry and committed execution produced \
337 different outcomes for the same block"
338 )]
339 ExecutionOutcomeMismatch,
340}
341
342impl From<Infallible> for Error {
343 fn from(infallible: Infallible) -> Self {
344 match infallible {}
345 }
346}
347
348impl Error {
349 pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
350 Self::Signer(Box::new(err))
351 }
352}
353
354impl<Env: Environment> ChainClient<Env> {
355 pub fn new(
356 client: Arc<Client<Env>>,
357 chain_id: ChainId,
358 options: Options,
359 initial_block_hash: Option<CryptoHash>,
360 initial_next_block_height: BlockHeight,
361 preferred_owner: Option<AccountOwner>,
362 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
363 ) -> Self {
364 ChainClient {
365 client,
366 chain_id,
367 options,
368 preferred_owner,
369 initial_block_hash,
370 initial_next_block_height,
371 timing_sender,
372 skipped_origins: Arc::new(papaya::HashSet::new()),
373 }
374 }
375
376 pub fn is_follow_only(&self) -> bool {
378 self.client.is_chain_follow_only(self.chain_id)
379 }
380
381 #[instrument(level = "trace", skip(self))]
385 fn proposal_mutex(&self) -> Arc<tokio::sync::Mutex<Option<PendingProposal>>> {
386 self.client
387 .chains
388 .pin()
389 .get(&self.chain_id)
390 .expect("Chain client constructed for invalid chain")
391 .proposal_mutex()
392 }
393
394 #[instrument(level = "trace", skip(self))]
396 pub async fn pending_proposal(&self) -> Option<PendingProposal> {
397 self.proposal_mutex().lock().await.clone()
398 }
399
400 #[instrument(level = "trace", skip(self))]
402 pub fn signer(&self) -> &impl Signer {
403 self.client.signer()
404 }
405
406 pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, Error> {
408 self.client.has_key_for(owner).await
409 }
410
411 #[instrument(level = "trace", skip(self))]
413 pub fn options_mut(&mut self) -> &mut Options {
414 &mut self.options
415 }
416
417 #[instrument(level = "trace", skip(self))]
419 pub fn options(&self) -> &Options {
420 &self.options
421 }
422
423 #[instrument(level = "trace", skip(self))]
425 pub fn chain_id(&self) -> ChainId {
426 self.chain_id
427 }
428
429 pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
431 self.timing_sender.clone()
432 }
433
434 #[instrument(level = "trace", skip(self))]
436 pub fn admin_chain_id(&self) -> ChainId {
437 self.client.admin_chain_id
438 }
439
440 #[instrument(level = "trace", skip(self))]
442 pub fn preferred_owner(&self) -> Option<AccountOwner> {
443 self.preferred_owner
444 }
445
446 #[instrument(level = "trace", skip(self))]
448 pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
449 self.preferred_owner = Some(preferred_owner);
450 }
451
452 #[instrument(level = "trace")]
454 pub async fn chain_state_view(
455 &self,
456 ) -> Result<crate::worker::ChainStateViewReadGuard<Env::Storage>, LocalNodeError> {
457 self.client.local_node.chain_state_view(self.chain_id).await
458 }
459
460 #[instrument(level = "trace", skip(self))]
462 pub async fn event_stream_publishers(
463 &self,
464 ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
465 let subscriptions = self
466 .client
467 .local_node
468 .get_event_subscriptions(self.chain_id)
469 .await?;
470 let mut publishers = subscriptions.into_iter().fold(
471 BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
472 |mut map, ((chain_id, stream_id), _)| {
473 map.entry(chain_id).or_default().insert(stream_id);
474 map
475 },
476 );
477 if self.chain_id != self.client.admin_chain_id {
478 publishers.insert(
479 self.client.admin_chain_id,
480 vec![
481 StreamId::system(EPOCH_STREAM_NAME),
482 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
483 ]
484 .into_iter()
485 .collect(),
486 );
487 }
488 Ok(publishers)
489 }
490
491 #[instrument(level = "trace")]
493 pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
494 self.subscribe_to(self.chain_id)
495 }
496
497 #[instrument(level = "trace")]
499 pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
500 Ok(Box::pin(UnboundedReceiverStream::new(
501 self.client.notifier.subscribe(vec![chain_id]),
502 )))
503 }
504
505 #[instrument(level = "trace")]
507 pub fn storage_client(&self) -> &Env::Storage {
508 self.client.storage_client()
509 }
510
511 #[instrument(level = "trace")]
513 pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
514 let query = ChainInfoQuery::new(self.chain_id);
515 let response = self
516 .client
517 .local_node
518 .handle_chain_info_query(query)
519 .await?;
520 Ok(response.info)
521 }
522
523 #[instrument(level = "trace")]
525 pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
526 let query = ChainInfoQuery::new(self.chain_id)
527 .with_manager_values()
528 .with_committees();
529 let response = self
530 .client
531 .local_node
532 .handle_chain_info_query(query)
533 .await?;
534 Ok(response.info)
535 }
536
537 pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
539 self.client.get_chain_description(self.chain_id).await
540 }
541
542 #[instrument(level = "trace")]
545 async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
546 if self.options.message_policy.is_ignore() {
547 return Ok(Vec::new());
549 }
550
551 let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
552 let info = self
553 .client
554 .local_node
555 .handle_chain_info_query(query)
556 .await?
557 .info;
558 if self.preferred_owner.is_some_and(|owner| {
559 info.manager
560 .ownership
561 .is_super_owner_no_regular_owners(&owner)
562 }) {
563 ensure!(
565 info.next_block_height >= self.initial_next_block_height,
566 Error::WalletSynchronizationError
567 );
568 }
569
570 let skipped = self.skipped_origins.pin();
571 let mut bundles = info
572 .requested_pending_message_bundles
573 .into_iter()
574 .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
575 .filter(|bundle| !skipped.contains(&bundle.origin))
576 .collect::<Vec<_>>();
577 let priority_origins = &self.options.priority_bundle_origins;
578 bundles.sort_by(|a, b| {
579 let a_priority = priority_origins.contains(&a.origin);
580 let b_priority = priority_origins.contains(&b.origin);
581 b_priority
582 .cmp(&a_priority)
583 .then(a.bundle.timestamp.cmp(&b.bundle.timestamp))
584 });
585 bundles.truncate(self.options.max_pending_message_bundles);
586 Ok(bundles)
587 }
588
589 #[instrument(level = "trace")]
593 async fn collect_stream_updates(&self) -> Result<Option<Operation>, Error> {
594 let subscription_map = self
596 .client
597 .local_node
598 .get_event_subscriptions(self.chain_id)
599 .await?;
600 let futures = subscription_map
602 .into_iter()
603 .filter(|((chain_id, _), _)| {
604 self.options
605 .message_policy
606 .restrict_chain_ids_to
607 .as_ref()
608 .is_none_or(|chain_set| chain_set.contains(chain_id))
609 })
610 .filter(|((_, stream_id), _)| {
611 self.options
612 .message_policy
613 .process_events_from_application_ids
614 .as_ref()
615 .is_none_or(|app_set| app_set.contains(&stream_id.application_id))
616 })
617 .map(|((chain_id, stream_id), subscriptions)| {
618 let client = self.client.clone();
619 async move {
620 let next_expected_index = client
621 .local_node
622 .get_next_expected_event(chain_id, stream_id.clone())
623 .await?;
624 if let Some(next_index) = next_expected_index
625 .filter(|next_index| *next_index > subscriptions.next_index)
626 {
627 Ok(Some((chain_id, stream_id, next_index)))
628 } else {
629 Ok::<_, Error>(None)
630 }
631 }
632 });
633 let updates = futures::stream::iter(futures)
634 .buffer_unordered(self.options.max_joined_tasks)
635 .try_collect::<Vec<_>>()
636 .await?
637 .into_iter()
638 .flatten()
639 .collect::<Vec<_>>();
640 if updates.is_empty() {
641 return Ok(None);
642 }
643 Ok(Some(SystemOperation::UpdateStreams(updates).into()))
644 }
645
646 #[instrument(level = "trace")]
647 async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
648 self.client.chain_info_with_committees(self.chain_id).await
649 }
650
651 #[instrument(level = "trace")]
653 async fn epoch_and_committees(
654 &self,
655 ) -> Result<(Epoch, BTreeMap<Epoch, CryptoHash>), LocalNodeError> {
656 let info = self.chain_info_with_committees().await?;
657 let epoch = info.epoch;
658 let committees = info
659 .requested_committees
660 .ok_or(LocalNodeError::InvalidChainInfoResponse)?;
661 Ok((epoch, committees))
662 }
663
664 #[instrument(level = "trace")]
666 pub async fn local_committee(&self) -> Result<Arc<Committee>, Error> {
667 let info = match self.chain_info_with_committees().await {
668 Ok(info) => info,
669 Err(LocalNodeError::BlobsNotFound(_)) => {
670 self.synchronize_chain_state(self.chain_id).await?;
671 self.chain_info_with_committees().await?
672 }
673 Err(LocalNodeError::EventsNotFound(event_ids))
674 if event_ids
675 .iter()
676 .all(|event_id| event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)) =>
677 {
678 self.synchronize_chain_state(self.client.admin_chain_id)
681 .await?;
682 self.chain_info_with_committees().await?
683 }
684 Err(err) => return Err(err.into()),
685 };
686 let hash = info
687 .requested_committees
688 .as_ref()
689 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
690 .get(&info.epoch)
691 .copied()
692 .ok_or(LocalNodeError::InactiveChain(self.chain_id))?;
693 Ok(self
694 .storage_client()
695 .get_or_load_committee_by_hash(hash)
696 .await
697 .map_err(LocalNodeError::from)?)
698 }
699
700 #[instrument(level = "trace")]
702 pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
703 self.client.admin_committee().await
704 }
705
706 #[instrument(level = "trace")]
710 pub async fn identity(&self) -> Result<AccountOwner, Error> {
711 let Some(preferred_owner) = self.preferred_owner else {
712 return Err(Error::NoAccountKeyConfigured(self.chain_id));
713 };
714 let manager = self.chain_info().await?.manager;
715 ensure!(
716 manager.ownership.is_active(),
717 LocalNodeError::InactiveChain(self.chain_id)
718 );
719 let fallback_owners = if manager.ownership.has_fallback() {
720 self.local_committee()
721 .await?
722 .account_keys_and_weights()
723 .map(|(key, _)| AccountOwner::from(key))
724 .collect()
725 } else {
726 BTreeSet::new()
727 };
728
729 let is_owner = manager
730 .ownership
731 .can_propose_in_multi_leader_round(&preferred_owner)
732 || fallback_owners.contains(&preferred_owner);
733
734 if !is_owner {
735 warn!(
736 chain_id = %self.chain_id,
737 ownership = ?manager.ownership,
738 ?fallback_owners,
739 ?preferred_owner,
740 "The preferred owner is not configured as an owner of this chain",
741 );
742 return Err(Error::NotAnOwner(self.chain_id));
743 }
744
745 let has_signer = self.has_key_for(&preferred_owner).await?;
746
747 if !has_signer {
748 warn!(%self.chain_id, ?preferred_owner,
749 "Chain is one of the owners but its Signer instance doesn't contain the key",
750 );
751 return Err(Error::CannotFindKeyForChain(self.chain_id));
752 }
753
754 Ok(preferred_owner)
755 }
756
757 #[instrument(level = "trace")]
765 pub async fn prepare_for_owner(&self, owner: AccountOwner) -> Result<Box<ChainInfo>, Error> {
766 ensure!(
767 self.has_key_for(&owner).await?,
768 Error::CannotFindKeyForChain(self.chain_id)
769 );
770 self.client
772 .get_chain_description_blob(self.chain_id)
773 .await?;
774
775 let info = self.chain_info().await?;
777
778 ensure!(
780 info.manager
781 .ownership
782 .can_propose_in_multi_leader_round(&owner),
783 Error::NotAnOwner(self.chain_id)
784 );
785
786 Ok(info)
787 }
788
789 #[instrument(level = "trace")]
792 pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
793 #[cfg(with_metrics)]
794 let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
795
796 let mut info = self.synchronize_to_known_height().await?;
797
798 if self.preferred_owner.is_none_or(|owner| {
799 !info
800 .manager
801 .ownership
802 .is_super_owner_no_regular_owners(&owner)
803 }) {
804 info = self.client.synchronize_chain_state(self.chain_id).await?;
808 }
809
810 if info.epoch > self.client.admin_committees().await?.0 {
811 self.client
812 .synchronize_chain_state(self.client.admin_chain_id)
813 .await?;
814 }
815
816 Ok(info)
817 }
818
819 async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
824 let info = self
825 .client
826 .download_certificates(self.chain_id, self.initial_next_block_height)
827 .await?;
828 if info.next_block_height == self.initial_next_block_height {
829 ensure!(
831 self.initial_block_hash == info.block_hash,
832 Error::InternalError("Invalid chain of blocks in local node")
833 );
834 }
835 Ok(info)
836 }
837
838 #[instrument(level = "trace", skip(old_committee, latest_certificate))]
840 pub async fn update_validators(
841 &self,
842 old_committee: Option<&Committee>,
843 latest_certificate: Option<Arc<ConfirmedBlockCertificate>>,
844 ) -> Result<(), Error> {
845 let update_validators_start = linera_base::time::Instant::now();
846 if let Some(old_committee) = old_committee {
848 self.communicate_chain_updates(old_committee, latest_certificate.clone())
849 .await?
850 };
851 if let Ok(new_committee) = self.local_committee().await {
852 if old_committee.is_none_or(|old| *new_committee != *old) {
853 self.communicate_chain_updates(&new_committee, latest_certificate)
856 .await?;
857 }
858 }
859 self.send_timing(update_validators_start, TimingType::UpdateValidators);
860 Ok(())
861 }
862
863 #[instrument(level = "trace", skip(committee))]
865 pub async fn communicate_chain_updates(
866 &self,
867 committee: &Committee,
868 latest_certificate: Option<Arc<ConfirmedBlockCertificate>>,
869 ) -> Result<(), Error> {
870 let delivery = self.options.cross_chain_message_delivery;
871 let height = self.chain_info().await?.next_block_height;
872 self.client
873 .communicate_chain_updates(
874 committee,
875 self.chain_id,
876 height,
877 delivery,
878 latest_certificate,
879 )
880 .await
881 }
882
883 async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
887 let subscriptions = self
888 .client
889 .local_node
890 .get_event_subscriptions(self.chain_id)
891 .await?;
892 let mut streams_by_chain = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
894 for ((chain_id, stream_id), _) in &subscriptions {
895 if *chain_id != self.chain_id {
896 streams_by_chain
897 .entry(*chain_id)
898 .or_default()
899 .insert(stream_id.clone());
900 }
901 }
902 let admin_chain_id = self.client.admin_chain_id;
904 if admin_chain_id != self.chain_id {
905 self.client.synchronize_chain_state(admin_chain_id).await?;
906 }
907 let (_, committee) = self.admin_committee().await?;
909 let nodes = self.client.make_nodes(&committee)?;
910 let tasks = streams_by_chain
911 .into_iter()
912 .filter(|(chain_id, _)| *chain_id != admin_chain_id)
913 .map(|(chain_id, stream_ids)| {
914 self.sync_publisher_chain_events(chain_id, stream_ids, &nodes, &committee)
915 })
916 .collect::<Vec<_>>();
917 stream::iter(tasks)
918 .buffer_unordered(self.options.max_joined_tasks)
919 .collect::<Vec<_>>()
920 .await
921 .into_iter()
922 .collect::<Result<Vec<_>, _>>()?;
923 Ok(())
924 }
925
926 async fn sync_publisher_chain_events(
933 &self,
934 publisher_chain_id: ChainId,
935 stream_ids: BTreeSet<StreamId>,
936 nodes: &[RemoteNode<Env::ValidatorNode>],
937 committee: &Committee,
938 ) -> Result<(), Error> {
939 let stream_ids_ref = &stream_ids;
940 communicate_with_quorum(
941 nodes,
942 committee,
943 |_: &()| (),
944 |remote_node| async move {
945 self.client
946 .sync_events_from_node(publisher_chain_id, stream_ids_ref, &remote_node)
947 .await
948 },
949 self.options.quorum_grace_period,
950 )
951 .await?;
952 Ok(())
953 }
954
955 #[instrument(level = "debug", skip(self), fields(chain_id = %self.chain_id))]
964 pub async fn find_received_certificates(&self) -> Result<(), Error> {
965 debug!("starting find_received_certificates");
966 #[cfg(with_metrics)]
967 let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
968 let chain_id = self.chain_id;
970 let (_, committee) = self.admin_committee().await?;
971 let nodes = self.client.make_nodes(&committee)?;
972
973 let trackers = self
974 .client
975 .local_node
976 .get_received_certificate_trackers(chain_id)
977 .await?;
978
979 trace!("find_received_certificates: read trackers");
980
981 let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
982 let result = communicate_with_quorum(
984 &nodes,
985 &committee,
986 |_| (),
987 |remote_node| {
988 let client = &self.client;
989 let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
990 let received_log_batches = Arc::clone(&received_log_batches);
991 Box::pin(async move {
992 let batch = client
993 .get_received_log_from_validator(chain_id, &remote_node, tracker)
994 .await?;
995 let mut batches = received_log_batches.lock().unwrap();
996 batches.push((remote_node.public_key, batch));
997 Ok(())
998 })
999 },
1000 self.options.quorum_grace_period,
1001 )
1002 .await;
1003
1004 if let Err(error) = result {
1005 error!(
1006 %error,
1007 "Failed to synchronize received_logs from at least a quorum of validators",
1008 );
1009 }
1010
1011 let received_logs: Vec<_> = {
1012 let mut received_log_batches = received_log_batches.lock().unwrap();
1013 std::mem::take(received_log_batches.as_mut())
1014 };
1015
1016 debug!(
1017 received_logs_len = %received_logs.len(),
1018 received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
1019 "collected received logs"
1020 );
1021
1022 let (received_logs, mut validator_trackers) = {
1023 (
1024 ReceivedLogs::from_received_result(received_logs.clone()),
1025 ValidatorTrackers::new(received_logs, &trackers),
1026 )
1027 };
1028
1029 debug!(
1030 num_chains = %received_logs.num_chains(),
1031 num_certs = %received_logs.num_certs(),
1032 "find_received_certificates: total number of chains and certificates to sync",
1033 );
1034
1035 let max_blocks_per_chain =
1036 self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
1037 for received_log in received_logs.into_batches(
1038 self.options.sender_certificate_download_batch_size,
1039 max_blocks_per_chain,
1040 ) {
1041 validator_trackers = self
1042 .receive_sender_certificates(received_log, validator_trackers, &nodes)
1043 .await?;
1044
1045 self.update_received_certificate_trackers(&validator_trackers)
1046 .await;
1047 }
1048
1049 info!("find_received_certificates finished");
1050
1051 Ok(())
1052 }
1053
1054 async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
1055 let updated_trackers = trackers.to_map();
1056 trace!(?updated_trackers, "updated tracker values");
1057
1058 if let Err(error) = self
1060 .client
1061 .local_node
1062 .update_received_certificate_trackers(self.chain_id, updated_trackers)
1063 .await
1064 {
1065 error!(
1066 chain_id = %self.chain_id,
1067 %error,
1068 "Failed to update the certificate trackers",
1069 );
1070 }
1071 }
1072
1073 async fn receive_sender_certificates(
1076 &self,
1077 mut received_logs: ReceivedLogs,
1078 mut validator_trackers: ValidatorTrackers,
1079 nodes: &[RemoteNode<Env::ValidatorNode>],
1080 ) -> Result<ValidatorTrackers, Error> {
1081 debug!(
1082 num_chains = %received_logs.num_chains(),
1083 num_certs = %received_logs.num_certs(),
1084 "receive_sender_certificates: number of chains and certificates to sync",
1085 );
1086
1087 let local_next_heights = self
1089 .client
1090 .local_node
1091 .next_outbox_heights(received_logs.chains(), self.chain_id)
1092 .await?;
1093
1094 validator_trackers.filter_out_already_known(&mut received_logs, &local_next_heights);
1095
1096 debug!(
1097 remaining_total_certificates = %received_logs.num_certs(),
1098 "receive_sender_certificates: computed remote_heights"
1099 );
1100
1101 let mut other_sender_chains = Vec::new();
1102 let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
1103
1104 let cert_futures = received_logs.heights_per_chain().into_iter().filter_map({
1105 let received_logs = &received_logs;
1106 let other_sender_chains = &mut other_sender_chains;
1107
1108 move |(sender_chain_id, remote_heights)| {
1109 if remote_heights.is_empty() {
1110 other_sender_chains.push(sender_chain_id);
1114 return None;
1115 };
1116 let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
1117 let sender = sender.clone();
1118 let client = self.client.clone();
1119 let nodes = nodes.to_vec();
1120 Some(async move {
1121 client
1122 .download_and_process_sender_chain(
1123 sender_chain_id,
1124 &nodes,
1125 received_logs,
1126 remote_heights,
1127 sender,
1128 )
1129 .await
1130 })
1131 }
1132 });
1133
1134 future::join(
1135 stream::iter(cert_futures)
1136 .buffer_unordered(self.options.max_joined_tasks)
1137 .collect::<()>(),
1138 async {
1139 while let Some(chain_and_height) = receiver.recv().await {
1140 validator_trackers.downloaded_cert(chain_and_height);
1141 }
1142 },
1143 )
1144 .await;
1145
1146 debug!(
1147 num_other_chains = %other_sender_chains.len(),
1148 "receive_sender_certificates: processing certificates finished"
1149 );
1150
1151 self.retry_pending_cross_chain_requests_from_sender_chains(nodes, other_sender_chains)
1155 .await;
1156
1157 debug!("receive_sender_certificates: finished processing other_sender_chains");
1158
1159 Ok(validator_trackers)
1160 }
1161
1162 async fn retry_pending_cross_chain_requests_from_sender_chains(
1166 &self,
1167 nodes: &[RemoteNode<Env::ValidatorNode>],
1168 other_sender_chains: Vec<ChainId>,
1169 ) {
1170 let stream = other_sender_chains
1171 .into_iter()
1172 .map(|chain_id| async move {
1173 if let Err(error) = match self
1174 .client
1175 .retry_pending_cross_chain_requests(chain_id)
1176 .await
1177 {
1178 Ok(()) => Ok(()),
1179 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1180 if let Err(error) = self
1181 .client
1182 .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1183 .await
1184 {
1185 error!(
1186 ?blob_ids,
1187 %error,
1188 "Error while attempting to download blobs during retrying outgoing \
1189 messages"
1190 );
1191 }
1192 self.client
1193 .retry_pending_cross_chain_requests(chain_id)
1194 .await
1195 }
1196 err => err,
1197 } {
1198 error!(
1199 %chain_id,
1200 %error,
1201 "Failed to retry outgoing messages from chain"
1202 );
1203 }
1204 })
1205 .collect::<FuturesUnordered<_>>();
1206 stream.for_each(future::ready).await;
1207 }
1208
1209 #[instrument(level = "trace")]
1211 pub async fn transfer(
1212 &self,
1213 owner: AccountOwner,
1214 amount: Amount,
1215 recipient: Account,
1216 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1217 self.execute_operation(SystemOperation::Transfer {
1219 owner,
1220 recipient,
1221 amount,
1222 })
1223 .await
1224 }
1225
1226 #[instrument(level = "trace")]
1229 pub async fn read_data_blob(
1230 &self,
1231 hash: CryptoHash,
1232 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1233 let blob_id = BlobId {
1234 hash,
1235 blob_type: BlobType::Data,
1236 };
1237 self.execute_operation(SystemOperation::VerifyBlob { blob_id })
1238 .await
1239 }
1240
1241 #[instrument(level = "trace")]
1243 pub async fn claim(
1244 &self,
1245 owner: AccountOwner,
1246 target_id: ChainId,
1247 recipient: Account,
1248 amount: Amount,
1249 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1250 self.execute_operation(SystemOperation::Claim {
1251 owner,
1252 target_id,
1253 recipient,
1254 amount,
1255 })
1256 .await
1257 }
1258
1259 #[instrument(level = "trace")]
1262 pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1263 let chain_id = self.chain_id;
1264 let info = self.chain_info().await?;
1265 let committee = self.local_committee().await?;
1266 let height = info.next_block_height;
1267 let round = info.manager.current_round;
1268 let action = CommunicateAction::RequestTimeout {
1269 height,
1270 round,
1271 chain_id,
1272 };
1273 let value = Timeout::new(chain_id, height, info.epoch);
1274 let certificate = Box::new(
1275 self.client
1276 .communicate_chain_action(&committee, action, value)
1277 .await?,
1278 );
1279 self.client.handle_certificate(*certificate.clone()).await?;
1280 self.client
1282 .communicate_chain_updates(
1283 &committee,
1284 chain_id,
1285 height,
1286 CrossChainMessageDelivery::NonBlocking,
1287 None,
1288 )
1289 .await?;
1290 Ok(*certificate)
1291 }
1292
1293 #[instrument(level = "trace", skip_all)]
1295 pub async fn synchronize_chain_state(
1296 &self,
1297 chain_id: ChainId,
1298 ) -> Result<Box<ChainInfo>, Error> {
1299 self.client.synchronize_chain_state(chain_id).await
1300 }
1301
1302 #[instrument(level = "trace", skip_all)]
1305 pub async fn synchronize_chain_state_from_committee(
1306 &self,
1307 committee: Arc<Committee>,
1308 ) -> Result<Box<ChainInfo>, Error> {
1309 self.client
1310 .synchronize_chain_from_committee(self.chain_id, committee)
1311 .await
1312 }
1313
1314 #[instrument(level = "trace", skip(operations, blobs))]
1316 pub async fn execute_operations(
1317 &self,
1318 operations: Vec<Operation>,
1319 blobs: Vec<Blob>,
1320 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1321 let timing_start = linera_base::time::Instant::now();
1322
1323 let result = loop {
1324 let execute_block_start = linera_base::time::Instant::now();
1325 match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1327 Ok(ClientOutcome::Committed(certificate)) => {
1328 self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1329 break Ok(ClientOutcome::Committed(certificate));
1330 }
1331 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
1332 break Ok(ClientOutcome::WaitForTimeout(timeout));
1333 }
1334 Ok(ClientOutcome::Conflict(certificate)) => {
1335 info!(
1336 height = %certificate.block().header.height,
1337 "Another block was committed."
1338 );
1339 break Ok(ClientOutcome::Conflict(certificate));
1340 }
1341 Err(Error::CommunicationError(CommunicationError::Trusted(
1342 NodeError::UnexpectedBlockHeight {
1343 expected_block_height,
1344 found_block_height,
1345 },
1346 ))) if expected_block_height > found_block_height => {
1347 tracing::info!(
1348 chain_id = %self.chain_id,
1349 "Local state is outdated; synchronizing chain"
1350 );
1351 self.synchronize_chain_state(self.chain_id).await?;
1352 }
1353 Err(err) => return Err(err),
1354 };
1355 };
1356
1357 self.send_timing(timing_start, TimingType::ExecuteOperations);
1358
1359 result
1360 }
1361
1362 pub async fn execute_operation(
1364 &self,
1365 operation: impl Into<Operation>,
1366 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1367 self.execute_operations(vec![operation.into()], vec![])
1368 .await
1369 }
1370
1371 #[instrument(level = "trace", skip(operations, blobs))]
1375 async fn execute_block(
1376 &self,
1377 operations: Vec<Operation>,
1378 blobs: Vec<Blob>,
1379 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1380 #[cfg(with_metrics)]
1381 let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1382
1383 let mutex = self.proposal_mutex();
1384 let lock_start = linera_base::time::Instant::now();
1385 let mut proposal_guard = mutex.lock_owned().await;
1386 tracing::debug!(
1387 chain_id = %self.chain_id,
1388 lock_wait_ms = lock_start.elapsed().as_millis(),
1389 "acquired proposal_mutex in execute_block"
1390 );
1391 match self
1397 .process_pending_block_without_prepare(&mut proposal_guard)
1398 .await?
1399 {
1400 ClientOutcome::Committed(Some(certificate)) => {
1401 return Ok(ClientOutcome::Conflict(Box::new(certificate)))
1402 }
1403 ClientOutcome::WaitForTimeout(timeout) => {
1404 return Ok(ClientOutcome::WaitForTimeout(timeout))
1405 }
1406 ClientOutcome::Conflict(certificate) => {
1407 return Ok(ClientOutcome::Conflict(certificate))
1408 }
1409 ClientOutcome::Committed(None) => {}
1410 }
1411
1412 let transactions = self.prepend_epochs_messages_and_events(operations).await?;
1416
1417 if transactions.is_empty() {
1418 return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1419 WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1420 )));
1421 }
1422
1423 let block = self
1424 .new_pending_block(transactions, blobs, &mut proposal_guard)
1425 .await?;
1426
1427 match self
1428 .process_pending_block_without_prepare(&mut proposal_guard)
1429 .await?
1430 {
1431 ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
1432 Ok(ClientOutcome::Committed(certificate))
1433 }
1434 ClientOutcome::Committed(Some(certificate)) => {
1435 Ok(ClientOutcome::Conflict(Box::new(certificate)))
1436 }
1437 ClientOutcome::Committed(None) => {
1439 Err(Error::BlockProposalError("Unexpected block proposal error"))
1440 }
1441 ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
1442 ClientOutcome::Conflict(certificate) => Ok(ClientOutcome::Conflict(certificate)),
1443 }
1444 }
1445
1446 #[instrument(level = "trace", skip(operations))]
1452 async fn prepend_epochs_messages_and_events(
1453 &self,
1454 operations: Vec<Operation>,
1455 ) -> Result<Vec<Transaction>, Error> {
1456 let incoming_bundles = self.pending_message_bundles().await?;
1457 let stream_updates = self.collect_stream_updates().await?;
1458 Ok(self
1459 .collect_epoch_changes()
1460 .await?
1461 .into_iter()
1462 .map(Transaction::ExecuteOperation)
1463 .chain(
1464 incoming_bundles
1465 .into_iter()
1466 .map(Transaction::ReceiveMessages),
1467 )
1468 .chain(
1469 stream_updates
1470 .into_iter()
1471 .map(Transaction::ExecuteOperation),
1472 )
1473 .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1474 .collect::<Vec<_>>())
1475 }
1476
1477 #[instrument(level = "trace", skip(transactions, blobs, proposal_guard))]
1482 async fn new_pending_block(
1483 &self,
1484 transactions: Vec<Transaction>,
1485 blobs: Vec<Blob>,
1486 proposal_guard: &mut Option<PendingProposal>,
1487 ) -> Result<Block, Error> {
1488 let identity = self.identity().await?;
1489
1490 ensure!(
1491 proposal_guard.is_none(),
1492 Error::BlockProposalError(
1493 "Client state already has a pending block; \
1494 use the `linera retry-pending-block` command to commit that first"
1495 )
1496 );
1497 let info = self.chain_info_with_committees().await?;
1498 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1499 let proposed_block = ProposedBlock {
1500 epoch: info.epoch,
1501 chain_id: self.chain_id,
1502 transactions,
1503 previous_block_hash: info.block_hash,
1504 height: info.next_block_height,
1505 authenticated_owner: Some(identity),
1506 timestamp,
1507 };
1508
1509 let round = self.round_for_oracle(&info, &identity).await?;
1510 let (block, _, never_reject_origins) = self
1513 .client
1514 .stage_block_execution(
1515 proposed_block,
1516 round,
1517 blobs.clone(),
1518 self.options.bundle_execution_policy(),
1519 )
1520 .await?;
1521 if !never_reject_origins.is_empty() {
1524 let skipped = self.skipped_origins.pin();
1525 for origin in never_reject_origins {
1526 skipped.insert(origin);
1527 }
1528 }
1529 let (proposed_block, auto_retry_outcome) = block.clone().into_proposal();
1530 *proposal_guard = Some(PendingProposal {
1531 block: proposed_block,
1532 blobs,
1533 auto_retry_outcome: Some(auto_retry_outcome),
1534 round: None,
1535 });
1536 Ok(block)
1537 }
1538
1539 #[instrument(level = "trace", skip(transactions))]
1544 fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1545 let local_time = self.storage_client().clock().current_time();
1546 transactions
1547 .iter()
1548 .filter_map(Transaction::incoming_bundle)
1549 .map(|msg| msg.bundle.timestamp)
1550 .max()
1551 .map_or(local_time, |timestamp| timestamp.max(local_time))
1552 .max(block_time)
1553 }
1554
1555 #[instrument(level = "trace", skip(query))]
1557 pub async fn query_application(
1558 &self,
1559 query: Query,
1560 block_hash: Option<CryptoHash>,
1561 ) -> Result<(QueryOutcome, BlockHeight), Error> {
1562 loop {
1563 let result = self
1564 .client
1565 .local_node
1566 .query_application(self.chain_id, query.clone(), block_hash)
1567 .await;
1568 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1569 let validators = self.client.validator_nodes().await?;
1570 self.client
1571 .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1572 .await?;
1573 continue; }
1575 return Ok(result?);
1576 }
1577 }
1578
1579 #[cfg(with_testing)]
1581 #[instrument(level = "trace", skip(query))]
1582 pub async fn query_system_application(
1583 &self,
1584 query: linera_execution::SystemQuery,
1585 ) -> Result<QueryOutcome<linera_execution::SystemResponse>, Error> {
1586 let (
1587 QueryOutcome {
1588 response,
1589 operations,
1590 },
1591 _,
1592 ) = self.query_application(Query::System(query), None).await?;
1593 match response {
1594 linera_execution::QueryResponse::System(response) => Ok(QueryOutcome {
1595 response,
1596 operations,
1597 }),
1598 _ => Err(Error::InternalError("Unexpected response for system query")),
1599 }
1600 }
1601
1602 #[instrument(level = "trace", skip(application_id, query))]
1604 #[cfg(with_testing)]
1605 pub async fn query_user_application<A: Abi>(
1606 &self,
1607 application_id: ApplicationId<A>,
1608 query: &A::Query,
1609 ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1610 let query = Query::user(application_id, query)?;
1611 let (
1612 QueryOutcome {
1613 response,
1614 operations,
1615 },
1616 _,
1617 ) = self.query_application(query, None).await?;
1618 match response {
1619 linera_execution::QueryResponse::User(response_bytes) => {
1620 let response = serde_json::from_slice(&response_bytes)?;
1621 Ok(QueryOutcome {
1622 response,
1623 operations,
1624 })
1625 }
1626 _ => Err(Error::InternalError("Unexpected response for user query")),
1627 }
1628 }
1629
1630 #[instrument(level = "trace")]
1637 pub async fn query_balance(&self) -> Result<Amount, Error> {
1638 let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
1639 Ok(balance)
1640 }
1641
1642 #[instrument(level = "trace", skip(owner))]
1649 pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1650 if owner.is_chain() {
1651 self.query_balance().await
1652 } else {
1653 Ok(self
1654 .query_balances_with_owner(owner)
1655 .await?
1656 .1
1657 .unwrap_or(Amount::ZERO))
1658 }
1659 }
1660
1661 #[instrument(level = "trace", skip(owner))]
1668 pub(crate) async fn query_balances_with_owner(
1669 &self,
1670 owner: AccountOwner,
1671 ) -> Result<(Amount, Option<Amount>), Error> {
1672 let incoming_bundles = self.pending_message_bundles().await?;
1673 if incoming_bundles.is_empty() {
1676 let chain_balance = self.local_balance().await?;
1677 let owner_balance = self.local_owner_balance(owner).await?;
1678 return Ok((chain_balance, Some(owner_balance)));
1679 }
1680 let info = self.chain_info().await?;
1681 let transactions = incoming_bundles
1682 .into_iter()
1683 .map(Transaction::ReceiveMessages)
1684 .collect::<Vec<_>>();
1685 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1686 let block = ProposedBlock {
1687 epoch: info.epoch,
1688 chain_id: self.chain_id,
1689 transactions,
1690 previous_block_hash: info.block_hash,
1691 height: info.next_block_height,
1692 authenticated_owner: if owner == AccountOwner::CHAIN {
1693 None
1694 } else {
1695 Some(owner)
1696 },
1697 timestamp,
1698 };
1699 match self
1700 .client
1701 .stage_block_execution(
1702 block,
1703 None,
1704 Vec::new(),
1705 self.options.bundle_execution_policy(),
1706 )
1707 .await
1708 {
1709 Ok((_, response, _)) => Ok((
1710 response.info.chain_balance,
1711 response.info.requested_owner_balance,
1712 )),
1713 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1714 error,
1715 )))) if matches!(
1716 &*error,
1717 ChainError::ExecutionError(
1718 execution_error,
1719 ChainExecutionContext::Block
1720 ) if matches!(
1721 **execution_error,
1722 ExecutionError::FeesExceedFunding { .. }
1723 )
1724 ) =>
1725 {
1726 Ok((Amount::ZERO, Some(Amount::ZERO)))
1728 }
1729 Err(error) => Err(error),
1730 }
1731 }
1732
1733 #[instrument(level = "trace")]
1737 pub async fn local_balance(&self) -> Result<Amount, Error> {
1738 let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1739 Ok(balance)
1740 }
1741
1742 #[instrument(level = "trace", skip(owner))]
1746 pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1747 if owner.is_chain() {
1748 self.local_balance().await
1749 } else {
1750 Ok(self
1751 .local_balances_with_owner(owner)
1752 .await?
1753 .1
1754 .unwrap_or(Amount::ZERO))
1755 }
1756 }
1757
1758 #[instrument(level = "trace", skip(owner))]
1762 pub(crate) async fn local_balances_with_owner(
1763 &self,
1764 owner: AccountOwner,
1765 ) -> Result<(Amount, Option<Amount>), Error> {
1766 ensure!(
1767 self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1768 Error::WalletSynchronizationError
1769 );
1770 let mut query = ChainInfoQuery::new(self.chain_id);
1771 query.request_owner_balance = owner;
1772 let response = self
1773 .client
1774 .local_node
1775 .handle_chain_info_query(query)
1776 .await?;
1777 Ok((
1778 response.info.chain_balance,
1779 response.info.requested_owner_balance,
1780 ))
1781 }
1782
1783 #[instrument(level = "trace")]
1785 pub async fn transfer_to_account(
1786 &self,
1787 from: AccountOwner,
1788 amount: Amount,
1789 account: Account,
1790 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1791 self.transfer(from, amount, account).await
1792 }
1793
1794 #[cfg(with_testing)]
1796 #[instrument(level = "trace")]
1797 pub async fn burn(
1798 &self,
1799 owner: AccountOwner,
1800 amount: Amount,
1801 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1802 let recipient = Account::burn_address(self.chain_id);
1803 self.transfer(owner, amount, recipient).await
1804 }
1805
1806 #[instrument(level = "trace")]
1807 pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1808 let validators = self.client.validator_nodes().await?;
1809 self.client
1810 .fetch_chain_info(self.chain_id, &validators)
1811 .await
1812 }
1813
1814 #[instrument(level = "trace")]
1829 pub async fn synchronize_up_to(
1830 &self,
1831 next_height: Option<BlockHeight>,
1832 until_block_time: Option<Timestamp>,
1833 ) -> Result<Box<ChainInfo>, Error> {
1834 let (_, committee) = self.client.admin_committee().await?;
1835 let validators = self.client.make_nodes(&committee)?;
1836 Box::pin(self.client.fetch_chain_info(self.chain_id, &validators)).await?;
1837 communicate_with_quorum(
1838 &validators,
1839 &committee,
1840 |_: &()| (),
1841 |remote_node| async move {
1842 self.client
1843 .download_certificates_from(
1844 &remote_node,
1845 self.chain_id,
1846 next_height.unwrap_or(BlockHeight::MAX),
1847 until_block_time,
1848 )
1849 .await?;
1850 Ok(())
1851 },
1852 self.client.options.quorum_grace_period,
1853 )
1854 .await?;
1855 self.client
1856 .local_node
1857 .chain_info(self.chain_id)
1858 .await
1859 .map_err(Into::into)
1860 }
1861
1862 pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1863 if self.is_follow_only() {
1864 return self.client.synchronize_chain_state(self.chain_id).await;
1865 }
1866 let info = self.prepare_chain().await?;
1867 self.synchronize_publisher_chains().await?;
1868 self.find_received_certificates().await?;
1869 Ok(info)
1870 }
1871
1872 #[instrument(level = "trace")]
1874 pub async fn process_pending_block(
1875 &self,
1876 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1877 self.prepare_chain().await?;
1878 let mutex = self.proposal_mutex();
1879 let mut proposal_guard = mutex.lock_owned().await;
1880 self.process_pending_block_without_prepare(&mut proposal_guard)
1881 .await
1882 }
1883
1884 #[instrument(level = "debug", skip(self, proposal_guard), fields(chain_id = %self.chain_id))]
1889 async fn process_pending_block_without_prepare(
1890 &self,
1891 proposal_guard: &mut Option<PendingProposal>,
1892 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1893 let process_start = linera_base::time::Instant::now();
1894 tracing::debug!("process_pending_block_without_prepare started");
1895 let info = self.request_leader_timeout_if_needed().await?;
1896
1897 if let Some(pending) = &*proposal_guard {
1899 if pending.block.height < info.next_block_height {
1900 tracing::debug!(
1901 "Clearing pending proposal: a block was committed at height {}",
1902 pending.block.height
1903 );
1904 *proposal_guard = None;
1905 }
1906 }
1907
1908 if info.manager.has_locking_block_in_current_round()
1910 && !info.manager.current_round.is_fast()
1911 {
1912 return self.finalize_locking_block(info).await;
1913 }
1914 let owner = self.identity().await?;
1915
1916 let local_node = &self.client.local_node;
1917 let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
1919 match &**locking {
1920 LockingBlock::Regular(certificate) => {
1921 let blob_ids = certificate.block().required_blob_ids();
1922 let blobs = local_node
1923 .get_locking_blobs(&blob_ids, self.chain_id)
1924 .await?
1925 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1926 debug!("Retrying locking block from round {}", certificate.round);
1927 (certificate.block().clone(), blobs)
1928 }
1929 LockingBlock::Fast(proposal) => {
1930 let proposed_block = proposal.content.block.clone();
1931 let blob_ids = proposed_block.published_blob_ids();
1932 let blobs = local_node
1933 .get_locking_blobs(&blob_ids, self.chain_id)
1934 .await?
1935 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1936 let (block, _, _) = self
1937 .client
1938 .stage_block_execution(
1939 proposed_block,
1940 None,
1941 blobs.clone(),
1942 BundleExecutionPolicy::committed(),
1943 )
1944 .await?;
1945 debug!("Retrying locking block from fast round.");
1946 (block, blobs)
1947 }
1948 }
1949 } else if let Some(pending) = proposal_guard.as_ref() {
1950 let proposed_block = pending.block.clone();
1952 let blobs = pending.blobs.clone();
1953 let staging_outcome = pending.auto_retry_outcome.as_ref();
1954 let round = self.round_for_oracle(&info, &owner).await?;
1955 let (block, _, _) = self
1956 .client
1957 .stage_block_execution(
1958 proposed_block,
1959 round,
1960 blobs.clone(),
1961 BundleExecutionPolicy::committed(),
1962 )
1963 .await?;
1964 if let Some(staging_outcome) = staging_outcome {
1968 ensure!(
1969 block.outcome_matches(staging_outcome),
1970 Error::ExecutionOutcomeMismatch
1971 );
1972 }
1973 debug!("Proposing the local pending block.");
1974 (block, blobs)
1975 } else {
1976 return Ok(ClientOutcome::Committed(None)); };
1978
1979 let has_oracle_responses = block.has_oracle_responses();
1980 let (proposed_block, outcome) = block.into_proposal();
1981 let round = match self
1982 .round_for_new_proposal(&info, &owner, has_oracle_responses)
1983 .await?
1984 {
1985 Either::Left(round) => round,
1986 Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
1987 };
1988 debug!("Proposing block for round {}", round);
1989 if let Some(pending) = proposal_guard.as_mut() {
1990 pending.round.get_or_insert(round);
1991 }
1992
1993 let already_handled_locally = info
1994 .manager
1995 .already_handled_proposal(round, &proposed_block);
1996 let proposal = if let Some(locking) = info.manager.requested_locking {
1998 Box::new(match *locking {
1999 LockingBlock::Regular(cert) => {
2000 BlockProposal::new_retry_regular(owner, round, cert, self.signer())
2001 .await
2002 .map_err(Error::signer_failure)?
2003 }
2004 LockingBlock::Fast(proposal) => {
2005 BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
2006 .await
2007 .map_err(Error::signer_failure)?
2008 }
2009 })
2010 } else {
2011 Box::new(
2012 BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
2013 .await
2014 .map_err(Error::signer_failure)?,
2015 )
2016 };
2017 if !already_handled_locally {
2018 if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
2020 match err {
2021 LocalNodeError::BlobsNotFound(_) => {
2022 local_node
2023 .handle_pending_blobs(self.chain_id, blobs)
2024 .await?;
2025 local_node.handle_block_proposal(*proposal.clone()).await?;
2026 }
2027 err => return Err(err.into()),
2028 }
2029 }
2030 }
2031 let committee = self.local_committee().await?;
2032 let block = Block::new(proposed_block, outcome);
2033 let submit_block_proposal_start = linera_base::time::Instant::now();
2035 let certificate = if round.is_fast() {
2036 let hashed_value = ConfirmedBlock::new(block);
2037 self.client
2038 .submit_block_proposal(committee.clone(), proposal, hashed_value)
2039 .await?
2040 } else {
2041 let hashed_value = ValidatedBlock::new(block);
2042 let certificate = self
2043 .client
2044 .submit_block_proposal(committee.clone(), proposal, hashed_value.clone())
2045 .await?;
2046 self.client.finalize_block(&committee, certificate).await?
2047 };
2048 self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
2049 tracing::debug!(
2050 total_process_ms = process_start.elapsed().as_millis(),
2051 "process_pending_block_without_prepare completing"
2052 );
2053 debug!(round = %certificate.round, "Sending confirmed block to validators");
2054 let certificate = self.client.storage_client().cache_certificate(certificate);
2055 self.update_validators(Some(&committee), Some(certificate.clone()))
2056 .await?;
2057 *proposal_guard = None;
2059 Ok(ClientOutcome::Committed(Some(Arc::unwrap_or_clone(
2060 certificate,
2061 ))))
2062 }
2063
2064 fn send_timing(&self, start: Instant, timing_type: TimingType) {
2065 let Some(sender) = &self.timing_sender else {
2066 return;
2067 };
2068 if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
2069 tracing::warn!(%err, "Failed to send timing info");
2070 }
2071 }
2072
2073 async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
2076 let mut info = self.chain_info_with_manager_values().await?;
2077 if let Some(round_timeout) = info.manager.round_timeout {
2080 if round_timeout <= self.storage_client().clock().current_time() {
2081 if let Err(e) = self.request_leader_timeout().await {
2082 debug!("Failed to obtain a timeout certificate: {}", e);
2083 } else {
2084 info = self.chain_info_with_manager_values().await?;
2085 }
2086 }
2087 }
2088 Ok(info)
2089 }
2090
2091 async fn finalize_locking_block(
2095 &self,
2096 info: Box<ChainInfo>,
2097 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2098 let locking = info
2099 .manager
2100 .requested_locking
2101 .expect("Should have a locking block");
2102 let LockingBlock::Regular(certificate) = *locking else {
2103 panic!("Should have a locking validated block");
2104 };
2105 debug!(
2106 round = %certificate.round,
2107 "Finalizing locking block"
2108 );
2109 let committee = self.local_committee().await?;
2110 let certificate = self
2111 .client
2112 .finalize_block(&committee, certificate.clone())
2113 .await?;
2114 let certificate = self.client.storage_client().cache_certificate(certificate);
2115 self.update_validators(Some(&committee), Some(certificate.clone()))
2116 .await?;
2117 Ok(ClientOutcome::Committed(Some(Arc::unwrap_or_clone(
2118 certificate,
2119 ))))
2120 }
2121
2122 async fn round_for_oracle(
2124 &self,
2125 info: &ChainInfo,
2126 identity: &AccountOwner,
2127 ) -> Result<Option<u32>, Error> {
2128 match self.round_for_new_proposal(info, identity, true).await {
2130 Ok(Either::Left(round)) => Ok(round.multi_leader()),
2132 Err(Error::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
2136 Err(err) => Err(err),
2137 }
2138 }
2139
2140 async fn round_for_new_proposal(
2142 &self,
2143 info: &ChainInfo,
2144 identity: &AccountOwner,
2145 has_oracle_responses: bool,
2146 ) -> Result<Either<Round, RoundTimeout>, Error> {
2147 let manager = &info.manager;
2148 let seed = manager.seed;
2149 let skip_fast = manager.current_round.is_fast()
2154 && (has_oracle_responses || !self.options.allow_fast_blocks);
2155 let conflict = manager
2156 .requested_signed_proposal
2157 .as_ref()
2158 .into_iter()
2159 .chain(&manager.requested_proposed)
2160 .any(|proposal| proposal.content.round == manager.current_round)
2161 || skip_fast;
2162 let round = if !conflict {
2163 manager.current_round
2164 } else if let Some(round) = manager
2165 .ownership
2166 .next_round(manager.current_round)
2167 .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
2168 {
2169 round
2170 } else if let Some(timeout) = info.round_timeout() {
2171 return Ok(Either::Right(timeout));
2172 } else {
2173 return Err(Error::BlockProposalError(
2174 "Conflicting proposal in the current round",
2175 ));
2176 };
2177 let current_committee = self
2178 .local_committee()
2179 .await?
2180 .validators
2181 .values()
2182 .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
2183 .collect();
2184 if manager.should_propose(identity, round, seed, ¤t_committee) {
2185 return Ok(Either::Left(round));
2186 }
2187 if let Some(timeout) = info.round_timeout() {
2188 return Ok(Either::Right(timeout));
2189 }
2190 Err(Error::BlockProposalError(
2191 "Not a leader in the current round",
2192 ))
2193 }
2194
2195 #[cfg(with_testing)]
2197 #[instrument(level = "trace")]
2198 pub async fn clear_pending_proposal(&self) {
2199 *self.proposal_mutex().lock().await = None;
2200 }
2201
2202 #[cfg(with_testing)]
2206 #[instrument(level = "trace")]
2207 pub async fn rotate_key_pair(
2208 &self,
2209 public_key: linera_base::crypto::AccountPublicKey,
2210 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2211 self.transfer_ownership(public_key.into()).await
2212 }
2213
2214 #[instrument(level = "trace")]
2216 pub async fn transfer_ownership(
2217 &self,
2218 new_owner: AccountOwner,
2219 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2220 self.execute_operation(SystemOperation::ChangeOwnership {
2221 super_owners: vec![new_owner],
2222 owners: Vec::new(),
2223 first_leader: None,
2224 multi_leader_rounds: 2,
2225 open_multi_leader_rounds: false,
2226 timeout_config: TimeoutConfig::default(),
2227 })
2228 .await
2229 }
2230
2231 #[instrument(level = "trace")]
2233 pub async fn share_ownership(
2234 &self,
2235 new_owner: AccountOwner,
2236 new_weight: u64,
2237 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2238 let ownership = self.prepare_chain().await?.manager.ownership;
2239 ensure!(
2240 ownership.is_active(),
2241 ChainError::InactiveChain(self.chain_id)
2242 );
2243 let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
2244 owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
2245 owners.push((new_owner, new_weight));
2246 let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
2247 super_owners: Vec::new(),
2248 owners,
2249 first_leader: ownership.first_leader,
2250 multi_leader_rounds: ownership.multi_leader_rounds,
2251 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2252 timeout_config: ownership.timeout_config,
2253 })];
2254 self.execute_block(operations, vec![]).await
2255 }
2256
2257 #[instrument(level = "trace")]
2259 pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, Error> {
2260 Ok(self
2261 .client
2262 .local_node
2263 .chain_state_view(self.chain_id)
2264 .await?
2265 .execution_state
2266 .system
2267 .ownership
2268 .get()
2269 .await?
2270 .clone())
2271 }
2272
2273 #[instrument(level = "trace")]
2276 pub async fn change_ownership(
2277 &self,
2278 ownership: ChainOwnership,
2279 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2280 self.execute_operation(SystemOperation::ChangeOwnership {
2281 super_owners: ownership.super_owners.into_iter().collect(),
2282 owners: ownership.owners.into_iter().collect(),
2283 first_leader: ownership.first_leader,
2284 multi_leader_rounds: ownership.multi_leader_rounds,
2285 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2286 timeout_config: ownership.timeout_config.clone(),
2287 })
2288 .await
2289 }
2290
2291 #[instrument(level = "trace")]
2293 pub async fn query_application_permissions(&self) -> Result<ApplicationPermissions, Error> {
2294 Ok(self
2295 .client
2296 .local_node
2297 .chain_state_view(self.chain_id)
2298 .await?
2299 .execution_state
2300 .system
2301 .application_permissions
2302 .get()
2303 .await?
2304 .clone())
2305 }
2306
2307 #[instrument(level = "trace", skip(application_permissions))]
2309 pub async fn change_application_permissions(
2310 &self,
2311 application_permissions: ApplicationPermissions,
2312 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2313 self.execute_operation(SystemOperation::ChangeApplicationPermissions(
2314 application_permissions,
2315 ))
2316 .await
2317 }
2318
2319 #[instrument(level = "trace", skip(self))]
2321 pub async fn open_chain(
2322 &self,
2323 ownership: ChainOwnership,
2324 application_permissions: ApplicationPermissions,
2325 balance: Amount,
2326 ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2327 let mut has_key = false;
2329 for owner in ownership.all_owners() {
2330 if self.has_key_for(owner).await? {
2331 has_key = true;
2332 break;
2333 }
2334 }
2335 let config = OpenChainConfig {
2336 ownership,
2337 balance,
2338 application_permissions,
2339 };
2340 let operation = Operation::system(SystemOperation::OpenChain(config));
2341 let certificate = match self.execute_block(vec![operation], vec![]).await? {
2342 ClientOutcome::Committed(certificate) => certificate,
2343 ClientOutcome::Conflict(certificate) => {
2344 return Ok(ClientOutcome::Conflict(certificate));
2345 }
2346 ClientOutcome::WaitForTimeout(timeout) => {
2347 return Ok(ClientOutcome::WaitForTimeout(timeout));
2348 }
2349 };
2350 let chain_blob = certificate
2352 .block()
2353 .body
2354 .blobs
2355 .last()
2356 .and_then(|blobs| blobs.last())
2357 .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2358 let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2359 if has_key {
2361 self.client
2362 .extend_chain_mode(description.id(), ListeningMode::FullChain);
2363 self.client
2364 .retry_pending_cross_chain_requests(self.chain_id)
2365 .await?;
2366 }
2367 Ok(ClientOutcome::Committed((description, certificate)))
2368 }
2369
2370 #[instrument(level = "trace")]
2373 pub async fn close_chain(
2374 &self,
2375 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2376 match self.execute_operation(SystemOperation::CloseChain).await {
2377 Ok(outcome) => Ok(outcome.map(Some)),
2378 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2379 chain_error,
2380 )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2381 Ok(ClientOutcome::Committed(None)) }
2383 Err(error) => Err(error),
2384 }
2385 }
2386
2387 #[cfg(not(target_arch = "wasm32"))]
2389 #[instrument(level = "trace", skip(contract, service))]
2390 pub async fn publish_module(
2391 &self,
2392 contract: Bytecode,
2393 service: Bytecode,
2394 vm_runtime: VmRuntime,
2395 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2396 let (blobs, module_id) = super::create_bytecode_blobs(contract, service, vm_runtime).await;
2397 self.publish_module_blobs(blobs, module_id).await
2398 }
2399
2400 #[cfg(not(target_arch = "wasm32"))]
2402 #[instrument(level = "trace", skip(blobs, module_id))]
2403 pub async fn publish_module_blobs(
2404 &self,
2405 blobs: Vec<Blob>,
2406 module_id: ModuleId,
2407 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2408 self.execute_operations(
2409 vec![Operation::system(SystemOperation::PublishModule {
2410 module_id,
2411 })],
2412 blobs,
2413 )
2414 .await?
2415 .try_map(|certificate| Ok((module_id, certificate)))
2416 }
2417
2418 #[instrument(level = "trace", skip(bytes))]
2420 pub async fn publish_data_blobs(
2421 &self,
2422 bytes: Vec<Vec<u8>>,
2423 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2424 let blobs = bytes.into_iter().map(Blob::new_data);
2425 let publish_blob_operations = blobs
2426 .clone()
2427 .map(|blob| {
2428 Operation::system(SystemOperation::PublishDataBlob {
2429 blob_hash: blob.id().hash,
2430 })
2431 })
2432 .collect();
2433 self.execute_operations(publish_blob_operations, blobs.collect())
2434 .await
2435 }
2436
2437 #[instrument(level = "trace", skip(bytes))]
2439 pub async fn publish_data_blob(
2440 &self,
2441 bytes: Vec<u8>,
2442 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2443 self.publish_data_blobs(vec![bytes]).await
2444 }
2445
2446 #[instrument(
2448 level = "trace",
2449 skip(self, parameters, instantiation_argument, required_application_ids)
2450 )]
2451 pub async fn create_application<
2452 A: Abi,
2453 Parameters: Serialize,
2454 InstantiationArgument: Serialize,
2455 >(
2456 &self,
2457 module_id: ModuleId<A, Parameters, InstantiationArgument>,
2458 parameters: &Parameters,
2459 instantiation_argument: &InstantiationArgument,
2460 required_application_ids: Vec<ApplicationId>,
2461 ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2462 let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2463 let parameters = serde_json::to_vec(parameters)?;
2464 Ok(self
2465 .create_application_untyped(
2466 module_id.forget_abi(),
2467 parameters,
2468 instantiation_argument,
2469 required_application_ids,
2470 )
2471 .await?
2472 .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2473 }
2474
2475 #[instrument(
2477 level = "trace",
2478 skip(
2479 self,
2480 module_id,
2481 parameters,
2482 instantiation_argument,
2483 required_application_ids
2484 )
2485 )]
2486 pub async fn create_application_untyped(
2487 &self,
2488 module_id: ModuleId,
2489 parameters: Vec<u8>,
2490 instantiation_argument: Vec<u8>,
2491 required_application_ids: Vec<ApplicationId>,
2492 ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2493 self.execute_operation(SystemOperation::CreateApplication {
2494 module_id,
2495 parameters,
2496 instantiation_argument,
2497 required_application_ids,
2498 })
2499 .await?
2500 .try_map(|certificate| {
2501 let mut creation: Vec<_> = certificate
2503 .block()
2504 .created_blob_ids()
2505 .into_iter()
2506 .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2507 .collect();
2508 if creation.len() > 1 {
2509 return Err(Error::InternalError(
2510 "Unexpected number of application descriptions published",
2511 ));
2512 }
2513 let blob_id = creation.pop().ok_or(Error::InternalError(
2514 "ApplicationDescription blob not found.",
2515 ))?;
2516 let id = ApplicationId::new(blob_id.hash);
2517 Ok((id, certificate))
2518 })
2519 }
2520
2521 #[instrument(level = "trace", skip(committee))]
2523 pub async fn stage_new_committee(
2524 &self,
2525 committee: Committee,
2526 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2527 let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2528 let blob_hash = blob.id().hash;
2529 match self
2530 .execute_operations(
2531 vec![Operation::system(SystemOperation::Admin(
2532 AdminOperation::PublishCommitteeBlob { blob_hash },
2533 ))],
2534 vec![blob],
2535 )
2536 .await?
2537 {
2538 ClientOutcome::Committed(_) => {}
2539 outcome @ ClientOutcome::WaitForTimeout(_) | outcome @ ClientOutcome::Conflict(_) => {
2540 return Ok(outcome)
2541 }
2542 }
2543 let epoch = self.chain_info().await?.epoch.try_add_one()?;
2544 self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2545 epoch,
2546 blob_hash,
2547 }))
2548 .await
2549 }
2550
2551 #[instrument(level = "trace")]
2557 pub async fn process_inbox(
2558 &self,
2559 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2560 self.prepare_chain().await?;
2561 self.process_inbox_without_prepare().await
2562 }
2563
2564 #[instrument(level = "trace")]
2570 pub async fn process_inbox_without_prepare(
2571 &self,
2572 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2573 #[cfg(with_metrics)]
2574 let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2575
2576 let mut certificates = Vec::new();
2577 loop {
2578 match self.execute_block(vec![], vec![]).await {
2582 Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
2583 Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
2584 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
2585 return Ok((certificates, Some(timeout)));
2586 }
2587 Err(Error::LocalNodeError(LocalNodeError::WorkerError(
2589 WorkerError::ChainError(chain_error),
2590 ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
2591 return Ok((certificates, None));
2592 }
2593 Err(error) => return Err(error),
2594 };
2595 }
2596 }
2597
2598 async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2601 let (mut min_epoch, mut next_epoch) = {
2602 let (epoch, committees) = self.epoch_and_committees().await?;
2603 let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
2604 (min_epoch, epoch.try_add_one()?)
2605 };
2606 let mut epoch_change_ops = Vec::new();
2607 while self
2608 .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2609 .await?
2610 {
2611 epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2612 next_epoch,
2613 )));
2614 next_epoch.try_add_assign_one()?;
2615 }
2616 while self
2617 .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
2618 .await?
2619 {
2620 epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
2621 min_epoch,
2622 )));
2623 min_epoch.try_add_assign_one()?;
2624 }
2625 Ok(epoch_change_ops)
2626 }
2627
2628 async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2631 let event_id = EventId {
2632 chain_id: self.client.admin_chain_id,
2633 stream_id: StreamId::system(stream_name),
2634 index,
2635 };
2636 Ok(self
2637 .client
2638 .storage_client()
2639 .read_event(event_id)
2640 .await?
2641 .is_some())
2642 }
2643
2644 pub async fn events_from_index(
2646 &self,
2647 stream_id: StreamId,
2648 start_index: u32,
2649 ) -> Result<Vec<IndexAndEvent>, Error> {
2650 Ok(self
2651 .client
2652 .storage_client()
2653 .read_events_from_index(&self.chain_id, &stream_id, start_index)
2654 .await?)
2655 }
2656
2657 #[instrument(level = "trace")]
2662 pub async fn revoke_epochs(
2663 &self,
2664 revoked_epoch: Epoch,
2665 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2666 self.prepare_chain().await?;
2667 let (current_epoch, committees) = self.epoch_and_committees().await?;
2668 ensure!(
2669 revoked_epoch < current_epoch,
2670 Error::CannotRevokeCurrentEpoch(current_epoch)
2671 );
2672 ensure!(
2673 committees.contains_key(&revoked_epoch),
2674 Error::EpochAlreadyRevoked
2675 );
2676 let operations = committees
2677 .keys()
2678 .filter_map(|epoch| {
2679 if *epoch <= revoked_epoch {
2680 Some(Operation::system(SystemOperation::Admin(
2681 AdminOperation::RemoveCommittee { epoch: *epoch },
2682 )))
2683 } else {
2684 None
2685 }
2686 })
2687 .collect();
2688 self.execute_operations(operations, vec![]).await
2689 }
2690
2691 #[cfg(with_testing)]
2695 #[instrument(level = "trace")]
2696 pub async fn transfer_to_account_unsafe_unconfirmed(
2697 &self,
2698 owner: AccountOwner,
2699 amount: Amount,
2700 recipient: Account,
2701 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2702 self.execute_operation(SystemOperation::Transfer {
2703 owner,
2704 recipient,
2705 amount,
2706 })
2707 .await
2708 }
2709
2710 #[instrument(level = "trace", skip(hash))]
2711 pub async fn read_confirmed_block(
2712 &self,
2713 hash: CryptoHash,
2714 ) -> Result<Arc<ConfirmedBlock>, Error> {
2715 self.client
2716 .storage_client()
2717 .read_confirmed_block(hash)
2718 .await?
2719 .ok_or(Error::MissingConfirmedBlock(hash))
2720 }
2721
2722 #[instrument(level = "trace", skip(hash))]
2723 pub async fn read_certificate(
2724 &self,
2725 hash: CryptoHash,
2726 ) -> Result<Arc<ConfirmedBlockCertificate>, Error> {
2727 self.client
2728 .storage_client()
2729 .read_certificate(hash)
2730 .await?
2731 .ok_or(Error::ReadCertificatesError(vec![hash]))
2732 }
2733
2734 #[instrument(level = "trace")]
2736 pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2737 self.client
2738 .retry_pending_cross_chain_requests(self.chain_id)
2739 .await?;
2740 Ok(())
2741 }
2742
2743 #[instrument(level = "trace", skip(local_node))]
2744 async fn local_chain_info(
2745 &self,
2746 chain_id: ChainId,
2747 local_node: &LocalNodeClient<Env::Storage>,
2748 ) -> Result<Option<Box<ChainInfo>>, Error> {
2749 match local_node.chain_info(chain_id).await {
2750 Ok(info) => Ok(Some(info)),
2751 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
2752 Err(err) => Err(err.into()),
2753 }
2754 }
2755
2756 #[instrument(level = "trace", skip(chain_id, local_node))]
2757 async fn local_next_block_height(
2758 &self,
2759 chain_id: ChainId,
2760 local_node: &LocalNodeClient<Env::Storage>,
2761 ) -> Result<BlockHeight, Error> {
2762 Ok(self
2763 .local_chain_info(chain_id, local_node)
2764 .await?
2765 .map_or(BlockHeight::ZERO, |info| info.next_block_height))
2766 }
2767
2768 #[instrument(level = "trace")]
2771 async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
2772 Ok(self
2773 .client
2774 .local_node
2775 .get_inbox_next_height(self.chain_id, origin)
2776 .await?)
2777 }
2778
2779 #[instrument(level = "trace", skip(remote_node, local_node, notification))]
2780 async fn process_notification(
2781 &self,
2782 remote_node: RemoteNode<Env::ValidatorNode>,
2783 local_node: LocalNodeClient<Env::Storage>,
2784 notification: Notification,
2785 ) -> Result<(), Error> {
2786 let listening_mode = self.client.chain_mode(notification.chain_id);
2787 let relevant = listening_mode
2788 .as_ref()
2789 .is_some_and(|mode| mode.is_relevant(¬ification.reason));
2790 if !relevant {
2791 tracing::trace!(
2792 chain_id = %notification.chain_id,
2793 reason = ?notification.reason,
2794 ?listening_mode,
2795 "Ignoring notification due to listening mode"
2796 );
2797 return Ok(());
2798 }
2799 match notification.reason {
2800 Reason::NewIncomingBundle { origin, height } => {
2801 if self.options.message_policy.ignores_origin(&origin) {
2802 trace!(
2803 chain_id = %self.chain_id,
2804 %origin,
2805 %height,
2806 "Skipping NewIncomingBundle notification: origin filtered by message_policy"
2807 );
2808 return Ok(());
2809 }
2810 if self.local_next_height_to_receive(origin).await? > height {
2811 debug!(
2812 chain_id = %self.chain_id,
2813 "Accepting redundant notification for new message"
2814 );
2815 return Ok(());
2816 }
2817 self.client
2818 .download_sender_block_with_sending_ancestors(
2819 self.chain_id,
2820 origin,
2821 height,
2822 &remote_node,
2823 )
2824 .await?;
2825 if self.local_next_height_to_receive(origin).await? <= height {
2826 info!(
2827 chain_id = %self.chain_id,
2828 "NewIncomingBundle: Fail to synchronize new message after notification"
2829 );
2830 }
2831 }
2832 Reason::NewBlock { height, .. } => {
2833 let chain_id = notification.chain_id;
2834 let local_height = self.local_next_block_height(chain_id, &local_node).await?;
2835 if local_height > height {
2836 debug!(
2837 chain_id = %self.chain_id,
2838 "Accepting redundant notification for new block"
2839 );
2840 return Ok(());
2841 }
2842 self.client
2845 .synchronize_chain_state_from(&remote_node, chain_id)
2846 .await?;
2847 if self.local_next_block_height(chain_id, &local_node).await? <= height {
2848 error!("NewBlock: Fail to synchronize new block after notification");
2849 }
2850 trace!(
2851 chain_id = %self.chain_id,
2852 %height,
2853 "NewBlock: processed notification",
2854 );
2855 }
2856 Reason::NewEvents {
2857 height, block_hash, ..
2858 } => {
2859 let chain_id = notification.chain_id;
2860 let local_height = self.local_next_block_height(chain_id, &local_node).await?;
2861 if local_height > height {
2862 debug!(
2863 chain_id = %self.chain_id,
2864 "Accepting redundant notification for new events"
2865 );
2866 return Ok(());
2867 }
2868 trace!(
2869 chain_id = %self.chain_id,
2870 %height,
2871 "NewEvents: processing notification"
2872 );
2873 let relevant_streams = match self.listening_mode() {
2876 Some(ListeningMode::EventsOnly(subscribed)) => subscribed,
2877 _ => unreachable!(),
2880 };
2881 self.client
2882 .download_event_bearing_blocks(
2883 self.chain_id,
2884 BTreeSet::from([(height, block_hash)]),
2885 local_height,
2886 &relevant_streams,
2887 &remote_node,
2888 )
2889 .await?;
2890 }
2891 Reason::NewRound { height, round } => {
2892 let chain_id = notification.chain_id;
2893 if let Some(info) = self.local_chain_info(chain_id, &local_node).await? {
2894 if (info.next_block_height, info.manager.current_round) >= (height, round) {
2895 debug!(
2896 chain_id = %self.chain_id,
2897 "Accepting redundant notification for new round"
2898 );
2899 return Ok(());
2900 }
2901 }
2902 self.client
2903 .synchronize_chain_state_from(&remote_node, chain_id)
2904 .await?;
2905 let Some(info) = self.local_chain_info(chain_id, &local_node).await? else {
2906 error!(
2907 chain_id = %self.chain_id,
2908 "NewRound: Fail to read local chain info for {chain_id}"
2909 );
2910 return Ok(());
2911 };
2912 if (info.next_block_height, info.manager.current_round) < (height, round) {
2913 info!(
2914 chain_id = %self.chain_id,
2915 "NewRound: Fail to synchronize new block after notification"
2916 );
2917 }
2918 }
2919 Reason::BlockExecuted { .. } => {
2920 }
2922 }
2923 Ok(())
2924 }
2925
2926 pub fn is_tracked(&self) -> bool {
2928 self.client.is_tracked(self.chain_id)
2929 }
2930
2931 pub fn listening_mode(&self) -> Option<ListeningMode> {
2933 self.client.chain_mode(self.chain_id)
2934 }
2935
2936 #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
2941 pub async fn listen(
2942 &self,
2943 ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
2944 use future::FutureExt as _;
2945
2946 async fn await_while_polling<F: FusedFuture>(
2947 future: F,
2948 background_work: impl FusedStream<Item = ()>,
2949 ) -> F::Output {
2950 tokio::pin!(future);
2951 tokio::pin!(background_work);
2952 loop {
2953 futures::select! {
2954 _ = background_work.next() => (),
2955 result = future => return result,
2956 }
2957 }
2958 }
2959
2960 let mut senders = HashMap::new();
2961 let mut circuit_breakers: HashMap<ValidatorPublicKey, CircuitBreakerState> = HashMap::new();
2962 let notifications = self.subscribe()?;
2963 let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
2964
2965 let mut process_notifications = FuturesUnordered::new();
2972
2973 match self
2974 .update_notification_streams(&mut senders, &mut circuit_breakers)
2975 .await
2976 {
2977 Ok(handler) => process_notifications.push(handler),
2978 Err(error) => error!("Failed to update committee: {error}"),
2979 };
2980
2981 let this = self.clone();
2982 let update_streams = async move {
2983 let mut abortable_notifications = abortable_notifications.fuse();
2984
2985 while let Some(notification) =
2986 await_while_polling(abortable_notifications.next(), &mut process_notifications)
2987 .await
2988 {
2989 if let Reason::NewBlock { .. } = notification.reason {
2990 match Box::pin(await_while_polling(
2991 this.update_notification_streams(&mut senders, &mut circuit_breakers)
2992 .fuse(),
2993 &mut process_notifications,
2994 ))
2995 .await
2996 {
2997 Ok(handler) => process_notifications.push(handler),
2998 Err(error) => error!("Failed to update committee: {error}"),
2999 }
3000 }
3001 }
3002
3003 for abort in senders.into_values() {
3004 abort.abort();
3005 }
3006
3007 let () = process_notifications.collect().await;
3008 }
3009 .in_current_span();
3010
3011 Ok((update_streams, AbortOnDrop(abort), notifications))
3012 }
3013
3014 #[instrument(level = "trace", skip(senders, circuit_breakers))]
3015 async fn update_notification_streams(
3016 &self,
3017 senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
3018 circuit_breakers: &mut HashMap<ValidatorPublicKey, CircuitBreakerState>,
3019 ) -> Result<impl Future<Output = ()>, Error> {
3020 let initial_probe_interval = self
3021 .options
3022 .notification_circuit_breaker_initial_probe_interval;
3023 let max_probe_interval = self.options.notification_circuit_breaker_max_probe_interval;
3024 let (nodes, local_node) = {
3025 let committee = if self
3029 .listening_mode()
3030 .is_some_and(|m| m.should_sync_chain_state())
3031 {
3032 self.local_committee().await?
3033 } else {
3034 self.client.admin_committee().await?.1
3035 };
3036 let nodes: HashMap<_, _> = self
3037 .client
3038 .validator_node_provider()
3039 .make_nodes(&committee)?
3040 .collect();
3041 (nodes, self.client.local_node.clone())
3042 };
3043 for (validator, abort) in senders.iter() {
3045 if abort.is_aborted() && nodes.contains_key(validator) {
3046 if let Some(state) = circuit_breakers.get_mut(validator) {
3047 state.probe_interval = (state.probe_interval * 2).min(max_probe_interval);
3049 state.next_probe_at = Instant::now() + state.probe_interval;
3050 warn!(
3051 %validator,
3052 chain_id = %self.chain_id,
3053 next_probe_in = ?state.probe_interval,
3054 "Validator still unhealthy after probe; increasing probe interval"
3055 );
3056 } else {
3057 circuit_breakers.insert(
3059 *validator,
3060 CircuitBreakerState {
3061 next_probe_at: Instant::now() + initial_probe_interval,
3062 probe_interval: initial_probe_interval,
3063 },
3064 );
3065 error!(
3066 %validator,
3067 chain_id = %self.chain_id,
3068 next_probe_in = ?initial_probe_interval,
3069 "Validator notification stream ended; entering circuit breaker"
3070 );
3071 }
3072 } else if !abort.is_aborted() && circuit_breakers.contains_key(validator) {
3073 info!(
3075 %validator,
3076 chain_id = %self.chain_id,
3077 "Validator recovered from circuit breaker"
3078 );
3079 circuit_breakers.remove(validator);
3080 }
3081 }
3082
3083 senders.retain(|validator, abort| {
3084 if !nodes.contains_key(validator) {
3085 abort.abort();
3086 }
3087 !abort.is_aborted()
3088 });
3089 circuit_breakers.retain(|validator, _| nodes.contains_key(validator));
3090
3091 let validator_tasks = FuturesUnordered::new();
3092 for (public_key, node) in nodes {
3093 let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
3094 continue;
3095 };
3096
3097 if let Some(state) = circuit_breakers.get(&public_key) {
3099 if Instant::now() < state.next_probe_at {
3100 continue;
3101 }
3102 debug!(
3103 validator = %public_key,
3104 chain_id = %self.chain_id,
3105 "Probing unhealthy validator"
3106 );
3107 }
3108
3109 let address = node.address();
3110 let this = self.clone();
3111 let listening_mode_for_sync = self.listening_mode();
3112 let stream = stream::once({
3113 let node = node.clone();
3114 async move {
3115 let stream = node.subscribe(vec![this.chain_id]).await?;
3116 let remote_node = RemoteNode { public_key, node };
3119 if listening_mode_for_sync
3120 .as_ref()
3121 .is_some_and(|mode| mode.should_sync_chain_state())
3122 {
3123 this.client
3124 .synchronize_chain_state_from(&remote_node, this.chain_id)
3125 .await?;
3126 } else {
3127 if let Some(ListeningMode::EventsOnly(subscribed)) =
3131 listening_mode_for_sync.as_ref()
3132 {
3133 if let Err(error) = this
3134 .client
3135 .sync_events_from_node(this.chain_id, subscribed, &remote_node)
3136 .await
3137 {
3138 debug!(
3139 chain_id = %this.chain_id,
3140 %error,
3141 "Failed initial sparse sync for EventsOnly chain"
3142 );
3143 }
3144 }
3145 }
3146 Ok::<_, Error>(stream)
3147 }
3148 })
3149 .filter_map(move |result| {
3150 let address = address.clone();
3151 async move {
3152 if let Err(error) = &result {
3153 info!(?error, address, "could not connect to validator");
3154 } else {
3155 debug!(address, "connected to validator");
3156 }
3157 result.ok()
3158 }
3159 })
3160 .flatten();
3161 let (stream, abort) = stream::abortable(stream);
3162 let mut stream = Box::pin(stream);
3163 let abort_on_exit = abort.clone();
3164 let this = self.clone();
3165 let local_node = local_node.clone();
3166 let remote_node = RemoteNode { public_key, node };
3167 validator_tasks.push(async move {
3168 while let Some(notification) = stream.next().await {
3169 if let Err(error) = this
3170 .process_notification(
3171 remote_node.clone(),
3172 local_node.clone(),
3173 notification.clone(),
3174 )
3175 .await
3176 {
3177 tracing::info!(
3178 chain_id = %this.chain_id,
3179 address = remote_node.address(),
3180 ?notification,
3181 %error,
3182 "failed to process notification",
3183 );
3184 }
3185 }
3186 warn!(
3187 chain_id = %this.chain_id,
3188 address = remote_node.address(),
3189 "Validator notification stream ended"
3190 );
3191 abort_on_exit.abort();
3192 });
3193 entry.insert(abort);
3194 }
3195 Ok(validator_tasks.collect())
3196 }
3197
3198 #[instrument(level = "trace", skip(remote_node))]
3200 pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
3201 let validator_next_block_height = match remote_node
3202 .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
3203 .await
3204 {
3205 Ok(info) => info.info.next_block_height,
3206 Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
3207 Err(err) => return Err(err.into()),
3208 };
3209 let local_next_block_height = self.chain_info().await?.next_block_height;
3210
3211 if validator_next_block_height >= local_next_block_height {
3212 debug!("Validator is up-to-date with local state");
3213 return Ok(());
3214 }
3215
3216 let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
3217 .map(BlockHeight)
3218 .collect();
3219
3220 let certificates = self
3221 .client
3222 .storage_client()
3223 .read_certificates_by_heights(self.chain_id, &heights)
3224 .await?
3225 .into_iter()
3226 .flatten()
3227 .collect::<Vec<_>>();
3228
3229 for certificate in certificates {
3230 match remote_node
3231 .handle_confirmed_certificate(
3232 certificate.clone(),
3233 CrossChainMessageDelivery::NonBlocking,
3234 )
3235 .await
3236 {
3237 Ok(_) => (),
3238 Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
3239 let missing_blobs: Vec<_> = self
3241 .client
3242 .storage_client()
3243 .read_blobs(&missing_blob_ids)
3244 .await?
3245 .into_iter()
3246 .flatten()
3247 .collect();
3248 remote_node.upload_blobs(missing_blobs).await?;
3249 remote_node
3250 .handle_confirmed_certificate(
3251 certificate,
3252 CrossChainMessageDelivery::NonBlocking,
3253 )
3254 .await?;
3255 }
3256 Err(err) => return Err(err.into()),
3257 }
3258 }
3259
3260 Ok(())
3261 }
3262}
3263
3264#[cfg(with_testing)]
3265impl<Env: Environment> ChainClient<Env> {
3266 pub async fn process_notification_from(
3267 &self,
3268 notification: Notification,
3269 validator: (ValidatorPublicKey, &str),
3270 ) {
3271 let mut node_list = self
3272 .client
3273 .validator_node_provider()
3274 .make_nodes_from_list(vec![validator])
3275 .unwrap();
3276 let (public_key, node) = node_list.next().unwrap();
3277 let remote_node = RemoteNode { node, public_key };
3278 let local_node = self.client.local_node.clone();
3279 self.process_notification(remote_node, local_node, notification)
3280 .await
3281 .unwrap();
3282 }
3283}