1use std::{
6 cmp::Ordering,
7 collections::{BTreeMap, BTreeSet, HashSet},
8 slice,
9 sync::{Arc, RwLock},
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14 future::{Future, TryFutureExt as _},
15 stream::{self, AbortHandle, FuturesOrdered, FuturesUnordered, StreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20 crypto::{CryptoHash, Signer as _, ValidatorPublicKey},
21 data_types::{
22 ArithmeticError, Blob, BlockHeight, ChainDescription, Epoch, Round, TimeDelta, Timestamp,
23 },
24 ensure,
25 hashed::Hashed,
26 identifiers::{AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
27 time::Duration,
28};
29#[cfg(not(target_arch = "wasm32"))]
30use linera_base::{data_types::Bytecode, identifiers::ModuleId, vm::VmRuntime};
31use linera_chain::{
32 data_types::{
33 BlockExecutionOutcome, BlockProposal, BundleExecutionPolicy, ChainAndHeight, LiteVote,
34 ProposedBlock,
35 },
36 manager::LockingBlock,
37 types::{
38 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
39 LiteCertificate, ValidatedBlock, ValidatedBlockCertificate,
40 },
41 ChainError, ChainIdSet,
42};
43use linera_execution::{committee::Committee, ExecutionError};
44use linera_storage::{Arc as CacheArc, Clock as _, ResultReadCertificates, Storage as _};
45use rand::seq::SliceRandom;
46use received_log::ReceivedLogs;
47use serde::{Deserialize, Serialize};
48use tokio::sync::mpsc;
49use tracing::{debug, error, info, instrument, trace, warn};
50
51use crate::{
52 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
53 environment::Environment,
54 local_node::{LocalNodeClient, LocalNodeError},
55 node::{CrossChainMessageDelivery, NodeError, ValidatorNode as _, ValidatorNodeProvider as _},
56 notifier::{ChannelNotifier, Notifier as _},
57 remote_node::RemoteNode,
58 updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
59 worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
60 ChainWorkerConfig, ProcessConfirmedBlockMode, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
61};
62
63pub mod chain_client;
64pub use chain_client::ChainClient;
65
66pub use crate::data_types::ClientOutcome;
67
68#[cfg(test)]
69#[path = "../unit_tests/client_tests.rs"]
70mod client_tests;
71pub mod requests_scheduler;
72
73pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
74mod received_log;
75mod validator_trackers;
76
77#[cfg(with_metrics)]
78mod metrics {
79 use std::sync::LazyLock;
80
81 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
82 use prometheus::HistogramVec;
83
84 pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
85 LazyLock::new(|| {
86 register_histogram_vec(
87 "process_inbox_latency",
88 "process_inbox latency",
89 &[],
90 exponential_bucket_latencies(10_000.0),
91 )
92 });
93
94 pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
95 register_histogram_vec(
96 "prepare_chain_latency",
97 "prepare_chain latency",
98 &[],
99 exponential_bucket_latencies(10_000.0),
100 )
101 });
102
103 pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
104 register_histogram_vec(
105 "synchronize_chain_state_latency",
106 "synchronize_chain_state latency",
107 &[],
108 exponential_bucket_latencies(10_000.0),
109 )
110 });
111
112 pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
113 register_histogram_vec(
114 "execute_block_latency",
115 "execute_block latency",
116 &[],
117 exponential_bucket_latencies(10_000.0),
118 )
119 });
120
121 pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
122 register_histogram_vec(
123 "find_received_certificates_latency",
124 "find_received_certificates latency",
125 &[],
126 exponential_bucket_latencies(10_000.0),
127 )
128 });
129}
130
131pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
132pub static DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE: usize = 500;
133pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
134pub static DEFAULT_MAX_EVENT_STREAM_QUERIES: usize = 1000;
135pub static DEFAULT_MAX_CONCURRENT_BATCH_DOWNLOADS: usize = 1;
136
137#[derive(Debug, Clone, Copy)]
138pub enum TimingType {
139 ExecuteOperations,
140 ExecuteBlock,
141 SubmitBlockProposal,
142 UpdateValidators,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
150pub enum ListeningMode {
151 FullChain,
154 FollowChain,
158 EventsOnly(BTreeSet<StreamId>),
160}
161
162impl PartialOrd for ListeningMode {
163 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
164 match (self, other) {
165 (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
166 (ListeningMode::FullChain, _) => Some(Ordering::Greater),
167 (_, ListeningMode::FullChain) => Some(Ordering::Less),
168 (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
169 (ListeningMode::FollowChain, ListeningMode::EventsOnly(_)) => Some(Ordering::Greater),
170 (ListeningMode::EventsOnly(_), ListeningMode::FollowChain) => Some(Ordering::Less),
171 (ListeningMode::EventsOnly(a), ListeningMode::EventsOnly(b)) => {
172 if a == b {
173 Some(Ordering::Equal)
174 } else if a.is_superset(b) {
175 Some(Ordering::Greater)
176 } else if b.is_superset(a) {
177 Some(Ordering::Less)
178 } else {
179 None
180 }
181 }
182 }
183 }
184}
185
186impl ListeningMode {
187 pub fn is_relevant(&self, reason: &Reason) -> bool {
190 match (reason, self) {
191 (Reason::NewEvents { .. }, ListeningMode::FollowChain | ListeningMode::FullChain) => {
194 false
195 }
196 (_, ListeningMode::FullChain) => true,
198 (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
201 (_, ListeningMode::FollowChain) => false,
202 (Reason::NewEvents { event_streams, .. }, ListeningMode::EventsOnly(relevant)) => {
204 relevant.intersection(event_streams).next().is_some()
205 }
206 (_, ListeningMode::EventsOnly(_)) => false,
207 }
208 }
209
210 pub fn extend(&mut self, other: Option<ListeningMode>) {
211 match (self, other) {
212 (_, None) => (),
213 (ListeningMode::FullChain, _) => (),
214 (mode, Some(ListeningMode::FullChain)) => {
215 *mode = ListeningMode::FullChain;
216 }
217 (ListeningMode::FollowChain, _) => (),
218 (mode, Some(ListeningMode::FollowChain)) => {
219 *mode = ListeningMode::FollowChain;
220 }
221 (
222 ListeningMode::EventsOnly(self_events),
223 Some(ListeningMode::EventsOnly(other_events)),
224 ) => {
225 self_events.extend(other_events);
226 }
227 }
228 }
229
230 pub fn is_follow_only(&self) -> bool {
233 !matches!(self, ListeningMode::FullChain)
234 }
235
236 pub fn is_full(&self) -> bool {
239 matches!(self, ListeningMode::FullChain)
240 }
241
242 pub fn should_sync_chain_state(&self) -> bool {
243 match self {
244 ListeningMode::FullChain | ListeningMode::FollowChain => true,
245 ListeningMode::EventsOnly(_) => false,
246 }
247 }
248}
249
250#[derive(Debug)]
260pub struct ChainModes {
261 modes: BTreeMap<ChainId, ListeningMode>,
262 full: Arc<Hashed<ChainIdSet>>,
263}
264
265impl Default for ChainModes {
266 fn default() -> Self {
267 Self::new(BTreeMap::new())
268 }
269}
270
271impl ChainModes {
272 pub fn new(modes: BTreeMap<ChainId, ListeningMode>) -> Self {
274 let full = Self::compute_full(&modes);
275 Self { modes, full }
276 }
277
278 fn compute_full(modes: &BTreeMap<ChainId, ListeningMode>) -> Arc<Hashed<ChainIdSet>> {
279 Arc::new(Hashed::new(ChainIdSet(
280 modes
281 .iter()
282 .filter(|(_, mode)| mode.is_full())
283 .map(|(id, _)| *id)
284 .collect(),
285 )))
286 }
287
288 pub fn full(&self) -> Arc<Hashed<ChainIdSet>> {
290 self.full.clone()
291 }
292
293 pub fn get(&self, chain_id: &ChainId) -> Option<&ListeningMode> {
295 self.modes.get(chain_id)
296 }
297
298 pub fn extend_mode(&mut self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
302 let entry = self
303 .modes
304 .entry(chain_id)
305 .or_insert_with(|| ListeningMode::EventsOnly(BTreeSet::new()));
306 let was_full = entry.is_full();
307 entry.extend(Some(mode));
308 let result = entry.clone();
309 if !was_full && result.is_full() {
310 self.full = Self::compute_full(&self.modes);
311 }
312 result
313 }
314}
315
316pub struct Client<Env: Environment> {
318 environment: Env,
319 pub local_node: LocalNodeClient<Env::Storage>,
322 requests_scheduler: Arc<RequestsScheduler<Env>>,
324 admin_chain_id: ChainId,
326 chain_modes: Arc<RwLock<ChainModes>>,
329 notifier: Arc<ChannelNotifier<Notification>>,
331 chains: papaya::HashMap<ChainId, chain_client::State>,
333 options: chain_client::Options,
335}
336
337impl<Env: Environment> Client<Env> {
338 #[instrument(level = "trace", skip_all)]
340 #[expect(clippy::too_many_arguments)]
341 pub fn new(
342 environment: Env,
343 admin_chain_id: ChainId,
344 long_lived_services: bool,
345 chain_modes: impl IntoIterator<Item = (ChainId, ListeningMode)>,
346 name: impl Into<String>,
347 chain_worker_ttl: Option<Duration>,
348 sender_chain_worker_ttl: Option<Duration>,
349 cross_chain_batch_size_limit: usize,
350 options: chain_client::Options,
351 block_cache_size: usize,
352 execution_state_cache_size: usize,
353 requests_scheduler_config: &requests_scheduler::RequestsSchedulerConfig,
354 ) -> Self {
355 let mut modes = chain_modes.into_iter().collect::<BTreeMap<_, _>>();
356 modes
360 .entry(admin_chain_id)
361 .or_insert(ListeningMode::FullChain)
362 .extend(Some(ListeningMode::FullChain));
363 let chain_modes = Arc::new(RwLock::new(ChainModes::new(modes)));
364 let config = ChainWorkerConfig {
365 nickname: name.into(),
366 long_lived_services,
367 allow_inactive_chains: true,
368 ttl: chain_worker_ttl,
369 sender_chain_ttl: sender_chain_worker_ttl,
370 block_cache_size,
371 execution_state_cache_size,
372 cross_chain_batch_size_limit,
373 ..ChainWorkerConfig::default()
374 };
375 let state = WorkerState::new(
376 environment.storage().clone(),
377 config,
378 Some(chain_modes.clone()),
379 );
380 let local_node = LocalNodeClient::new(state);
381 let requests_scheduler =
382 Arc::new(RequestsScheduler::new(vec![], requests_scheduler_config));
383
384 Self {
385 environment,
386 local_node,
387 requests_scheduler,
388 chains: papaya::HashMap::new(),
389 admin_chain_id,
390 chain_modes,
391 notifier: Arc::new(ChannelNotifier::default()),
392 options,
393 }
394 }
395
396 pub fn admin_chain_id(&self) -> ChainId {
398 self.admin_chain_id
399 }
400
401 pub fn subscribe(
403 &self,
404 chain_ids: Vec<ChainId>,
405 ) -> tokio::sync::mpsc::UnboundedReceiver<Notification> {
406 self.notifier.subscribe(chain_ids)
407 }
408
409 pub fn subscribe_extra(
411 &self,
412 chain_ids: Vec<ChainId>,
413 sender: &tokio::sync::mpsc::UnboundedSender<Notification>,
414 ) {
415 self.notifier.add_sender(chain_ids, sender);
416 }
417
418 pub fn storage_client(&self) -> &Env::Storage {
420 self.environment.storage()
421 }
422
423 async fn try_read_local_certificate(
426 &self,
427 chain_id: ChainId,
428 height: BlockHeight,
429 hash: Option<CryptoHash>,
430 ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, chain_client::Error> {
431 if let Some(hash) = hash {
432 return Ok(self.storage_client().read_certificate(hash).await?);
433 }
434 let results = self
435 .storage_client()
436 .read_certificates_by_heights(chain_id, &[height])
437 .await?;
438 Ok(results.into_iter().next().flatten())
439 }
440
441 pub fn validator_node_provider(&self) -> &Env::Network {
442 self.environment.network()
443 }
444
445 pub(crate) fn options(&self) -> &chain_client::Options {
446 &self.options
447 }
448
449 pub async fn retry_pending_cross_chain_requests(
451 &self,
452 sender_chain: ChainId,
453 ) -> Result<(), LocalNodeError> {
454 self.local_node
455 .retry_pending_cross_chain_requests(sender_chain, &self.notifier)
456 .await
457 }
458
459 #[instrument(level = "trace", skip(self))]
461 pub fn signer(&self) -> &Env::Signer {
462 self.environment.signer()
463 }
464
465 pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, chain_client::Error> {
467 self.signer()
468 .contains_key(owner)
469 .await
470 .map_err(chain_client::Error::signer_failure)
471 }
472
473 pub fn wallet(&self) -> &Env::Wallet {
475 self.environment.wallet()
476 }
477
478 #[instrument(level = "trace", skip(self))]
481 pub fn extend_chain_mode(&self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
482 self.chain_modes
483 .write()
484 .expect("Panics should not happen while holding a lock to `chain_modes`")
485 .extend_mode(chain_id, mode)
486 }
487
488 pub fn chain_mode(&self, chain_id: ChainId) -> Option<ListeningMode> {
490 self.chain_modes
491 .read()
492 .expect("Panics should not happen while holding a lock to `chain_modes`")
493 .get(&chain_id)
494 .cloned()
495 }
496
497 pub fn is_tracked(&self, chain_id: ChainId) -> bool {
499 self.chain_modes
500 .read()
501 .expect("Panics should not happen while holding a lock to `chain_modes`")
502 .get(&chain_id)
503 .is_some_and(ListeningMode::is_full)
504 }
505
506 #[expect(clippy::too_many_arguments)]
508 #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
509 pub fn create_chain_client(
510 self: &Arc<Self>,
511 chain_id: ChainId,
512 block_hash: Option<CryptoHash>,
513 next_block_height: BlockHeight,
514 pending_proposal: &Option<PendingProposal>,
515 preferred_owner: Option<AccountOwner>,
516 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
517 follow_only: bool,
518 ) -> ChainClient<Env> {
519 self.chains.pin().get_or_insert_with(chain_id, || {
522 chain_client::State::new(pending_proposal.clone(), follow_only)
523 });
524
525 ChainClient::new(
526 self.clone(),
527 chain_id,
528 self.options.clone(),
529 block_hash,
530 next_block_height,
531 preferred_owner,
532 timing_sender,
533 )
534 }
535
536 fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
538 self.chains
539 .pin()
540 .get(&chain_id)
541 .is_some_and(|state| state.is_follow_only())
542 }
543
544 pub fn set_chain_follow_only(&self, chain_id: ChainId, follow_only: bool) {
546 self.chains
547 .pin()
548 .update(chain_id, |state| state.with_follow_only(follow_only));
549 }
550
551 async fn fetch_chain_info(
553 &self,
554 chain_id: ChainId,
555 validators: &[RemoteNode<Env::ValidatorNode>],
556 ) -> Result<Box<ChainInfo>, chain_client::Error> {
557 match self.local_node.chain_info(chain_id).await {
558 Ok(info) => Ok(info),
559 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
560 self.synchronize_chain_state(self.admin_chain_id).await?;
563 self.update_local_node_with_blobs_from(blob_ids, validators)
564 .await?;
565 Ok(self.local_node.chain_info(chain_id).await?)
566 }
567 Err(err) => Err(err.into()),
568 }
569 }
570
571 #[instrument(level = "trace", skip(self))]
573 async fn download_certificates(
574 &self,
575 chain_id: ChainId,
576 target_next_block_height: BlockHeight,
577 ) -> Result<Box<ChainInfo>, chain_client::Error> {
578 let validators = self.validator_nodes().await?;
579 let mut info = Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
580 if target_next_block_height <= info.next_block_height {
581 return Ok(info);
582 }
583 info = self
584 .load_local_certificates(chain_id, target_next_block_height, None)
585 .await?;
586 let mut next_height = info.next_block_height;
587 while next_height < target_next_block_height {
589 let limit = u64::from(target_next_block_height)
590 .checked_sub(u64::from(next_height))
591 .ok_or(ArithmeticError::Overflow)?
592 .min(self.options.certificate_download_batch_size);
593 let certificates = self
594 .requests_scheduler
595 .download_certificates_from_validators(
596 &validators,
597 chain_id,
598 next_height,
599 limit,
600 self.options.certificate_batch_download_timeout,
601 )
602 .await?;
603 let Some(new_info) = self
604 .process_certificates(
605 &validators,
606 certificates,
607 None,
608 ProcessConfirmedBlockMode::Execute,
609 )
610 .await?
611 else {
612 break;
613 };
614 assert!(new_info.next_block_height > next_height);
615 next_height = new_info.next_block_height;
616 info = new_info;
617 }
618 ensure!(
619 target_next_block_height <= info.next_block_height,
620 chain_client::Error::CannotDownloadCertificates {
621 chain_id,
622 target_next_block_height,
623 }
624 );
625 Ok(info)
626 }
627
628 async fn load_local_certificates(
633 &self,
634 chain_id: ChainId,
635 end: BlockHeight,
636 until_block_time: Option<Timestamp>,
637 ) -> Result<Box<ChainInfo>, chain_client::Error> {
638 let mut last_info = self.local_node.chain_info(chain_id).await?;
639 let next_height = last_info.next_block_height;
640 let hashes = self
641 .local_node
642 .get_preprocessed_block_hashes(chain_id, next_height, end)
643 .await?;
644 let certificates = self.storage_client().read_certificates(&hashes).await?;
645 let certificates = match ResultReadCertificates::new(certificates, hashes) {
646 ResultReadCertificates::Certificates(certificates) => certificates,
647 ResultReadCertificates::InvalidHashes(hashes) => {
648 return Err(chain_client::Error::ReadCertificatesError(hashes))
649 }
650 };
651 for certificate in certificates {
652 if let Some(until) = until_block_time {
653 if certificate.value().block().header.timestamp >= until {
654 break;
655 }
656 }
657 last_info = self.handle_certificate(certificate).await?.info;
658 }
659 Ok(last_info)
660 }
661
662 #[instrument(level = "trace", skip_all)]
668 async fn download_certificates_from(
669 &self,
670 remote_node: &RemoteNode<Env::ValidatorNode>,
671 chain_id: ChainId,
672 stop: BlockHeight,
673 until_block_time: Option<Timestamp>,
674 ) -> Result<Box<ChainInfo>, chain_client::Error> {
675 let mut last_info = self
676 .load_local_certificates(chain_id, stop, until_block_time)
677 .await?;
678 let mut next_height = last_info.next_block_height;
679
680 if next_height >= stop {
681 return Ok(last_info);
682 }
683
684 #[cfg(not(web))]
688 type CertificateBatchFuture = std::pin::Pin<
689 Box<dyn Future<Output = Result<Vec<ConfirmedBlockCertificate>, NodeError>> + Send>,
690 >;
691 #[cfg(web)]
692 type CertificateBatchFuture = std::pin::Pin<
693 Box<dyn Future<Output = Result<Vec<ConfirmedBlockCertificate>, NodeError>>>,
694 >;
695
696 let max_concurrent = self.options.max_concurrent_batch_downloads;
697 let batch_size = self.options.certificate_download_batch_size;
698 let (sender, mut receiver) = tokio::sync::mpsc::channel(max_concurrent);
699 let scheduler = self.requests_scheduler.clone();
700 let remote = remote_node.clone();
701
702 let download_task = linera_base::Task::spawn(async move {
703 let mut download_height = next_height;
704 let mut in_flight = FuturesOrdered::<CertificateBatchFuture>::new();
705
706 let try_enqueue = |in_flight: &mut FuturesOrdered<CertificateBatchFuture>,
707 download_height: &mut BlockHeight| {
708 if *download_height >= stop {
709 return;
710 }
711 let limit = u64::from(stop)
712 .saturating_sub(u64::from(*download_height))
713 .min(batch_size);
714 let height = *download_height;
715 let scheduler = scheduler.clone();
716 let remote = remote.clone();
717 in_flight.push_back(Box::pin(async move {
718 scheduler
719 .download_certificates(&remote, chain_id, height, limit)
720 .await
721 }));
722 *download_height = BlockHeight(u64::from(*download_height) + limit);
723 };
724
725 while in_flight.len() < max_concurrent && download_height < stop {
726 try_enqueue(&mut in_flight, &mut download_height);
727 }
728
729 while let Some(result) = in_flight.next().await {
730 if sender.send(result).await.is_err() {
731 break;
732 }
733 try_enqueue(&mut in_flight, &mut download_height);
734 }
735 });
736
737 while let Some(result) = receiver.recv().await {
739 let certificates = result?;
740 let Some(info) = self
741 .process_certificates(
742 slice::from_ref(remote_node),
743 certificates,
744 until_block_time,
745 ProcessConfirmedBlockMode::Execute,
746 )
747 .await?
748 else {
749 break;
750 };
751 assert!(info.next_block_height >= next_height);
752 next_height = info.next_block_height;
753 last_info = info;
754 }
755 download_task.await;
758 Ok(last_info)
759 }
760
761 async fn download_blobs(
762 &self,
763 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
764 blob_ids: &[BlobId],
765 ) -> Result<(), chain_client::Error> {
766 let blobs = &self
767 .requests_scheduler
768 .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
769 .await?
770 .ok_or_else(|| {
771 chain_client::Error::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
772 })?;
773 self.local_node.store_blobs(blobs).await.map_err(Into::into)
774 }
775
776 #[instrument(level = "trace", skip_all)]
781 pub(crate) async fn download_certificates_for_events(
782 &self,
783 event_ids: &[EventId],
784 ) -> Result<(), chain_client::Error> {
785 let mut validators = self.validator_nodes().await?;
786 let timeout = self.options.certificate_batch_download_timeout;
787 let mut remaining_event_ids = event_ids.to_vec();
788
789 while !remaining_event_ids.is_empty() {
790 let remaining_ref = &remaining_event_ids;
791 validators.shuffle(&mut rand::thread_rng());
792 let result = communicate_concurrently(
793 &validators,
794 move |remote_node| {
795 let validator_key = remote_node.public_key;
796 let validator_address = remote_node.address();
797 Box::pin(async move {
798 let heights = remote_node
800 .node
801 .event_block_heights(remaining_ref.to_vec())
802 .await?;
803
804 let mut chain_heights = BTreeMap::<_, BTreeSet<_>>::new();
806 let mut expected_events = BTreeMap::<_, HashSet<EventId>>::new();
807 let mut unresolved = Vec::new();
808 for (event_id, maybe_height) in remaining_ref.iter().zip(heights) {
809 if let Some(height) = maybe_height {
810 chain_heights
811 .entry(event_id.chain_id)
812 .or_default()
813 .insert(height);
814 expected_events
815 .entry((event_id.chain_id, height))
816 .or_default()
817 .insert(event_id.clone());
818 } else {
819 unresolved.push(event_id.clone());
820 }
821 }
822 if chain_heights.is_empty() {
823 return Err(chain_client::Error::from(NodeError::EventsNotFound(remaining_ref.clone())));
825 }
826
827 let mut checked_certificates = Vec::<ConfirmedBlockCertificate>::new();
829 for (chain_id, heights) in chain_heights {
830 let heights_vec = heights.into_iter().collect::<Vec<_>>();
831 let certificates = self
832 .requests_scheduler
833 .download_certificates_by_heights(
834 &remote_node,
835 chain_id,
836 heights_vec,
837 )
838 .await?;
839 for cert in &certificates {
840 let block = cert.block();
842 let block_event_ids = block.event_ids().collect::<HashSet<_>>();
843 if let Some(expected_event_ids) =
844 expected_events.get(&(chain_id, block.header.height))
845 {
846 if !expected_event_ids.is_subset(&block_event_ids) {
847 tracing::debug!(
848 %validator_address, ?expected_event_ids, ?block_event_ids,
849 "validator lied about events in block."
850 );
851 return Err(NodeError::UnexpectedCertificateValue.into());
852 }
853 }
854 }
855 for cert in certificates {
856 self.check_certificate(&cert)
857 .await
858 .map_err(|error| {
859 tracing::debug!(
860 %validator_address, %error,
861 "invalid certificate"
862 );
863 error
864 })?
865 .into_result()
866 .map_err(|error| {
867 tracing::debug!(
868 %validator_address, %error,
869 "could not check certificate"
870 );
871 error
872 })?;
873 checked_certificates.push(cert);
874 }
875 }
876 Ok((checked_certificates, unresolved, validator_key))
877 })
878 },
879 timeout,
880 )
881 .await;
882
883 match result {
884 Ok((certificates, unresolved, validator_key)) => {
885 for certificate in certificates {
886 let mode = ReceiveCertificateMode::AlreadyChecked;
887 self.receive_sender_certificate(
888 self.storage_client().cache_certificate(certificate),
889 mode,
890 None,
891 )
892 .await?;
893 }
894 validators.retain(|node| node.public_key != validator_key);
895 remaining_event_ids = unresolved;
896 }
897 Err(errors) => {
898 for (validator, error) in &errors {
899 warn!(
900 %validator,
901 %error,
902 "failed to download event certificates from validator",
903 );
904 }
905 return Err(NodeError::EventsNotFound(remaining_event_ids).into());
907 }
908 }
909 }
910 Ok(())
911 }
912
913 #[instrument(level = "trace", skip_all)]
923 async fn bootstrap_chain_from_checkpoint(
924 &self,
925 remote_node: &RemoteNode<Env::ValidatorNode>,
926 chain_id: ChainId,
927 checkpoint_height: BlockHeight,
928 ) -> Result<(), chain_client::Error> {
929 let local_next = match self.local_node.chain_info(chain_id).await {
930 Ok(info) => info.next_block_height,
931 Err(LocalNodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
936 Err(err) => return Err(err.into()),
937 };
938 if local_next > checkpoint_height {
939 return Ok(());
940 }
941 let certificates = remote_node
942 .download_certificates_by_heights(chain_id, vec![checkpoint_height])
943 .await?;
944 if certificates.is_empty() {
945 return Ok(());
948 }
949 self.process_certificates(
955 slice::from_ref(remote_node),
956 certificates,
957 None,
958 ProcessConfirmedBlockMode::Execute,
959 )
960 .await?;
961 Ok(())
962 }
963
964 #[instrument(level = "trace", skip_all)]
969 async fn process_certificates(
970 &self,
971 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
972 certificates: Vec<ConfirmedBlockCertificate>,
973 until_block_time: Option<Timestamp>,
974 mode: ProcessConfirmedBlockMode,
975 ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
976 let mut info = None;
977 let created_blob_ids = certificates
982 .iter()
983 .flat_map(|certificate| certificate.value().block().created_blob_ids())
984 .collect::<BTreeSet<BlobId>>();
985 let required_blob_ids = certificates
986 .iter()
987 .flat_map(|certificate| certificate.value().required_blob_ids())
988 .filter(|blob_id| !created_blob_ids.contains(blob_id))
989 .collect::<Vec<_>>();
990
991 match self
992 .local_node
993 .read_blob_states_from_storage(&required_blob_ids)
994 .await
995 {
996 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
997 self.download_blobs(remote_nodes, &blob_ids).await?;
998 }
999 x => {
1000 x?;
1001 }
1002 }
1003
1004 for certificate in certificates {
1005 if let Some(until) = until_block_time {
1006 if certificate.value().block().header.timestamp >= until {
1007 break;
1008 }
1009 }
1010 let response = self
1011 .handle_certificate_with_retry(&certificate, remote_nodes, mode)
1012 .await?;
1013 info = Some(response.info);
1014 }
1015
1016 Ok(info)
1017 }
1018
1019 async fn handle_certificate_with_retry(
1023 &self,
1024 certificate: &ConfirmedBlockCertificate,
1025 nodes: &[RemoteNode<Env::ValidatorNode>],
1026 mode: ProcessConfirmedBlockMode,
1027 ) -> Result<ChainInfoResponse, chain_client::Error> {
1028 let mut downloaded_blobs = HashSet::<BlobId>::new();
1029 let mut downloaded_blocks = HashSet::<CryptoHash>::new();
1030 let mut events = EventSetDownloader::new(self);
1031 loop {
1032 let result = self
1033 .handle_confirmed_certificate(certificate.clone(), mode)
1034 .await;
1035 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1036 let new_blobs = filter_new(blob_ids, &downloaded_blobs);
1037 if !new_blobs.is_empty() {
1038 self.download_blobs(nodes, &new_blobs).await?;
1039 downloaded_blobs.extend(new_blobs);
1040 continue;
1041 }
1042 }
1043 if let Err(LocalNodeError::BlocksNotFound(hashes)) = &result {
1044 let new_blocks = filter_new(hashes, &downloaded_blocks);
1045 if !new_blocks.is_empty() {
1046 self.download_pre_checkpoint_blocks(nodes, &new_blocks)
1047 .await?;
1048 downloaded_blocks.extend(new_blocks);
1049 continue;
1050 }
1051 }
1052 if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
1053 if events.download_new(event_ids).await? {
1054 continue;
1055 }
1056 }
1057 return Ok(result?);
1058 }
1059 }
1060
1061 async fn download_pre_checkpoint_blocks(
1068 &self,
1069 nodes: &[RemoteNode<Env::ValidatorNode>],
1070 hashes: &[CryptoHash],
1071 ) -> Result<(), chain_client::Error> {
1072 for hash in hashes {
1073 let mut last_error = None;
1074 for node in nodes {
1075 match node.node.download_certificate(*hash).await {
1076 Ok(certificate) => {
1077 Box::pin(self.handle_certificate_with_retry(
1078 &certificate,
1079 nodes,
1080 ProcessConfirmedBlockMode::Auto,
1081 ))
1082 .await?;
1083 last_error = None;
1084 break;
1085 }
1086 Err(error) => last_error = Some(error),
1087 }
1088 }
1089 if let Some(error) = last_error {
1090 return Err(error.into());
1091 }
1092 }
1093 Ok(())
1094 }
1095
1096 async fn handle_certificate<T: ProcessableCertificate>(
1097 &self,
1098 certificate: GenericCertificate<T>,
1099 ) -> Result<ChainInfoResponse, LocalNodeError> {
1100 self.local_node
1101 .handle_certificate(certificate, &self.notifier)
1102 .await
1103 }
1104
1105 async fn handle_confirmed_certificate(
1106 &self,
1107 certificate: ConfirmedBlockCertificate,
1108 mode: ProcessConfirmedBlockMode,
1109 ) -> Result<ChainInfoResponse, LocalNodeError> {
1110 self.local_node
1111 .handle_confirmed_certificate(certificate, mode, &self.notifier)
1112 .await
1113 }
1114
1115 pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
1117 let info = self.local_node.chain_info(self.admin_chain_id).await?;
1118 let hash = info
1119 .committee_hash
1120 .ok_or(LocalNodeError::InactiveChain(self.admin_chain_id))?;
1121 let committee = self
1122 .storage_client()
1123 .get_or_load_committee_by_hash(hash)
1124 .await?;
1125 Ok((info.epoch, committee))
1126 }
1127
1128 async fn validator_nodes(
1130 &self,
1131 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, chain_client::Error> {
1132 let (_, committee) = self.admin_committee().await?;
1133 Ok(self.make_nodes(&committee)?)
1134 }
1135
1136 fn make_nodes(
1138 &self,
1139 committee: &Committee,
1140 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
1141 Ok(self
1142 .validator_node_provider()
1143 .make_nodes(committee)?
1144 .map(|(public_key, node)| RemoteNode { public_key, node })
1145 .collect())
1146 }
1147
1148 pub async fn get_chain_description_blob(
1151 &self,
1152 chain_id: ChainId,
1153 ) -> Result<Arc<Blob>, chain_client::Error> {
1154 let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
1155 let blob = self
1156 .local_node
1157 .storage_client()
1158 .read_blob(chain_desc_id)
1159 .await?;
1160 if let Some(blob) = blob {
1161 return Ok(blob.into_std());
1163 }
1164 self.synchronize_chain_state(self.admin_chain_id).await?;
1166 let nodes = self.validator_nodes().await?;
1167 Ok(self
1168 .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
1169 .await?
1170 .pop()
1171 .unwrap() .into_std())
1173 }
1174
1175 pub async fn get_chain_description(
1178 &self,
1179 chain_id: ChainId,
1180 ) -> Result<ChainDescription, chain_client::Error> {
1181 let blob = self.get_chain_description_blob(chain_id).await?;
1182 Ok(bcs::from_bytes(blob.bytes())?)
1183 }
1184
1185 #[instrument(level = "trace", skip_all)]
1187 pub(crate) async fn finalize_block(
1188 self: &Arc<Self>,
1189 committee: &Committee,
1190 certificate: ValidatedBlockCertificate,
1191 ) -> Result<ConfirmedBlockCertificate, chain_client::Error> {
1192 debug!(round = %certificate.round, "Submitting block for confirmation");
1193 let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
1194 let finalize_action = CommunicateAction::FinalizeBlock {
1195 certificate: Box::new(certificate),
1196 delivery: self.options.cross_chain_message_delivery,
1197 };
1198 let certificate = self
1199 .communicate_chain_action(committee, finalize_action, hashed_value)
1200 .await?;
1201 self.receive_certificate_with_checked_signatures(
1202 certificate.clone(),
1203 ProcessConfirmedBlockMode::Execute,
1204 )
1205 .await?;
1206 Ok(certificate)
1207 }
1208
1209 #[instrument(level = "trace", skip_all)]
1211 async fn submit_block_proposal<T: ProcessableCertificate>(
1212 self: &Arc<Self>,
1213 committee: Arc<Committee>,
1214 proposal: Box<BlockProposal>,
1215 value: T,
1216 ) -> Result<GenericCertificate<T>, chain_client::Error> {
1217 debug!(
1218 round = %proposal.content.round,
1219 "Submitting block proposal to validators"
1220 );
1221
1222 let block_timestamp = proposal.content.block.timestamp;
1224 let local_time = self.local_node.storage_client().clock().current_time();
1225 if block_timestamp > local_time {
1226 info!(
1227 chain_id = %proposal.content.block.chain_id,
1228 %block_timestamp,
1229 %local_time,
1230 "Block timestamp is in the future; waiting until it can be proposed",
1231 );
1232 }
1233
1234 let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
1236 let submit_action = CommunicateAction::SubmitBlock {
1237 proposal,
1238 blob_ids: value.required_blob_ids().into_iter().collect(),
1239 clock_skew_sender,
1240 };
1241
1242 let validity_threshold = committee.validity_threshold();
1244 let committee_clone = committee.clone();
1245 let clock_skew_check_handle = linera_base::Task::spawn(async move {
1246 let mut skew_weight = 0u64;
1247 let mut min_skew = TimeDelta::MAX;
1248 let mut max_skew = TimeDelta::ZERO;
1249 while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
1250 if clock_skew.as_micros() > 0 {
1251 skew_weight += committee_clone.weight(&public_key);
1252 min_skew = min_skew.min(clock_skew);
1253 max_skew = max_skew.max(clock_skew);
1254 if skew_weight >= validity_threshold {
1255 warn!(
1256 skew_weight,
1257 validity_threshold,
1258 min_skew_ms = min_skew.as_micros() / 1000,
1259 max_skew_ms = max_skew.as_micros() / 1000,
1260 "A validity threshold of validators reported clock skew; \
1261 consider checking your system clock",
1262 );
1263 return;
1264 }
1265 }
1266 }
1267 });
1268
1269 let certificate = self
1270 .communicate_chain_action(&committee, submit_action, value)
1271 .await?;
1272
1273 clock_skew_check_handle.await;
1274
1275 self.handle_certificate(certificate.clone()).await?;
1276 Ok(certificate)
1277 }
1278
1279 #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
1281 async fn communicate_chain_updates(
1282 self: &Arc<Self>,
1283 committee: &Committee,
1284 chain_id: ChainId,
1285 height: BlockHeight,
1286 delivery: CrossChainMessageDelivery,
1287 latest_certificate: Option<CacheArc<GenericCertificate<ConfirmedBlock>>>,
1288 ) -> Result<(), chain_client::Error> {
1289 let nodes = self.make_nodes(committee)?;
1290 communicate_with_quorum(
1291 &nodes,
1292 committee,
1293 |_: &()| (),
1294 |remote_node| {
1295 let mut updater = ValidatorUpdater {
1296 remote_node,
1297 client: self.clone(),
1298 admin_chain_id: self.admin_chain_id,
1299 };
1300 let certificate = latest_certificate.clone();
1301 Box::pin(async move {
1302 updater
1303 .send_chain_information(chain_id, height, delivery, certificate)
1304 .await
1305 })
1306 },
1307 self.options.quorum_grace_period,
1308 )
1309 .await?;
1310 Ok(())
1311 }
1312
1313 #[instrument(level = "trace", skip_all)]
1319 async fn communicate_chain_action<T: CertificateValue>(
1320 self: &Arc<Self>,
1321 committee: &Committee,
1322 action: CommunicateAction,
1323 value: T,
1324 ) -> Result<GenericCertificate<T>, chain_client::Error> {
1325 let nodes = self.make_nodes(committee)?;
1326 let ((votes_hash, votes_round), votes) = communicate_with_quorum(
1327 &nodes,
1328 committee,
1329 |vote: &LiteVote| (vote.value.value_hash, vote.round),
1330 |remote_node| {
1331 let mut updater = ValidatorUpdater {
1332 remote_node,
1333 client: self.clone(),
1334 admin_chain_id: self.admin_chain_id,
1335 };
1336 let action = action.clone();
1337 Box::pin(async move { updater.send_chain_update(action).await })
1338 },
1339 self.options.quorum_grace_period,
1340 )
1341 .await?;
1342 ensure!(
1343 (votes_hash, votes_round) == (value.hash(), action.round()),
1344 chain_client::Error::UnexpectedQuorum {
1345 hash: votes_hash,
1346 round: votes_round,
1347 expected_hash: value.hash(),
1348 expected_round: action.round(),
1349 }
1350 );
1351 let certificate = LiteCertificate::try_from_votes(votes)
1356 .ok_or_else(|| {
1357 chain_client::Error::InternalError(
1358 "Vote values or rounds don't match; this is a bug",
1359 )
1360 })?
1361 .with_value(value)
1362 .ok_or_else(|| {
1363 chain_client::Error::ProtocolError("A quorum voted for an unexpected value")
1364 })?;
1365 Ok(certificate)
1366 }
1367
1368 #[instrument(level = "trace", skip_all)]
1371 async fn receive_certificate_with_checked_signatures(
1372 &self,
1373 certificate: ConfirmedBlockCertificate,
1374 mode: ProcessConfirmedBlockMode,
1375 ) -> Result<(), chain_client::Error> {
1376 let block = certificate.block();
1377 self.download_certificates(block.header.chain_id, block.header.height)
1379 .await?;
1380 let nodes = self.validator_nodes().await?;
1383 self.handle_certificate_with_retry(&certificate, &nodes, mode)
1384 .await?;
1385 Ok(())
1386 }
1387
1388 #[instrument(level = "trace", skip_all)]
1393 async fn receive_sender_certificate(
1394 &self,
1395 certificate: CacheArc<ConfirmedBlockCertificate>,
1396 mode: ReceiveCertificateMode,
1397 nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
1398 ) -> Result<(), chain_client::Error> {
1399 if let ReceiveCertificateMode::NeedsCheck = mode {
1401 self.check_certificate(&certificate).await?.into_result()?;
1402 }
1403 let nodes = if let Some(nodes) = nodes {
1405 nodes
1406 } else {
1407 self.validator_nodes().await?
1408 };
1409 let processing_mode = if self
1410 .chain_mode(certificate.value().chain_id())
1411 .is_some_and(|m| m.should_sync_chain_state())
1412 {
1413 ProcessConfirmedBlockMode::Auto
1414 } else {
1415 ProcessConfirmedBlockMode::Preprocess
1416 };
1417 self.handle_certificate_with_retry(&certificate, &nodes, processing_mode)
1418 .await?;
1419
1420 Ok(())
1421 }
1422
1423 #[instrument(level = "debug", skip_all, fields(chain_id = %sender_chain_id))]
1425 async fn download_and_process_sender_chain(
1426 &self,
1427 sender_chain_id: ChainId,
1428 nodes: &[RemoteNode<Env::ValidatorNode>],
1429 received_log: &ReceivedLogs,
1430 mut remote_heights: Vec<BlockHeight>,
1431 sender: mpsc::UnboundedSender<ChainAndHeight>,
1432 ) {
1433 let mut nodes = nodes.to_vec();
1434 while !remote_heights.is_empty() {
1435 if let Ok(local_certs) = self
1438 .storage_client()
1439 .read_certificates_by_heights(sender_chain_id, &remote_heights)
1440 .await
1441 {
1442 let mut still_needed = Vec::new();
1443 for (height, maybe_cert) in remote_heights.iter().copied().zip(local_certs) {
1444 if let Some(certificate) = maybe_cert {
1445 let chain_id = certificate.block().header.chain_id;
1446 if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1447 error!(
1448 %chain_id, %height, %error,
1449 "failed to send chain and height over the channel",
1450 );
1451 }
1452 } else {
1453 still_needed.push(height);
1454 }
1455 }
1456 remote_heights = still_needed;
1457 if remote_heights.is_empty() {
1458 break;
1459 }
1460 }
1461
1462 let remote_heights_ref = &remote_heights;
1463 let certificates = match communicate_concurrently(
1464 &nodes,
1465 async move |remote_node| {
1466 let mut remote_heights = remote_heights_ref.clone();
1467 remote_heights.retain(|height| {
1470 received_log.validator_has_block(
1471 &remote_node.public_key,
1472 sender_chain_id,
1473 *height,
1474 )
1475 });
1476 if remote_heights.is_empty() {
1477 return Err(NodeError::MissingCertificateValue);
1480 }
1481 let certificates = self
1482 .requests_scheduler
1483 .download_certificates_by_heights(
1484 &remote_node,
1485 sender_chain_id,
1486 remote_heights,
1487 )
1488 .await?;
1489 let mut certificates_with_check_results = vec![];
1490 for cert in certificates {
1491 let check_result = self.check_certificate(&cert).await?;
1492 certificates_with_check_results
1493 .push((cert, check_result.into_result().is_ok()));
1494 }
1495 Ok(certificates_with_check_results)
1496 },
1497 self.options.certificate_batch_download_timeout,
1498 )
1499 .await
1500 {
1501 Ok(certificates_with_check_results) => certificates_with_check_results,
1502 Err(errors) => {
1503 let faulty_validators = errors
1504 .into_iter()
1505 .map(|(validator, error)| {
1506 warn!(
1507 %validator,
1508 %sender_chain_id,
1509 %error,
1510 "failed to download certificates from validator",
1511 );
1512 validator
1513 })
1514 .collect::<BTreeSet<_>>();
1515 nodes.retain(|node| !faulty_validators.contains(&node.public_key));
1517 if nodes.is_empty() {
1518 info!(
1519 chain_id = %sender_chain_id,
1520 "could not download certificates for chain - no more correct validators left"
1521 );
1522 return;
1523 }
1524 continue;
1525 }
1526 };
1527
1528 trace!(
1529 num_certificates = %certificates.len(),
1530 "received certificates",
1531 );
1532
1533 let mut to_remove_from_queue = BTreeSet::new();
1534
1535 for (certificate, check_result) in certificates {
1536 let hash = certificate.hash();
1537 let chain_id = certificate.block().header.chain_id;
1538 let height = certificate.block().header.height;
1539 if !check_result {
1540 to_remove_from_queue.insert(height);
1544 continue;
1545 }
1546 let mode = ReceiveCertificateMode::AlreadyChecked;
1548 if let Err(error) = self
1549 .receive_sender_certificate(
1550 self.storage_client().cache_certificate(certificate),
1551 mode,
1552 None,
1553 )
1554 .await
1555 {
1556 warn!(%error, %hash, "Received invalid certificate");
1557 } else {
1558 to_remove_from_queue.insert(height);
1559 if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1560 error!(
1561 %chain_id,
1562 %height,
1563 %error,
1564 "failed to send chain and height over the channel",
1565 );
1566 }
1567 }
1568 }
1569
1570 remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1571 }
1572 trace!("find_received_certificates: finished processing chain");
1573 }
1574
1575 #[instrument(level = "trace", skip(self))]
1577 async fn get_received_log_from_validator(
1578 &self,
1579 chain_id: ChainId,
1580 remote_node: &RemoteNode<Env::ValidatorNode>,
1581 tracker: u64,
1582 ) -> Result<Vec<ChainAndHeight>, chain_client::Error> {
1583 let mut offset = tracker;
1584
1585 let mut remote_log = Vec::new();
1587 loop {
1588 trace!("get_received_log_from_validator: looping");
1589 let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1590 let info = remote_node.handle_chain_info_query(query).await?;
1591 let received_entries = info.requested_received_log.len();
1592 offset += received_entries as u64;
1593 remote_log.extend(info.requested_received_log);
1594 trace!(
1595 remote_node = remote_node.address(),
1596 %received_entries,
1597 "get_received_log_from_validator: received log batch",
1598 );
1599 if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1600 break;
1601 }
1602 }
1603
1604 trace!(
1605 remote_node = remote_node.address(),
1606 num_entries = remote_log.len(),
1607 "get_received_log_from_validator: returning downloaded log",
1608 );
1609
1610 Ok(remote_log)
1611 }
1612
1613 async fn download_sender_block_with_sending_ancestors(
1619 &self,
1620 receiver_chain_id: ChainId,
1621 sender_chain_id: ChainId,
1622 height: BlockHeight,
1623 remote_node: &RemoteNode<Env::ValidatorNode>,
1624 ) -> Result<(), chain_client::Error> {
1625 let next_outbox_height = self
1626 .local_node
1627 .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1628 .await?
1629 .get(&sender_chain_id)
1630 .copied()
1631 .unwrap_or(BlockHeight::ZERO);
1632
1633 let mut certificates = BTreeMap::new();
1636 let mut current_height = height;
1637 let mut current_hash: Option<CryptoHash> = None;
1640
1641 while current_height >= next_outbox_height {
1643 let certificate = if let Some(local) = self
1647 .try_read_local_certificate(sender_chain_id, current_height, current_hash)
1648 .await?
1649 {
1650 local
1651 } else {
1652 let downloaded = self
1653 .requests_scheduler
1654 .download_certificates_by_heights(
1655 remote_node,
1656 sender_chain_id,
1657 vec![current_height],
1658 )
1659 .await?;
1660 let Some(certificate) = downloaded.into_iter().next() else {
1661 return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
1662 chain_id: sender_chain_id,
1663 height: current_height,
1664 });
1665 };
1666 self.storage_client().cache_certificate(certificate)
1667 };
1668
1669 self.check_certificate(&certificate).await?.into_result()?;
1671
1672 let block = certificate.block();
1674 let next = block
1675 .body
1676 .previous_message_blocks
1677 .get(&receiver_chain_id)
1678 .map(|(prev_hash, prev_height)| (*prev_hash, *prev_height));
1679
1680 certificates.insert(current_height, certificate);
1682
1683 if let Some((prev_hash, prev_height)) = next {
1684 current_height = prev_height;
1686 current_hash = Some(prev_hash);
1687 } else {
1688 break;
1690 }
1691 }
1692
1693 if certificates.is_empty() {
1694 self.retry_pending_cross_chain_requests(sender_chain_id)
1695 .await?;
1696 }
1697
1698 for certificate in certificates.into_values() {
1700 self.receive_sender_certificate(
1701 certificate,
1702 ReceiveCertificateMode::AlreadyChecked,
1703 Some(vec![remote_node.clone()]),
1704 )
1705 .await?;
1706 }
1707
1708 Ok(())
1709 }
1710
1711 async fn download_event_bearing_blocks(
1715 &self,
1716 publisher_chain_id: ChainId,
1717 initial_blocks: BTreeSet<(BlockHeight, CryptoHash)>,
1718 local_next_block_height: BlockHeight,
1719 subscribed_streams: &BTreeSet<StreamId>,
1720 remote_node: &RemoteNode<Env::ValidatorNode>,
1721 ) -> Result<(), chain_client::Error> {
1722 if initial_blocks.is_empty() {
1723 return Ok(());
1724 }
1725
1726 let mut certificates = BTreeMap::new();
1727 let mut blocks_to_fetch = initial_blocks;
1728 let next_expected_events = self
1729 .local_node
1730 .next_expected_events(
1731 publisher_chain_id,
1732 subscribed_streams.iter().cloned().collect(),
1733 )
1734 .await?;
1735
1736 while let Some((current_height, current_hash)) = blocks_to_fetch.pop_last() {
1737 if current_height < local_next_block_height {
1738 continue; }
1740 if certificates.contains_key(¤t_height) {
1741 continue;
1742 }
1743
1744 let certificate = if let Some(certificate) =
1745 self.storage_client().read_certificate(current_hash).await?
1746 {
1747 certificate
1748 } else {
1749 let downloaded = self
1750 .requests_scheduler
1751 .download_certificates(remote_node, publisher_chain_id, current_height, 1)
1752 .await?;
1753 let Some(certificate) = downloaded.into_iter().next() else {
1754 tracing::debug!(
1755 validator = remote_node.address(),
1756 %publisher_chain_id,
1757 height = %current_height,
1758 "failed to download event publisher block"
1759 );
1760 continue;
1761 };
1762
1763 self.check_certificate(&certificate).await?.into_result()?;
1764
1765 self.storage_client().cache_certificate(certificate)
1766 };
1767
1768 let block = certificate.block();
1769 for stream_id in subscribed_streams {
1771 if let Some((prev_hash, prev_height)) =
1772 block.body.previous_event_blocks.get(stream_id)
1773 {
1774 if next_expected_events.get(stream_id).is_some_and(|index| {
1775 block
1776 .body
1777 .events
1778 .iter()
1779 .flatten()
1780 .find(|event| event.stream_id == *stream_id)
1781 .is_some_and(|event| event.index == *index)
1782 }) {
1783 continue;
1784 }
1785 if !certificates.contains_key(prev_height) {
1786 blocks_to_fetch.insert((*prev_height, *prev_hash));
1787 }
1788 }
1789 }
1790
1791 certificates.insert(current_height, certificate);
1792 }
1793
1794 for certificate in certificates.into_values() {
1796 self.receive_sender_certificate(
1797 certificate,
1798 ReceiveCertificateMode::AlreadyChecked,
1799 Some(vec![remote_node.clone()]),
1800 )
1801 .await?;
1802 }
1803
1804 Ok(())
1805 }
1806
1807 async fn sync_events_from_node(
1810 &self,
1811 chain_id: ChainId,
1812 stream_ids: &BTreeSet<StreamId>,
1813 remote_node: &RemoteNode<Env::ValidatorNode>,
1814 ) -> Result<(), chain_client::Error> {
1815 let stream_ids_vec = stream_ids.iter().cloned().collect::<Vec<_>>();
1816 let mut initial_blocks = BTreeSet::new();
1817 for chunk in stream_ids_vec.chunks(self.options.max_event_stream_queries) {
1818 let query = ChainInfoQuery::new(chain_id).with_previous_event_blocks(chunk.to_vec());
1819 let info = remote_node.handle_chain_info_query(query).await?;
1820 initial_blocks.extend(info.requested_previous_event_blocks.values().copied());
1821 }
1822 let local_height = match self.local_node.chain_info(chain_id).await {
1823 Ok(info) => info.next_block_height,
1824 Err(LocalNodeError::InactiveChain(_) | LocalNodeError::BlobsNotFound(_)) => {
1825 BlockHeight::ZERO
1826 }
1827 Err(error) => return Err(error.into()),
1828 };
1829 self.download_event_bearing_blocks(
1830 chain_id,
1831 initial_blocks,
1832 local_height,
1833 stream_ids,
1834 remote_node,
1835 )
1836 .await
1837 }
1838
1839 #[instrument(
1840 level = "trace", skip_all,
1841 fields(certificate_hash = ?incoming_certificate.hash()),
1842 )]
1843 async fn check_certificate(
1844 &self,
1845 incoming_certificate: &ConfirmedBlockCertificate,
1846 ) -> Result<CheckCertificateResult, NodeError> {
1847 let epoch = incoming_certificate.block().header.epoch;
1848 let storage = self.storage_client();
1849 let view_err = |error: ExecutionError| NodeError::ViewError {
1850 error: error.to_string(),
1851 };
1852 if storage.is_epoch_revoked(epoch).await.map_err(view_err)? {
1853 return Ok(CheckCertificateResult::OldEpoch);
1854 }
1855 let Some(committee) = storage.committee_for_epoch(epoch).await.map_err(view_err)? else {
1856 return Ok(CheckCertificateResult::FutureEpoch);
1857 };
1858 incoming_certificate.check(&committee)?;
1859 Ok(CheckCertificateResult::New)
1860 }
1861
1862 #[instrument(level = "trace", skip_all)]
1866 async fn synchronize_chain_state(
1867 &self,
1868 chain_id: ChainId,
1869 ) -> Result<Box<ChainInfo>, chain_client::Error> {
1870 let (_, committee) = self.admin_committee().await?;
1871 self.synchronize_chain_from_committee(chain_id, committee)
1872 .await
1873 }
1874
1875 #[instrument(level = "trace", skip_all)]
1880 pub(crate) async fn synchronize_chain_from_committee(
1881 &self,
1882 chain_id: ChainId,
1883 committee: Arc<Committee>,
1884 ) -> Result<Box<ChainInfo>, chain_client::Error> {
1885 #[cfg(with_metrics)]
1886 let _latency = if !self.is_chain_follow_only(chain_id) {
1887 Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1888 } else {
1889 None
1890 };
1891
1892 let validators = self.make_nodes(&committee)?;
1893 Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1894 communicate_with_quorum(
1895 &validators,
1896 &committee,
1897 |_: &()| (),
1898 |remote_node| async move {
1899 self.synchronize_chain_state_from(&remote_node, chain_id)
1900 .await
1901 },
1902 self.options.quorum_grace_period,
1903 )
1904 .await?;
1905
1906 self.local_node
1907 .chain_info(chain_id)
1908 .await
1909 .map_err(Into::into)
1910 }
1911
1912 #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1918 pub(crate) async fn synchronize_chain_state_from(
1919 &self,
1920 remote_node: &RemoteNode<Env::ValidatorNode>,
1921 chain_id: ChainId,
1922 ) -> Result<(), chain_client::Error> {
1923 let with_manager_values = !self.is_chain_follow_only(chain_id);
1924 let query = if with_manager_values {
1925 ChainInfoQuery::new(chain_id).with_manager_values()
1926 } else {
1927 ChainInfoQuery::new(chain_id)
1928 };
1929 let query = query.with_latest_checkpoint_height();
1930 let remote_info = remote_node.handle_chain_info_query(query).await?;
1931
1932 if let Some(checkpoint_height) = remote_info.requested_latest_checkpoint_height {
1939 self.bootstrap_chain_from_checkpoint(remote_node, chain_id, checkpoint_height)
1940 .await?;
1941 }
1942
1943 let local_info = self
1944 .download_certificates_from(remote_node, chain_id, remote_info.next_block_height, None)
1945 .await?;
1946
1947 if !with_manager_values {
1948 return Ok(());
1949 }
1950
1951 let local_height = local_info.next_block_height;
1953 if local_height != remote_info.next_block_height {
1954 debug!(
1955 remote_node = remote_node.address(),
1956 remote_height = %remote_info.next_block_height,
1957 local_height = %local_height,
1958 "synced from validator, but remote height and local height are different",
1959 );
1960 return Ok(());
1961 };
1962
1963 if let Some(timeout) = remote_info.manager.timeout {
1964 self.handle_certificate(*timeout).await?;
1965 }
1966 let mut proposals = Vec::new();
1967 if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1968 proposals.push(*proposal);
1969 }
1970 if let Some(proposal) = remote_info.manager.requested_proposed {
1971 proposals.push(*proposal);
1972 }
1973 if let Some(locking) = remote_info.manager.requested_locking {
1974 match *locking {
1975 LockingBlock::Fast(proposal) => {
1976 proposals.push(proposal);
1977 }
1978 LockingBlock::Regular(cert) => {
1979 let hash = cert.hash();
1980 if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1981 {
1982 debug!(
1983 remote_node = remote_node.address(),
1984 %hash,
1985 height = %local_height,
1986 %error,
1987 "skipping locked block from validator",
1988 );
1989 }
1990 }
1991 }
1992 }
1993 'proposal_loop: for proposal in proposals {
1994 let owner: AccountOwner = proposal.owner();
1995 if let Err(mut err) = self
1996 .local_node
1997 .handle_block_proposal(proposal.clone())
1998 .await
1999 {
2000 if let LocalNodeError::BlobsNotFound(_) = &err {
2001 let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
2002 if !required_blob_ids.is_empty() {
2003 let mut blobs = Vec::new();
2004 for blob_id in required_blob_ids {
2005 let blob_content = match self
2006 .requests_scheduler
2007 .download_pending_blob(remote_node, chain_id, blob_id)
2008 .await
2009 {
2010 Ok(content) => content,
2011 Err(error) => {
2012 info!(
2013 remote_node = remote_node.address(),
2014 height = %local_height,
2015 proposer = %owner,
2016 %blob_id,
2017 %error,
2018 "skipping proposal from validator; failed to download blob",
2019 );
2020 continue 'proposal_loop;
2021 }
2022 };
2023 blobs.push(Blob::new(blob_content));
2024 }
2025 self.local_node
2026 .handle_pending_blobs(chain_id, blobs)
2027 .await?;
2028 if let Err(new_err) = self
2030 .local_node
2031 .handle_block_proposal(proposal.clone())
2032 .await
2033 {
2034 err = new_err;
2035 } else {
2036 continue;
2037 }
2038 }
2039 if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
2040 self.update_local_node_with_blobs_from(
2041 blob_ids.clone(),
2042 slice::from_ref(remote_node),
2043 )
2044 .await?;
2045 if let Err(new_err) = self
2047 .local_node
2048 .handle_block_proposal(proposal.clone())
2049 .await
2050 {
2051 err = new_err;
2052 } else {
2053 continue;
2054 }
2055 }
2056 }
2057 if let LocalNodeError::EventsNotFound(event_ids) = &err {
2058 if let Err(error) =
2059 Box::pin(self.download_certificates_for_events(event_ids)).await
2060 {
2061 info!(
2062 remote_node = remote_node.address(),
2063 height = %local_height,
2064 proposer = %owner,
2065 %error,
2066 "skipping proposal from validator; failed to download events",
2067 );
2068 continue 'proposal_loop;
2069 }
2070 if let Err(new_err) = self
2072 .local_node
2073 .handle_block_proposal(proposal.clone())
2074 .await
2075 {
2076 err = new_err;
2077 } else {
2078 continue;
2079 }
2080 }
2081 while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
2082 if let ChainError::MissingCrossChainUpdate {
2083 chain_id,
2084 origin,
2085 height,
2086 } = &**chain_err
2087 {
2088 self.download_sender_block_with_sending_ancestors(
2089 *chain_id,
2090 *origin,
2091 *height,
2092 remote_node,
2093 )
2094 .await?;
2095 if let Err(new_err) = self
2097 .local_node
2098 .handle_block_proposal(proposal.clone())
2099 .await
2100 {
2101 err = new_err;
2102 } else {
2103 continue 'proposal_loop;
2104 }
2105 } else {
2106 break;
2107 }
2108 }
2109
2110 debug!(
2111 remote_node = remote_node.address(),
2112 proposer = %owner,
2113 height = %local_height,
2114 error = %err,
2115 "skipping proposal from validator",
2116 );
2117 }
2118 }
2119 Ok(())
2120 }
2121
2122 async fn try_process_locking_block_from(
2123 &self,
2124 remote_node: &RemoteNode<Env::ValidatorNode>,
2125 certificate: GenericCertificate<ValidatedBlock>,
2126 ) -> Result<(), chain_client::Error> {
2127 let chain_id = certificate.inner().chain_id();
2128 let mut downloaded_blobs = HashSet::<BlobId>::new();
2129 let mut events = EventSetDownloader::new(self);
2130 loop {
2131 let result = self.handle_certificate(certificate.clone()).await;
2132 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
2133 let new_blobs = filter_new(blob_ids, &downloaded_blobs);
2134 if !new_blobs.is_empty() {
2135 let mut blobs = Vec::new();
2136 for blob_id in &new_blobs {
2137 let blob_content = self
2138 .requests_scheduler
2139 .download_pending_blob(remote_node, chain_id, *blob_id)
2140 .await?;
2141 blobs.push(Blob::new(blob_content));
2142 }
2143 self.local_node
2144 .handle_pending_blobs(chain_id, blobs)
2145 .await?;
2146 downloaded_blobs.extend(new_blobs);
2147 continue;
2148 }
2149 }
2150 if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
2151 if events.download_new(event_ids).await? {
2152 continue;
2153 }
2154 }
2155 result?;
2156 return Ok(());
2157 }
2158 }
2159
2160 async fn update_local_node_with_blobs_from(
2163 &self,
2164 blob_ids: Vec<BlobId>,
2165 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
2166 ) -> Result<Vec<CacheArc<Blob>>, chain_client::Error> {
2167 let timeout = self.options.blob_download_timeout;
2168 let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
2170 stream::iter(blob_ids.into_iter().map(|blob_id| {
2171 communicate_concurrently(
2172 remote_nodes,
2173 async move |remote_node| {
2174 let certificate = self
2175 .requests_scheduler
2176 .download_certificate_for_blob(&remote_node, blob_id)
2177 .await?;
2178 self.receive_sender_certificate(
2179 self.storage_client().cache_certificate(certificate),
2180 ReceiveCertificateMode::NeedsCheck,
2181 Some(vec![remote_node.clone()]),
2182 )
2183 .await?;
2184 let blob = self
2185 .local_node
2186 .storage_client()
2187 .read_blob(blob_id)
2188 .await?
2189 .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
2190 Result::<_, chain_client::Error>::Ok(blob)
2191 },
2192 timeout,
2193 )
2194 .map_err(move |errors| {
2195 for (validator, error) in &errors {
2196 warn!(
2197 %validator,
2198 %blob_id,
2199 %error,
2200 "failed to download certificate-for-blob from validator",
2201 );
2202 }
2203 chain_client::Error::from(NodeError::BlobsNotFound(vec![blob_id]))
2204 })
2205 }))
2206 .buffer_unordered(self.options.max_joined_tasks)
2207 .collect::<Vec<_>>()
2208 .await
2209 .into_iter()
2210 .collect()
2211 }
2212
2213 #[instrument(level = "trace", skip(self, block))]
2223 async fn stage_block_execution(
2224 &self,
2225 block: ProposedBlock,
2226 round: Option<u32>,
2227 published_blobs: Vec<Blob>,
2228 policy: BundleExecutionPolicy,
2229 ) -> Result<(Block, ChainInfoResponse, HashSet<ChainId>), chain_client::Error> {
2230 let mut events = EventSetDownloader::new(self);
2231 loop {
2232 let result = self
2233 .local_node
2234 .stage_block_execution(
2235 block.clone(),
2236 round,
2237 published_blobs.clone(),
2238 policy.clone(),
2239 )
2240 .await;
2241 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
2242 let validators = self.validator_nodes().await?;
2243 self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
2244 .await?;
2245 continue; }
2247 if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
2248 if events.download_new(event_ids).await? {
2249 continue; }
2251 }
2253 if let Ok((_, executed_block, _, _, _)) = &result {
2254 let hash = CryptoHash::new(executed_block);
2255 let notification = Notification {
2256 chain_id: executed_block.header.chain_id,
2257 reason: Reason::BlockExecuted {
2258 height: executed_block.header.height,
2259 hash,
2260 },
2261 };
2262 self.notifier.notify(&[notification]);
2263 }
2264 let (
2265 _modified_block,
2266 executed_block,
2267 response,
2268 _resource_tracker,
2269 never_reject_origins,
2270 ) = result?;
2271 return Ok((executed_block, response, never_reject_origins));
2272 }
2273 }
2274}
2275
2276fn filter_new<T: Clone + Eq + std::hash::Hash>(
2278 ids: &[T],
2279 already_downloaded: &HashSet<T>,
2280) -> Vec<T> {
2281 ids.iter()
2282 .filter(|id| !already_downloaded.contains(*id))
2283 .cloned()
2284 .collect()
2285}
2286
2287pub(crate) struct EventSetDownloader<'a, Env: Environment> {
2292 client: &'a Client<Env>,
2293 downloaded: HashSet<EventId>,
2294}
2295
2296impl<'a, Env: Environment> EventSetDownloader<'a, Env> {
2297 pub(crate) fn new(client: &'a Client<Env>) -> Self {
2298 Self {
2299 client,
2300 downloaded: HashSet::new(),
2301 }
2302 }
2303
2304 pub(crate) async fn download_new(
2310 &mut self,
2311 event_ids: &[EventId],
2312 ) -> Result<bool, chain_client::Error> {
2313 let new_events = filter_new(event_ids, &self.downloaded);
2314 if new_events.is_empty() {
2315 return Ok(false);
2316 }
2317 Box::pin(self.client.download_certificates_for_events(&new_events)).await?;
2318 self.downloaded.extend(new_events);
2319 Ok(true)
2320 }
2321}
2322
2323async fn communicate_concurrently<'a, A, E, F, R, V>(
2327 nodes: &[RemoteNode<A>],
2328 f: F,
2329 timeout: Duration,
2330) -> Result<V, Vec<(ValidatorPublicKey, E)>>
2331where
2332 F: Clone + FnOnce(RemoteNode<A>) -> R,
2333 RemoteNode<A>: Clone,
2334 R: Future<Output = Result<V, E>> + 'a,
2335{
2336 let mut nodes = nodes.to_vec();
2337 nodes.shuffle(&mut rand::thread_rng());
2338 let mut stream = nodes
2339 .iter()
2340 .zip(0..)
2341 .map(|(remote_node, i)| {
2342 let fun = f.clone();
2343 let node = remote_node.clone();
2344 async move {
2345 linera_base::time::timer::sleep(timeout * i * i).await;
2346 fun(node).await.map_err(|err| (remote_node.public_key, err))
2347 }
2348 })
2349 .collect::<FuturesUnordered<_>>();
2350 let mut errors = vec![];
2351 while let Some(maybe_result) = stream.next().await {
2352 match maybe_result {
2353 Ok(result) => return Ok(result),
2354 Err(error) => errors.push(error),
2355 };
2356 }
2357 Err(errors)
2358}
2359
2360#[must_use]
2362pub struct AbortOnDrop(pub AbortHandle);
2363
2364impl Drop for AbortOnDrop {
2365 #[instrument(level = "trace", skip(self))]
2366 fn drop(&mut self) {
2367 self.0.abort();
2368 }
2369}
2370
2371#[derive(Clone, Serialize, Deserialize)]
2373pub struct PendingProposal {
2374 pub block: ProposedBlock,
2375 pub blobs: Vec<Blob>,
2376 #[serde(default)]
2379 pub auto_retry_outcome: Option<BlockExecutionOutcome>,
2380 #[serde(default)]
2382 pub round: Option<Round>,
2383}
2384
2385enum ReceiveCertificateMode {
2386 NeedsCheck,
2387 AlreadyChecked,
2388}
2389
2390enum CheckCertificateResult {
2391 OldEpoch,
2393 FutureEpoch,
2396 New,
2397}
2398
2399impl CheckCertificateResult {
2400 fn into_result(self) -> Result<(), chain_client::Error> {
2401 match self {
2402 Self::OldEpoch => Err(chain_client::Error::CommitteeDeprecationError),
2403 Self::FutureEpoch => Err(chain_client::Error::CommitteeSynchronizationError),
2404 Self::New => Ok(()),
2405 }
2406 }
2407}
2408
2409#[cfg(not(target_arch = "wasm32"))]
2413pub async fn create_bytecode_blobs(
2414 contract: Bytecode,
2415 service: Bytecode,
2416 vm_runtime: VmRuntime,
2417 formats: Option<Vec<u8>>,
2418) -> (Vec<Blob>, ModuleId) {
2419 let formats_blob = formats.map(Blob::new_application_formats);
2420 let formats_blob_hash = formats_blob.as_ref().map(|blob| blob.id().hash);
2421 let (mut blobs, module_id) = match vm_runtime {
2422 VmRuntime::Wasm => {
2423 let (compressed_contract, compressed_service) =
2424 tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
2425 .await
2426 .expect("Compression should not panic");
2427 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
2428 let service_blob = Blob::new_service_bytecode(compressed_service);
2429 let module_id = ModuleId::new_with_formats(
2430 contract_blob.id().hash,
2431 service_blob.id().hash,
2432 vm_runtime,
2433 formats_blob_hash,
2434 );
2435 (vec![contract_blob, service_blob], module_id)
2436 }
2437 VmRuntime::Evm => {
2438 let compressed_contract = contract.compress();
2439 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
2440 let module_id = ModuleId::new_with_formats(
2441 evm_contract_blob.id().hash,
2442 evm_contract_blob.id().hash,
2443 vm_runtime,
2444 formats_blob_hash,
2445 );
2446 (vec![evm_contract_blob], module_id)
2447 }
2448 };
2449 if let Some(blob) = formats_blob {
2450 blobs.push(blob);
2451 }
2452 (blobs, module_id)
2453}