1use std::{
6 cmp::{Ordering, PartialOrd},
7 collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
8 convert::Infallible,
9 iter,
10 sync::{Arc, RwLock},
11};
12
13use chain_client_state::ChainClientState;
14use custom_debug_derive::Debug;
15use futures::{
16 future::{self, Either, FusedFuture, Future},
17 stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt},
18};
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{
22 abi::Abi,
23 crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
24 data_types::{
25 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
26 ChainDescription, Epoch, Round, Timestamp,
27 },
28 ensure,
29 identifiers::{
30 Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
31 ModuleId, StreamId,
32 },
33 ownership::{ChainOwnership, TimeoutConfig},
34 time::{Duration, Instant},
35};
36#[cfg(not(target_arch = "wasm32"))]
37use linera_base::{data_types::Bytecode, vm::VmRuntime};
38use linera_chain::{
39 data_types::{
40 BlockProposal, ChainAndHeight, IncomingBundle, LiteVote, MessageAction, ProposedBlock,
41 Transaction,
42 },
43 manager::LockingBlock,
44 types::{
45 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
46 LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
47 },
48 ChainError, ChainExecutionContext, ChainStateView,
49};
50use linera_execution::{
51 committee::Committee,
52 system::{
53 AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
54 REMOVED_EPOCH_STREAM_NAME,
55 },
56 ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
57};
58use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
59use linera_views::ViewError;
60use rand::{
61 distributions::{Distribution, WeightedIndex},
62 rngs::StdRng,
63 SeedableRng,
64};
65use serde::{Deserialize, Serialize};
66use thiserror::Error;
67use tokio::sync::{mpsc, OwnedRwLockReadGuard};
68use tokio_stream::wrappers::UnboundedReceiverStream;
69use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
70
71use crate::{
72 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout},
73 environment::Environment,
74 local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
75 node::{
76 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
77 ValidatorNodeProvider as _,
78 },
79 notifier::ChannelNotifier,
80 remote_node::RemoteNode,
81 updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater},
82 worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
83};
84
85mod chain_client_state;
86#[cfg(test)]
87#[path = "../unit_tests/client_tests.rs"]
88mod client_tests;
89
90#[cfg(with_metrics)]
91mod metrics {
92 use std::sync::LazyLock;
93
94 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
95 use prometheus::HistogramVec;
96
97 pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
98 LazyLock::new(|| {
99 register_histogram_vec(
100 "process_inbox_latency",
101 "process_inbox latency",
102 &[],
103 exponential_bucket_latencies(500.0),
104 )
105 });
106
107 pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
108 register_histogram_vec(
109 "prepare_chain_latency",
110 "prepare_chain latency",
111 &[],
112 exponential_bucket_latencies(500.0),
113 )
114 });
115
116 pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
117 register_histogram_vec(
118 "synchronize_chain_state_latency",
119 "synchronize_chain_state latency",
120 &[],
121 exponential_bucket_latencies(500.0),
122 )
123 });
124
125 pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
126 register_histogram_vec(
127 "execute_block_latency",
128 "execute_block latency",
129 &[],
130 exponential_bucket_latencies(500.0),
131 )
132 });
133
134 pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
135 register_histogram_vec(
136 "find_received_certificates_latency",
137 "find_received_certificates latency",
138 &[],
139 exponential_bucket_latencies(500.0),
140 )
141 });
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
148pub enum ListeningMode {
149 FullChain,
151 EventsOnly(BTreeSet<StreamId>),
153}
154
155impl PartialOrd for ListeningMode {
156 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
157 match (self, other) {
158 (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
159 (ListeningMode::FullChain, _) => Some(Ordering::Greater),
160 (_, ListeningMode::FullChain) => Some(Ordering::Less),
161 (ListeningMode::EventsOnly(events_a), ListeningMode::EventsOnly(events_b)) => {
162 if events_a.is_superset(events_b) {
163 Some(Ordering::Greater)
164 } else if events_b.is_superset(events_a) {
165 Some(Ordering::Less)
166 } else {
167 None
168 }
169 }
170 }
171 }
172}
173
174impl ListeningMode {
175 pub fn extend(&mut self, other: Option<ListeningMode>) {
176 match (self, other) {
177 (_, None) => (),
178 (ListeningMode::FullChain, _) => (),
179 (mode, Some(ListeningMode::FullChain)) => {
180 *mode = ListeningMode::FullChain;
181 }
182 (
183 ListeningMode::EventsOnly(self_events),
184 Some(ListeningMode::EventsOnly(other_events)),
185 ) => {
186 self_events.extend(other_events);
187 }
188 }
189 }
190}
191
192pub struct Client<Env: Environment> {
194 environment: Env,
195 local_node: LocalNodeClient<Env::Storage>,
198 admin_id: ChainId,
200 tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
203 notifier: Arc<ChannelNotifier<Notification>>,
205 chains: papaya::HashMap<ChainId, ChainClientState>,
207 options: ChainClientOptions,
209}
210
211impl<Env: Environment> Client<Env> {
212 #[instrument(level = "trace", skip_all)]
214 pub fn new(
215 environment: Env,
216 admin_id: ChainId,
217 long_lived_services: bool,
218 tracked_chains: impl IntoIterator<Item = ChainId>,
219 name: impl Into<String>,
220 chain_worker_ttl: Duration,
221 options: ChainClientOptions,
222 ) -> Self {
223 let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
224 let state = WorkerState::new_for_client(
225 name.into(),
226 environment.storage().clone(),
227 tracked_chains.clone(),
228 )
229 .with_long_lived_services(long_lived_services)
230 .with_allow_inactive_chains(true)
231 .with_allow_messages_from_deprecated_epochs(true)
232 .with_chain_worker_ttl(chain_worker_ttl);
233 let local_node = LocalNodeClient::new(state);
234
235 Self {
236 environment,
237 local_node,
238 chains: papaya::HashMap::new(),
239 admin_id,
240 tracked_chains,
241 notifier: Arc::new(ChannelNotifier::default()),
242 options,
243 }
244 }
245
246 pub fn storage_client(&self) -> &Env::Storage {
248 self.environment.storage()
249 }
250
251 pub fn validator_node_provider(&self) -> &Env::Network {
252 self.environment.network()
253 }
254
255 #[instrument(level = "trace", skip(self))]
257 pub fn signer(&self) -> &impl Signer {
258 self.environment.signer()
259 }
260
261 #[instrument(level = "trace", skip(self))]
263 pub fn track_chain(&self, chain_id: ChainId) {
264 self.tracked_chains
265 .write()
266 .expect("Panics should not happen while holding a lock to `tracked_chains`")
267 .insert(chain_id);
268 }
269
270 #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
272 pub fn create_chain_client(
273 self: &Arc<Self>,
274 chain_id: ChainId,
275 block_hash: Option<CryptoHash>,
276 next_block_height: BlockHeight,
277 pending_proposal: Option<PendingProposal>,
278 preferred_owner: Option<AccountOwner>,
279 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
280 ) -> ChainClient<Env> {
281 self.chains
284 .pin()
285 .get_or_insert_with(chain_id, || ChainClientState::new(pending_proposal.clone()));
286
287 ChainClient {
288 client: self.clone(),
289 chain_id,
290 options: self.options.clone(),
291 preferred_owner,
292 initial_block_hash: block_hash,
293 initial_next_block_height: next_block_height,
294 timing_sender,
295 }
296 }
297
298 async fn fetch_chain_info(
300 &self,
301 chain_id: ChainId,
302 validators: &[RemoteNode<Env::ValidatorNode>],
303 ) -> Result<Box<ChainInfo>, ChainClientError> {
304 match self.local_node.chain_info(chain_id).await {
305 Ok(info) => Ok(info),
306 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
307 self.synchronize_chain_state(self.admin_id).await?;
309 self.update_local_node_with_blobs_from(blob_ids, validators)
312 .await?;
313 Ok(self.local_node.chain_info(chain_id).await?)
314 }
315 Err(err) => Err(err.into()),
316 }
317 }
318
319 fn weighted_select(
320 remaining_validators: &mut Vec<RemoteNode<Env::ValidatorNode>>,
321 remaining_weights: &mut Vec<u64>,
322 rng: &mut StdRng,
323 ) -> Option<RemoteNode<Env::ValidatorNode>> {
324 if remaining_weights.is_empty() {
325 return None;
326 }
327 let dist = WeightedIndex::new(remaining_weights.clone()).unwrap();
328 let idx = dist.sample(rng);
329 remaining_weights.remove(idx);
330 Some(remaining_validators.remove(idx))
331 }
332
333 #[instrument(level = "trace", skip(self))]
335 async fn download_certificates(
336 &self,
337 chain_id: ChainId,
338 target_next_block_height: BlockHeight,
339 ) -> Result<Box<ChainInfo>, ChainClientError> {
340 let (_, committee) = self.admin_committee().await?;
341 let mut remaining_validators = self.make_nodes(&committee)?;
342 let mut info = self
343 .fetch_chain_info(chain_id, &remaining_validators)
344 .await?;
345 let mut remaining_weights = remaining_validators
347 .iter()
348 .map(|validator| {
349 let validator_state = committee.validators.get(&validator.public_key).unwrap();
350 validator_state.votes
351 })
352 .collect::<Vec<_>>();
353 let mut rng: StdRng = StdRng::from_entropy();
354
355 while let Some(remote_node) =
356 Self::weighted_select(&mut remaining_validators, &mut remaining_weights, &mut rng)
357 {
358 if target_next_block_height <= info.next_block_height {
359 return Ok(info);
360 }
361 match self
362 .download_certificates_from(&remote_node, chain_id, target_next_block_height)
363 .await
364 {
365 Err(err) => warn!(
366 "Failed to download certificates from validator {:?}: {err}",
367 remote_node.public_key
368 ),
369 Ok(Some(new_info)) => info = new_info,
370 Ok(None) => {}
371 }
372 }
373 ensure!(
374 target_next_block_height <= info.next_block_height,
375 ChainClientError::CannotDownloadCertificates {
376 chain_id,
377 target_next_block_height,
378 }
379 );
380 Ok(info)
381 }
382
383 #[instrument(level = "trace", skip_all)]
386 async fn download_certificates_from(
387 &self,
388 remote_node: &RemoteNode<Env::ValidatorNode>,
389 chain_id: ChainId,
390 stop: BlockHeight,
391 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
392 let mut last_info = None;
393 let mut hashes = Vec::new();
395 let mut next_height = BlockHeight::ZERO;
396 {
397 let chain = self.local_node.chain_state_view(chain_id).await?;
398 next_height = next_height.max(chain.tip_state.get().next_block_height);
399 while next_height < stop {
400 let Some(hash) = chain.preprocessed_blocks.get(&next_height).await? else {
401 break;
402 };
403 hashes.push(hash);
404 next_height = next_height.try_add_one()?;
405 }
406 }
407 let certificates = self
408 .storage_client()
409 .read_certificates(hashes.clone())
410 .await?;
411 let certificates = match ResultReadCertificates::new(certificates, hashes) {
412 ResultReadCertificates::Certificates(certificates) => certificates,
413 ResultReadCertificates::InvalidHashes(hashes) => {
414 return Err(ChainClientError::ReadCertificatesError(hashes))
415 }
416 };
417 for certificate in certificates {
418 last_info = Some(self.handle_certificate(Box::new(certificate)).await?.info);
419 }
420 while next_height < stop {
422 let limit = u64::from(stop)
424 .checked_sub(u64::from(next_height))
425 .ok_or(ArithmeticError::Overflow)?
426 .min(1000);
427 let certificates = remote_node
428 .query_certificates_from(chain_id, next_height, limit)
429 .await?;
430 let Some(info) = self.process_certificates(remote_node, certificates).await? else {
431 break;
432 };
433 assert!(info.next_block_height > next_height);
434 next_height = info.next_block_height;
435 last_info = Some(info);
436 }
437 Ok(last_info)
438 }
439
440 #[instrument(level = "trace", skip_all)]
443 async fn process_certificates(
444 &self,
445 remote_node: &RemoteNode<impl ValidatorNode>,
446 certificates: Vec<ConfirmedBlockCertificate>,
447 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
448 let mut info = None;
449 for certificate in certificates {
450 let certificate = Box::new(certificate);
451 let mut result = self.handle_certificate(certificate.clone()).await;
452
453 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
454 let blobs = future::join_all(blob_ids.iter().map(|blob_id| async move {
455 remote_node.try_download_blob(*blob_id).await.unwrap()
456 }))
457 .await;
458 self.local_node.store_blobs(&blobs).await?;
459 result = self.handle_certificate(certificate.clone()).await;
460 }
461
462 info = Some(result?.info);
463 }
464 Ok(info)
466 }
467
468 async fn handle_certificate<T: ProcessableCertificate>(
469 &self,
470 certificate: Box<GenericCertificate<T>>,
471 ) -> Result<ChainInfoResponse, LocalNodeError> {
472 self.local_node
473 .handle_certificate(*certificate, &self.notifier)
474 .await
475 }
476
477 async fn chain_info_with_committees(
478 &self,
479 chain_id: ChainId,
480 ) -> Result<Box<ChainInfo>, LocalNodeError> {
481 let query = ChainInfoQuery::new(chain_id).with_committees();
482 let info = self.local_node.handle_chain_info_query(query).await?.info;
483 Ok(info)
484 }
485
486 #[instrument(level = "trace", skip_all)]
489 async fn admin_committees(
490 &self,
491 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
492 let info = self.chain_info_with_committees(self.admin_id).await?;
493 Ok((info.epoch, info.into_committees()?))
494 }
495
496 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
498 let info = self.chain_info_with_committees(self.admin_id).await?;
499 Ok((info.epoch, info.into_current_committee()?))
500 }
501
502 async fn validator_nodes(
504 &self,
505 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, ChainClientError> {
506 let (_, committee) = self.admin_committee().await?;
507 Ok(self.make_nodes(&committee)?)
508 }
509
510 fn make_nodes(
512 &self,
513 committee: &Committee,
514 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
515 Ok(self
516 .validator_node_provider()
517 .make_nodes(committee)?
518 .map(|(public_key, node)| RemoteNode { public_key, node })
519 .collect())
520 }
521
522 pub async fn get_chain_description(
525 &self,
526 chain_id: ChainId,
527 ) -> Result<ChainDescription, ChainClientError> {
528 let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
529 let blob = self
530 .local_node
531 .storage_client()
532 .read_blob(chain_desc_id)
533 .await?;
534 if let Some(blob) = blob {
535 return Ok(bcs::from_bytes(blob.bytes())?);
537 };
538 self.synchronize_chain_state(self.admin_id).await?;
540 let nodes = self.validator_nodes().await?;
541 let blob = self
542 .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
543 .await?
544 .pop()
545 .unwrap(); Ok(bcs::from_bytes(blob.bytes())?)
547 }
548
549 #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
551 fn update_from_info(&self, info: &ChainInfo) {
552 self.chains.pin().update(info.chain_id, |state| {
553 let mut state = state.clone_for_update_unchecked();
554 state.update_from_info(info);
555 state
556 });
557 }
558
559 #[instrument(level = "trace", skip_all)]
561 async fn process_certificate<T: ProcessableCertificate>(
562 &self,
563 certificate: Box<GenericCertificate<T>>,
564 ) -> Result<(), LocalNodeError> {
565 let info = self.handle_certificate(certificate).await?.info;
566 self.update_from_info(&info);
567 Ok(())
568 }
569
570 #[instrument(level = "trace", skip_all)]
572 async fn finalize_block(
573 &self,
574 committee: &Committee,
575 certificate: ValidatedBlockCertificate,
576 ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
577 debug!(round = %certificate.round, "Submitting block for confirmation");
578 let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
579 let finalize_action = CommunicateAction::FinalizeBlock {
580 certificate: Box::new(certificate),
581 delivery: self.options.cross_chain_message_delivery,
582 };
583 let certificate = self
584 .communicate_chain_action(committee, finalize_action, hashed_value)
585 .await?;
586 self.receive_certificate(certificate.clone(), ReceiveCertificateMode::AlreadyChecked)
587 .await?;
588 Ok(certificate)
589 }
590
591 #[instrument(level = "trace", skip_all)]
593 async fn submit_block_proposal<T: ProcessableCertificate>(
594 &self,
595 committee: &Committee,
596 proposal: Box<BlockProposal>,
597 value: T,
598 ) -> Result<GenericCertificate<T>, ChainClientError> {
599 debug!(
600 round = %proposal.content.round,
601 "Submitting block proposal to validators"
602 );
603 let submit_action = CommunicateAction::SubmitBlock {
604 proposal,
605 blob_ids: value.required_blob_ids().into_iter().collect(),
606 };
607 let certificate = self
608 .communicate_chain_action(committee, submit_action, value)
609 .await?;
610 self.process_certificate(Box::new(certificate.clone()))
611 .await?;
612 Ok(certificate)
613 }
614
615 #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
617 async fn communicate_chain_updates(
618 &self,
619 committee: &Committee,
620 chain_id: ChainId,
621 height: BlockHeight,
622 delivery: CrossChainMessageDelivery,
623 ) -> Result<(), ChainClientError> {
624 let nodes = self.make_nodes(committee)?;
625 communicate_with_quorum(
626 &nodes,
627 committee,
628 |_: &()| (),
629 |remote_node| {
630 let mut updater = ValidatorUpdater {
631 remote_node,
632 local_node: self.local_node.clone(),
633 admin_id: self.admin_id,
634 };
635 Box::pin(async move {
636 updater
637 .send_chain_information(chain_id, height, delivery)
638 .await
639 })
640 },
641 self.options.grace_period,
642 )
643 .await?;
644 Ok(())
645 }
646
647 #[instrument(level = "trace", skip_all)]
653 async fn communicate_chain_action<T: CertificateValue>(
654 &self,
655 committee: &Committee,
656 action: CommunicateAction,
657 value: T,
658 ) -> Result<GenericCertificate<T>, ChainClientError> {
659 let nodes = self.make_nodes(committee)?;
660 let ((votes_hash, votes_round), votes) = communicate_with_quorum(
661 &nodes,
662 committee,
663 |vote: &LiteVote| (vote.value.value_hash, vote.round),
664 |remote_node| {
665 let mut updater = ValidatorUpdater {
666 remote_node,
667 local_node: self.local_node.clone(),
668 admin_id: self.admin_id,
669 };
670 let action = action.clone();
671 Box::pin(async move { updater.send_chain_update(action).await })
672 },
673 self.options.grace_period,
674 )
675 .await?;
676 ensure!(
677 (votes_hash, votes_round) == (value.hash(), action.round()),
678 ChainClientError::UnexpectedQuorum {
679 hash: votes_hash,
680 round: votes_round,
681 expected_hash: value.hash(),
682 expected_round: action.round(),
683 }
684 );
685 let certificate = LiteCertificate::try_from_votes(votes)
690 .ok_or_else(|| {
691 ChainClientError::InternalError("Vote values or rounds don't match; this is a bug")
692 })?
693 .with_value(value)
694 .ok_or_else(|| {
695 ChainClientError::ProtocolError("A quorum voted for an unexpected value")
696 })?;
697 Ok(certificate)
698 }
699
700 #[instrument(level = "trace", skip_all)]
703 async fn receive_certificate_and_update_validators(
704 &self,
705 certificate: ConfirmedBlockCertificate,
706 mode: ReceiveCertificateMode,
707 ) -> Result<(), ChainClientError> {
708 let block_chain_id = certificate.block().header.chain_id;
709 let block_height = certificate.block().header.height;
710
711 self.receive_certificate(certificate, mode).await?;
712
713 let local_committee = self
716 .chain_info_with_committees(block_chain_id)
717 .await?
718 .into_current_committee()?;
719 self.communicate_chain_updates(
720 &local_committee,
721 block_chain_id,
722 block_height.try_add_one()?,
723 CrossChainMessageDelivery::Blocking,
724 )
725 .await?;
726 Ok(())
727 }
728
729 #[instrument(level = "trace", skip_all)]
732 async fn receive_certificate(
733 &self,
734 certificate: ConfirmedBlockCertificate,
735 mode: ReceiveCertificateMode,
736 ) -> Result<(), ChainClientError> {
737 let certificate = Box::new(certificate);
738 let block = certificate.block();
739
740 let (max_epoch, committees) = self.admin_committees().await?;
742 if let ReceiveCertificateMode::NeedsCheck = mode {
743 Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
744 }
745 self.download_certificates(block.header.chain_id, block.header.height)
747 .await?;
748 if let Err(err) = self.process_certificate(certificate.clone()).await {
751 match &err {
752 LocalNodeError::BlobsNotFound(blob_ids) => {
753 let blobs = RemoteNode::download_blobs(
754 blob_ids,
755 &self.validator_nodes().await?,
756 self.options.blob_download_timeout,
757 )
758 .await
759 .ok_or(err)?;
760 self.local_node.store_blobs(&blobs).await?;
761 self.process_certificate(certificate).await?;
762 }
763 _ => {
764 warn!("Failed to process network hashed certificate value");
766 return Err(err.into());
767 }
768 }
769 }
770
771 Ok(())
772 }
773
774 #[instrument(level = "trace", skip_all)]
776 #[allow(dead_code)] async fn receive_sender_certificate(
778 &self,
779 certificate: ConfirmedBlockCertificate,
780 mode: ReceiveCertificateMode,
781 nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
782 ) -> Result<(), ChainClientError> {
783 let certificate = Box::new(certificate);
784
785 let (max_epoch, committees) = self.admin_committees().await?;
787 if let ReceiveCertificateMode::NeedsCheck = mode {
788 Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
789 }
790 let nodes = if let Some(nodes) = nodes {
792 nodes
793 } else {
794 self.validator_nodes().await?
795 };
796 if let Err(err) = self.handle_certificate(certificate.clone()).await {
797 match &err {
798 LocalNodeError::BlobsNotFound(blob_ids) => {
799 let blobs = RemoteNode::download_blobs(
800 blob_ids,
801 &nodes,
802 self.options.blob_download_timeout,
803 )
804 .await
805 .ok_or(err)?;
806 self.local_node.store_blobs(&blobs).await?;
807 self.handle_certificate(certificate.clone()).await?;
808 }
809 _ => {
810 warn!("Failed to process network hashed certificate value");
812 return Err(err.into());
813 }
814 }
815 }
816
817 Ok(())
818 }
819
820 #[instrument(level = "trace", skip(self))]
823 async fn synchronize_received_certificates_from_validator(
824 &self,
825 chain_id: ChainId,
826 remote_node: &RemoteNode<Env::ValidatorNode>,
827 ) -> Result<ReceivedCertificatesFromValidator, ChainClientError> {
828 let mut tracker = self
829 .local_node
830 .chain_state_view(chain_id)
831 .await?
832 .received_certificate_trackers
833 .get()
834 .get(&remote_node.public_key)
835 .copied()
836 .unwrap_or(0);
837 let (max_epoch, committees) = self.admin_committees().await?;
838
839 let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(tracker);
841 let info = remote_node.handle_chain_info_query(query).await?;
842 let remote_log = info.requested_received_log;
843 let remote_heights = Self::heights_per_chain(&remote_log);
844
845 let local_next_heights = self
847 .local_node
848 .next_outbox_heights(remote_heights.keys(), chain_id)
849 .await?;
850
851 let mut downloaded_heights = BTreeMap::new();
853 let mut other_sender_chains = Vec::new();
856
857 let certificates = future::try_join_all(remote_heights.into_iter().filter_map(
858 |(sender_chain_id, remote_heights)| {
859 let local_next = *local_next_heights.get(&sender_chain_id)?;
860 if let Ok(height) = local_next.try_sub_one() {
861 downloaded_heights.insert(sender_chain_id, height);
862 }
863 let remote_heights = remote_heights
864 .into_iter()
865 .filter(|h| *h >= local_next)
866 .collect::<Vec<_>>();
867 if remote_heights.is_empty() {
868 other_sender_chains.push(sender_chain_id);
872 return None;
873 };
874 Some(async move {
875 let certificates = remote_node
876 .download_certificates_by_heights(sender_chain_id, remote_heights)
877 .await?;
878 Ok::<Vec<_>, ChainClientError>(certificates)
879 })
880 },
881 ))
882 .await?
883 .into_iter()
884 .flatten()
885 .collect::<Vec<_>>();
886
887 let mut certificates_by_height_by_chain = BTreeMap::new();
888
889 for confirmed_block_certificate in certificates {
891 let block_header = &confirmed_block_certificate.inner().block().header;
892 let sender_chain_id = block_header.chain_id;
893 let height = block_header.height;
894 let epoch = block_header.epoch;
895 match Self::check_certificate(max_epoch, &committees, &confirmed_block_certificate)? {
896 CheckCertificateResult::FutureEpoch => {
897 warn!(
898 "Postponing received certificate from {sender_chain_id:.8} at height \
899 {height} from future epoch {epoch}"
900 );
901 }
904 CheckCertificateResult::OldEpoch => {
905 warn!("Skipping received certificate from past epoch {epoch:?}");
910 }
911 CheckCertificateResult::New => {
912 certificates_by_height_by_chain
913 .entry(sender_chain_id)
914 .or_insert_with(BTreeMap::new)
915 .insert(height, confirmed_block_certificate);
916 }
917 }
918 }
919
920 for entry in remote_log {
922 if certificates_by_height_by_chain
923 .get(&entry.chain_id)
924 .is_some_and(|certs| certs.contains_key(&entry.height))
925 {
926 tracker += 1;
927 } else {
928 break;
929 }
930 }
931
932 for (sender_chain_id, certs) in &mut certificates_by_height_by_chain {
933 if certs
934 .values()
935 .any(|cert| !cert.block().recipients().contains(&chain_id))
936 {
937 warn!(
938 "Skipping received certificates from chain {sender_chain_id:.8}:
939 No messages for {chain_id:.8}."
940 );
941 certs.clear();
942 }
943 }
944
945 Ok(ReceivedCertificatesFromValidator {
946 public_key: remote_node.public_key,
947 tracker,
948 certificates: certificates_by_height_by_chain
949 .into_values()
950 .flat_map(BTreeMap::into_values)
951 .collect(),
952 other_sender_chains,
953 })
954 }
955
956 #[instrument(
957 level = "trace", skip_all,
958 fields(certificate_hash = ?incoming_certificate.hash()),
959 )]
960 fn check_certificate(
961 highest_known_epoch: Epoch,
962 committees: &BTreeMap<Epoch, Committee>,
963 incoming_certificate: &ConfirmedBlockCertificate,
964 ) -> Result<CheckCertificateResult, NodeError> {
965 let block = incoming_certificate.block();
966 if block.header.epoch > highest_known_epoch {
968 return Ok(CheckCertificateResult::FutureEpoch);
969 }
970 if let Some(known_committee) = committees.get(&block.header.epoch) {
971 incoming_certificate.check(known_committee)?;
974 Ok(CheckCertificateResult::New)
975 } else {
976 Ok(CheckCertificateResult::OldEpoch)
978 }
979 }
980
981 fn heights_per_chain(
984 remote_log: &[ChainAndHeight],
985 ) -> BTreeMap<ChainId, BTreeSet<BlockHeight>> {
986 remote_log.iter().fold(
987 BTreeMap::<ChainId, BTreeSet<_>>::new(),
988 |mut chain_to_info, entry| {
989 chain_to_info
990 .entry(entry.chain_id)
991 .or_default()
992 .insert(entry.height);
993 chain_to_info
994 },
995 )
996 }
997
998 #[instrument(level = "trace", skip_all)]
1000 async fn synchronize_chain_state(
1001 &self,
1002 chain_id: ChainId,
1003 ) -> Result<Box<ChainInfo>, ChainClientError> {
1004 let (_, committee) = self.admin_committee().await?;
1005 self.synchronize_chain_state_from_committee(chain_id, committee)
1006 .await
1007 }
1008
1009 #[instrument(level = "trace", skip_all)]
1012 pub async fn synchronize_chain_state_from_committee(
1013 &self,
1014 chain_id: ChainId,
1015 committee: Committee,
1016 ) -> Result<Box<ChainInfo>, ChainClientError> {
1017 #[cfg(with_metrics)]
1018 let _latency = metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency();
1019
1020 let validators = self.make_nodes(&committee)?;
1021 Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1022 communicate_with_quorum(
1023 &validators,
1024 &committee,
1025 |_: &()| (),
1026 |remote_node| async move {
1027 self.synchronize_chain_state_from(&remote_node, chain_id)
1028 .await
1029 },
1030 self.options.grace_period,
1031 )
1032 .await?;
1033
1034 self.local_node
1035 .chain_info(chain_id)
1036 .await
1037 .map_err(Into::into)
1038 }
1039
1040 #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1043 async fn synchronize_chain_state_from(
1044 &self,
1045 remote_node: &RemoteNode<Env::ValidatorNode>,
1046 chain_id: ChainId,
1047 ) -> Result<(), ChainClientError> {
1048 let mut local_info = self.local_node.chain_info(chain_id).await?;
1049 let query = ChainInfoQuery::new(chain_id).with_manager_values();
1050 let remote_info = remote_node.handle_chain_info_query(query).await?;
1051 if let Some(new_info) = self
1052 .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1053 .await?
1054 {
1055 local_info = new_info;
1056 };
1057
1058 if local_info.next_block_height != remote_info.next_block_height {
1060 debug!(
1061 "Synced from validator {}; but remote height is {} and local height is {}",
1062 remote_node.public_key, remote_info.next_block_height, local_info.next_block_height
1063 );
1064 return Ok(());
1065 };
1066
1067 if let Some(timeout) = remote_info.manager.timeout {
1068 self.handle_certificate(Box::new(*timeout)).await?;
1069 }
1070 let mut proposals = Vec::new();
1071 if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1072 proposals.push(*proposal);
1073 }
1074 if let Some(proposal) = remote_info.manager.requested_proposed {
1075 proposals.push(*proposal);
1076 }
1077 if let Some(locking) = remote_info.manager.requested_locking {
1078 match *locking {
1079 LockingBlock::Fast(proposal) => {
1080 proposals.push(proposal);
1081 }
1082 LockingBlock::Regular(cert) => {
1083 let hash = cert.hash();
1084 if let Err(err) = self.try_process_locking_block_from(remote_node, cert).await {
1085 debug!(
1086 "Skipping locked block {hash} from validator {} at height {}: {err}",
1087 remote_node.public_key, local_info.next_block_height,
1088 );
1089 }
1090 }
1091 }
1092 }
1093 'proposal_loop: for proposal in proposals {
1094 let owner: AccountOwner = proposal.owner();
1095 if let Err(mut err) = self
1096 .local_node
1097 .handle_block_proposal(proposal.clone())
1098 .await
1099 {
1100 if let LocalNodeError::BlobsNotFound(_) = &err {
1101 let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1102 if !required_blob_ids.is_empty() {
1103 let mut blobs = Vec::new();
1104 for blob_id in required_blob_ids {
1105 let blob_content = match remote_node
1106 .node
1107 .download_pending_blob(chain_id, blob_id)
1108 .await
1109 {
1110 Ok(content) => content,
1111 Err(err) => {
1112 warn!(
1113 "Skipping proposal from {owner} and validator {} at \
1114 height {}; failed to download {blob_id}: {err}",
1115 remote_node.public_key, local_info.next_block_height
1116 );
1117 continue 'proposal_loop;
1118 }
1119 };
1120 blobs.push(Blob::new(blob_content));
1121 }
1122 self.local_node
1123 .handle_pending_blobs(chain_id, blobs)
1124 .await?;
1125 if let Err(new_err) = self
1127 .local_node
1128 .handle_block_proposal(proposal.clone())
1129 .await
1130 {
1131 err = new_err;
1132 } else {
1133 continue;
1134 }
1135 }
1136 if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1137 self.update_local_node_with_blobs_from(
1138 blob_ids.clone(),
1139 &[remote_node.clone()],
1140 )
1141 .await?;
1142 if let Err(new_err) = self
1144 .local_node
1145 .handle_block_proposal(proposal.clone())
1146 .await
1147 {
1148 err = new_err;
1149 } else {
1150 continue;
1151 }
1152 }
1153 }
1154
1155 debug!(
1156 "Skipping proposal from {owner} and validator {} at height {}: {err}",
1157 remote_node.public_key, local_info.next_block_height
1158 );
1159 }
1160 }
1161 Ok(())
1162 }
1163
1164 async fn try_process_locking_block_from(
1165 &self,
1166 remote_node: &RemoteNode<Env::ValidatorNode>,
1167 certificate: GenericCertificate<ValidatedBlock>,
1168 ) -> Result<(), ChainClientError> {
1169 let chain_id = certificate.inner().chain_id();
1170 let certificate = Box::new(certificate);
1171 match self.process_certificate(certificate.clone()).await {
1172 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1173 let mut blobs = Vec::new();
1174 for blob_id in blob_ids {
1175 let blob_content = remote_node
1176 .node
1177 .download_pending_blob(chain_id, blob_id)
1178 .await?;
1179 blobs.push(Blob::new(blob_content));
1180 }
1181 self.local_node
1182 .handle_pending_blobs(chain_id, blobs)
1183 .await?;
1184 self.process_certificate(certificate).await?;
1185 Ok(())
1186 }
1187 Err(err) => Err(err.into()),
1188 Ok(()) => Ok(()),
1189 }
1190 }
1191
1192 async fn update_local_node_with_blobs_from(
1195 &self,
1196 blob_ids: Vec<BlobId>,
1197 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1198 ) -> Result<Vec<Blob>, ChainClientError> {
1199 let timeout = self.options.blob_download_timeout;
1200 future::try_join_all(blob_ids.into_iter().map(|blob_id| async move {
1201 let mut stream = remote_nodes
1202 .iter()
1203 .zip(0..)
1204 .map(|(remote_node, i)| async move {
1205 linera_base::time::timer::sleep(timeout * i * i).await;
1206 let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
1207 self.receive_sender_certificate(
1209 certificate,
1210 ReceiveCertificateMode::NeedsCheck,
1211 Some(vec![remote_node.clone()]),
1212 )
1213 .await?;
1214 let blob = self
1215 .local_node
1216 .storage_client()
1217 .read_blob(blob_id)
1218 .await?
1219 .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1220 Result::<_, ChainClientError>::Ok(blob)
1221 })
1222 .collect::<FuturesUnordered<_>>();
1223 while let Some(maybe_blob) = stream.next().await {
1224 if let Ok(blob) = maybe_blob {
1225 return Ok(blob);
1226 }
1227 }
1228 Err(LocalNodeError::BlobsNotFound(vec![blob_id]).into())
1229 }))
1230 .await
1231 }
1232
1233 async fn receive_certificates_for_blobs(
1236 &self,
1237 blob_ids: Vec<BlobId>,
1238 ) -> Result<(), ChainClientError> {
1239 let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1241 let validators = self.validator_nodes().await?;
1242
1243 let mut missing_blobs = Vec::new();
1244 for blob_id in blob_ids {
1245 let mut certificate_stream = validators
1246 .iter()
1247 .map(|remote_node| async move {
1248 let cert = remote_node.download_certificate_for_blob(blob_id).await?;
1249 Ok::<_, NodeError>((remote_node.clone(), cert))
1250 })
1251 .collect::<FuturesUnordered<_>>();
1252 loop {
1253 let Some(result) = certificate_stream.next().await else {
1254 missing_blobs.push(blob_id);
1255 break;
1256 };
1257 if let Ok((remote_node, cert)) = result {
1258 if self
1259 .receive_sender_certificate(
1260 cert,
1261 ReceiveCertificateMode::NeedsCheck,
1262 Some(vec![remote_node]),
1263 )
1264 .await
1265 .is_ok()
1266 {
1267 break;
1268 }
1269 }
1270 }
1271 }
1272
1273 if missing_blobs.is_empty() {
1274 Ok(())
1275 } else {
1276 Err(NodeError::BlobsNotFound(missing_blobs).into())
1277 }
1278 }
1279
1280 #[tracing::instrument(level = "trace", skip(self, block))]
1285 async fn stage_block_execution_and_discard_failing_messages(
1286 &self,
1287 mut block: ProposedBlock,
1288 round: Option<u32>,
1289 published_blobs: Vec<Blob>,
1290 ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1291 loop {
1292 let result = self
1293 .stage_block_execution(block.clone(), round, published_blobs.clone())
1294 .await;
1295 if let Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
1296 WorkerError::ChainError(chain_error),
1297 ))) = &result
1298 {
1299 if let ChainError::ExecutionError(
1300 error,
1301 ChainExecutionContext::IncomingBundle(index),
1302 ) = &**chain_error
1303 {
1304 let transaction = block
1305 .transactions
1306 .get_mut(*index as usize)
1307 .expect("Transaction at given index should exist");
1308 let Transaction::ReceiveMessages(message) = transaction else {
1309 panic!(
1310 "Expected incoming bundle at transaction index {}, found operation",
1311 index
1312 );
1313 };
1314 ensure!(
1315 !message.bundle.is_protected(),
1316 ChainClientError::BlockProposalError(
1317 "Protected incoming message failed to execute locally"
1318 )
1319 );
1320 info!(
1324 %error, origin = ?message.origin,
1325 "Message failed to execute locally and will be rejected."
1326 );
1327 message.action = MessageAction::Reject;
1328 continue;
1329 }
1330 }
1331 return result;
1332 }
1333 }
1334
1335 #[instrument(level = "trace", skip(self, block))]
1338 async fn stage_block_execution(
1339 &self,
1340 block: ProposedBlock,
1341 round: Option<u32>,
1342 published_blobs: Vec<Blob>,
1343 ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1344 loop {
1345 let result = self
1346 .local_node
1347 .stage_block_execution(block.clone(), round, published_blobs.clone())
1348 .await;
1349 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1350 self.receive_certificates_for_blobs(blob_ids.clone())
1351 .await?;
1352 continue; }
1354 return Ok(result?);
1355 }
1356 }
1357}
1358
1359#[derive(Clone, Debug)]
1361pub struct MessagePolicy {
1362 blanket: BlanketMessagePolicy,
1364 restrict_chain_ids_to: Option<HashSet<ChainId>>,
1368}
1369
1370#[derive(Copy, Clone, Debug, clap::ValueEnum)]
1371pub enum BlanketMessagePolicy {
1372 Accept,
1374 Reject,
1377 Ignore,
1380}
1381
1382impl MessagePolicy {
1383 pub fn new(
1384 blanket: BlanketMessagePolicy,
1385 restrict_chain_ids_to: Option<HashSet<ChainId>>,
1386 ) -> Self {
1387 Self {
1388 blanket,
1389 restrict_chain_ids_to,
1390 }
1391 }
1392
1393 #[cfg(with_testing)]
1394 pub fn new_accept_all() -> Self {
1395 Self {
1396 blanket: BlanketMessagePolicy::Accept,
1397 restrict_chain_ids_to: None,
1398 }
1399 }
1400
1401 #[instrument(level = "trace", skip(self))]
1402 fn must_handle(&self, bundle: &mut IncomingBundle) -> bool {
1403 if self.is_reject() {
1404 if bundle.bundle.is_skippable() {
1405 return false;
1406 } else if !bundle.bundle.is_protected() {
1407 bundle.action = MessageAction::Reject;
1408 }
1409 }
1410 match &self.restrict_chain_ids_to {
1411 None => true,
1412 Some(chains) => chains.contains(&bundle.origin),
1413 }
1414 }
1415
1416 #[instrument(level = "trace", skip(self))]
1417 fn is_ignore(&self) -> bool {
1418 matches!(self.blanket, BlanketMessagePolicy::Ignore)
1419 }
1420
1421 #[instrument(level = "trace", skip(self))]
1422 fn is_reject(&self) -> bool {
1423 matches!(self.blanket, BlanketMessagePolicy::Reject)
1424 }
1425}
1426
1427#[derive(Debug, Clone, Copy)]
1428pub enum TimingType {
1429 ExecuteOperations,
1430 ExecuteBlock,
1431 SubmitBlockProposal,
1432 UpdateValidators,
1433}
1434
1435#[derive(Debug, Clone)]
1436pub struct ChainClientOptions {
1437 pub max_pending_message_bundles: usize,
1439 pub message_policy: MessagePolicy,
1441 pub cross_chain_message_delivery: CrossChainMessageDelivery,
1443 pub grace_period: f64,
1446 pub blob_download_timeout: Duration,
1448}
1449
1450#[cfg(with_testing)]
1451impl ChainClientOptions {
1452 pub fn test_default() -> Self {
1453 use crate::DEFAULT_GRACE_PERIOD;
1454
1455 ChainClientOptions {
1456 max_pending_message_bundles: 10,
1457 message_policy: MessagePolicy::new_accept_all(),
1458 cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
1459 grace_period: DEFAULT_GRACE_PERIOD,
1460 blob_download_timeout: Duration::from_secs(1),
1461 }
1462 }
1463}
1464
1465#[derive(Debug)]
1471pub struct ChainClient<Env: Environment> {
1472 #[debug(skip)]
1474 client: Arc<Client<Env>>,
1475 chain_id: ChainId,
1477 #[debug(skip)]
1479 options: ChainClientOptions,
1480 preferred_owner: Option<AccountOwner>,
1483 initial_next_block_height: BlockHeight,
1485 initial_block_hash: Option<CryptoHash>,
1487 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
1489}
1490
1491impl<Env: Environment> Clone for ChainClient<Env> {
1492 fn clone(&self) -> Self {
1493 Self {
1494 client: self.client.clone(),
1495 chain_id: self.chain_id,
1496 options: self.options.clone(),
1497 preferred_owner: self.preferred_owner,
1498 initial_next_block_height: self.initial_next_block_height,
1499 initial_block_hash: self.initial_block_hash,
1500 timing_sender: self.timing_sender.clone(),
1501 }
1502 }
1503}
1504
1505#[derive(Debug, Error)]
1507pub enum ChainClientError {
1508 #[error("Local node operation failed: {0}")]
1509 LocalNodeError(#[from] LocalNodeError),
1510
1511 #[error("Remote node operation failed: {0}")]
1512 RemoteNodeError(#[from] NodeError),
1513
1514 #[error(transparent)]
1515 ArithmeticError(#[from] ArithmeticError),
1516
1517 #[error("Missing certificates: {0:?}")]
1518 ReadCertificatesError(Vec<CryptoHash>),
1519
1520 #[error("Missing confirmed block: {0:?}")]
1521 MissingConfirmedBlock(CryptoHash),
1522
1523 #[error("JSON (de)serialization error: {0}")]
1524 JsonError(#[from] serde_json::Error),
1525
1526 #[error("Chain operation failed: {0}")]
1527 ChainError(#[from] ChainError),
1528
1529 #[error(transparent)]
1530 CommunicationError(#[from] CommunicationError<NodeError>),
1531
1532 #[error("Internal error within chain client: {0}")]
1533 InternalError(&'static str),
1534
1535 #[error(
1536 "Cannot accept a certificate from an unknown committee in the future. \
1537 Please synchronize the local view of the admin chain"
1538 )]
1539 CommitteeSynchronizationError,
1540
1541 #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
1542 WalletSynchronizationError,
1543
1544 #[error("The state of the client is incompatible with the proposed block: {0}")]
1545 BlockProposalError(&'static str),
1546
1547 #[error(
1548 "Cannot accept a certificate from a committee that was retired. \
1549 Try a newer certificate from the same origin"
1550 )]
1551 CommitteeDeprecationError,
1552
1553 #[error("Protocol error within chain client: {0}")]
1554 ProtocolError(&'static str),
1555
1556 #[error("Signer doesn't have key to sign for chain {0}")]
1557 CannotFindKeyForChain(ChainId),
1558
1559 #[error("client is not configured to propose on chain {0}")]
1560 NoAccountKeyConfigured(ChainId),
1561
1562 #[error("The chain client isn't owner on chain {0}")]
1563 NotAnOwner(ChainId),
1564
1565 #[error(transparent)]
1566 ViewError(#[from] ViewError),
1567
1568 #[error(
1569 "Failed to download certificates and update local node to the next height \
1570 {target_next_block_height} of chain {chain_id:?}"
1571 )]
1572 CannotDownloadCertificates {
1573 chain_id: ChainId,
1574 target_next_block_height: BlockHeight,
1575 },
1576
1577 #[error(transparent)]
1578 BcsError(#[from] bcs::Error),
1579
1580 #[error(
1581 "Unexpected quorum: validators voted for block {hash} in {round}, \
1582 expected block {expected_hash} in {expected_round}"
1583 )]
1584 UnexpectedQuorum {
1585 hash: CryptoHash,
1586 round: Round,
1587 expected_hash: CryptoHash,
1588 expected_round: Round,
1589 },
1590
1591 #[error("signer error: {0:?}")]
1592 Signer(#[source] Box<dyn signer::Error>),
1593
1594 #[error("Cannot revoke the current epoch {0}")]
1595 CannotRevokeCurrentEpoch(Epoch),
1596
1597 #[error("Epoch is already revoked")]
1598 EpochAlreadyRevoked,
1599}
1600
1601impl From<Infallible> for ChainClientError {
1602 fn from(infallible: Infallible) -> Self {
1603 match infallible {}
1604 }
1605}
1606
1607impl ChainClientError {
1608 pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
1609 Self::Signer(Box::new(err))
1610 }
1611}
1612
1613impl<Env: Environment> ChainClient<Env> {
1614 #[instrument(level = "trace", skip(self))]
1616 fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
1617 self.client
1618 .chains
1619 .pin()
1620 .get(&self.chain_id)
1621 .expect("Chain client constructed for invalid chain")
1622 .client_mutex()
1623 }
1624
1625 #[instrument(level = "trace", skip(self))]
1627 pub fn pending_proposal(&self) -> Option<PendingProposal> {
1628 self.client
1629 .chains
1630 .pin()
1631 .get(&self.chain_id)
1632 .expect("Chain client constructed for invalid chain")
1633 .pending_proposal()
1634 .clone()
1635 }
1636
1637 #[instrument(level = "trace", skip(self, f))]
1639 fn update_state<F>(&self, f: F)
1640 where
1641 F: Fn(&mut ChainClientState),
1642 {
1643 let chains = self.client.chains.pin();
1644 chains
1645 .update(self.chain_id, |state| {
1646 let mut state = state.clone_for_update_unchecked();
1647 f(&mut state);
1648 state
1649 })
1650 .expect("Chain client constructed for invalid chain");
1651 }
1652
1653 #[instrument(level = "trace", skip(self))]
1655 pub fn signer(&self) -> &impl Signer {
1656 self.client.signer()
1657 }
1658
1659 #[instrument(level = "trace", skip(self))]
1661 pub fn options_mut(&mut self) -> &mut ChainClientOptions {
1662 &mut self.options
1663 }
1664
1665 #[instrument(level = "trace", skip(self))]
1667 pub fn options(&self) -> &ChainClientOptions {
1668 &self.options
1669 }
1670
1671 #[instrument(level = "trace", skip(self))]
1673 pub fn chain_id(&self) -> ChainId {
1674 self.chain_id
1675 }
1676
1677 pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
1679 self.timing_sender.clone()
1680 }
1681
1682 #[instrument(level = "trace", skip(self))]
1684 pub fn admin_id(&self) -> ChainId {
1685 self.client.admin_id
1686 }
1687
1688 #[instrument(level = "trace", skip(self))]
1690 pub fn preferred_owner(&self) -> Option<AccountOwner> {
1691 self.preferred_owner
1692 }
1693
1694 #[instrument(level = "trace", skip(self))]
1696 pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
1697 self.preferred_owner = Some(preferred_owner);
1698 }
1699
1700 #[instrument(level = "trace", skip(self))]
1702 pub fn unset_preferred_owner(&mut self) {
1703 self.preferred_owner = None;
1704 }
1705
1706 #[instrument(level = "trace")]
1708 pub async fn chain_state_view(
1709 &self,
1710 ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
1711 self.client.local_node.chain_state_view(self.chain_id).await
1712 }
1713
1714 #[instrument(level = "trace", skip(self))]
1716 pub async fn event_stream_publishers(
1717 &self,
1718 ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
1719 let mut publishers = self
1720 .chain_state_view()
1721 .await?
1722 .execution_state
1723 .system
1724 .event_subscriptions
1725 .indices()
1726 .await?
1727 .into_iter()
1728 .fold(
1729 BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
1730 |mut map, (chain_id, stream_id)| {
1731 map.entry(chain_id).or_default().insert(stream_id);
1732 map
1733 },
1734 );
1735 if self.chain_id != self.client.admin_id {
1736 publishers.insert(
1737 self.client.admin_id,
1738 vec![
1739 StreamId::system(EPOCH_STREAM_NAME),
1740 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
1741 ]
1742 .into_iter()
1743 .collect(),
1744 );
1745 }
1746 Ok(publishers)
1747 }
1748
1749 #[instrument(level = "trace")]
1751 pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
1752 self.subscribe_to(self.chain_id)
1753 }
1754
1755 #[instrument(level = "trace")]
1757 pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
1758 Ok(Box::pin(UnboundedReceiverStream::new(
1759 self.client.notifier.subscribe(vec![chain_id]),
1760 )))
1761 }
1762
1763 #[instrument(level = "trace")]
1765 pub fn storage_client(&self) -> &Env::Storage {
1766 self.client.storage_client()
1767 }
1768
1769 #[instrument(level = "trace")]
1771 pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1772 let query = ChainInfoQuery::new(self.chain_id);
1773 let response = self
1774 .client
1775 .local_node
1776 .handle_chain_info_query(query)
1777 .await?;
1778 self.client.update_from_info(&response.info);
1779 Ok(response.info)
1780 }
1781
1782 #[instrument(level = "trace")]
1784 async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1785 let query = ChainInfoQuery::new(self.chain_id).with_manager_values();
1786 let response = self
1787 .client
1788 .local_node
1789 .handle_chain_info_query(query)
1790 .await?;
1791 self.client.update_from_info(&response.info);
1792 Ok(response.info)
1793 }
1794
1795 pub async fn get_chain_description(&self) -> Result<ChainDescription, ChainClientError> {
1797 self.client.get_chain_description(self.chain_id).await
1798 }
1799
1800 #[instrument(level = "trace")]
1803 async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, ChainClientError> {
1804 if self.options.message_policy.is_ignore() {
1805 return Ok(Vec::new());
1807 }
1808
1809 let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
1810 let info = self
1811 .client
1812 .local_node
1813 .handle_chain_info_query(query)
1814 .await?
1815 .info;
1816 {
1817 ensure!(
1818 self.has_other_owners(&info.manager.ownership)
1819 || info.next_block_height >= self.initial_next_block_height,
1820 ChainClientError::WalletSynchronizationError
1821 );
1822 }
1823
1824 Ok(info
1825 .requested_pending_message_bundles
1826 .into_iter()
1827 .filter_map(|mut bundle| {
1828 self.options
1829 .message_policy
1830 .must_handle(&mut bundle)
1831 .then_some(bundle)
1832 })
1833 .take(self.options.max_pending_message_bundles)
1834 .collect())
1835 }
1836
1837 #[instrument(level = "trace")]
1841 async fn collect_stream_updates(&self) -> Result<Option<Operation>, ChainClientError> {
1842 let subscription_map = self
1844 .chain_state_view()
1845 .await?
1846 .execution_state
1847 .system
1848 .event_subscriptions
1849 .index_values()
1850 .await?;
1851 let futures = subscription_map
1853 .into_iter()
1854 .map(|((chain_id, stream_id), subscriptions)| {
1855 let client = self.client.clone();
1856 async move {
1857 let chain = client.local_node.chain_state_view(chain_id).await?;
1858 if let Some(next_expected_index) = chain
1859 .next_expected_events
1860 .get(&stream_id)
1861 .await?
1862 .filter(|next_index| *next_index > subscriptions.next_index)
1863 {
1864 Ok(Some((chain_id, stream_id, next_expected_index)))
1865 } else {
1866 Ok::<_, ChainClientError>(None)
1867 }
1868 }
1869 });
1870 let updates = future::try_join_all(futures)
1871 .await?
1872 .into_iter()
1873 .flatten()
1874 .collect::<Vec<_>>();
1875 if updates.is_empty() {
1876 return Ok(None);
1877 }
1878 Ok(Some(SystemOperation::UpdateStreams(updates).into()))
1879 }
1880
1881 #[instrument(level = "trace")]
1882 async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1883 self.client.chain_info_with_committees(self.chain_id).await
1884 }
1885
1886 #[instrument(level = "trace")]
1888 async fn epoch_and_committees(
1889 &self,
1890 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
1891 let info = self.chain_info_with_committees().await?;
1892 let epoch = info.epoch;
1893 let committees = info.into_committees()?;
1894 Ok((epoch, committees))
1895 }
1896
1897 #[instrument(level = "trace")]
1899 pub async fn local_committee(&self) -> Result<Committee, ChainClientError> {
1900 let info = match self.chain_info_with_committees().await {
1901 Ok(info) => info,
1902 Err(LocalNodeError::BlobsNotFound(_)) => {
1903 self.synchronize_chain_state(self.chain_id).await?;
1904 self.chain_info_with_committees().await?
1905 }
1906 Err(err) => return Err(err.into()),
1907 };
1908 Ok(info.into_current_committee()?)
1909 }
1910
1911 #[instrument(level = "trace")]
1913 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
1914 self.client.admin_committee().await
1915 }
1916
1917 #[instrument(level = "trace")]
1921 pub async fn identity(&self) -> Result<AccountOwner, ChainClientError> {
1922 let Some(preferred_owner) = self.preferred_owner else {
1923 return Err(ChainClientError::NoAccountKeyConfigured(self.chain_id));
1924 };
1925 let manager = self.chain_info().await?.manager;
1926 ensure!(
1927 manager.ownership.is_active(),
1928 LocalNodeError::InactiveChain(self.chain_id)
1929 );
1930
1931 let is_owner = manager
1932 .ownership
1933 .all_owners()
1934 .chain(&manager.leader)
1935 .any(|owner| *owner == preferred_owner);
1936
1937 if !is_owner {
1938 let accepted_owners = manager
1939 .ownership
1940 .all_owners()
1941 .chain(&manager.leader)
1942 .collect::<Vec<_>>();
1943 warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
1944 "Chain has multiple owners configured but none is preferred owner",
1945 );
1946 return Err(ChainClientError::NotAnOwner(self.chain_id));
1947 }
1948
1949 let has_signer = self
1950 .signer()
1951 .contains_key(&preferred_owner)
1952 .await
1953 .map_err(ChainClientError::signer_failure)?;
1954
1955 if !has_signer {
1956 warn!(%self.chain_id, ?preferred_owner,
1957 "Chain is one of the owners but its Signer instance doesn't contain the key",
1958 );
1959 return Err(ChainClientError::CannotFindKeyForChain(self.chain_id));
1960 }
1961
1962 Ok(preferred_owner)
1963 }
1964
1965 #[instrument(level = "trace")]
1968 pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, ChainClientError> {
1969 #[cfg(with_metrics)]
1970 let _latency = metrics::PREPARE_CHAIN_LATENCY.measure_latency();
1971
1972 let mut info = self.synchronize_to_known_height().await?;
1973
1974 if self.has_other_owners(&info.manager.ownership) {
1975 info = self.client.synchronize_chain_state(self.chain_id).await?;
1979 }
1980
1981 if info.epoch > self.client.admin_committees().await?.0 {
1982 self.client
1983 .synchronize_chain_state(self.client.admin_id)
1984 .await?;
1985 }
1986
1987 let result = self
1988 .chain_state_view()
1989 .await?
1990 .validate_incoming_bundles()
1991 .await;
1992 if matches!(result, Err(ChainError::MissingCrossChainUpdate { .. })) {
1993 self.find_received_certificates().await?;
1994 }
1995 self.client.update_from_info(&info);
1996 Ok(info)
1997 }
1998
1999 async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2004 let info = self
2005 .client
2006 .download_certificates(self.chain_id, self.initial_next_block_height)
2007 .await?;
2008 if info.next_block_height == self.initial_next_block_height {
2009 ensure!(
2011 self.initial_block_hash == info.block_hash,
2012 ChainClientError::InternalError("Invalid chain of blocks in local node")
2013 );
2014 }
2015 Ok(info)
2016 }
2017
2018 #[instrument(level = "trace", skip(committee, operations))]
2022 pub async fn submit_fast_block_proposal(
2023 &self,
2024 committee: &Committee,
2025 operations: &[Operation],
2026 incoming_bundles: &[IncomingBundle],
2027 super_owner: AccountOwner,
2028 ) -> Result<(u64, u64, u64, u64), ChainClientError> {
2029 let creating_proposal_start = Instant::now();
2030 let info = self.chain_info().await?;
2031 let timestamp = self.next_timestamp(incoming_bundles, info.timestamp);
2032 let transactions = incoming_bundles
2033 .iter()
2034 .map(|bundle| Transaction::ReceiveMessages(bundle.clone()))
2035 .chain(
2036 operations
2037 .iter()
2038 .map(|operation| Transaction::ExecuteOperation(operation.clone())),
2039 )
2040 .collect::<Vec<_>>();
2041 let proposed_block = ProposedBlock {
2042 epoch: info.epoch,
2043 chain_id: self.chain_id,
2044 transactions,
2045 previous_block_hash: info.block_hash,
2046 height: info.next_block_height,
2047 authenticated_signer: Some(super_owner),
2048 timestamp,
2049 };
2050 let proposal = Box::new(
2051 BlockProposal::new_initial(
2052 super_owner,
2053 Round::Fast,
2054 proposed_block.clone(),
2055 self.signer(),
2056 )
2057 .await
2058 .map_err(ChainClientError::signer_failure)?,
2059 );
2060 let creating_proposal_ms = creating_proposal_start.elapsed().as_millis() as u64;
2061 let stage_block_execution_start = Instant::now();
2062 let block = self
2063 .client
2064 .local_node
2065 .stage_block_execution(proposed_block, None, Vec::new())
2066 .await?
2067 .0;
2068 let stage_block_execution_ms = stage_block_execution_start.elapsed().as_millis() as u64;
2069 let creating_confirmed_block_start = Instant::now();
2070 let value = ConfirmedBlock::new(block);
2071 let creating_confirmed_block_ms =
2072 creating_confirmed_block_start.elapsed().as_millis() as u64;
2073 let submitting_block_proposal_start = Instant::now();
2074 self.client
2075 .submit_block_proposal(committee, proposal, value)
2076 .await?;
2077 let submitting_block_proposal_ms =
2078 submitting_block_proposal_start.elapsed().as_millis() as u64;
2079 Ok((
2080 creating_proposal_ms,
2081 stage_block_execution_ms,
2082 creating_confirmed_block_ms,
2083 submitting_block_proposal_ms,
2084 ))
2085 }
2086
2087 #[instrument(level = "trace", skip(old_committee))]
2089 pub async fn update_validators(
2090 &self,
2091 old_committee: Option<&Committee>,
2092 ) -> Result<(), ChainClientError> {
2093 let update_validators_start = linera_base::time::Instant::now();
2094 if let Some(old_committee) = old_committee {
2096 self.communicate_chain_updates(old_committee).await?
2097 };
2098 if let Ok(new_committee) = self.local_committee().await {
2099 if Some(&new_committee) != old_committee {
2100 self.communicate_chain_updates(&new_committee).await?;
2103 }
2104 }
2105 self.send_timing(update_validators_start, TimingType::UpdateValidators);
2106 Ok(())
2107 }
2108
2109 #[instrument(level = "trace", skip(committee))]
2111 pub async fn communicate_chain_updates(
2112 &self,
2113 committee: &Committee,
2114 ) -> Result<(), ChainClientError> {
2115 let delivery = self.options.cross_chain_message_delivery;
2116 let height = self.chain_info().await?.next_block_height;
2117 self.client
2118 .communicate_chain_updates(committee, self.chain_id, height, delivery)
2119 .await
2120 }
2121
2122 #[tracing::instrument(level = "trace", skip(received_certificates_batches))]
2125 async fn receive_certificates_from_validators(
2126 &self,
2127 received_certificates_batches: Vec<ReceivedCertificatesFromValidator>,
2128 ) {
2129 let validator_count = received_certificates_batches.len();
2130 let mut other_sender_chains = BTreeSet::new();
2131 let mut certificates =
2132 BTreeMap::<ChainId, BTreeMap<BlockHeight, ConfirmedBlockCertificate>>::new();
2133 let mut new_trackers = BTreeMap::new();
2134 for response in received_certificates_batches {
2135 other_sender_chains.extend(response.other_sender_chains);
2136 new_trackers.insert(response.public_key, response.tracker);
2137 for certificate in response.certificates {
2138 certificates
2139 .entry(certificate.block().header.chain_id)
2140 .or_default()
2141 .insert(certificate.block().header.height, certificate);
2142 }
2143 }
2144 let certificate_count = certificates.values().map(BTreeMap::len).sum::<usize>();
2145
2146 tracing::info!(
2147 "Received {certificate_count} certificates from {validator_count} validator(s)."
2148 );
2149
2150 let stream = FuturesUnordered::from_iter(certificates.into_values().map(|certificates| {
2152 let client = self.client.clone();
2153 async move {
2154 for certificate in certificates.into_values() {
2155 let hash = certificate.hash();
2156 let mode = ReceiveCertificateMode::AlreadyChecked;
2157 if let Err(err) = client
2158 .receive_sender_certificate(certificate, mode, None)
2159 .await
2160 {
2161 error!("Received invalid certificate {hash}: {err}");
2162 }
2163 }
2164 }
2165 }));
2166 stream.for_each(future::ready).await;
2167
2168 let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
2172 let local_node = self.client.local_node.clone();
2173 async move {
2174 if let Err(error) = local_node
2175 .retry_pending_cross_chain_requests(chain_id)
2176 .await
2177 {
2178 error!("Failed to retry outgoing messages from {chain_id}: {error}");
2179 }
2180 }
2181 }));
2182 stream.for_each(future::ready).await;
2183
2184 if let Err(error) = self
2186 .client
2187 .local_node
2188 .update_received_certificate_trackers(self.chain_id, new_trackers)
2189 .await
2190 {
2191 error!(
2192 "Failed to update the certificate trackers for chain {:.8}: {error}",
2193 self.chain_id
2194 );
2195 }
2196 }
2197
2198 async fn synchronize_publisher_chains(&self) -> Result<(), ChainClientError> {
2201 let chain_ids = self
2202 .chain_state_view()
2203 .await?
2204 .execution_state
2205 .system
2206 .event_subscriptions
2207 .indices()
2208 .await?
2209 .iter()
2210 .map(|(chain_id, _)| *chain_id)
2211 .chain(iter::once(self.client.admin_id))
2212 .filter(|chain_id| *chain_id != self.chain_id)
2213 .collect::<BTreeSet<_>>();
2214 future::try_join_all(
2215 chain_ids
2216 .into_iter()
2217 .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
2218 )
2219 .await?;
2220 Ok(())
2221 }
2222
2223 #[instrument(level = "trace")]
2232 async fn find_received_certificates(&self) -> Result<(), ChainClientError> {
2233 #[cfg(with_metrics)]
2234 let _latency = metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
2235
2236 let chain_id = self.chain_id;
2238 let (_, committee) = self.admin_committee().await?;
2239 let nodes = self.client.make_nodes(&committee)?;
2240 let result = communicate_with_quorum(
2242 &nodes,
2243 &committee,
2244 |_| (),
2245 |remote_node| {
2246 let client = &self.client;
2247 Box::pin(async move {
2248 client
2249 .synchronize_received_certificates_from_validator(chain_id, &remote_node)
2250 .await
2251 })
2252 },
2253 self.options.grace_period,
2254 )
2255 .await;
2256 let received_certificate_batches = match result {
2257 Ok(((), received_certificate_batches)) => received_certificate_batches
2258 .into_iter()
2259 .map(|(_, batch)| batch)
2260 .collect(),
2261 Err(CommunicationError::Trusted(NodeError::InactiveChain(id))) if id == chain_id => {
2262 return Ok(());
2265 }
2266 Err(error) => {
2267 return Err(error.into());
2268 }
2269 };
2270 self.receive_certificates_from_validators(received_certificate_batches)
2271 .await;
2272 Ok(())
2273 }
2274
2275 #[instrument(level = "trace")]
2277 pub async fn transfer(
2278 &self,
2279 owner: AccountOwner,
2280 amount: Amount,
2281 recipient: Account,
2282 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2283 self.execute_operation(SystemOperation::Transfer {
2285 owner,
2286 recipient,
2287 amount,
2288 })
2289 .await
2290 }
2291
2292 #[instrument(level = "trace")]
2295 pub async fn read_data_blob(
2296 &self,
2297 hash: CryptoHash,
2298 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2299 let blob_id = BlobId {
2300 hash,
2301 blob_type: BlobType::Data,
2302 };
2303 self.execute_operation(SystemOperation::VerifyBlob { blob_id })
2304 .await
2305 }
2306
2307 #[instrument(level = "trace")]
2309 pub async fn claim(
2310 &self,
2311 owner: AccountOwner,
2312 target_id: ChainId,
2313 recipient: Account,
2314 amount: Amount,
2315 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2316 self.execute_operation(SystemOperation::Claim {
2317 owner,
2318 target_id,
2319 recipient,
2320 amount,
2321 })
2322 .await
2323 }
2324
2325 #[instrument(level = "trace")]
2328 pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, ChainClientError> {
2329 let chain_id = self.chain_id;
2330 let info = self.chain_info_with_committees().await?;
2331 let committee = info.current_committee()?;
2332 let height = info.next_block_height;
2333 let round = info.manager.current_round;
2334 let action = CommunicateAction::RequestTimeout {
2335 height,
2336 round,
2337 chain_id,
2338 };
2339 let value = Timeout::new(chain_id, height, info.epoch);
2340 let certificate = Box::new(
2341 self.client
2342 .communicate_chain_action(committee, action, value)
2343 .await?,
2344 );
2345 self.client.process_certificate(certificate.clone()).await?;
2346 self.client
2348 .communicate_chain_updates(
2349 committee,
2350 chain_id,
2351 height,
2352 CrossChainMessageDelivery::NonBlocking,
2353 )
2354 .await?;
2355 Ok(*certificate)
2356 }
2357
2358 #[instrument(level = "trace", skip_all)]
2360 pub async fn synchronize_chain_state(
2361 &self,
2362 chain_id: ChainId,
2363 ) -> Result<Box<ChainInfo>, ChainClientError> {
2364 self.client.synchronize_chain_state(chain_id).await
2365 }
2366
2367 #[instrument(level = "trace", skip_all)]
2370 pub async fn synchronize_chain_state_from_committee(
2371 &self,
2372 committee: Committee,
2373 ) -> Result<Box<ChainInfo>, ChainClientError> {
2374 self.client
2375 .synchronize_chain_state_from_committee(self.chain_id, committee)
2376 .await
2377 }
2378
2379 #[instrument(level = "trace", skip(operations, blobs))]
2381 pub async fn execute_operations(
2382 &self,
2383 operations: Vec<Operation>,
2384 blobs: Vec<Blob>,
2385 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2386 let timing_start = linera_base::time::Instant::now();
2387
2388 let result = loop {
2389 let execute_block_start = linera_base::time::Instant::now();
2390 match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
2392 Ok(ExecuteBlockOutcome::Executed(certificate)) => {
2393 self.send_timing(execute_block_start, TimingType::ExecuteBlock);
2394 break Ok(ClientOutcome::Committed(certificate));
2395 }
2396 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
2397 break Ok(ClientOutcome::WaitForTimeout(timeout));
2398 }
2399 Ok(ExecuteBlockOutcome::Conflict(certificate)) => {
2400 info!(
2401 height = %certificate.block().header.height,
2402 "Another block was committed; retrying."
2403 );
2404 }
2405 Err(ChainClientError::CommunicationError(CommunicationError::Trusted(
2406 NodeError::UnexpectedBlockHeight {
2407 expected_block_height,
2408 found_block_height,
2409 },
2410 ))) if expected_block_height > found_block_height => {
2411 tracing::info!(
2412 "Local state is outdated; synchronizing chain {:.8}",
2413 self.chain_id
2414 );
2415 self.synchronize_chain_state(self.chain_id).await?;
2416 }
2417 Err(err) => return Err(err),
2418 };
2419 };
2420
2421 self.send_timing(timing_start, TimingType::ExecuteOperations);
2422
2423 result
2424 }
2425
2426 pub async fn execute_operation(
2428 &self,
2429 operation: impl Into<Operation>,
2430 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2431 self.execute_operations(vec![operation.into()], vec![])
2432 .await
2433 }
2434
2435 #[instrument(level = "trace", skip(operations, blobs))]
2439 async fn execute_block(
2440 &self,
2441 operations: Vec<Operation>,
2442 blobs: Vec<Blob>,
2443 ) -> Result<ExecuteBlockOutcome, ChainClientError> {
2444 #[cfg(with_metrics)]
2445 let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
2446
2447 let mutex = self.client_mutex();
2448 let _guard = mutex.lock_owned().await;
2449 match self.process_pending_block_without_prepare().await? {
2451 ClientOutcome::Committed(Some(certificate)) => {
2452 return Ok(ExecuteBlockOutcome::Conflict(certificate))
2453 }
2454 ClientOutcome::WaitForTimeout(timeout) => {
2455 return Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
2456 }
2457 ClientOutcome::Committed(None) => {}
2458 }
2459
2460 let incoming_bundles = self.pending_message_bundles().await?;
2461 let identity = self.identity().await?;
2462 let confirmed_value = self
2463 .new_pending_block(incoming_bundles, operations, blobs, identity)
2464 .await?;
2465
2466 match self.process_pending_block_without_prepare().await? {
2467 ClientOutcome::Committed(Some(certificate))
2468 if certificate.block() == confirmed_value.block() =>
2469 {
2470 Ok(ExecuteBlockOutcome::Executed(certificate))
2471 }
2472 ClientOutcome::Committed(Some(certificate)) => {
2473 Ok(ExecuteBlockOutcome::Conflict(certificate))
2474 }
2475 ClientOutcome::Committed(None) => Err(ChainClientError::BlockProposalError(
2477 "Unexpected block proposal error",
2478 )),
2479 ClientOutcome::WaitForTimeout(timeout) => {
2480 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
2481 }
2482 }
2483 }
2484
2485 #[instrument(level = "trace", skip(incoming_bundles, operations, blobs))]
2489 async fn new_pending_block(
2490 &self,
2491 incoming_bundles: Vec<IncomingBundle>,
2492 operations: Vec<Operation>,
2493 blobs: Vec<Blob>,
2494 identity: AccountOwner,
2495 ) -> Result<ConfirmedBlock, ChainClientError> {
2496 ensure!(
2497 self.pending_proposal().is_none(),
2498 ChainClientError::BlockProposalError(
2499 "Client state already has a pending block; \
2500 use the `linera retry-pending-block` command to commit that first"
2501 )
2502 );
2503 let info = self.chain_info().await?;
2504 let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
2505 let transactions = incoming_bundles
2506 .into_iter()
2507 .map(Transaction::ReceiveMessages)
2508 .chain(operations.into_iter().map(Transaction::ExecuteOperation))
2509 .collect::<Vec<_>>();
2510 let proposed_block = ProposedBlock {
2511 epoch: info.epoch,
2512 chain_id: self.chain_id,
2513 transactions,
2514 previous_block_hash: info.block_hash,
2515 height: info.next_block_height,
2516 authenticated_signer: Some(identity),
2517 timestamp,
2518 };
2519
2520 let round = match Self::round_for_new_proposal(&info, &identity, true)? {
2525 Either::Left(round) => round.multi_leader(),
2526 Either::Right(_) => None,
2527 };
2528 let (block, _) = self
2531 .client
2532 .stage_block_execution_and_discard_failing_messages(
2533 proposed_block,
2534 round,
2535 blobs.clone(),
2536 )
2537 .await?;
2538 let (proposed_block, _) = block.clone().into_proposal();
2539 self.update_state(|state| {
2540 state.set_pending_proposal(proposed_block.clone(), blobs.clone())
2541 });
2542 Ok(ConfirmedBlock::new(block))
2543 }
2544
2545 #[instrument(level = "trace", skip(incoming_bundles))]
2550 fn next_timestamp(
2551 &self,
2552 incoming_bundles: &[IncomingBundle],
2553 block_time: Timestamp,
2554 ) -> Timestamp {
2555 let local_time = self.storage_client().clock().current_time();
2556 incoming_bundles
2557 .iter()
2558 .map(|msg| msg.bundle.timestamp)
2559 .max()
2560 .map_or(local_time, |timestamp| timestamp.max(local_time))
2561 .max(block_time)
2562 }
2563
2564 #[instrument(level = "trace", skip(query))]
2566 pub async fn query_application(&self, query: Query) -> Result<QueryOutcome, ChainClientError> {
2567 loop {
2568 let result = self
2569 .client
2570 .local_node
2571 .query_application(self.chain_id, query.clone())
2572 .await;
2573 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
2574 self.client
2575 .receive_certificates_for_blobs(blob_ids.clone())
2576 .await?;
2577 continue; }
2579 return Ok(result?);
2580 }
2581 }
2582
2583 #[instrument(level = "trace", skip(query))]
2585 pub async fn query_system_application(
2586 &self,
2587 query: SystemQuery,
2588 ) -> Result<QueryOutcome<SystemResponse>, ChainClientError> {
2589 let QueryOutcome {
2590 response,
2591 operations,
2592 } = self.query_application(Query::System(query)).await?;
2593 match response {
2594 QueryResponse::System(response) => Ok(QueryOutcome {
2595 response,
2596 operations,
2597 }),
2598 _ => Err(ChainClientError::InternalError(
2599 "Unexpected response for system query",
2600 )),
2601 }
2602 }
2603
2604 #[instrument(level = "trace", skip(application_id, query))]
2606 pub async fn query_user_application<A: Abi>(
2607 &self,
2608 application_id: ApplicationId<A>,
2609 query: &A::Query,
2610 ) -> Result<QueryOutcome<A::QueryResponse>, ChainClientError> {
2611 let query = Query::user(application_id, query)?;
2612 let QueryOutcome {
2613 response,
2614 operations,
2615 } = self.query_application(query).await?;
2616 match response {
2617 QueryResponse::User(response_bytes) => {
2618 let response = serde_json::from_slice(&response_bytes)?;
2619 Ok(QueryOutcome {
2620 response,
2621 operations,
2622 })
2623 }
2624 _ => Err(ChainClientError::InternalError(
2625 "Unexpected response for user query",
2626 )),
2627 }
2628 }
2629
2630 #[instrument(level = "trace")]
2637 pub async fn query_balance(&self) -> Result<Amount, ChainClientError> {
2638 let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
2639 Ok(balance)
2640 }
2641
2642 #[instrument(level = "trace", skip(owner))]
2649 pub async fn query_owner_balance(
2650 &self,
2651 owner: AccountOwner,
2652 ) -> Result<Amount, ChainClientError> {
2653 if owner.is_chain() {
2654 self.query_balance().await
2655 } else {
2656 Ok(self
2657 .query_balances_with_owner(owner)
2658 .await?
2659 .1
2660 .unwrap_or(Amount::ZERO))
2661 }
2662 }
2663
2664 #[instrument(level = "trace", skip(owner))]
2671 async fn query_balances_with_owner(
2672 &self,
2673 owner: AccountOwner,
2674 ) -> Result<(Amount, Option<Amount>), ChainClientError> {
2675 let incoming_bundles = self.pending_message_bundles().await?;
2676 if incoming_bundles.is_empty() {
2679 let chain_balance = self.local_balance().await?;
2680 let owner_balance = self.local_owner_balance(owner).await?;
2681 return Ok((chain_balance, Some(owner_balance)));
2682 }
2683 let info = self.chain_info().await?;
2684 let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
2685 let transactions = incoming_bundles
2686 .into_iter()
2687 .map(Transaction::ReceiveMessages)
2688 .collect::<Vec<_>>();
2689 let block = ProposedBlock {
2690 epoch: info.epoch,
2691 chain_id: self.chain_id,
2692 transactions,
2693 previous_block_hash: info.block_hash,
2694 height: info.next_block_height,
2695 authenticated_signer: if owner == AccountOwner::CHAIN {
2696 None
2697 } else {
2698 Some(owner)
2699 },
2700 timestamp,
2701 };
2702 match self
2703 .client
2704 .stage_block_execution_and_discard_failing_messages(block, None, Vec::new())
2705 .await
2706 {
2707 Ok((_, response)) => Ok((
2708 response.info.chain_balance,
2709 response.info.requested_owner_balance,
2710 )),
2711 Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
2712 WorkerError::ChainError(error),
2713 ))) if matches!(
2714 &*error,
2715 ChainError::ExecutionError(
2716 execution_error,
2717 ChainExecutionContext::Block
2718 ) if matches!(
2719 **execution_error,
2720 ExecutionError::FeesExceedFunding { .. }
2721 )
2722 ) =>
2723 {
2724 Ok((Amount::ZERO, Some(Amount::ZERO)))
2726 }
2727 Err(error) => Err(error),
2728 }
2729 }
2730
2731 #[instrument(level = "trace")]
2735 pub async fn local_balance(&self) -> Result<Amount, ChainClientError> {
2736 let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
2737 Ok(balance)
2738 }
2739
2740 #[instrument(level = "trace", skip(owner))]
2744 pub async fn local_owner_balance(
2745 &self,
2746 owner: AccountOwner,
2747 ) -> Result<Amount, ChainClientError> {
2748 if owner.is_chain() {
2749 self.local_balance().await
2750 } else {
2751 Ok(self
2752 .local_balances_with_owner(owner)
2753 .await?
2754 .1
2755 .unwrap_or(Amount::ZERO))
2756 }
2757 }
2758
2759 #[instrument(level = "trace", skip(owner))]
2763 async fn local_balances_with_owner(
2764 &self,
2765 owner: AccountOwner,
2766 ) -> Result<(Amount, Option<Amount>), ChainClientError> {
2767 ensure!(
2768 self.chain_info().await?.next_block_height >= self.initial_next_block_height,
2769 ChainClientError::WalletSynchronizationError
2770 );
2771 let mut query = ChainInfoQuery::new(self.chain_id);
2772 query.request_owner_balance = owner;
2773 let response = self
2774 .client
2775 .local_node
2776 .handle_chain_info_query(query)
2777 .await?;
2778 Ok((
2779 response.info.chain_balance,
2780 response.info.requested_owner_balance,
2781 ))
2782 }
2783
2784 #[instrument(level = "trace")]
2786 pub async fn transfer_to_account(
2787 &self,
2788 from: AccountOwner,
2789 amount: Amount,
2790 account: Account,
2791 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2792 self.transfer(from, amount, account).await
2793 }
2794
2795 #[cfg(with_testing)]
2797 #[instrument(level = "trace")]
2798 pub async fn burn(
2799 &self,
2800 owner: AccountOwner,
2801 amount: Amount,
2802 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2803 let recipient = Account::burn_address(self.chain_id);
2804 self.transfer(owner, amount, recipient).await
2805 }
2806
2807 #[instrument(level = "trace")]
2813 pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2814 let info = self.prepare_chain().await?;
2815 self.synchronize_publisher_chains().await?;
2816 self.find_received_certificates().await?;
2817 Ok(info)
2818 }
2819
2820 #[instrument(level = "trace")]
2822 pub async fn process_pending_block(
2823 &self,
2824 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2825 self.synchronize_from_validators().await?;
2826 self.process_pending_block_without_prepare().await
2827 }
2828
2829 #[instrument(level = "trace")]
2831 async fn process_pending_block_without_prepare(
2832 &self,
2833 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2834 let info = self.request_leader_timeout_if_needed().await?;
2835
2836 if info.manager.has_locking_block_in_current_round()
2838 && !info.manager.current_round.is_fast()
2839 {
2840 return self.finalize_locking_block(info).await;
2841 }
2842 let owner = self.identity().await?;
2843
2844 let local_node = &self.client.local_node;
2845 let pending_proposal = self.pending_proposal();
2847 let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
2848 match &**locking {
2849 LockingBlock::Regular(certificate) => {
2850 let blob_ids = certificate.block().required_blob_ids();
2851 let blobs = local_node
2852 .get_locking_blobs(&blob_ids, self.chain_id)
2853 .await?
2854 .ok_or_else(|| {
2855 ChainClientError::InternalError("Missing local locking blobs")
2856 })?;
2857 debug!("Retrying locking block from round {}", certificate.round);
2858 (certificate.block().clone(), blobs)
2859 }
2860 LockingBlock::Fast(proposal) => {
2861 let proposed_block = proposal.content.block.clone();
2862 let blob_ids = proposed_block.published_blob_ids();
2863 let blobs = local_node
2864 .get_locking_blobs(&blob_ids, self.chain_id)
2865 .await?
2866 .ok_or_else(|| {
2867 ChainClientError::InternalError("Missing local locking blobs")
2868 })?;
2869 let block = self
2870 .client
2871 .stage_block_execution(proposed_block, None, blobs.clone())
2872 .await?
2873 .0;
2874 debug!("Retrying locking block from fast round.");
2875 (block, blobs)
2876 }
2877 }
2878 } else if let Some(pending_proposal) = pending_proposal {
2879 let proposed_block = pending_proposal.block;
2883 let round = match Self::round_for_new_proposal(&info, &owner, true)? {
2884 Either::Left(round) => round.multi_leader(),
2885 Either::Right(_) => None,
2886 };
2887 let (block, _) = self
2888 .client
2889 .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
2890 .await?;
2891 debug!("Proposing the local pending block.");
2892 (block, pending_proposal.blobs)
2893 } else {
2894 return Ok(ClientOutcome::Committed(None)); };
2896
2897 let has_oracle_responses = block.has_oracle_responses();
2898 let (proposed_block, outcome) = block.into_proposal();
2899 let round = match Self::round_for_new_proposal(&info, &owner, has_oracle_responses)? {
2900 Either::Left(round) => round,
2901 Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
2902 };
2903 debug!("Proposing block for round {}", round);
2904
2905 let already_handled_locally = info
2906 .manager
2907 .already_handled_proposal(round, &proposed_block);
2908 let proposal = if let Some(locking) = info.manager.requested_locking {
2910 Box::new(match *locking {
2911 LockingBlock::Regular(cert) => {
2912 BlockProposal::new_retry_regular(owner, round, cert, self.signer())
2913 .await
2914 .map_err(ChainClientError::signer_failure)?
2915 }
2916 LockingBlock::Fast(proposal) => {
2917 BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
2918 .await
2919 .map_err(ChainClientError::signer_failure)?
2920 }
2921 })
2922 } else {
2923 Box::new(
2924 BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
2925 .await
2926 .map_err(ChainClientError::signer_failure)?,
2927 )
2928 };
2929 if !already_handled_locally {
2930 if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
2932 match err {
2933 LocalNodeError::BlobsNotFound(_) => {
2934 local_node
2935 .handle_pending_blobs(self.chain_id, blobs)
2936 .await?;
2937 local_node.handle_block_proposal(*proposal.clone()).await?;
2938 }
2939 err => return Err(err.into()),
2940 }
2941 }
2942 }
2943 let committee = self.local_committee().await?;
2944 let block = Block::new(proposed_block, outcome);
2945 let submit_block_proposal_start = linera_base::time::Instant::now();
2947 let certificate = if round.is_fast() {
2948 let hashed_value = ConfirmedBlock::new(block);
2949 self.client
2950 .submit_block_proposal(&committee, proposal, hashed_value)
2951 .await?
2952 } else {
2953 let hashed_value = ValidatedBlock::new(block);
2954 let certificate = self
2955 .client
2956 .submit_block_proposal(&committee, proposal, hashed_value.clone())
2957 .await?;
2958 self.client.finalize_block(&committee, certificate).await?
2959 };
2960 self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
2961 debug!(round = %certificate.round, "Sending confirmed block to validators");
2962 self.update_validators(Some(&committee)).await?;
2963 Ok(ClientOutcome::Committed(Some(certificate)))
2964 }
2965
2966 fn send_timing(&self, start: Instant, timing_type: TimingType) {
2967 let Some(sender) = &self.timing_sender else {
2968 return;
2969 };
2970 if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
2971 tracing::warn!(%err, "Failed to send timing info");
2972 }
2973 }
2974
2975 async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2978 let mut info = self.chain_info_with_manager_values().await?;
2979 if let Some(round_timeout) = info.manager.round_timeout {
2982 if round_timeout <= self.storage_client().clock().current_time() {
2983 self.request_leader_timeout().await?;
2984 info = self.chain_info_with_manager_values().await?;
2985 }
2986 }
2987 Ok(info)
2988 }
2989
2990 async fn finalize_locking_block(
2994 &self,
2995 info: Box<ChainInfo>,
2996 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2997 let locking = info
2998 .manager
2999 .requested_locking
3000 .expect("Should have a locking block");
3001 let LockingBlock::Regular(certificate) = *locking else {
3002 panic!("Should have a locking validated block");
3003 };
3004 debug!(
3005 round = %certificate.round,
3006 "Finalizing locking block"
3007 );
3008 let committee = self.local_committee().await?;
3009 match self
3010 .client
3011 .finalize_block(&committee, certificate.clone())
3012 .await
3013 {
3014 Ok(certificate) => {
3015 self.update_validators(Some(&committee)).await?;
3016 Ok(ClientOutcome::Committed(Some(certificate)))
3017 }
3018 Err(ChainClientError::CommunicationError(error)) => {
3019 let timestamp = info.manager.round_timeout.ok_or(error)?;
3022 Ok(ClientOutcome::WaitForTimeout(RoundTimeout {
3023 timestamp,
3024 current_round: info.manager.current_round,
3025 next_block_height: info.next_block_height,
3026 }))
3027 }
3028 Err(error) => Err(error),
3029 }
3030 }
3031
3032 fn round_for_new_proposal(
3034 info: &ChainInfo,
3035 identity: &AccountOwner,
3036 has_oracle_responses: bool,
3037 ) -> Result<Either<Round, RoundTimeout>, ChainClientError> {
3038 let manager = &info.manager;
3039 let conflict = manager
3043 .requested_signed_proposal
3044 .as_ref()
3045 .into_iter()
3046 .chain(&manager.requested_proposed)
3047 .any(|proposal| proposal.content.round == manager.current_round)
3048 || (manager.current_round.is_fast() && has_oracle_responses);
3049 let round = if !conflict {
3050 manager.current_round
3051 } else if let Some(round) = manager
3052 .ownership
3053 .next_round(manager.current_round)
3054 .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
3055 {
3056 round
3057 } else if let Some(timeout) = info.round_timeout() {
3058 return Ok(Either::Right(timeout));
3059 } else {
3060 return Err(ChainClientError::BlockProposalError(
3061 "Conflicting proposal in the current round",
3062 ));
3063 };
3064 if manager.can_propose(identity, round) {
3065 return Ok(Either::Left(round));
3066 }
3067 if let Some(timeout) = info.round_timeout() {
3068 return Ok(Either::Right(timeout));
3069 }
3070 Err(ChainClientError::BlockProposalError(
3071 "Not a leader in the current round",
3072 ))
3073 }
3074
3075 #[cfg(with_testing)]
3077 #[instrument(level = "trace")]
3078 pub fn clear_pending_proposal(&self) {
3079 self.update_state(|state| state.clear_pending_proposal());
3080 }
3081
3082 #[instrument(
3084 level = "trace",
3085 skip(certificate),
3086 fields(certificate_hash = ?certificate.hash()),
3087 )]
3088 pub async fn receive_certificate_and_update_validators(
3089 &self,
3090 certificate: ConfirmedBlockCertificate,
3091 ) -> Result<(), ChainClientError> {
3092 self.client
3093 .receive_certificate_and_update_validators(
3094 certificate,
3095 ReceiveCertificateMode::NeedsCheck,
3096 )
3097 .await
3098 }
3099
3100 #[instrument(level = "trace")]
3104 pub async fn rotate_key_pair(
3105 &self,
3106 public_key: AccountPublicKey,
3107 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3108 self.transfer_ownership(public_key.into()).await
3109 }
3110
3111 #[instrument(level = "trace")]
3113 pub async fn transfer_ownership(
3114 &self,
3115 new_owner: AccountOwner,
3116 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3117 self.execute_operation(SystemOperation::ChangeOwnership {
3118 super_owners: vec![new_owner],
3119 owners: Vec::new(),
3120 multi_leader_rounds: 2,
3121 open_multi_leader_rounds: false,
3122 timeout_config: TimeoutConfig::default(),
3123 })
3124 .await
3125 }
3126
3127 #[instrument(level = "trace")]
3129 pub async fn share_ownership(
3130 &self,
3131 new_owner: AccountOwner,
3132 new_weight: u64,
3133 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3134 loop {
3135 let ownership = self.prepare_chain().await?.manager.ownership;
3136 ensure!(
3137 ownership.is_active(),
3138 ChainError::InactiveChain(self.chain_id)
3139 );
3140 let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
3141 owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
3142 owners.push((new_owner, new_weight));
3143 let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
3144 super_owners: Vec::new(),
3145 owners,
3146 multi_leader_rounds: ownership.multi_leader_rounds,
3147 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3148 timeout_config: ownership.timeout_config,
3149 })];
3150 match self.execute_block(operations, vec![]).await? {
3151 ExecuteBlockOutcome::Executed(certificate) => {
3152 return Ok(ClientOutcome::Committed(certificate));
3153 }
3154 ExecuteBlockOutcome::Conflict(certificate) => {
3155 info!(
3156 height = %certificate.block().header.height,
3157 "Another block was committed; retrying."
3158 );
3159 }
3160 ExecuteBlockOutcome::WaitForTimeout(timeout) => {
3161 return Ok(ClientOutcome::WaitForTimeout(timeout));
3162 }
3163 };
3164 }
3165 }
3166
3167 #[instrument(level = "trace")]
3170 pub async fn change_ownership(
3171 &self,
3172 ownership: ChainOwnership,
3173 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3174 self.execute_operation(SystemOperation::ChangeOwnership {
3175 super_owners: ownership.super_owners.into_iter().collect(),
3176 owners: ownership.owners.into_iter().collect(),
3177 multi_leader_rounds: ownership.multi_leader_rounds,
3178 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3179 timeout_config: ownership.timeout_config.clone(),
3180 })
3181 .await
3182 }
3183
3184 #[instrument(level = "trace", skip(application_permissions))]
3186 pub async fn change_application_permissions(
3187 &self,
3188 application_permissions: ApplicationPermissions,
3189 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3190 self.execute_operation(SystemOperation::ChangeApplicationPermissions(
3191 application_permissions,
3192 ))
3193 .await
3194 }
3195
3196 #[instrument(level = "trace", skip(self))]
3198 pub async fn open_chain(
3199 &self,
3200 ownership: ChainOwnership,
3201 application_permissions: ApplicationPermissions,
3202 balance: Amount,
3203 ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, ChainClientError>
3204 {
3205 loop {
3206 let config = OpenChainConfig {
3207 ownership: ownership.clone(),
3208 balance,
3209 application_permissions: application_permissions.clone(),
3210 };
3211 let operation = Operation::system(SystemOperation::OpenChain(config));
3212 let certificate = match self.execute_block(vec![operation], vec![]).await? {
3213 ExecuteBlockOutcome::Executed(certificate) => certificate,
3214 ExecuteBlockOutcome::Conflict(_) => continue,
3215 ExecuteBlockOutcome::WaitForTimeout(timeout) => {
3216 return Ok(ClientOutcome::WaitForTimeout(timeout));
3217 }
3218 };
3219 let chain_blob = certificate
3221 .block()
3222 .body
3223 .blobs
3224 .last()
3225 .and_then(|blobs| blobs.last())
3226 .ok_or_else(|| ChainClientError::InternalError("Failed to create a new chain"))?;
3227 let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
3228 self.client.track_chain(description.id());
3230 self.client
3231 .local_node
3232 .retry_pending_cross_chain_requests(self.chain_id)
3233 .await?;
3234 return Ok(ClientOutcome::Committed((description, certificate)));
3235 }
3236 }
3237
3238 #[instrument(level = "trace")]
3241 pub async fn close_chain(
3242 &self,
3243 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3244 match self.execute_operation(SystemOperation::CloseChain).await {
3245 Ok(outcome) => Ok(outcome.map(Some)),
3246 Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3247 WorkerError::ChainError(chain_error),
3248 ))) if matches!(*chain_error, ChainError::ClosedChain) => {
3249 Ok(ClientOutcome::Committed(None)) }
3251 Err(error) => Err(error),
3252 }
3253 }
3254
3255 #[cfg(not(target_arch = "wasm32"))]
3257 #[instrument(level = "trace", skip(contract, service))]
3258 pub async fn publish_module(
3259 &self,
3260 contract: Bytecode,
3261 service: Bytecode,
3262 vm_runtime: VmRuntime,
3263 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3264 let (blobs, module_id) = create_bytecode_blobs(contract, service, vm_runtime).await;
3265 self.publish_module_blobs(blobs, module_id).await
3266 }
3267
3268 #[cfg(not(target_arch = "wasm32"))]
3270 #[instrument(level = "trace", skip(blobs, module_id))]
3271 pub async fn publish_module_blobs(
3272 &self,
3273 blobs: Vec<Blob>,
3274 module_id: ModuleId,
3275 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3276 self.execute_operations(
3277 vec![Operation::system(SystemOperation::PublishModule {
3278 module_id,
3279 })],
3280 blobs,
3281 )
3282 .await?
3283 .try_map(|certificate| Ok((module_id, certificate)))
3284 }
3285
3286 #[instrument(level = "trace", skip(bytes))]
3288 pub async fn publish_data_blobs(
3289 &self,
3290 bytes: Vec<Vec<u8>>,
3291 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3292 let blobs = bytes.into_iter().map(Blob::new_data);
3293 let publish_blob_operations = blobs
3294 .clone()
3295 .map(|blob| {
3296 Operation::system(SystemOperation::PublishDataBlob {
3297 blob_hash: blob.id().hash,
3298 })
3299 })
3300 .collect();
3301 self.execute_operations(publish_blob_operations, blobs.collect())
3302 .await
3303 }
3304
3305 #[instrument(level = "trace", skip(bytes))]
3307 pub async fn publish_data_blob(
3308 &self,
3309 bytes: Vec<u8>,
3310 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3311 self.publish_data_blobs(vec![bytes]).await
3312 }
3313
3314 #[instrument(
3316 level = "trace",
3317 skip(self, parameters, instantiation_argument, required_application_ids)
3318 )]
3319 pub async fn create_application<
3320 A: Abi,
3321 Parameters: Serialize,
3322 InstantiationArgument: Serialize,
3323 >(
3324 &self,
3325 module_id: ModuleId<A, Parameters, InstantiationArgument>,
3326 parameters: &Parameters,
3327 instantiation_argument: &InstantiationArgument,
3328 required_application_ids: Vec<ApplicationId>,
3329 ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, ChainClientError>
3330 {
3331 let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
3332 let parameters = serde_json::to_vec(parameters)?;
3333 Ok(self
3334 .create_application_untyped(
3335 module_id.forget_abi(),
3336 parameters,
3337 instantiation_argument,
3338 required_application_ids,
3339 )
3340 .await?
3341 .map(|(app_id, cert)| (app_id.with_abi(), cert)))
3342 }
3343
3344 #[instrument(
3346 level = "trace",
3347 skip(
3348 self,
3349 module_id,
3350 parameters,
3351 instantiation_argument,
3352 required_application_ids
3353 )
3354 )]
3355 pub async fn create_application_untyped(
3356 &self,
3357 module_id: ModuleId,
3358 parameters: Vec<u8>,
3359 instantiation_argument: Vec<u8>,
3360 required_application_ids: Vec<ApplicationId>,
3361 ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, ChainClientError> {
3362 self.execute_operation(SystemOperation::CreateApplication {
3363 module_id,
3364 parameters,
3365 instantiation_argument,
3366 required_application_ids,
3367 })
3368 .await?
3369 .try_map(|certificate| {
3370 let mut creation: Vec<_> = certificate
3372 .block()
3373 .created_blob_ids()
3374 .into_iter()
3375 .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
3376 .collect();
3377 if creation.len() > 1 {
3378 return Err(ChainClientError::InternalError(
3379 "Unexpected number of application descriptions published",
3380 ));
3381 }
3382 let blob_id = creation.pop().ok_or(ChainClientError::InternalError(
3383 "ApplicationDescription blob not found.",
3384 ))?;
3385 let id = ApplicationId::new(blob_id.hash);
3386 Ok((id, certificate))
3387 })
3388 }
3389
3390 #[instrument(level = "trace", skip(committee))]
3392 pub async fn stage_new_committee(
3393 &self,
3394 committee: Committee,
3395 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3396 let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
3397 let blob_hash = blob.id().hash;
3398 match self
3399 .execute_operations(
3400 vec![Operation::system(SystemOperation::Admin(
3401 AdminOperation::PublishCommitteeBlob { blob_hash },
3402 ))],
3403 vec![blob],
3404 )
3405 .await?
3406 {
3407 ClientOutcome::Committed(_) => {}
3408 outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
3409 }
3410 let epoch = self.chain_info().await?.epoch.try_add_one()?;
3411 self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
3412 epoch,
3413 blob_hash,
3414 }))
3415 .await
3416 }
3417
3418 #[instrument(level = "trace")]
3424 pub async fn process_inbox(
3425 &self,
3426 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3427 self.prepare_chain().await?;
3428 self.process_inbox_without_prepare().await
3429 }
3430
3431 #[instrument(level = "trace")]
3437 pub async fn process_inbox_without_prepare(
3438 &self,
3439 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3440 #[cfg(with_metrics)]
3441 let _latency = metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
3442
3443 let mut epoch_change_ops = self.collect_epoch_changes().await?.into_iter();
3444
3445 let mut certificates = Vec::new();
3446 loop {
3447 let incoming_bundles = self.pending_message_bundles().await?;
3448 let stream_updates = self.collect_stream_updates().await?;
3449 let block_operations = stream_updates
3450 .into_iter()
3451 .chain(epoch_change_ops.next())
3452 .collect::<Vec<_>>();
3453 if incoming_bundles.is_empty() && block_operations.is_empty() {
3454 return Ok((certificates, None));
3455 }
3456 match self.execute_block(block_operations, vec![]).await {
3457 Ok(ExecuteBlockOutcome::Executed(certificate))
3458 | Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
3459 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
3460 return Ok((certificates, Some(timeout)));
3461 }
3462 Err(error) => return Err(error),
3463 };
3464 }
3465 }
3466
3467 async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, ChainClientError> {
3470 let (mut min_epoch, mut next_epoch) = {
3471 let (epoch, committees) = self.epoch_and_committees().await?;
3472 let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
3473 (min_epoch, epoch.try_add_one()?)
3474 };
3475 let mut epoch_change_ops = Vec::new();
3476 while self
3477 .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
3478 .await?
3479 {
3480 epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
3481 next_epoch,
3482 )));
3483 next_epoch.try_add_assign_one()?;
3484 }
3485 while self
3486 .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
3487 .await?
3488 {
3489 epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
3490 min_epoch,
3491 )));
3492 min_epoch.try_add_assign_one()?;
3493 }
3494 Ok(epoch_change_ops)
3495 }
3496
3497 async fn has_admin_event(
3500 &self,
3501 stream_name: &[u8],
3502 index: u32,
3503 ) -> Result<bool, ChainClientError> {
3504 let event_id = EventId {
3505 chain_id: self.client.admin_id,
3506 stream_id: StreamId::system(stream_name),
3507 index,
3508 };
3509 Ok(self
3510 .client
3511 .storage_client()
3512 .read_event(event_id)
3513 .await?
3514 .is_some())
3515 }
3516
3517 pub async fn events_from_index(
3519 &self,
3520 stream_id: StreamId,
3521 start_index: u32,
3522 ) -> Result<Vec<IndexAndEvent>, ChainClientError> {
3523 Ok(self
3524 .client
3525 .storage_client()
3526 .read_events_from_index(&self.chain_id, &stream_id, start_index)
3527 .await?)
3528 }
3529
3530 #[instrument(level = "trace")]
3535 pub async fn revoke_epochs(
3536 &self,
3537 revoked_epoch: Epoch,
3538 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3539 self.prepare_chain().await?;
3540 let (current_epoch, committees) = self.epoch_and_committees().await?;
3541 ensure!(
3542 revoked_epoch < current_epoch,
3543 ChainClientError::CannotRevokeCurrentEpoch(current_epoch)
3544 );
3545 ensure!(
3546 committees.contains_key(&revoked_epoch),
3547 ChainClientError::EpochAlreadyRevoked
3548 );
3549 let operations = committees
3550 .keys()
3551 .filter_map(|epoch| {
3552 if *epoch <= revoked_epoch {
3553 Some(Operation::system(SystemOperation::Admin(
3554 AdminOperation::RemoveCommittee { epoch: *epoch },
3555 )))
3556 } else {
3557 None
3558 }
3559 })
3560 .collect();
3561 self.execute_operations(operations, vec![]).await
3562 }
3563
3564 #[instrument(level = "trace")]
3568 pub async fn transfer_to_account_unsafe_unconfirmed(
3569 &self,
3570 owner: AccountOwner,
3571 amount: Amount,
3572 recipient: Account,
3573 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3574 self.execute_operation(SystemOperation::Transfer {
3575 owner,
3576 recipient,
3577 amount,
3578 })
3579 .await
3580 }
3581
3582 #[instrument(level = "trace", skip(hash))]
3583 pub async fn read_confirmed_block(
3584 &self,
3585 hash: CryptoHash,
3586 ) -> Result<ConfirmedBlock, ChainClientError> {
3587 let block = self
3588 .client
3589 .storage_client()
3590 .read_confirmed_block(hash)
3591 .await?;
3592 block.ok_or(ChainClientError::MissingConfirmedBlock(hash))
3593 }
3594
3595 #[instrument(level = "trace")]
3597 pub async fn retry_pending_outgoing_messages(&self) -> Result<(), ChainClientError> {
3598 self.client
3599 .local_node
3600 .retry_pending_cross_chain_requests(self.chain_id)
3601 .await?;
3602 Ok(())
3603 }
3604
3605 #[instrument(level = "trace", skip(local_node))]
3606 async fn local_chain_info(
3607 &self,
3608 chain_id: ChainId,
3609 local_node: &mut LocalNodeClient<Env::Storage>,
3610 ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
3611 match local_node.chain_info(chain_id).await {
3612 Ok(info) => {
3613 self.client.update_from_info(&info);
3615 Ok(Some(info))
3616 }
3617 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
3618 Err(err) => Err(err.into()),
3619 }
3620 }
3621
3622 #[instrument(level = "trace", skip(chain_id, local_node))]
3623 async fn local_next_block_height(
3624 &self,
3625 chain_id: ChainId,
3626 local_node: &mut LocalNodeClient<Env::Storage>,
3627 ) -> Result<BlockHeight, ChainClientError> {
3628 Ok(self
3629 .local_chain_info(chain_id, local_node)
3630 .await?
3631 .map_or(BlockHeight::ZERO, |info| info.next_block_height))
3632 }
3633
3634 #[instrument(level = "trace")]
3637 async fn local_next_height_to_receive(
3638 &self,
3639 origin: ChainId,
3640 ) -> Result<BlockHeight, ChainClientError> {
3641 let chain = self.chain_state_view().await?;
3642 Ok(match chain.inboxes.try_load_entry(&origin).await? {
3643 Some(inbox) => inbox.next_block_height_to_receive()?,
3644 None => BlockHeight::ZERO,
3645 })
3646 }
3647
3648 #[instrument(level = "trace", skip(remote_node, local_node, notification))]
3649 async fn process_notification(
3650 &self,
3651 remote_node: RemoteNode<Env::ValidatorNode>,
3652 mut local_node: LocalNodeClient<Env::Storage>,
3653 notification: Notification,
3654 listening_mode: &ListeningMode,
3655 ) -> Result<(), ChainClientError> {
3656 match notification.reason {
3657 Reason::NewIncomingBundle { origin, height } => {
3658 if self.local_next_height_to_receive(origin).await? > height {
3659 debug!(
3660 chain_id = %self.chain_id,
3661 "Accepting redundant notification for new message"
3662 );
3663 return Ok(());
3664 }
3665 self.find_received_certificates_from_validator(remote_node)
3666 .await?;
3667 if self.local_next_height_to_receive(origin).await? <= height {
3668 warn!(
3669 chain_id = %self.chain_id,
3670 "NewIncomingBundle: Fail to synchronize new message after notification"
3671 );
3672 }
3673 }
3674 Reason::NewBlock { height, .. } => {
3675 let chain_id = notification.chain_id;
3676 if self
3677 .local_next_block_height(chain_id, &mut local_node)
3678 .await?
3679 > height
3680 {
3681 debug!(
3682 chain_id = %self.chain_id,
3683 "Accepting redundant notification for new block"
3684 );
3685 return Ok(());
3686 }
3687 match listening_mode {
3688 ListeningMode::FullChain => {
3689 self.client
3690 .synchronize_chain_state_from(&remote_node, chain_id)
3691 .await?;
3692 if self
3693 .local_next_block_height(chain_id, &mut local_node)
3694 .await?
3695 <= height
3696 {
3697 error!("NewBlock: Fail to synchronize new block after notification");
3698 }
3699 trace!(
3700 chain_id = %self.chain_id,
3701 %height,
3702 "NewBlock: processed notification",
3703 );
3704 }
3705 ListeningMode::EventsOnly(_) => {
3706 debug!(
3707 chain_id = %self.chain_id,
3708 %height,
3709 "NewBlock: ignoring notification due to listening in EventsOnly mode"
3710 );
3711 }
3712 }
3713 }
3714 Reason::NewEvents {
3715 height,
3716 hash,
3717 event_streams,
3718 } => {
3719 if self
3720 .local_next_block_height(notification.chain_id, &mut local_node)
3721 .await?
3722 > height
3723 {
3724 debug!(
3725 chain_id = %self.chain_id,
3726 "Accepting redundant notification for new block"
3727 );
3728 return Ok(());
3729 }
3730 let should_process = match listening_mode {
3731 ListeningMode::FullChain => true,
3732 ListeningMode::EventsOnly(relevant_events) => relevant_events
3733 .intersection(&event_streams)
3734 .next()
3735 .is_some(),
3736 };
3737 if !should_process {
3738 debug!(
3739 chain_id = %self.chain_id,
3740 %height,
3741 "NewEvents: got a notification, but no relevant event streams in it"
3742 );
3743 return Ok(());
3744 }
3745 trace!(
3746 chain_id = %self.chain_id,
3747 %height,
3748 "NewEvents: processing notification"
3749 );
3750 let mut certificates = remote_node.node.download_certificates(vec![hash]).await?;
3751 let certificate = certificates
3754 .pop()
3755 .expect("download_certificates should have returned one certificate");
3756 self.client
3757 .receive_sender_certificate(
3758 certificate,
3759 ReceiveCertificateMode::NeedsCheck,
3760 None,
3761 )
3762 .await?;
3763 }
3764 Reason::NewRound { height, round } => {
3765 if matches!(listening_mode, ListeningMode::EventsOnly(_)) {
3766 debug!("NewRound: ignoring a notification due to listening mode");
3767 return Ok(());
3768 }
3769 let chain_id = notification.chain_id;
3770 if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
3771 if (info.next_block_height, info.manager.current_round) >= (height, round) {
3772 debug!(
3773 chain_id = %self.chain_id,
3774 "Accepting redundant notification for new round"
3775 );
3776 return Ok(());
3777 }
3778 }
3779 self.client
3780 .synchronize_chain_state_from(&remote_node, chain_id)
3781 .await?;
3782 let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
3783 error!(
3784 chain_id = %self.chain_id,
3785 "NewRound: Fail to read local chain info for {chain_id}"
3786 );
3787 return Ok(());
3788 };
3789 if (info.next_block_height, info.manager.current_round) < (height, round) {
3790 error!(
3791 chain_id = %self.chain_id,
3792 "NewRound: Fail to synchronize new block after notification"
3793 );
3794 }
3795 }
3796 }
3797 Ok(())
3798 }
3799
3800 pub fn is_tracked(&self) -> bool {
3802 self.client
3803 .tracked_chains
3804 .read()
3805 .unwrap()
3806 .contains(&self.chain_id)
3807 }
3808
3809 #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
3812 pub async fn listen(
3813 &self,
3814 listening_mode: ListeningMode,
3815 ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), ChainClientError> {
3816 use future::FutureExt as _;
3817
3818 async fn await_while_polling<F: FusedFuture>(
3819 future: F,
3820 background_work: impl FusedStream<Item = ()>,
3821 ) -> F::Output {
3822 tokio::pin!(future);
3823 tokio::pin!(background_work);
3824 loop {
3825 futures::select! {
3826 _ = background_work.next() => (),
3827 result = future => return result,
3828 }
3829 }
3830 }
3831
3832 let mut senders = HashMap::new(); let notifications = self.subscribe()?;
3834 let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
3835
3836 let mut process_notifications = FuturesUnordered::new();
3843
3844 match self
3845 .update_notification_streams(&mut senders, &listening_mode)
3846 .await
3847 {
3848 Ok(handler) => process_notifications.push(handler),
3849 Err(error) => error!("Failed to update committee: {error}"),
3850 };
3851
3852 let this = self.clone();
3853 let update_streams = async move {
3854 let mut abortable_notifications = abortable_notifications.fuse();
3855
3856 while let Some(notification) =
3857 await_while_polling(abortable_notifications.next(), &mut process_notifications)
3858 .await
3859 {
3860 if let Reason::NewBlock { .. } = notification.reason {
3861 match Box::pin(await_while_polling(
3862 this.update_notification_streams(&mut senders, &listening_mode)
3863 .fuse(),
3864 &mut process_notifications,
3865 ))
3866 .await
3867 {
3868 Ok(handler) => process_notifications.push(handler),
3869 Err(error) => error!("Failed to update committee: {error}"),
3870 }
3871 }
3872 }
3873
3874 for abort in senders.into_values() {
3875 abort.abort();
3876 }
3877
3878 let () = process_notifications.collect().await;
3879 }
3880 .in_current_span();
3881
3882 Ok((update_streams, AbortOnDrop(abort), notifications))
3883 }
3884
3885 #[instrument(level = "trace", skip(senders))]
3886 async fn update_notification_streams(
3887 &self,
3888 senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
3889 listening_mode: &ListeningMode,
3890 ) -> Result<impl Future<Output = ()>, ChainClientError> {
3891 let (nodes, local_node) = {
3892 let committee = self.local_committee().await?;
3893 let nodes: HashMap<_, _> = self
3894 .client
3895 .validator_node_provider()
3896 .make_nodes(&committee)?
3897 .collect();
3898 (nodes, self.client.local_node.clone())
3899 };
3900 senders.retain(|validator, abort| {
3902 if !nodes.contains_key(validator) {
3903 abort.abort();
3904 }
3905 !abort.is_aborted()
3906 });
3907 let validator_tasks = FuturesUnordered::new();
3909 for (public_key, node) in nodes {
3910 let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
3911 continue;
3912 };
3913 let this = self.clone();
3914 let stream = stream::once({
3915 let node = node.clone();
3916 async move {
3917 let stream = node.subscribe(vec![this.chain_id]).await?;
3918 let remote_node = RemoteNode { public_key, node };
3921 this.client
3922 .synchronize_chain_state_from(&remote_node, this.chain_id)
3923 .await?;
3924 Ok::<_, ChainClientError>(stream)
3925 }
3926 })
3927 .filter_map(move |result| async move {
3928 if let Err(error) = &result {
3929 warn!(?error, "Could not connect to validator {public_key}");
3930 } else {
3931 info!("Connected to validator {public_key}");
3932 }
3933 result.ok()
3934 })
3935 .flatten();
3936 let (stream, abort) = stream::abortable(stream);
3937 let mut stream = Box::pin(stream);
3938 let this = self.clone();
3939 let local_node = local_node.clone();
3940 let remote_node = RemoteNode { public_key, node };
3941 let listening_mode_cloned = listening_mode.clone();
3942 validator_tasks.push(async move {
3943 while let Some(notification) = stream.next().await {
3944 if let Err(err) = this
3945 .process_notification(
3946 remote_node.clone(),
3947 local_node.clone(),
3948 notification.clone(),
3949 &listening_mode_cloned,
3950 )
3951 .await
3952 {
3953 tracing::warn!(
3954 chain_id = %this.chain_id,
3955 validator_public_key = ?remote_node.public_key,
3956 ?notification,
3957 "Failed to process notification: {err}",
3958 );
3959 }
3960 }
3961 });
3962 entry.insert(abort);
3963 }
3964 Ok(validator_tasks.collect())
3965 }
3966
3967 #[instrument(level = "trace")]
3972 async fn find_received_certificates_from_validator(
3973 &self,
3974 remote_node: RemoteNode<Env::ValidatorNode>,
3975 ) -> Result<(), ChainClientError> {
3976 let chain_id = self.chain_id;
3977 let received_certificates = self
3979 .client
3980 .synchronize_received_certificates_from_validator(chain_id, &remote_node)
3981 .await?;
3982 self.receive_certificates_from_validators(vec![received_certificates])
3985 .await;
3986 Ok(())
3987 }
3988
3989 #[instrument(level = "trace", skip(remote_node))]
3991 pub async fn sync_validator(
3992 &self,
3993 remote_node: Env::ValidatorNode,
3994 ) -> Result<(), ChainClientError> {
3995 let validator_next_block_height = match remote_node
3996 .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
3997 .await
3998 {
3999 Ok(info) => info.info.next_block_height.0,
4000 Err(NodeError::BlobsNotFound(_)) => 0,
4001 Err(err) => return Err(err.into()),
4002 };
4003 let local_chain_state = self.chain_info().await?;
4004
4005 let Some(missing_certificate_count) = local_chain_state
4006 .next_block_height
4007 .0
4008 .checked_sub(validator_next_block_height)
4009 .filter(|count| *count > 0)
4010 else {
4011 debug!("Validator is up-to-date with local state");
4012 return Ok(());
4013 };
4014
4015 let missing_certificates_end = usize::try_from(local_chain_state.next_block_height.0)
4016 .expect("`usize` should be at least `u64`");
4017 let missing_certificates_start = missing_certificates_end
4018 - usize::try_from(missing_certificate_count).expect("`usize` should be at least `u64`");
4019
4020 let missing_certificate_hashes = self
4021 .chain_state_view()
4022 .await?
4023 .confirmed_log
4024 .read(missing_certificates_start..missing_certificates_end)
4025 .await?;
4026
4027 let certificates = self
4028 .client
4029 .storage_client()
4030 .read_certificates(missing_certificate_hashes.clone())
4031 .await?;
4032 let certificates =
4033 match ResultReadCertificates::new(certificates, missing_certificate_hashes) {
4034 ResultReadCertificates::Certificates(certificates) => certificates,
4035 ResultReadCertificates::InvalidHashes(hashes) => {
4036 return Err(ChainClientError::ReadCertificatesError(hashes))
4037 }
4038 };
4039 for certificate in certificates {
4040 match remote_node
4041 .handle_confirmed_certificate(
4042 certificate.clone(),
4043 CrossChainMessageDelivery::NonBlocking,
4044 )
4045 .await
4046 {
4047 Ok(_) => (),
4048 Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
4049 let missing_blobs: Vec<_> = self
4051 .client
4052 .storage_client()
4053 .read_blobs(&missing_blob_ids)
4054 .await?
4055 .into_iter()
4056 .flatten()
4057 .collect();
4058 remote_node.upload_blobs(missing_blobs).await?;
4059 remote_node
4060 .handle_confirmed_certificate(
4061 certificate,
4062 CrossChainMessageDelivery::NonBlocking,
4063 )
4064 .await?;
4065 }
4066 Err(err) => return Err(err.into()),
4067 }
4068 }
4069
4070 Ok(())
4071 }
4072
4073 fn has_other_owners(&self, ownership: &ChainOwnership) -> bool {
4075 ownership
4076 .all_owners()
4077 .any(|owner| Some(owner) != self.preferred_owner.as_ref())
4078 }
4079}
4080
4081#[cfg(with_testing)]
4082impl<Env: Environment> ChainClient<Env> {
4083 pub async fn process_notification_from(
4084 &self,
4085 notification: Notification,
4086 validator: (ValidatorPublicKey, &str),
4087 ) {
4088 let mut node_list = self
4089 .client
4090 .validator_node_provider()
4091 .make_nodes_from_list(vec![validator])
4092 .unwrap();
4093 let (public_key, node) = node_list.next().unwrap();
4094 let remote_node = RemoteNode { node, public_key };
4095 let local_node = self.client.local_node.clone();
4096 self.process_notification(
4097 remote_node,
4098 local_node,
4099 notification,
4100 &ListeningMode::FullChain,
4101 )
4102 .await
4103 .unwrap();
4104 }
4105}
4106
4107#[derive(Debug)]
4109enum ExecuteBlockOutcome {
4110 Executed(ConfirmedBlockCertificate),
4112 Conflict(ConfirmedBlockCertificate),
4115 WaitForTimeout(RoundTimeout),
4118}
4119
4120#[must_use]
4122pub struct AbortOnDrop(pub AbortHandle);
4123
4124impl Drop for AbortOnDrop {
4125 #[instrument(level = "trace", skip(self))]
4126 fn drop(&mut self) {
4127 self.0.abort();
4128 }
4129}
4130
4131struct ReceivedCertificatesFromValidator {
4133 public_key: ValidatorPublicKey,
4135 tracker: u64,
4137 certificates: Vec<ConfirmedBlockCertificate>,
4140 other_sender_chains: Vec<ChainId>,
4143}
4144
4145#[derive(Clone, Serialize, Deserialize)]
4147pub struct PendingProposal {
4148 pub block: ProposedBlock,
4149 pub blobs: Vec<Blob>,
4150}
4151
4152enum ReceiveCertificateMode {
4153 NeedsCheck,
4154 AlreadyChecked,
4155}
4156
4157enum CheckCertificateResult {
4158 OldEpoch,
4159 New,
4160 FutureEpoch,
4161}
4162
4163impl CheckCertificateResult {
4164 fn into_result(self) -> Result<(), ChainClientError> {
4165 match self {
4166 Self::OldEpoch => Err(ChainClientError::CommitteeDeprecationError),
4167 Self::New => Ok(()),
4168 Self::FutureEpoch => Err(ChainClientError::CommitteeSynchronizationError),
4169 }
4170 }
4171}
4172
4173#[cfg(not(target_arch = "wasm32"))]
4175pub async fn create_bytecode_blobs(
4176 contract: Bytecode,
4177 service: Bytecode,
4178 vm_runtime: VmRuntime,
4179) -> (Vec<Blob>, ModuleId) {
4180 match vm_runtime {
4181 VmRuntime::Wasm => {
4182 let (compressed_contract, compressed_service) =
4183 tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
4184 .await
4185 .expect("Compression should not panic");
4186 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
4187 let service_blob = Blob::new_service_bytecode(compressed_service);
4188 let module_id =
4189 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
4190 (vec![contract_blob, service_blob], module_id)
4191 }
4192 VmRuntime::Evm => {
4193 let compressed_contract = contract.compress();
4194 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
4195 let module_id = ModuleId::new(
4196 evm_contract_blob.id().hash,
4197 evm_contract_blob.id().hash,
4198 vm_runtime,
4199 );
4200 (vec![evm_contract_blob], module_id)
4201 }
4202 }
4203}