1use std::{
6 cmp::Ordering,
7 collections::{BTreeMap, BTreeSet, HashSet},
8 sync::{Arc, RwLock},
9};
10
11use custom_debug_derive::Debug;
12use futures::{
13 future::Future,
14 stream::{self, AbortHandle, FuturesUnordered, StreamExt},
15};
16#[cfg(with_metrics)]
17use linera_base::prometheus_util::MeasureLatency as _;
18use linera_base::{
19 crypto::{CryptoHash, Signer, ValidatorPublicKey},
20 data_types::{ArithmeticError, Blob, BlockHeight, ChainDescription, Epoch},
21 ensure,
22 identifiers::{AccountOwner, BlobId, BlobType, ChainId, StreamId},
23 time::Duration,
24};
25#[cfg(not(target_arch = "wasm32"))]
26use linera_base::{data_types::Bytecode, identifiers::ModuleId, vm::VmRuntime};
27use linera_chain::{
28 data_types::{
29 BlockProposal, ChainAndHeight, IncomingBundle, LiteVote, MessageAction, ProposedBlock,
30 Transaction,
31 },
32 manager::LockingBlock,
33 types::{
34 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
35 LiteCertificate, ValidatedBlock, ValidatedBlockCertificate,
36 },
37 ChainError, ChainExecutionContext,
38};
39use linera_execution::committee::Committee;
40use linera_storage::{ResultReadCertificates, Storage as _};
41use rand::{
42 distributions::{Distribution, WeightedIndex},
43 seq::SliceRandom,
44};
45use received_log::ReceivedLogs;
46use serde::{Deserialize, Serialize};
47use tokio::sync::mpsc;
48use tracing::{debug, error, info, instrument, trace, warn};
49
50use crate::{
51 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, RoundTimeout},
52 environment::Environment,
53 local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
54 node::{CrossChainMessageDelivery, NodeError, ValidatorNode, ValidatorNodeProvider as _},
55 notifier::ChannelNotifier,
56 remote_node::RemoteNode,
57 updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
58 worker::{Notification, ProcessableCertificate, WorkerError, WorkerState},
59 CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
60};
61
62pub mod chain_client;
63pub use chain_client::ChainClient;
64
65pub use crate::data_types::ClientOutcome;
66
67#[cfg(test)]
68#[path = "../unit_tests/client_tests.rs"]
69mod client_tests;
70mod received_log;
71mod validator_trackers;
72
73#[cfg(with_metrics)]
74mod metrics {
75 use std::sync::LazyLock;
76
77 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
78 use prometheus::HistogramVec;
79
80 pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
81 LazyLock::new(|| {
82 register_histogram_vec(
83 "process_inbox_latency",
84 "process_inbox latency",
85 &[],
86 exponential_bucket_latencies(500.0),
87 )
88 });
89
90 pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
91 register_histogram_vec(
92 "prepare_chain_latency",
93 "prepare_chain latency",
94 &[],
95 exponential_bucket_latencies(500.0),
96 )
97 });
98
99 pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
100 register_histogram_vec(
101 "synchronize_chain_state_latency",
102 "synchronize_chain_state latency",
103 &[],
104 exponential_bucket_latencies(500.0),
105 )
106 });
107
108 pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
109 register_histogram_vec(
110 "execute_block_latency",
111 "execute_block latency",
112 &[],
113 exponential_bucket_latencies(500.0),
114 )
115 });
116
117 pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
118 register_histogram_vec(
119 "find_received_certificates_latency",
120 "find_received_certificates latency",
121 &[],
122 exponential_bucket_latencies(500.0),
123 )
124 });
125}
126
127pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
128pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
129
130#[derive(Clone, Debug)]
132pub struct MessagePolicy {
133 blanket: BlanketMessagePolicy,
135 restrict_chain_ids_to: Option<HashSet<ChainId>>,
139}
140
141#[derive(Copy, Clone, Debug, clap::ValueEnum)]
142pub enum BlanketMessagePolicy {
143 Accept,
145 Reject,
148 Ignore,
151}
152
153impl MessagePolicy {
154 pub fn new(
155 blanket: BlanketMessagePolicy,
156 restrict_chain_ids_to: Option<HashSet<ChainId>>,
157 ) -> Self {
158 Self {
159 blanket,
160 restrict_chain_ids_to,
161 }
162 }
163
164 #[cfg(with_testing)]
165 pub fn new_accept_all() -> Self {
166 Self {
167 blanket: BlanketMessagePolicy::Accept,
168 restrict_chain_ids_to: None,
169 }
170 }
171
172 #[instrument(level = "trace", skip(self))]
173 fn must_handle(&self, bundle: &mut IncomingBundle) -> bool {
174 if self.is_reject() {
175 if bundle.bundle.is_skippable() {
176 return false;
177 } else if !bundle.bundle.is_protected() {
178 bundle.action = MessageAction::Reject;
179 }
180 }
181 match &self.restrict_chain_ids_to {
182 None => true,
183 Some(chains) => chains.contains(&bundle.origin),
184 }
185 }
186
187 #[instrument(level = "trace", skip(self))]
188 fn is_ignore(&self) -> bool {
189 matches!(self.blanket, BlanketMessagePolicy::Ignore)
190 }
191
192 #[instrument(level = "trace", skip(self))]
193 fn is_reject(&self) -> bool {
194 matches!(self.blanket, BlanketMessagePolicy::Reject)
195 }
196}
197
198#[derive(Debug, Clone, Copy)]
199pub enum TimingType {
200 ExecuteOperations,
201 ExecuteBlock,
202 SubmitBlockProposal,
203 UpdateValidators,
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
210pub enum ListeningMode {
211 FullChain,
213 EventsOnly(BTreeSet<StreamId>),
215}
216
217impl PartialOrd for ListeningMode {
218 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
219 match (self, other) {
220 (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
221 (ListeningMode::FullChain, _) => Some(Ordering::Greater),
222 (_, ListeningMode::FullChain) => Some(Ordering::Less),
223 (ListeningMode::EventsOnly(events_a), ListeningMode::EventsOnly(events_b)) => {
224 if events_a.is_superset(events_b) {
225 Some(Ordering::Greater)
226 } else if events_b.is_superset(events_a) {
227 Some(Ordering::Less)
228 } else {
229 None
230 }
231 }
232 }
233 }
234}
235
236impl ListeningMode {
237 pub fn extend(&mut self, other: Option<ListeningMode>) {
238 match (self, other) {
239 (_, None) => (),
240 (ListeningMode::FullChain, _) => (),
241 (mode, Some(ListeningMode::FullChain)) => {
242 *mode = ListeningMode::FullChain;
243 }
244 (
245 ListeningMode::EventsOnly(self_events),
246 Some(ListeningMode::EventsOnly(other_events)),
247 ) => {
248 self_events.extend(other_events);
249 }
250 }
251 }
252}
253
254pub struct Client<Env: Environment> {
256 environment: Env,
257 local_node: LocalNodeClient<Env::Storage>,
260 admin_id: ChainId,
262 tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
265 notifier: Arc<ChannelNotifier<Notification>>,
267 chains: papaya::HashMap<ChainId, chain_client::State>,
269 options: chain_client::Options,
271}
272
273impl<Env: Environment> Client<Env> {
274 #[instrument(level = "trace", skip_all)]
276 #[allow(clippy::too_many_arguments)]
277 pub fn new(
278 environment: Env,
279 admin_id: ChainId,
280 long_lived_services: bool,
281 tracked_chains: impl IntoIterator<Item = ChainId>,
282 name: impl Into<String>,
283 chain_worker_ttl: Duration,
284 sender_chain_worker_ttl: Duration,
285 options: chain_client::Options,
286 block_cache_size: usize,
287 execution_state_cache_size: usize,
288 ) -> Self {
289 let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
290 let state = WorkerState::new_for_client(
291 name.into(),
292 environment.storage().clone(),
293 tracked_chains.clone(),
294 block_cache_size,
295 execution_state_cache_size,
296 )
297 .with_long_lived_services(long_lived_services)
298 .with_allow_inactive_chains(true)
299 .with_allow_messages_from_deprecated_epochs(true)
300 .with_chain_worker_ttl(chain_worker_ttl)
301 .with_sender_chain_worker_ttl(sender_chain_worker_ttl);
302 let local_node = LocalNodeClient::new(state);
303
304 Self {
305 environment,
306 local_node,
307 chains: papaya::HashMap::new(),
308 admin_id,
309 tracked_chains,
310 notifier: Arc::new(ChannelNotifier::default()),
311 options,
312 }
313 }
314
315 pub fn storage_client(&self) -> &Env::Storage {
317 self.environment.storage()
318 }
319
320 pub fn validator_node_provider(&self) -> &Env::Network {
321 self.environment.network()
322 }
323
324 #[instrument(level = "trace", skip(self))]
326 pub fn signer(&self) -> &impl Signer {
327 self.environment.signer()
328 }
329
330 #[instrument(level = "trace", skip(self))]
332 pub fn track_chain(&self, chain_id: ChainId) {
333 self.tracked_chains
334 .write()
335 .expect("Panics should not happen while holding a lock to `tracked_chains`")
336 .insert(chain_id);
337 }
338
339 #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
341 pub fn create_chain_client(
342 self: &Arc<Self>,
343 chain_id: ChainId,
344 block_hash: Option<CryptoHash>,
345 next_block_height: BlockHeight,
346 pending_proposal: Option<PendingProposal>,
347 preferred_owner: Option<AccountOwner>,
348 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
349 ) -> ChainClient<Env> {
350 self.chains.pin().get_or_insert_with(chain_id, || {
353 chain_client::State::new(pending_proposal.clone())
354 });
355
356 ChainClient::new(
357 self.clone(),
358 chain_id,
359 self.options.clone(),
360 block_hash,
361 next_block_height,
362 preferred_owner,
363 timing_sender,
364 )
365 }
366
367 async fn fetch_chain_info(
369 &self,
370 chain_id: ChainId,
371 validators: &[RemoteNode<Env::ValidatorNode>],
372 ) -> Result<Box<ChainInfo>, chain_client::Error> {
373 match self.local_node.chain_info(chain_id).await {
374 Ok(info) => Ok(info),
375 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
376 self.synchronize_chain_state(self.admin_id).await?;
378 self.update_local_node_with_blobs_from(blob_ids, validators)
381 .await?;
382 Ok(self.local_node.chain_info(chain_id).await?)
383 }
384 Err(err) => Err(err.into()),
385 }
386 }
387
388 fn weighted_select(
389 remaining_validators: &mut Vec<RemoteNode<Env::ValidatorNode>>,
390 remaining_weights: &mut Vec<u64>,
391 ) -> Option<RemoteNode<Env::ValidatorNode>> {
392 if remaining_weights.is_empty() {
393 return None;
394 }
395 let dist = WeightedIndex::new(remaining_weights.clone()).unwrap();
396 let idx = dist.sample(&mut rand::thread_rng());
397 remaining_weights.remove(idx);
398 Some(remaining_validators.remove(idx))
399 }
400
401 #[instrument(level = "trace", skip(self))]
403 async fn download_certificates(
404 &self,
405 chain_id: ChainId,
406 target_next_block_height: BlockHeight,
407 ) -> Result<Box<ChainInfo>, chain_client::Error> {
408 let (_, committee) = self.admin_committee().await?;
409 let mut remaining_validators = self.make_nodes(&committee)?;
410 let mut info = self
411 .fetch_chain_info(chain_id, &remaining_validators)
412 .await?;
413 let mut remaining_weights = remaining_validators
415 .iter()
416 .map(|validator| {
417 let validator_state = committee.validators.get(&validator.public_key).unwrap();
418 validator_state.votes
419 })
420 .collect::<Vec<_>>();
421
422 while let Some(remote_node) =
423 Self::weighted_select(&mut remaining_validators, &mut remaining_weights)
424 {
425 if target_next_block_height <= info.next_block_height {
426 return Ok(info);
427 }
428 match self
429 .download_certificates_from(&remote_node, chain_id, target_next_block_height)
430 .await
431 {
432 Err(err) => info!(
433 "Failed to download certificates from validator {:?}: {err}",
434 remote_node.public_key
435 ),
436 Ok(Some(new_info)) => info = new_info,
437 Ok(None) => {}
438 }
439 }
440 ensure!(
441 target_next_block_height <= info.next_block_height,
442 chain_client::Error::CannotDownloadCertificates {
443 chain_id,
444 target_next_block_height,
445 }
446 );
447 Ok(info)
448 }
449
450 #[instrument(level = "trace", skip_all)]
453 async fn download_certificates_from(
454 &self,
455 remote_node: &RemoteNode<Env::ValidatorNode>,
456 chain_id: ChainId,
457 stop: BlockHeight,
458 ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
459 let mut last_info = None;
460 let chain_info = self.local_node.chain_info(chain_id).await?;
462 let mut next_height = chain_info.next_block_height;
463 let hashes = self
464 .local_node
465 .get_preprocessed_block_hashes(chain_id, next_height, stop)
466 .await?;
467 let certificates = self
468 .storage_client()
469 .read_certificates(hashes.clone())
470 .await?;
471 let certificates = match ResultReadCertificates::new(certificates, hashes) {
472 ResultReadCertificates::Certificates(certificates) => certificates,
473 ResultReadCertificates::InvalidHashes(hashes) => {
474 return Err(chain_client::Error::ReadCertificatesError(hashes))
475 }
476 };
477 for certificate in certificates {
478 last_info = Some(self.handle_certificate(certificate).await?.info);
479 }
480 while next_height < stop {
482 let limit = u64::from(stop)
484 .checked_sub(u64::from(next_height))
485 .ok_or(ArithmeticError::Overflow)?
486 .min(self.options.certificate_download_batch_size);
487 let certificates = remote_node
488 .query_certificates_from(chain_id, next_height, limit)
489 .await?;
490 let Some(info) = self.process_certificates(remote_node, certificates).await? else {
491 break;
492 };
493 assert!(info.next_block_height > next_height);
494 next_height = info.next_block_height;
495 last_info = Some(info);
496 }
497 Ok(last_info)
498 }
499
500 async fn download_blobs(
501 &self,
502 remote_node: &RemoteNode<impl ValidatorNode>,
503 blob_ids: impl IntoIterator<Item = BlobId>,
504 ) -> Result<(), chain_client::Error> {
505 self.local_node
506 .store_blobs(
507 &futures::stream::iter(blob_ids.into_iter().map(|blob_id| async move {
508 remote_node.try_download_blob(blob_id).await.unwrap()
509 }))
510 .buffer_unordered(self.options.max_joined_tasks)
511 .collect::<Vec<_>>()
512 .await,
513 )
514 .await
515 .map_err(Into::into)
516 }
517
518 #[instrument(level = "trace", skip_all)]
521 async fn process_certificates(
522 &self,
523 remote_node: &RemoteNode<impl ValidatorNode>,
524 certificates: Vec<ConfirmedBlockCertificate>,
525 ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
526 let mut info = None;
527 let required_blob_ids: Vec<_> = certificates
528 .iter()
529 .flat_map(|certificate| certificate.value().required_blob_ids())
530 .collect();
531
532 match self
533 .local_node
534 .read_blob_states_from_storage(&required_blob_ids)
535 .await
536 {
537 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
538 self.download_blobs(remote_node, blob_ids).await?;
539 }
540 x => {
541 x?;
542 }
543 }
544
545 for certificate in certificates {
546 info = Some(
547 match self.handle_certificate(certificate.clone()).await {
548 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
549 self.download_blobs(remote_node, blob_ids).await?;
550 self.handle_certificate(certificate).await?
551 }
552 x => x?,
553 }
554 .info,
555 );
556 }
557
558 Ok(info)
560 }
561
562 async fn handle_certificate<T: ProcessableCertificate>(
563 &self,
564 certificate: GenericCertificate<T>,
565 ) -> Result<ChainInfoResponse, LocalNodeError> {
566 self.local_node
567 .handle_certificate(certificate, &self.notifier)
568 .await
569 }
570
571 async fn chain_info_with_committees(
572 &self,
573 chain_id: ChainId,
574 ) -> Result<Box<ChainInfo>, LocalNodeError> {
575 let query = ChainInfoQuery::new(chain_id).with_committees();
576 let info = self.local_node.handle_chain_info_query(query).await?.info;
577 Ok(info)
578 }
579
580 #[instrument(level = "trace", skip_all)]
583 async fn admin_committees(
584 &self,
585 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
586 let info = self.chain_info_with_committees(self.admin_id).await?;
587 Ok((info.epoch, info.into_committees()?))
588 }
589
590 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
592 let info = self.chain_info_with_committees(self.admin_id).await?;
593 Ok((info.epoch, info.into_current_committee()?))
594 }
595
596 async fn validator_nodes(
598 &self,
599 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, chain_client::Error> {
600 let (_, committee) = self.admin_committee().await?;
601 Ok(self.make_nodes(&committee)?)
602 }
603
604 fn make_nodes(
606 &self,
607 committee: &Committee,
608 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
609 Ok(self
610 .validator_node_provider()
611 .make_nodes(committee)?
612 .map(|(public_key, node)| RemoteNode { public_key, node })
613 .collect())
614 }
615
616 pub async fn get_chain_description(
619 &self,
620 chain_id: ChainId,
621 ) -> Result<ChainDescription, chain_client::Error> {
622 let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
623 let blob = self
624 .local_node
625 .storage_client()
626 .read_blob(chain_desc_id)
627 .await?;
628 if let Some(blob) = blob {
629 return Ok(bcs::from_bytes(blob.bytes())?);
631 };
632 self.synchronize_chain_state(self.admin_id).await?;
634 let nodes = self.validator_nodes().await?;
635 let blob = self
636 .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
637 .await?
638 .pop()
639 .unwrap(); Ok(bcs::from_bytes(blob.bytes())?)
641 }
642
643 #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
645 fn update_from_info(&self, info: &ChainInfo) {
646 self.chains.pin().update(info.chain_id, |state| {
647 let mut state = state.clone_for_update_unchecked();
648 state.update_from_info(info);
649 state
650 });
651 }
652
653 #[instrument(level = "trace", skip_all)]
655 async fn process_certificate<T: ProcessableCertificate>(
656 &self,
657 certificate: Box<GenericCertificate<T>>,
658 ) -> Result<(), LocalNodeError> {
659 let info = self.handle_certificate(*certificate).await?.info;
660 self.update_from_info(&info);
661 Ok(())
662 }
663
664 #[instrument(level = "trace", skip_all)]
666 async fn finalize_block(
667 &self,
668 committee: &Committee,
669 certificate: ValidatedBlockCertificate,
670 ) -> Result<ConfirmedBlockCertificate, chain_client::Error> {
671 debug!(round = %certificate.round, "Submitting block for confirmation");
672 let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
673 let finalize_action = CommunicateAction::FinalizeBlock {
674 certificate: Box::new(certificate),
675 delivery: self.options.cross_chain_message_delivery,
676 };
677 let certificate = self
678 .communicate_chain_action(committee, finalize_action, hashed_value)
679 .await?;
680 self.receive_certificate_with_checked_signatures(certificate.clone())
681 .await?;
682 Ok(certificate)
683 }
684
685 #[instrument(level = "trace", skip_all)]
687 async fn submit_block_proposal<T: ProcessableCertificate>(
688 &self,
689 committee: &Committee,
690 proposal: Box<BlockProposal>,
691 value: T,
692 ) -> Result<GenericCertificate<T>, chain_client::Error> {
693 debug!(
694 round = %proposal.content.round,
695 "Submitting block proposal to validators"
696 );
697 let submit_action = CommunicateAction::SubmitBlock {
698 proposal,
699 blob_ids: value.required_blob_ids().into_iter().collect(),
700 };
701 let certificate = self
702 .communicate_chain_action(committee, submit_action, value)
703 .await?;
704 self.process_certificate(Box::new(certificate.clone()))
705 .await?;
706 Ok(certificate)
707 }
708
709 #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
711 async fn communicate_chain_updates(
712 &self,
713 committee: &Committee,
714 chain_id: ChainId,
715 height: BlockHeight,
716 delivery: CrossChainMessageDelivery,
717 ) -> Result<(), chain_client::Error> {
718 let nodes = self.make_nodes(committee)?;
719 communicate_with_quorum(
720 &nodes,
721 committee,
722 |_: &()| (),
723 |remote_node| {
724 let mut updater = ValidatorUpdater {
725 remote_node,
726 local_node: self.local_node.clone(),
727 admin_id: self.admin_id,
728 };
729 Box::pin(async move {
730 updater
731 .send_chain_information(chain_id, height, delivery)
732 .await
733 })
734 },
735 self.options.grace_period,
736 )
737 .await?;
738 Ok(())
739 }
740
741 #[instrument(level = "trace", skip_all)]
747 async fn communicate_chain_action<T: CertificateValue>(
748 &self,
749 committee: &Committee,
750 action: CommunicateAction,
751 value: T,
752 ) -> Result<GenericCertificate<T>, chain_client::Error> {
753 let nodes = self.make_nodes(committee)?;
754 let ((votes_hash, votes_round), votes) = communicate_with_quorum(
755 &nodes,
756 committee,
757 |vote: &LiteVote| (vote.value.value_hash, vote.round),
758 |remote_node| {
759 let mut updater = ValidatorUpdater {
760 remote_node,
761 local_node: self.local_node.clone(),
762 admin_id: self.admin_id,
763 };
764 let action = action.clone();
765 Box::pin(async move { updater.send_chain_update(action).await })
766 },
767 self.options.grace_period,
768 )
769 .await?;
770 ensure!(
771 (votes_hash, votes_round) == (value.hash(), action.round()),
772 chain_client::Error::UnexpectedQuorum {
773 hash: votes_hash,
774 round: votes_round,
775 expected_hash: value.hash(),
776 expected_round: action.round(),
777 }
778 );
779 let certificate = LiteCertificate::try_from_votes(votes)
784 .ok_or_else(|| {
785 chain_client::Error::InternalError(
786 "Vote values or rounds don't match; this is a bug",
787 )
788 })?
789 .with_value(value)
790 .ok_or_else(|| {
791 chain_client::Error::ProtocolError("A quorum voted for an unexpected value")
792 })?;
793 Ok(certificate)
794 }
795
796 #[instrument(level = "trace", skip_all)]
799 async fn receive_certificate_with_checked_signatures(
800 &self,
801 certificate: ConfirmedBlockCertificate,
802 ) -> Result<(), chain_client::Error> {
803 let certificate = Box::new(certificate);
804 let block = certificate.block();
805
806 self.download_certificates(block.header.chain_id, block.header.height)
808 .await?;
809 if let Err(err) = self.process_certificate(certificate.clone()).await {
812 match &err {
813 LocalNodeError::BlobsNotFound(blob_ids) => {
814 let blobs = RemoteNode::download_blobs(
815 blob_ids,
816 &self.validator_nodes().await?,
817 self.options.blob_download_timeout,
818 )
819 .await
820 .ok_or(err)?;
821 self.local_node.store_blobs(&blobs).await?;
822 self.process_certificate(certificate).await?;
823 }
824 _ => {
825 warn!("Failed to process network hashed certificate value");
827 return Err(err.into());
828 }
829 }
830 }
831
832 Ok(())
833 }
834
835 #[instrument(level = "trace", skip_all)]
837 #[allow(dead_code)] async fn receive_sender_certificate(
839 &self,
840 certificate: ConfirmedBlockCertificate,
841 mode: ReceiveCertificateMode,
842 nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
843 ) -> Result<(), chain_client::Error> {
844 let (max_epoch, committees) = self.admin_committees().await?;
846 if let ReceiveCertificateMode::NeedsCheck = mode {
847 Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
848 }
849 let nodes = if let Some(nodes) = nodes {
851 nodes
852 } else {
853 self.validator_nodes().await?
854 };
855 if let Err(err) = self.handle_certificate(certificate.clone()).await {
856 match &err {
857 LocalNodeError::BlobsNotFound(blob_ids) => {
858 let blobs = RemoteNode::download_blobs(
859 blob_ids,
860 &nodes,
861 self.options.blob_download_timeout,
862 )
863 .await
864 .ok_or(err)?;
865 self.local_node.store_blobs(&blobs).await?;
866 self.handle_certificate(certificate.clone()).await?;
867 }
868 _ => {
869 warn!("Failed to process network hashed certificate value");
871 return Err(err.into());
872 }
873 }
874 }
875
876 Ok(())
877 }
878
879 #[instrument(level = "trace", skip_all)]
881 async fn download_and_process_sender_chain(
882 &self,
883 sender_chain_id: ChainId,
884 nodes: &[RemoteNode<Env::ValidatorNode>],
885 received_log: &ReceivedLogs,
886 mut remote_heights: Vec<BlockHeight>,
887 sender: mpsc::UnboundedSender<ChainAndHeight>,
888 ) {
889 let (max_epoch, committees) = match self.admin_committees().await {
890 Ok(result) => result,
891 Err(error) => {
892 error!(%error, %sender_chain_id, "could not read admin committees");
893 return;
894 }
895 };
896 let committees_ref = &committees;
897 let mut nodes = nodes.to_vec();
898 while !remote_heights.is_empty() {
899 let remote_heights_ref = &remote_heights;
900 nodes.shuffle(&mut rand::thread_rng());
901 let certificates = match communicate_concurrently(
902 &nodes,
903 async move |remote_node| {
904 let mut remote_heights = remote_heights_ref.clone();
905 remote_heights.retain(|height| {
908 received_log.validator_has_block(
909 &remote_node.public_key,
910 sender_chain_id,
911 *height,
912 )
913 });
914 if remote_heights.is_empty() {
915 return Err(());
918 }
919 let certificates = remote_node
920 .download_certificates_by_heights(sender_chain_id, remote_heights)
921 .await
922 .map_err(|_| ())?;
923 let mut certificates_with_check_results = vec![];
924 for cert in certificates {
925 if let Ok(check_result) =
926 Self::check_certificate(max_epoch, committees_ref, &cert)
927 {
928 certificates_with_check_results
929 .push((cert, check_result.into_result().is_ok()));
930 } else {
931 return Err(());
933 }
934 }
935 Ok(certificates_with_check_results)
936 },
937 |errors| {
938 errors
939 .into_iter()
940 .map(|(validator, _error)| validator)
941 .collect::<BTreeSet<_>>()
942 },
943 self.options.certificate_batch_download_timeout,
944 )
945 .await
946 {
947 Ok(certificates_with_check_results) => certificates_with_check_results,
948 Err(faulty_validators) => {
949 nodes.retain(|node| !faulty_validators.contains(&node.public_key));
951 if nodes.is_empty() {
952 info!(
953 chain_id = %sender_chain_id,
954 "could not download certificates for chain - no more correct validators left"
955 );
956 return;
957 }
958 continue;
959 }
960 };
961
962 trace!(
963 chain_id = %sender_chain_id,
964 num_certificates = %certificates.len(),
965 "received certificates",
966 );
967
968 let mut to_remove_from_queue = BTreeSet::new();
969
970 for (certificate, check_result) in certificates {
971 let hash = certificate.hash();
972 let chain_id = certificate.block().header.chain_id;
973 let height = certificate.block().header.height;
974 if !check_result {
975 to_remove_from_queue.insert(height);
979 continue;
980 }
981 let mode = ReceiveCertificateMode::AlreadyChecked;
983 if let Err(error) = self
984 .receive_sender_certificate(certificate, mode, None)
985 .await
986 {
987 warn!(%error, %hash, "Received invalid certificate");
988 } else {
989 to_remove_from_queue.insert(height);
990 if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
991 error!(
992 %chain_id,
993 %height,
994 %error,
995 "failed to send chain and height over the channel",
996 );
997 }
998 }
999 }
1000
1001 remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1002 }
1003 trace!(
1004 chain_id = %sender_chain_id,
1005 "find_received_certificates: finished processing chain",
1006 );
1007 }
1008
1009 #[instrument(level = "trace", skip(self))]
1011 async fn get_received_log_from_validator(
1012 &self,
1013 chain_id: ChainId,
1014 remote_node: &RemoteNode<Env::ValidatorNode>,
1015 tracker: u64,
1016 ) -> Result<Vec<ChainAndHeight>, chain_client::Error> {
1017 let mut offset = tracker;
1018
1019 let mut remote_log = Vec::new();
1021 loop {
1022 trace!("get_received_log_from_validator: looping");
1023 let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1024 let info = remote_node.handle_chain_info_query(query).await?;
1025 let received_entries = info.requested_received_log.len();
1026 offset += received_entries as u64;
1027 remote_log.extend(info.requested_received_log);
1028 trace!(
1029 ?remote_node,
1030 %received_entries,
1031 "get_received_log_from_validator: received log batch",
1032 );
1033 if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1034 break;
1035 }
1036 }
1037
1038 trace!(
1039 ?remote_node,
1040 num_entries = %remote_log.len(),
1041 "get_received_log_from_validator: returning downloaded log",
1042 );
1043
1044 Ok(remote_log)
1045 }
1046
1047 async fn download_sender_block_with_sending_ancestors(
1053 &self,
1054 receiver_chain_id: ChainId,
1055 sender_chain_id: ChainId,
1056 height: BlockHeight,
1057 remote_node: &RemoteNode<Env::ValidatorNode>,
1058 ) -> Result<(), chain_client::Error> {
1059 let next_outbox_height = self
1060 .local_node
1061 .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1062 .await?
1063 .get(&sender_chain_id)
1064 .copied()
1065 .unwrap_or(BlockHeight::ZERO);
1066 let (max_epoch, committees) = self.admin_committees().await?;
1067
1068 let mut certificates = BTreeMap::new();
1071 let mut current_height = height;
1072
1073 while current_height >= next_outbox_height {
1075 let downloaded = remote_node
1077 .download_certificates_by_heights(sender_chain_id, vec![current_height])
1078 .await?;
1079 let Some(certificate) = downloaded.into_iter().next() else {
1080 return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
1081 chain_id: sender_chain_id,
1082 height: current_height,
1083 });
1084 };
1085
1086 Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1088 .into_result()?;
1089
1090 let block = certificate.block();
1092 let next_height = block
1093 .body
1094 .previous_message_blocks
1095 .get(&receiver_chain_id)
1096 .map(|(_prev_hash, prev_height)| *prev_height);
1097
1098 certificates.insert(current_height, certificate);
1100
1101 if let Some(prev_height) = next_height {
1102 current_height = prev_height;
1104 } else {
1105 break;
1107 }
1108 }
1109
1110 if certificates.is_empty() {
1111 self.local_node
1112 .retry_pending_cross_chain_requests(sender_chain_id)
1113 .await?;
1114 }
1115
1116 for certificate in certificates.into_values() {
1118 self.receive_sender_certificate(
1119 certificate,
1120 ReceiveCertificateMode::AlreadyChecked,
1121 Some(vec![remote_node.clone()]),
1122 )
1123 .await?;
1124 }
1125
1126 Ok(())
1127 }
1128
1129 #[instrument(
1130 level = "trace", skip_all,
1131 fields(certificate_hash = ?incoming_certificate.hash()),
1132 )]
1133 fn check_certificate(
1134 highest_known_epoch: Epoch,
1135 committees: &BTreeMap<Epoch, Committee>,
1136 incoming_certificate: &ConfirmedBlockCertificate,
1137 ) -> Result<CheckCertificateResult, NodeError> {
1138 let block = incoming_certificate.block();
1139 if block.header.epoch > highest_known_epoch {
1141 return Ok(CheckCertificateResult::FutureEpoch);
1142 }
1143 if let Some(known_committee) = committees.get(&block.header.epoch) {
1144 incoming_certificate.check(known_committee)?;
1147 Ok(CheckCertificateResult::New)
1148 } else {
1149 Ok(CheckCertificateResult::OldEpoch)
1151 }
1152 }
1153
1154 #[instrument(level = "trace", skip_all)]
1156 async fn synchronize_chain_state(
1157 &self,
1158 chain_id: ChainId,
1159 ) -> Result<Box<ChainInfo>, chain_client::Error> {
1160 let (_, committee) = self.admin_committee().await?;
1161 self.synchronize_chain_state_from_committee(chain_id, committee)
1162 .await
1163 }
1164
1165 #[instrument(level = "trace", skip_all)]
1168 pub async fn synchronize_chain_state_from_committee(
1169 &self,
1170 chain_id: ChainId,
1171 committee: Committee,
1172 ) -> Result<Box<ChainInfo>, chain_client::Error> {
1173 #[cfg(with_metrics)]
1174 let _latency = metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency();
1175
1176 let validators = self.make_nodes(&committee)?;
1177 Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1178 communicate_with_quorum(
1179 &validators,
1180 &committee,
1181 |_: &()| (),
1182 |remote_node| async move {
1183 self.synchronize_chain_state_from(&remote_node, chain_id)
1184 .await
1185 },
1186 self.options.grace_period,
1187 )
1188 .await?;
1189
1190 self.local_node
1191 .chain_info(chain_id)
1192 .await
1193 .map_err(Into::into)
1194 }
1195
1196 #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1199 async fn synchronize_chain_state_from(
1200 &self,
1201 remote_node: &RemoteNode<Env::ValidatorNode>,
1202 chain_id: ChainId,
1203 ) -> Result<(), chain_client::Error> {
1204 let mut local_info = self.local_node.chain_info(chain_id).await?;
1205 let query = ChainInfoQuery::new(chain_id).with_manager_values();
1206 let remote_info = remote_node.handle_chain_info_query(query).await?;
1207 if let Some(new_info) = self
1208 .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1209 .await?
1210 {
1211 local_info = new_info;
1212 };
1213
1214 if local_info.next_block_height != remote_info.next_block_height {
1216 debug!(
1217 "Synced from validator {}; but remote height is {} and local height is {}",
1218 remote_node.public_key, remote_info.next_block_height, local_info.next_block_height
1219 );
1220 return Ok(());
1221 };
1222
1223 if let Some(timeout) = remote_info.manager.timeout {
1224 self.handle_certificate(*timeout).await?;
1225 }
1226 let mut proposals = Vec::new();
1227 if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1228 proposals.push(*proposal);
1229 }
1230 if let Some(proposal) = remote_info.manager.requested_proposed {
1231 proposals.push(*proposal);
1232 }
1233 if let Some(locking) = remote_info.manager.requested_locking {
1234 match *locking {
1235 LockingBlock::Fast(proposal) => {
1236 proposals.push(proposal);
1237 }
1238 LockingBlock::Regular(cert) => {
1239 let hash = cert.hash();
1240 if let Err(err) = self.try_process_locking_block_from(remote_node, cert).await {
1241 debug!(
1242 "Skipping locked block {hash} from validator {} at height {}: {err}",
1243 remote_node.public_key, local_info.next_block_height,
1244 );
1245 }
1246 }
1247 }
1248 }
1249 'proposal_loop: for proposal in proposals {
1250 let owner: AccountOwner = proposal.owner();
1251 if let Err(mut err) = self
1252 .local_node
1253 .handle_block_proposal(proposal.clone())
1254 .await
1255 {
1256 if let LocalNodeError::BlobsNotFound(_) = &err {
1257 let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1258 if !required_blob_ids.is_empty() {
1259 let mut blobs = Vec::new();
1260 for blob_id in required_blob_ids {
1261 let blob_content = match remote_node
1262 .node
1263 .download_pending_blob(chain_id, blob_id)
1264 .await
1265 {
1266 Ok(content) => content,
1267 Err(err) => {
1268 info!(
1269 "Skipping proposal from {owner} and validator {} at \
1270 height {}; failed to download {blob_id}: {err}",
1271 remote_node.public_key, local_info.next_block_height
1272 );
1273 continue 'proposal_loop;
1274 }
1275 };
1276 blobs.push(Blob::new(blob_content));
1277 }
1278 self.local_node
1279 .handle_pending_blobs(chain_id, blobs)
1280 .await?;
1281 if let Err(new_err) = self
1283 .local_node
1284 .handle_block_proposal(proposal.clone())
1285 .await
1286 {
1287 err = new_err;
1288 } else {
1289 continue;
1290 }
1291 }
1292 if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1293 self.update_local_node_with_blobs_from(
1294 blob_ids.clone(),
1295 &[remote_node.clone()],
1296 )
1297 .await?;
1298 if let Err(new_err) = self
1300 .local_node
1301 .handle_block_proposal(proposal.clone())
1302 .await
1303 {
1304 err = new_err;
1305 } else {
1306 continue;
1307 }
1308 }
1309 }
1310 while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1311 if let ChainError::MissingCrossChainUpdate {
1312 chain_id,
1313 origin,
1314 height,
1315 } = &**chain_err
1316 {
1317 self.download_sender_block_with_sending_ancestors(
1318 *chain_id,
1319 *origin,
1320 *height,
1321 remote_node,
1322 )
1323 .await?;
1324 if let Err(new_err) = self
1326 .local_node
1327 .handle_block_proposal(proposal.clone())
1328 .await
1329 {
1330 err = new_err;
1331 } else {
1332 continue 'proposal_loop;
1333 }
1334 } else {
1335 break;
1336 }
1337 }
1338
1339 debug!(
1340 "Skipping proposal from {owner} and validator {} at height {}: {err}",
1341 remote_node.public_key, local_info.next_block_height
1342 );
1343 }
1344 }
1345 Ok(())
1346 }
1347
1348 async fn try_process_locking_block_from(
1349 &self,
1350 remote_node: &RemoteNode<Env::ValidatorNode>,
1351 certificate: GenericCertificate<ValidatedBlock>,
1352 ) -> Result<(), chain_client::Error> {
1353 let chain_id = certificate.inner().chain_id();
1354 let certificate = Box::new(certificate);
1355 match self.process_certificate(certificate.clone()).await {
1356 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1357 let mut blobs = Vec::new();
1358 for blob_id in blob_ids {
1359 let blob_content = remote_node
1360 .node
1361 .download_pending_blob(chain_id, blob_id)
1362 .await?;
1363 blobs.push(Blob::new(blob_content));
1364 }
1365 self.local_node
1366 .handle_pending_blobs(chain_id, blobs)
1367 .await?;
1368 self.process_certificate(certificate).await?;
1369 Ok(())
1370 }
1371 Err(err) => Err(err.into()),
1372 Ok(()) => Ok(()),
1373 }
1374 }
1375
1376 async fn update_local_node_with_blobs_from(
1379 &self,
1380 blob_ids: Vec<BlobId>,
1381 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1382 ) -> Result<Vec<Blob>, chain_client::Error> {
1383 let timeout = self.options.blob_download_timeout;
1384 let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1386 stream::iter(blob_ids.into_iter().map(|blob_id| {
1387 communicate_concurrently(
1388 remote_nodes,
1389 async move |remote_node| {
1390 let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
1391 self.receive_sender_certificate(
1392 certificate,
1393 ReceiveCertificateMode::NeedsCheck,
1394 Some(vec![remote_node.clone()]),
1395 )
1396 .await?;
1397 let blob = self
1398 .local_node
1399 .storage_client()
1400 .read_blob(blob_id)
1401 .await?
1402 .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1403 Result::<_, chain_client::Error>::Ok(blob)
1404 },
1405 move |_| chain_client::Error::from(NodeError::BlobsNotFound(vec![blob_id])),
1406 timeout,
1407 )
1408 }))
1409 .buffer_unordered(self.options.max_joined_tasks)
1410 .collect::<Vec<_>>()
1411 .await
1412 .into_iter()
1413 .collect()
1414 }
1415
1416 #[tracing::instrument(level = "trace", skip(self, block))]
1421 async fn stage_block_execution_and_discard_failing_messages(
1422 &self,
1423 mut block: ProposedBlock,
1424 round: Option<u32>,
1425 published_blobs: Vec<Blob>,
1426 ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1427 loop {
1428 let result = self
1429 .stage_block_execution(block.clone(), round, published_blobs.clone())
1430 .await;
1431 if let Err(chain_client::Error::LocalNodeError(LocalNodeError::WorkerError(
1432 WorkerError::ChainError(chain_error),
1433 ))) = &result
1434 {
1435 if let ChainError::ExecutionError(
1436 error,
1437 ChainExecutionContext::IncomingBundle(index),
1438 ) = &**chain_error
1439 {
1440 let transaction = block
1441 .transactions
1442 .get_mut(*index as usize)
1443 .expect("Transaction at given index should exist");
1444 let Transaction::ReceiveMessages(message) = transaction else {
1445 panic!(
1446 "Expected incoming bundle at transaction index {}, found operation",
1447 index
1448 );
1449 };
1450 ensure!(
1451 !message.bundle.is_protected(),
1452 chain_client::Error::BlockProposalError(
1453 "Protected incoming message failed to execute locally"
1454 )
1455 );
1456 info!(
1460 %error, origin = ?message.origin,
1461 "Message failed to execute locally and will be rejected."
1462 );
1463 message.action = MessageAction::Reject;
1464 continue;
1465 }
1466 }
1467 return result;
1468 }
1469 }
1470
1471 #[instrument(level = "trace", skip(self, block))]
1474 async fn stage_block_execution(
1475 &self,
1476 block: ProposedBlock,
1477 round: Option<u32>,
1478 published_blobs: Vec<Blob>,
1479 ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1480 loop {
1481 let result = self
1482 .local_node
1483 .stage_block_execution(block.clone(), round, published_blobs.clone())
1484 .await;
1485 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1486 let validators = self.validator_nodes().await?;
1487 self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1488 .await?;
1489 continue; }
1491 return Ok(result?);
1492 }
1493 }
1494}
1495
1496async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
1499 nodes: &[RemoteNode<A>],
1500 f: F,
1501 err: G,
1502 timeout: Duration,
1503) -> Result<V, E2>
1504where
1505 F: Clone + FnOnce(RemoteNode<A>) -> R,
1506 RemoteNode<A>: Clone,
1507 G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
1508 R: Future<Output = Result<V, E1>> + 'a,
1509{
1510 let mut stream = nodes
1511 .iter()
1512 .zip(0..)
1513 .map(|(remote_node, i)| {
1514 let fun = f.clone();
1515 let node = remote_node.clone();
1516 async move {
1517 linera_base::time::timer::sleep(timeout * i * i).await;
1518 fun(node).await.map_err(|err| (remote_node.public_key, err))
1519 }
1520 })
1521 .collect::<FuturesUnordered<_>>();
1522 let mut errors = vec![];
1523 while let Some(maybe_result) = stream.next().await {
1524 match maybe_result {
1525 Ok(result) => return Ok(result),
1526 Err(error) => errors.push(error),
1527 };
1528 }
1529 Err(err(errors))
1530}
1531
1532#[derive(Debug)]
1534enum ExecuteBlockOutcome {
1535 Executed(ConfirmedBlockCertificate),
1537 Conflict(ConfirmedBlockCertificate),
1540 WaitForTimeout(RoundTimeout),
1543}
1544
1545#[must_use]
1547pub struct AbortOnDrop(pub AbortHandle);
1548
1549impl Drop for AbortOnDrop {
1550 #[instrument(level = "trace", skip(self))]
1551 fn drop(&mut self) {
1552 self.0.abort();
1553 }
1554}
1555
1556#[derive(Clone, Serialize, Deserialize)]
1558pub struct PendingProposal {
1559 pub block: ProposedBlock,
1560 pub blobs: Vec<Blob>,
1561}
1562
1563enum ReceiveCertificateMode {
1564 NeedsCheck,
1565 AlreadyChecked,
1566}
1567
1568enum CheckCertificateResult {
1569 OldEpoch,
1570 New,
1571 FutureEpoch,
1572}
1573
1574impl CheckCertificateResult {
1575 fn into_result(self) -> Result<(), chain_client::Error> {
1576 match self {
1577 Self::OldEpoch => Err(chain_client::Error::CommitteeDeprecationError),
1578 Self::New => Ok(()),
1579 Self::FutureEpoch => Err(chain_client::Error::CommitteeSynchronizationError),
1580 }
1581 }
1582}
1583
1584#[cfg(not(target_arch = "wasm32"))]
1586pub async fn create_bytecode_blobs(
1587 contract: Bytecode,
1588 service: Bytecode,
1589 vm_runtime: VmRuntime,
1590) -> (Vec<Blob>, ModuleId) {
1591 match vm_runtime {
1592 VmRuntime::Wasm => {
1593 let (compressed_contract, compressed_service) =
1594 tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
1595 .await
1596 .expect("Compression should not panic");
1597 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1598 let service_blob = Blob::new_service_bytecode(compressed_service);
1599 let module_id =
1600 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1601 (vec![contract_blob, service_blob], module_id)
1602 }
1603 VmRuntime::Evm => {
1604 let compressed_contract = contract.compress();
1605 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1606 let module_id = ModuleId::new(
1607 evm_contract_blob.id().hash,
1608 evm_contract_blob.id().hash,
1609 vm_runtime,
1610 );
1611 (vec![evm_contract_blob], module_id)
1612 }
1613 }
1614}