1use std::{
6 cmp::Ordering,
7 collections::{BTreeMap, BTreeSet, HashSet},
8 slice,
9 sync::{Arc, RwLock},
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14 future::Future,
15 stream::{self, AbortHandle, FuturesUnordered, StreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20 crypto::{CryptoHash, Signer as _, ValidatorPublicKey},
21 data_types::{
22 ArithmeticError, Blob, BlockHeight, ChainDescription, Epoch, Round, TimeDelta, Timestamp,
23 },
24 ensure,
25 identifiers::{AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
26 time::Duration,
27};
28#[cfg(not(target_arch = "wasm32"))]
29use linera_base::{data_types::Bytecode, identifiers::ModuleId, vm::VmRuntime};
30use linera_chain::{
31 data_types::{
32 BlockExecutionOutcome, BlockProposal, BundleExecutionPolicy, ChainAndHeight, LiteVote,
33 ProposedBlock,
34 },
35 manager::LockingBlock,
36 types::{
37 Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
38 LiteCertificate, ValidatedBlock, ValidatedBlockCertificate,
39 },
40 ChainError,
41};
42use linera_execution::committee::Committee;
43use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
44use rand::seq::SliceRandom;
45use received_log::ReceivedLogs;
46use serde::{Deserialize, Serialize};
47use tokio::sync::mpsc;
48use tracing::{debug, error, info, instrument, trace, warn};
49
50use crate::{
51 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
52 environment::Environment,
53 local_node::{LocalNodeClient, LocalNodeError},
54 node::{CrossChainMessageDelivery, NodeError, ValidatorNode as _, ValidatorNodeProvider as _},
55 notifier::{ChannelNotifier, Notifier as _},
56 remote_node::RemoteNode,
57 updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
58 worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
59 ChainWorkerConfig, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
60};
61
62pub mod chain_client;
63pub use chain_client::ChainClient;
64
65pub use crate::data_types::ClientOutcome;
66
67#[cfg(test)]
68#[path = "../unit_tests/client_tests.rs"]
69mod client_tests;
70pub mod requests_scheduler;
71
72pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
73mod received_log;
74mod validator_trackers;
75
76#[cfg(with_metrics)]
77mod metrics {
78 use std::sync::LazyLock;
79
80 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
81 use prometheus::HistogramVec;
82
83 pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
84 LazyLock::new(|| {
85 register_histogram_vec(
86 "process_inbox_latency",
87 "process_inbox latency",
88 &[],
89 exponential_bucket_latencies(10_000.0),
90 )
91 });
92
93 pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94 register_histogram_vec(
95 "prepare_chain_latency",
96 "prepare_chain latency",
97 &[],
98 exponential_bucket_latencies(10_000.0),
99 )
100 });
101
102 pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
103 register_histogram_vec(
104 "synchronize_chain_state_latency",
105 "synchronize_chain_state latency",
106 &[],
107 exponential_bucket_latencies(10_000.0),
108 )
109 });
110
111 pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
112 register_histogram_vec(
113 "execute_block_latency",
114 "execute_block latency",
115 &[],
116 exponential_bucket_latencies(10_000.0),
117 )
118 });
119
120 pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
121 register_histogram_vec(
122 "find_received_certificates_latency",
123 "find_received_certificates latency",
124 &[],
125 exponential_bucket_latencies(10_000.0),
126 )
127 });
128}
129
130pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
131pub static DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE: u64 = 500;
132pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
133pub static DEFAULT_MAX_EVENT_STREAM_QUERIES: usize = 1000;
134
135#[derive(Debug, Clone, Copy)]
136pub enum TimingType {
137 ExecuteOperations,
138 ExecuteBlock,
139 SubmitBlockProposal,
140 UpdateValidators,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
148pub enum ListeningMode {
149 FullChain,
152 FollowChain,
156 EventsOnly(BTreeSet<StreamId>),
158}
159
160impl PartialOrd for ListeningMode {
161 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
162 match (self, other) {
163 (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
164 (ListeningMode::FullChain, _) => Some(Ordering::Greater),
165 (_, ListeningMode::FullChain) => Some(Ordering::Less),
166 (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
167 (ListeningMode::FollowChain, ListeningMode::EventsOnly(_)) => Some(Ordering::Greater),
168 (ListeningMode::EventsOnly(_), ListeningMode::FollowChain) => Some(Ordering::Less),
169 (ListeningMode::EventsOnly(a), ListeningMode::EventsOnly(b)) => {
170 if a == b {
171 Some(Ordering::Equal)
172 } else if a.is_superset(b) {
173 Some(Ordering::Greater)
174 } else if b.is_superset(a) {
175 Some(Ordering::Less)
176 } else {
177 None
178 }
179 }
180 }
181 }
182}
183
184impl ListeningMode {
185 pub fn is_relevant(&self, reason: &Reason) -> bool {
188 match (reason, self) {
189 (Reason::NewEvents { .. }, ListeningMode::FollowChain | ListeningMode::FullChain) => {
192 false
193 }
194 (_, ListeningMode::FullChain) => true,
196 (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
199 (_, ListeningMode::FollowChain) => false,
200 (Reason::NewEvents { event_streams, .. }, ListeningMode::EventsOnly(relevant)) => {
202 relevant.intersection(event_streams).next().is_some()
203 }
204 (_, ListeningMode::EventsOnly(_)) => false,
205 }
206 }
207
208 pub fn extend(&mut self, other: Option<ListeningMode>) {
209 match (self, other) {
210 (_, None) => (),
211 (ListeningMode::FullChain, _) => (),
212 (mode, Some(ListeningMode::FullChain)) => {
213 *mode = ListeningMode::FullChain;
214 }
215 (ListeningMode::FollowChain, _) => (),
216 (mode, Some(ListeningMode::FollowChain)) => {
217 *mode = ListeningMode::FollowChain;
218 }
219 (
220 ListeningMode::EventsOnly(self_events),
221 Some(ListeningMode::EventsOnly(other_events)),
222 ) => {
223 self_events.extend(other_events);
224 }
225 }
226 }
227
228 pub fn is_follow_only(&self) -> bool {
231 !matches!(self, ListeningMode::FullChain)
232 }
233
234 pub fn is_full(&self) -> bool {
237 matches!(self, ListeningMode::FullChain)
238 }
239
240 pub fn should_sync_chain_state(&self) -> bool {
241 match self {
242 ListeningMode::FullChain | ListeningMode::FollowChain => true,
243 ListeningMode::EventsOnly(_) => false,
244 }
245 }
246}
247
248pub struct Client<Env: Environment> {
250 environment: Env,
251 pub local_node: LocalNodeClient<Env::Storage>,
254 requests_scheduler: RequestsScheduler<Env>,
256 admin_chain_id: ChainId,
258 chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
261 notifier: Arc<ChannelNotifier<Notification>>,
263 chains: papaya::HashMap<ChainId, chain_client::State>,
265 options: chain_client::Options,
267}
268
269impl<Env: Environment> Client<Env> {
270 #[instrument(level = "trace", skip_all)]
272 #[expect(clippy::too_many_arguments)]
273 pub fn new(
274 environment: Env,
275 admin_chain_id: ChainId,
276 long_lived_services: bool,
277 chain_modes: impl IntoIterator<Item = (ChainId, ListeningMode)>,
278 name: impl Into<String>,
279 chain_worker_ttl: Option<Duration>,
280 sender_chain_worker_ttl: Option<Duration>,
281 options: chain_client::Options,
282 block_cache_size: usize,
283 execution_state_cache_size: usize,
284 requests_scheduler_config: &requests_scheduler::RequestsSchedulerConfig,
285 ) -> Self {
286 let chain_modes = Arc::new(RwLock::new(chain_modes.into_iter().collect()));
287 let config = ChainWorkerConfig {
288 nickname: name.into(),
289 long_lived_services,
290 allow_inactive_chains: true,
291 allow_messages_from_deprecated_epochs: true,
292 ttl: chain_worker_ttl,
293 sender_chain_ttl: sender_chain_worker_ttl,
294 block_cache_size,
295 execution_state_cache_size,
296 ..ChainWorkerConfig::default()
297 };
298 let state = WorkerState::new(
299 environment.storage().clone(),
300 config,
301 Some(chain_modes.clone()),
302 );
303 let local_node = LocalNodeClient::new(state);
304 let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
305
306 Self {
307 environment,
308 local_node,
309 requests_scheduler,
310 chains: papaya::HashMap::new(),
311 admin_chain_id,
312 chain_modes,
313 notifier: Arc::new(ChannelNotifier::default()),
314 options,
315 }
316 }
317
318 pub fn admin_chain_id(&self) -> ChainId {
320 self.admin_chain_id
321 }
322
323 pub fn subscribe(
325 &self,
326 chain_ids: Vec<ChainId>,
327 ) -> tokio::sync::mpsc::UnboundedReceiver<Notification> {
328 self.notifier.subscribe(chain_ids)
329 }
330
331 pub fn subscribe_extra(
333 &self,
334 chain_ids: Vec<ChainId>,
335 sender: &tokio::sync::mpsc::UnboundedSender<Notification>,
336 ) {
337 self.notifier.add_sender(chain_ids, sender);
338 }
339
340 pub fn storage_client(&self) -> &Env::Storage {
342 self.environment.storage()
343 }
344
345 async fn try_read_local_certificate(
348 &self,
349 chain_id: ChainId,
350 height: BlockHeight,
351 hash: Option<CryptoHash>,
352 ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, chain_client::Error> {
353 if let Some(hash) = hash {
354 return Ok(self.storage_client().read_certificate(hash).await?);
355 }
356 let results = self
357 .storage_client()
358 .read_certificates_by_heights(chain_id, &[height])
359 .await?;
360 Ok(results.into_iter().next().flatten())
361 }
362
363 pub fn validator_node_provider(&self) -> &Env::Network {
364 self.environment.network()
365 }
366
367 pub(crate) fn options(&self) -> &chain_client::Options {
368 &self.options
369 }
370
371 pub async fn retry_pending_cross_chain_requests(
373 &self,
374 sender_chain: ChainId,
375 ) -> Result<(), LocalNodeError> {
376 self.local_node
377 .retry_pending_cross_chain_requests(sender_chain, &self.notifier)
378 .await
379 }
380
381 #[instrument(level = "trace", skip(self))]
383 pub fn signer(&self) -> &Env::Signer {
384 self.environment.signer()
385 }
386
387 pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, chain_client::Error> {
389 self.signer()
390 .contains_key(owner)
391 .await
392 .map_err(chain_client::Error::signer_failure)
393 }
394
395 pub fn wallet(&self) -> &Env::Wallet {
397 self.environment.wallet()
398 }
399
400 #[instrument(level = "trace", skip(self))]
403 pub fn extend_chain_mode(&self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
404 let mut chain_modes = self
405 .chain_modes
406 .write()
407 .expect("Panics should not happen while holding a lock to `chain_modes`");
408 let entry = chain_modes.entry(chain_id).or_insert_with(|| mode.clone());
409 entry.extend(Some(mode));
410 entry.clone()
411 }
412
413 pub fn chain_mode(&self, chain_id: ChainId) -> Option<ListeningMode> {
415 self.chain_modes
416 .read()
417 .expect("Panics should not happen while holding a lock to `chain_modes`")
418 .get(&chain_id)
419 .cloned()
420 }
421
422 pub fn is_tracked(&self, chain_id: ChainId) -> bool {
424 self.chain_modes
425 .read()
426 .expect("Panics should not happen while holding a lock to `chain_modes`")
427 .get(&chain_id)
428 .is_some_and(ListeningMode::is_full)
429 }
430
431 #[expect(clippy::too_many_arguments)]
433 #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
434 pub fn create_chain_client(
435 self: &Arc<Self>,
436 chain_id: ChainId,
437 block_hash: Option<CryptoHash>,
438 next_block_height: BlockHeight,
439 pending_proposal: &Option<PendingProposal>,
440 preferred_owner: Option<AccountOwner>,
441 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
442 follow_only: bool,
443 ) -> ChainClient<Env> {
444 self.chains.pin().get_or_insert_with(chain_id, || {
447 chain_client::State::new(pending_proposal.clone(), follow_only)
448 });
449
450 ChainClient::new(
451 self.clone(),
452 chain_id,
453 self.options.clone(),
454 block_hash,
455 next_block_height,
456 preferred_owner,
457 timing_sender,
458 )
459 }
460
461 fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
463 self.chains
464 .pin()
465 .get(&chain_id)
466 .is_some_and(|state| state.is_follow_only())
467 }
468
469 pub fn set_chain_follow_only(&self, chain_id: ChainId, follow_only: bool) {
471 self.chains
472 .pin()
473 .update(chain_id, |state| state.with_follow_only(follow_only));
474 }
475
476 async fn fetch_chain_info(
478 &self,
479 chain_id: ChainId,
480 validators: &[RemoteNode<Env::ValidatorNode>],
481 ) -> Result<Box<ChainInfo>, chain_client::Error> {
482 match self.local_node.chain_info(chain_id).await {
483 Ok(info) => Ok(info),
484 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
485 self.synchronize_chain_state(self.admin_chain_id).await?;
487 self.update_local_node_with_blobs_from(blob_ids, validators)
490 .await?;
491 Ok(self.local_node.chain_info(chain_id).await?)
492 }
493 Err(err) => Err(err.into()),
494 }
495 }
496
497 #[instrument(level = "trace", skip(self))]
499 async fn download_certificates(
500 &self,
501 chain_id: ChainId,
502 target_next_block_height: BlockHeight,
503 ) -> Result<Box<ChainInfo>, chain_client::Error> {
504 let validators = self.validator_nodes().await?;
505 let mut info = Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
506 if target_next_block_height <= info.next_block_height {
507 return Ok(info);
508 }
509 info = self
510 .load_local_certificates(chain_id, target_next_block_height, None)
511 .await?;
512 let mut next_height = info.next_block_height;
513 while next_height < target_next_block_height {
515 let limit = u64::from(target_next_block_height)
516 .checked_sub(u64::from(next_height))
517 .ok_or(ArithmeticError::Overflow)?
518 .min(self.options.certificate_download_batch_size);
519 let certificates = self
520 .requests_scheduler
521 .download_certificates_from_validators(
522 &validators,
523 chain_id,
524 next_height,
525 limit,
526 self.options.certificate_batch_download_timeout,
527 )
528 .await?;
529 let Some(new_info) = self
530 .process_certificates(&validators, certificates, None)
531 .await?
532 else {
533 break;
534 };
535 assert!(new_info.next_block_height > next_height);
536 next_height = new_info.next_block_height;
537 info = new_info;
538 }
539 ensure!(
540 target_next_block_height <= info.next_block_height,
541 chain_client::Error::CannotDownloadCertificates {
542 chain_id,
543 target_next_block_height,
544 }
545 );
546 Ok(info)
547 }
548
549 async fn load_local_certificates(
554 &self,
555 chain_id: ChainId,
556 end: BlockHeight,
557 until_block_time: Option<Timestamp>,
558 ) -> Result<Box<ChainInfo>, chain_client::Error> {
559 let mut last_info = self.local_node.chain_info(chain_id).await?;
560 let next_height = last_info.next_block_height;
561 let hashes = self
562 .local_node
563 .get_preprocessed_block_hashes(chain_id, next_height, end)
564 .await?;
565 let certificates = self.storage_client().read_certificates(&hashes).await?;
566 let certificates = match ResultReadCertificates::new(certificates, hashes) {
567 ResultReadCertificates::Certificates(certificates) => certificates,
568 ResultReadCertificates::InvalidHashes(hashes) => {
569 return Err(chain_client::Error::ReadCertificatesError(hashes))
570 }
571 };
572 for certificate in certificates {
573 if let Some(until) = until_block_time {
574 if certificate.value().block().header.timestamp >= until {
575 break;
576 }
577 }
578 last_info = self.handle_certificate(certificate).await?.info;
579 }
580 Ok(last_info)
581 }
582
583 #[instrument(level = "trace", skip_all)]
589 async fn download_certificates_from(
590 &self,
591 remote_node: &RemoteNode<Env::ValidatorNode>,
592 chain_id: ChainId,
593 stop: BlockHeight,
594 until_block_time: Option<Timestamp>,
595 ) -> Result<Box<ChainInfo>, chain_client::Error> {
596 let mut last_info = self
597 .load_local_certificates(chain_id, stop, until_block_time)
598 .await?;
599 let mut next_height = last_info.next_block_height;
600 while next_height < stop {
602 let limit = u64::from(stop)
604 .checked_sub(u64::from(next_height))
605 .ok_or(ArithmeticError::Overflow)?
606 .min(self.options.certificate_download_batch_size);
607
608 let certificates = self
609 .requests_scheduler
610 .download_certificates(remote_node, chain_id, next_height, limit)
611 .await?;
612 let Some(info) = self
613 .process_certificates(slice::from_ref(remote_node), certificates, until_block_time)
614 .await?
615 else {
616 break;
617 };
618 assert!(info.next_block_height > next_height);
619 next_height = info.next_block_height;
620 last_info = info;
621 }
622 Ok(last_info)
623 }
624
625 async fn download_blobs(
626 &self,
627 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
628 blob_ids: &[BlobId],
629 ) -> Result<(), chain_client::Error> {
630 let blobs = &self
631 .requests_scheduler
632 .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
633 .await?
634 .ok_or_else(|| {
635 chain_client::Error::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
636 })?;
637 self.local_node.store_blobs(blobs).await.map_err(Into::into)
638 }
639
640 #[instrument(level = "trace", skip_all)]
645 async fn download_certificates_for_events(
646 &self,
647 event_ids: &[EventId],
648 ) -> Result<(), chain_client::Error> {
649 let mut validators = self.validator_nodes().await?;
650 let timeout = self.options.certificate_batch_download_timeout;
651 let (max_epoch, committees) = self.admin_committees().await?;
652 let committees_ref = &committees;
653 let mut remaining_event_ids = event_ids.to_vec();
654
655 while !remaining_event_ids.is_empty() {
656 let remaining_ref = &remaining_event_ids;
657 validators.shuffle(&mut rand::thread_rng());
658 let result = communicate_concurrently(
659 &validators,
660 move |remote_node| {
661 let validator_key = remote_node.public_key;
662 let validator_address = remote_node.address();
663 Box::pin(async move {
664 let heights = remote_node
666 .node
667 .event_block_heights(remaining_ref.to_vec())
668 .await?;
669
670 let mut chain_heights = BTreeMap::<_, BTreeSet<_>>::new();
672 let mut expected_events = BTreeMap::<_, HashSet<EventId>>::new();
673 let mut unresolved = Vec::new();
674 for (event_id, maybe_height) in remaining_ref.iter().zip(heights) {
675 if let Some(height) = maybe_height {
676 chain_heights
677 .entry(event_id.chain_id)
678 .or_default()
679 .insert(height);
680 expected_events
681 .entry((event_id.chain_id, height))
682 .or_default()
683 .insert(event_id.clone());
684 } else {
685 unresolved.push(event_id.clone());
686 }
687 }
688 if chain_heights.is_empty() {
689 return Err(chain_client::Error::from(NodeError::EventsNotFound(remaining_ref.clone())));
691 }
692
693 let mut checked_certificates = Vec::<ConfirmedBlockCertificate>::new();
695 for (chain_id, heights) in chain_heights {
696 let heights_vec = heights.into_iter().collect::<Vec<_>>();
697 let certificates = self
698 .requests_scheduler
699 .download_certificates_by_heights(
700 &remote_node,
701 chain_id,
702 heights_vec,
703 )
704 .await?;
705 for cert in &certificates {
706 let block = cert.block();
708 let block_event_ids = block.event_ids().collect::<HashSet<_>>();
709 if let Some(expected_event_ids) =
710 expected_events.get(&(chain_id, block.header.height))
711 {
712 if !expected_event_ids.is_subset(&block_event_ids) {
713 tracing::debug!(
714 %validator_address, ?expected_event_ids, ?block_event_ids,
715 "validator lied about events in block."
716 );
717 return Err(NodeError::UnexpectedCertificateValue.into());
718 }
719 }
720 }
721 for cert in certificates {
722 Self::check_certificate(max_epoch, committees_ref, &cert)
723 .map_err(|error| {
724 tracing::debug!(
725 %validator_address, %error,
726 "invalid certificate"
727 );
728 error
729 })?
730 .into_result()
731 .map_err(|error| {
732 tracing::debug!(
733 %validator_address, %error,
734 "could not check certificate"
735 );
736 error
737 })?;
738 checked_certificates.push(cert);
739 }
740 }
741 Ok((checked_certificates, unresolved, validator_key))
742 })
743 },
744 |errors| {
745 errors
746 .into_iter()
747 .map(|(validator, _error)| validator)
748 .collect::<BTreeSet<_>>()
749 },
750 timeout,
751 )
752 .await;
753
754 match result {
755 Ok((certificates, unresolved, validator_key)) => {
756 for certificate in certificates {
757 let mode = ReceiveCertificateMode::AlreadyChecked;
758 self.receive_sender_certificate(
759 self.storage_client().cache_certificate(certificate),
760 mode,
761 None,
762 )
763 .await?;
764 }
765 validators.retain(|node| node.public_key != validator_key);
766 remaining_event_ids = unresolved;
767 }
768 Err(_) => {
769 return Err(NodeError::EventsNotFound(remaining_event_ids).into());
771 }
772 }
773 }
774 Ok(())
775 }
776
777 #[instrument(level = "trace", skip_all)]
782 async fn process_certificates(
783 &self,
784 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
785 certificates: Vec<ConfirmedBlockCertificate>,
786 until_block_time: Option<Timestamp>,
787 ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
788 let mut info = None;
789 let created_blob_ids: BTreeSet<BlobId> = certificates
794 .iter()
795 .flat_map(|certificate| certificate.value().block().created_blob_ids())
796 .collect();
797 let required_blob_ids: Vec<_> = certificates
798 .iter()
799 .flat_map(|certificate| certificate.value().required_blob_ids())
800 .filter(|blob_id| !created_blob_ids.contains(blob_id))
801 .collect();
802
803 match self
804 .local_node
805 .read_blob_states_from_storage(&required_blob_ids)
806 .await
807 {
808 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
809 self.download_blobs(remote_nodes, &blob_ids).await?;
810 }
811 x => {
812 x?;
813 }
814 }
815
816 for certificate in certificates {
817 if let Some(until) = until_block_time {
818 if certificate.value().block().header.timestamp >= until {
819 break;
820 }
821 }
822 info = Some(
823 match self.handle_certificate(certificate.clone()).await {
824 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
825 self.download_blobs(remote_nodes, &blob_ids).await?;
826 self.handle_certificate(certificate).await?
827 }
828 x => x?,
829 }
830 .info,
831 );
832 }
833
834 Ok(info)
835 }
836
837 async fn handle_certificate<T: ProcessableCertificate>(
838 &self,
839 certificate: GenericCertificate<T>,
840 ) -> Result<ChainInfoResponse, LocalNodeError> {
841 self.local_node
842 .handle_certificate(certificate, &self.notifier)
843 .await
844 }
845
846 async fn chain_info_with_committees(
847 &self,
848 chain_id: ChainId,
849 ) -> Result<Box<ChainInfo>, LocalNodeError> {
850 let query = ChainInfoQuery::new(chain_id).with_committees();
851 let info = self.local_node.handle_chain_info_query(query).await?.info;
852 Ok(info)
853 }
854
855 #[instrument(level = "trace", skip_all)]
858 async fn admin_committees(
859 &self,
860 ) -> Result<(Epoch, BTreeMap<Epoch, Arc<Committee>>), LocalNodeError> {
861 let info = self.chain_info_with_committees(self.admin_chain_id).await?;
862 let hashes = info
863 .requested_committees
864 .ok_or(LocalNodeError::InvalidChainInfoResponse)?;
865 let committees =
866 futures::future::try_join_all(hashes.into_iter().map(|(epoch, hash)| async move {
867 let committee = self
868 .storage_client()
869 .get_or_load_committee_by_hash(hash)
870 .await?;
871 Ok::<_, LocalNodeError>((epoch, committee))
872 }))
873 .await?
874 .into_iter()
875 .collect();
876 Ok((info.epoch, committees))
877 }
878
879 pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
881 let info = self.chain_info_with_committees(self.admin_chain_id).await?;
882 let hash = info
883 .requested_committees
884 .as_ref()
885 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
886 .get(&info.epoch)
887 .copied()
888 .ok_or(LocalNodeError::InactiveChain(self.admin_chain_id))?;
889 let committee = self
890 .storage_client()
891 .get_or_load_committee_by_hash(hash)
892 .await?;
893 Ok((info.epoch, committee))
894 }
895
896 async fn validator_nodes(
898 &self,
899 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, chain_client::Error> {
900 let (_, committee) = self.admin_committee().await?;
901 Ok(self.make_nodes(&committee)?)
902 }
903
904 fn make_nodes(
906 &self,
907 committee: &Committee,
908 ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
909 Ok(self
910 .validator_node_provider()
911 .make_nodes(committee)?
912 .map(|(public_key, node)| RemoteNode { public_key, node })
913 .collect())
914 }
915
916 pub async fn get_chain_description_blob(
919 &self,
920 chain_id: ChainId,
921 ) -> Result<Arc<Blob>, chain_client::Error> {
922 let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
923 let blob = self
924 .local_node
925 .storage_client()
926 .read_blob(chain_desc_id)
927 .await?;
928 if let Some(blob) = blob {
929 return Ok(blob);
931 }
932 self.synchronize_chain_state(self.admin_chain_id).await?;
934 let nodes = self.validator_nodes().await?;
935 Ok(self
936 .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
937 .await?
938 .pop()
939 .unwrap()) }
941
942 pub async fn get_chain_description(
945 &self,
946 chain_id: ChainId,
947 ) -> Result<ChainDescription, chain_client::Error> {
948 let blob = self.get_chain_description_blob(chain_id).await?;
949 Ok(bcs::from_bytes(blob.bytes())?)
950 }
951
952 #[instrument(level = "trace", skip_all)]
954 pub(crate) async fn finalize_block(
955 self: &Arc<Self>,
956 committee: &Committee,
957 certificate: ValidatedBlockCertificate,
958 ) -> Result<ConfirmedBlockCertificate, chain_client::Error> {
959 debug!(round = %certificate.round, "Submitting block for confirmation");
960 let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
961 let finalize_action = CommunicateAction::FinalizeBlock {
962 certificate: Box::new(certificate),
963 delivery: self.options.cross_chain_message_delivery,
964 };
965 let certificate = self
966 .communicate_chain_action(committee, finalize_action, hashed_value)
967 .await?;
968 self.receive_certificate_with_checked_signatures(certificate.clone())
969 .await?;
970 Ok(certificate)
971 }
972
973 #[instrument(level = "trace", skip_all)]
975 async fn submit_block_proposal<T: ProcessableCertificate>(
976 self: &Arc<Self>,
977 committee: Arc<Committee>,
978 proposal: Box<BlockProposal>,
979 value: T,
980 ) -> Result<GenericCertificate<T>, chain_client::Error> {
981 debug!(
982 round = %proposal.content.round,
983 "Submitting block proposal to validators"
984 );
985
986 let block_timestamp = proposal.content.block.timestamp;
988 let local_time = self.local_node.storage_client().clock().current_time();
989 if block_timestamp > local_time {
990 info!(
991 chain_id = %proposal.content.block.chain_id,
992 %block_timestamp,
993 %local_time,
994 "Block timestamp is in the future; waiting until it can be proposed",
995 );
996 }
997
998 let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
1000 let submit_action = CommunicateAction::SubmitBlock {
1001 proposal,
1002 blob_ids: value.required_blob_ids().into_iter().collect(),
1003 clock_skew_sender,
1004 };
1005
1006 let validity_threshold = committee.validity_threshold();
1008 let committee_clone = committee.clone();
1009 let clock_skew_check_handle = linera_base::Task::spawn(async move {
1010 let mut skew_weight = 0u64;
1011 let mut min_skew = TimeDelta::MAX;
1012 let mut max_skew = TimeDelta::ZERO;
1013 while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
1014 if clock_skew.as_micros() > 0 {
1015 skew_weight += committee_clone.weight(&public_key);
1016 min_skew = min_skew.min(clock_skew);
1017 max_skew = max_skew.max(clock_skew);
1018 if skew_weight >= validity_threshold {
1019 warn!(
1020 skew_weight,
1021 validity_threshold,
1022 min_skew_ms = min_skew.as_micros() / 1000,
1023 max_skew_ms = max_skew.as_micros() / 1000,
1024 "A validity threshold of validators reported clock skew; \
1025 consider checking your system clock",
1026 );
1027 return;
1028 }
1029 }
1030 }
1031 });
1032
1033 let certificate = self
1034 .communicate_chain_action(&committee, submit_action, value)
1035 .await?;
1036
1037 clock_skew_check_handle.await;
1038
1039 self.handle_certificate(certificate.clone()).await?;
1040 Ok(certificate)
1041 }
1042
1043 #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
1045 async fn communicate_chain_updates(
1046 self: &Arc<Self>,
1047 committee: &Committee,
1048 chain_id: ChainId,
1049 height: BlockHeight,
1050 delivery: CrossChainMessageDelivery,
1051 latest_certificate: Option<Arc<GenericCertificate<ConfirmedBlock>>>,
1052 ) -> Result<(), chain_client::Error> {
1053 let nodes = self.make_nodes(committee)?;
1054 communicate_with_quorum(
1055 &nodes,
1056 committee,
1057 |_: &()| (),
1058 |remote_node| {
1059 let mut updater = ValidatorUpdater {
1060 remote_node,
1061 client: self.clone(),
1062 admin_chain_id: self.admin_chain_id,
1063 };
1064 let certificate = latest_certificate.clone();
1065 Box::pin(async move {
1066 updater
1067 .send_chain_information(chain_id, height, delivery, certificate)
1068 .await
1069 })
1070 },
1071 self.options.quorum_grace_period,
1072 )
1073 .await?;
1074 Ok(())
1075 }
1076
1077 #[instrument(level = "trace", skip_all)]
1083 async fn communicate_chain_action<T: CertificateValue>(
1084 self: &Arc<Self>,
1085 committee: &Committee,
1086 action: CommunicateAction,
1087 value: T,
1088 ) -> Result<GenericCertificate<T>, chain_client::Error> {
1089 let nodes = self.make_nodes(committee)?;
1090 let ((votes_hash, votes_round), votes) = communicate_with_quorum(
1091 &nodes,
1092 committee,
1093 |vote: &LiteVote| (vote.value.value_hash, vote.round),
1094 |remote_node| {
1095 let mut updater = ValidatorUpdater {
1096 remote_node,
1097 client: self.clone(),
1098 admin_chain_id: self.admin_chain_id,
1099 };
1100 let action = action.clone();
1101 Box::pin(async move { updater.send_chain_update(action).await })
1102 },
1103 self.options.quorum_grace_period,
1104 )
1105 .await?;
1106 ensure!(
1107 (votes_hash, votes_round) == (value.hash(), action.round()),
1108 chain_client::Error::UnexpectedQuorum {
1109 hash: votes_hash,
1110 round: votes_round,
1111 expected_hash: value.hash(),
1112 expected_round: action.round(),
1113 }
1114 );
1115 let certificate = LiteCertificate::try_from_votes(votes)
1120 .ok_or_else(|| {
1121 chain_client::Error::InternalError(
1122 "Vote values or rounds don't match; this is a bug",
1123 )
1124 })?
1125 .with_value(value)
1126 .ok_or_else(|| {
1127 chain_client::Error::ProtocolError("A quorum voted for an unexpected value")
1128 })?;
1129 Ok(certificate)
1130 }
1131
1132 #[instrument(level = "trace", skip_all)]
1135 async fn receive_certificate_with_checked_signatures(
1136 &self,
1137 certificate: ConfirmedBlockCertificate,
1138 ) -> Result<(), chain_client::Error> {
1139 let block = certificate.block();
1140 self.download_certificates(block.header.chain_id, block.header.height)
1142 .await?;
1143 if let Err(err) = self.handle_certificate(certificate.clone()).await {
1146 match &err {
1147 LocalNodeError::BlobsNotFound(blob_ids) => {
1148 self.download_blobs(&self.validator_nodes().await?, blob_ids)
1149 .await
1150 .map_err(|_| err)?;
1151 self.handle_certificate(certificate).await?;
1152 }
1153 _ => {
1154 warn!("Failed to process network hashed certificate value");
1156 return Err(err.into());
1157 }
1158 }
1159 }
1160
1161 Ok(())
1162 }
1163
1164 #[instrument(level = "trace", skip_all)]
1166 async fn receive_sender_certificate(
1167 &self,
1168 certificate: Arc<ConfirmedBlockCertificate>,
1169 mode: ReceiveCertificateMode,
1170 nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
1171 ) -> Result<(), chain_client::Error> {
1172 let (max_epoch, committees) = self.admin_committees().await?;
1174 if let ReceiveCertificateMode::NeedsCheck = mode {
1175 Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
1176 }
1177 let nodes = if let Some(nodes) = nodes {
1179 nodes
1180 } else {
1181 self.validator_nodes().await?
1182 };
1183 if let Err(err) = self.handle_certificate((*certificate).clone()).await {
1184 match &err {
1185 LocalNodeError::BlobsNotFound(blob_ids) => {
1186 self.download_blobs(&nodes, blob_ids).await?;
1187 self.handle_certificate(Arc::unwrap_or_clone(certificate))
1188 .await?;
1189 }
1190 _ => {
1191 warn!("Failed to process network hashed certificate value");
1193 return Err(err.into());
1194 }
1195 }
1196 }
1197
1198 Ok(())
1199 }
1200
1201 #[instrument(level = "debug", skip_all, fields(chain_id = %sender_chain_id))]
1203 async fn download_and_process_sender_chain(
1204 &self,
1205 sender_chain_id: ChainId,
1206 nodes: &[RemoteNode<Env::ValidatorNode>],
1207 received_log: &ReceivedLogs,
1208 mut remote_heights: Vec<BlockHeight>,
1209 sender: mpsc::UnboundedSender<ChainAndHeight>,
1210 ) {
1211 let (max_epoch, committees) = match self.admin_committees().await {
1212 Ok(result) => result,
1213 Err(error) => {
1214 error!(%error, %sender_chain_id, "could not read admin committees");
1215 return;
1216 }
1217 };
1218 let committees_ref = &committees;
1219 let mut nodes = nodes.to_vec();
1220 while !remote_heights.is_empty() {
1221 if let Ok(local_certs) = self
1224 .storage_client()
1225 .read_certificates_by_heights(sender_chain_id, &remote_heights)
1226 .await
1227 {
1228 let mut still_needed = Vec::new();
1229 for (height, maybe_cert) in remote_heights.iter().copied().zip(local_certs) {
1230 if let Some(certificate) = maybe_cert {
1231 let chain_id = certificate.block().header.chain_id;
1232 if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1233 error!(
1234 %chain_id, %height, %error,
1235 "failed to send chain and height over the channel",
1236 );
1237 }
1238 } else {
1239 still_needed.push(height);
1240 }
1241 }
1242 remote_heights = still_needed;
1243 if remote_heights.is_empty() {
1244 break;
1245 }
1246 }
1247
1248 let remote_heights_ref = &remote_heights;
1249 let certificates = match communicate_concurrently(
1250 &nodes,
1251 async move |remote_node| {
1252 let mut remote_heights = remote_heights_ref.clone();
1253 remote_heights.retain(|height| {
1256 received_log.validator_has_block(
1257 &remote_node.public_key,
1258 sender_chain_id,
1259 *height,
1260 )
1261 });
1262 if remote_heights.is_empty() {
1263 return Err(NodeError::MissingCertificateValue);
1266 }
1267 let certificates = self
1268 .requests_scheduler
1269 .download_certificates_by_heights(
1270 &remote_node,
1271 sender_chain_id,
1272 remote_heights,
1273 )
1274 .await?;
1275 let mut certificates_with_check_results = vec![];
1276 for cert in certificates {
1277 let check_result =
1278 Self::check_certificate(max_epoch, committees_ref, &cert)?;
1279 certificates_with_check_results
1280 .push((cert, check_result.into_result().is_ok()));
1281 }
1282 Ok(certificates_with_check_results)
1283 },
1284 |errors| {
1285 errors
1286 .into_iter()
1287 .map(|(validator, _error)| validator)
1288 .collect::<BTreeSet<_>>()
1289 },
1290 self.options.certificate_batch_download_timeout,
1291 )
1292 .await
1293 {
1294 Ok(certificates_with_check_results) => certificates_with_check_results,
1295 Err(faulty_validators) => {
1296 nodes.retain(|node| !faulty_validators.contains(&node.public_key));
1298 if nodes.is_empty() {
1299 info!(
1300 chain_id = %sender_chain_id,
1301 "could not download certificates for chain - no more correct validators left"
1302 );
1303 return;
1304 }
1305 continue;
1306 }
1307 };
1308
1309 trace!(
1310 num_certificates = %certificates.len(),
1311 "received certificates",
1312 );
1313
1314 let mut to_remove_from_queue = BTreeSet::new();
1315
1316 for (certificate, check_result) in certificates {
1317 let hash = certificate.hash();
1318 let chain_id = certificate.block().header.chain_id;
1319 let height = certificate.block().header.height;
1320 if !check_result {
1321 to_remove_from_queue.insert(height);
1325 continue;
1326 }
1327 let mode = ReceiveCertificateMode::AlreadyChecked;
1329 if let Err(error) = self
1330 .receive_sender_certificate(
1331 self.storage_client().cache_certificate(certificate),
1332 mode,
1333 None,
1334 )
1335 .await
1336 {
1337 warn!(%error, %hash, "Received invalid certificate");
1338 } else {
1339 to_remove_from_queue.insert(height);
1340 if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1341 error!(
1342 %chain_id,
1343 %height,
1344 %error,
1345 "failed to send chain and height over the channel",
1346 );
1347 }
1348 }
1349 }
1350
1351 remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1352 }
1353 trace!("find_received_certificates: finished processing chain");
1354 }
1355
1356 #[instrument(level = "trace", skip(self))]
1358 async fn get_received_log_from_validator(
1359 &self,
1360 chain_id: ChainId,
1361 remote_node: &RemoteNode<Env::ValidatorNode>,
1362 tracker: u64,
1363 ) -> Result<Vec<ChainAndHeight>, chain_client::Error> {
1364 let mut offset = tracker;
1365
1366 let mut remote_log = Vec::new();
1368 loop {
1369 trace!("get_received_log_from_validator: looping");
1370 let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1371 let info = remote_node.handle_chain_info_query(query).await?;
1372 let received_entries = info.requested_received_log.len();
1373 offset += received_entries as u64;
1374 remote_log.extend(info.requested_received_log);
1375 trace!(
1376 remote_node = remote_node.address(),
1377 %received_entries,
1378 "get_received_log_from_validator: received log batch",
1379 );
1380 if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1381 break;
1382 }
1383 }
1384
1385 trace!(
1386 remote_node = remote_node.address(),
1387 num_entries = remote_log.len(),
1388 "get_received_log_from_validator: returning downloaded log",
1389 );
1390
1391 Ok(remote_log)
1392 }
1393
1394 async fn download_sender_block_with_sending_ancestors(
1400 &self,
1401 receiver_chain_id: ChainId,
1402 sender_chain_id: ChainId,
1403 height: BlockHeight,
1404 remote_node: &RemoteNode<Env::ValidatorNode>,
1405 ) -> Result<(), chain_client::Error> {
1406 let next_outbox_height = self
1407 .local_node
1408 .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1409 .await?
1410 .get(&sender_chain_id)
1411 .copied()
1412 .unwrap_or(BlockHeight::ZERO);
1413 let (max_epoch, committees) = self.admin_committees().await?;
1414
1415 let mut certificates = BTreeMap::new();
1418 let mut current_height = height;
1419 let mut current_hash: Option<CryptoHash> = None;
1422
1423 while current_height >= next_outbox_height {
1425 let certificate = if let Some(local) = self
1429 .try_read_local_certificate(sender_chain_id, current_height, current_hash)
1430 .await?
1431 {
1432 local
1433 } else {
1434 let downloaded = self
1435 .requests_scheduler
1436 .download_certificates_by_heights(
1437 remote_node,
1438 sender_chain_id,
1439 vec![current_height],
1440 )
1441 .await?;
1442 let Some(certificate) = downloaded.into_iter().next() else {
1443 return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
1444 chain_id: sender_chain_id,
1445 height: current_height,
1446 });
1447 };
1448 self.storage_client().cache_certificate(certificate)
1449 };
1450
1451 Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1453 .into_result()?;
1454
1455 let block = certificate.block();
1457 let next = block
1458 .body
1459 .previous_message_blocks
1460 .get(&receiver_chain_id)
1461 .map(|(prev_hash, prev_height)| (*prev_hash, *prev_height));
1462
1463 certificates.insert(current_height, certificate);
1465
1466 if let Some((prev_hash, prev_height)) = next {
1467 current_height = prev_height;
1469 current_hash = Some(prev_hash);
1470 } else {
1471 break;
1473 }
1474 }
1475
1476 if certificates.is_empty() {
1477 self.retry_pending_cross_chain_requests(sender_chain_id)
1478 .await?;
1479 }
1480
1481 for certificate in certificates.into_values() {
1483 self.receive_sender_certificate(
1484 certificate,
1485 ReceiveCertificateMode::AlreadyChecked,
1486 Some(vec![remote_node.clone()]),
1487 )
1488 .await?;
1489 }
1490
1491 Ok(())
1492 }
1493
1494 async fn download_event_bearing_blocks(
1498 &self,
1499 publisher_chain_id: ChainId,
1500 initial_blocks: BTreeSet<(BlockHeight, CryptoHash)>,
1501 local_next_block_height: BlockHeight,
1502 subscribed_streams: &BTreeSet<StreamId>,
1503 remote_node: &RemoteNode<Env::ValidatorNode>,
1504 ) -> Result<(), chain_client::Error> {
1505 if initial_blocks.is_empty() {
1506 return Ok(());
1507 }
1508 let (max_epoch, committees) = self.admin_committees().await?;
1509
1510 let mut certificates = BTreeMap::new();
1511 let mut blocks_to_fetch = initial_blocks;
1512 let next_expected_events = self
1513 .local_node
1514 .next_expected_events(
1515 publisher_chain_id,
1516 subscribed_streams.iter().cloned().collect(),
1517 )
1518 .await?;
1519
1520 while let Some((current_height, current_hash)) = blocks_to_fetch.pop_last() {
1521 if current_height < local_next_block_height {
1522 continue; }
1524 if certificates.contains_key(¤t_height) {
1525 continue;
1526 }
1527
1528 let certificate = if let Some(certificate) =
1529 self.storage_client().read_certificate(current_hash).await?
1530 {
1531 certificate
1532 } else {
1533 let downloaded = self
1534 .requests_scheduler
1535 .download_certificates(remote_node, publisher_chain_id, current_height, 1)
1536 .await?;
1537 let Some(certificate) = downloaded.into_iter().next() else {
1538 tracing::debug!(
1539 validator = remote_node.address(),
1540 %publisher_chain_id,
1541 height = %current_height,
1542 "failed to download event publisher block"
1543 );
1544 continue;
1545 };
1546
1547 Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1548 .into_result()?;
1549
1550 self.storage_client().cache_certificate(certificate)
1551 };
1552
1553 let block = certificate.block();
1554 for stream_id in subscribed_streams {
1556 if let Some((prev_hash, prev_height)) =
1557 block.body.previous_event_blocks.get(stream_id)
1558 {
1559 if next_expected_events.get(stream_id).is_some_and(|index| {
1560 block
1561 .body
1562 .events
1563 .iter()
1564 .flatten()
1565 .find(|event| event.stream_id == *stream_id)
1566 .is_some_and(|event| event.index == *index)
1567 }) {
1568 continue;
1569 }
1570 if !certificates.contains_key(prev_height) {
1571 blocks_to_fetch.insert((*prev_height, *prev_hash));
1572 }
1573 }
1574 }
1575
1576 certificates.insert(current_height, certificate);
1577 }
1578
1579 for certificate in certificates.into_values() {
1581 self.receive_sender_certificate(
1582 certificate,
1583 ReceiveCertificateMode::AlreadyChecked,
1584 Some(vec![remote_node.clone()]),
1585 )
1586 .await?;
1587 }
1588
1589 Ok(())
1590 }
1591
1592 async fn sync_events_from_node(
1595 &self,
1596 chain_id: ChainId,
1597 stream_ids: &BTreeSet<StreamId>,
1598 remote_node: &RemoteNode<Env::ValidatorNode>,
1599 ) -> Result<(), chain_client::Error> {
1600 let stream_ids_vec: Vec<_> = stream_ids.iter().cloned().collect();
1601 let mut initial_blocks = BTreeSet::new();
1602 for chunk in stream_ids_vec.chunks(self.options.max_event_stream_queries) {
1603 let query = ChainInfoQuery::new(chain_id).with_previous_event_blocks(chunk.to_vec());
1604 let info = remote_node.handle_chain_info_query(query).await?;
1605 initial_blocks.extend(info.requested_previous_event_blocks.values().copied());
1606 }
1607 let local_height = match self.local_node.chain_info(chain_id).await {
1608 Ok(info) => info.next_block_height,
1609 Err(LocalNodeError::InactiveChain(_) | LocalNodeError::BlobsNotFound(_)) => {
1610 BlockHeight::ZERO
1611 }
1612 Err(error) => return Err(error.into()),
1613 };
1614 self.download_event_bearing_blocks(
1615 chain_id,
1616 initial_blocks,
1617 local_height,
1618 stream_ids,
1619 remote_node,
1620 )
1621 .await
1622 }
1623
1624 #[instrument(
1625 level = "trace", skip_all,
1626 fields(certificate_hash = ?incoming_certificate.hash()),
1627 )]
1628 fn check_certificate(
1629 highest_known_epoch: Epoch,
1630 committees: &BTreeMap<Epoch, Arc<Committee>>,
1631 incoming_certificate: &ConfirmedBlockCertificate,
1632 ) -> Result<CheckCertificateResult, NodeError> {
1633 let block = incoming_certificate.block();
1634 if block.header.epoch > highest_known_epoch {
1636 return Ok(CheckCertificateResult::FutureEpoch);
1637 }
1638 if let Some(known_committee) = committees.get(&block.header.epoch) {
1639 incoming_certificate.check(known_committee)?;
1642 Ok(CheckCertificateResult::New)
1643 } else {
1644 Ok(CheckCertificateResult::OldEpoch)
1646 }
1647 }
1648
1649 #[instrument(level = "trace", skip_all)]
1653 async fn synchronize_chain_state(
1654 &self,
1655 chain_id: ChainId,
1656 ) -> Result<Box<ChainInfo>, chain_client::Error> {
1657 let (_, committee) = self.admin_committee().await?;
1658 self.synchronize_chain_from_committee(chain_id, committee)
1659 .await
1660 }
1661
1662 #[instrument(level = "trace", skip_all)]
1667 pub(crate) async fn synchronize_chain_from_committee(
1668 &self,
1669 chain_id: ChainId,
1670 committee: Arc<Committee>,
1671 ) -> Result<Box<ChainInfo>, chain_client::Error> {
1672 #[cfg(with_metrics)]
1673 let _latency = if !self.is_chain_follow_only(chain_id) {
1674 Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1675 } else {
1676 None
1677 };
1678
1679 let validators = self.make_nodes(&committee)?;
1680 Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1681 communicate_with_quorum(
1682 &validators,
1683 &committee,
1684 |_: &()| (),
1685 |remote_node| async move {
1686 self.synchronize_chain_state_from(&remote_node, chain_id)
1687 .await
1688 },
1689 self.options.quorum_grace_period,
1690 )
1691 .await?;
1692
1693 self.local_node
1694 .chain_info(chain_id)
1695 .await
1696 .map_err(Into::into)
1697 }
1698
1699 #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1705 pub(crate) async fn synchronize_chain_state_from(
1706 &self,
1707 remote_node: &RemoteNode<Env::ValidatorNode>,
1708 chain_id: ChainId,
1709 ) -> Result<(), chain_client::Error> {
1710 let with_manager_values = !self.is_chain_follow_only(chain_id);
1711 let query = if with_manager_values {
1712 ChainInfoQuery::new(chain_id).with_manager_values()
1713 } else {
1714 ChainInfoQuery::new(chain_id)
1715 };
1716 let remote_info = remote_node.handle_chain_info_query(query).await?;
1717 let local_info = self
1718 .download_certificates_from(remote_node, chain_id, remote_info.next_block_height, None)
1719 .await?;
1720
1721 if !with_manager_values {
1722 return Ok(());
1723 }
1724
1725 let local_height = local_info.next_block_height;
1727 if local_height != remote_info.next_block_height {
1728 debug!(
1729 remote_node = remote_node.address(),
1730 remote_height = %remote_info.next_block_height,
1731 local_height = %local_height,
1732 "synced from validator, but remote height and local height are different",
1733 );
1734 return Ok(());
1735 };
1736
1737 if let Some(timeout) = remote_info.manager.timeout {
1738 self.handle_certificate(*timeout).await?;
1739 }
1740 let mut proposals = Vec::new();
1741 if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1742 proposals.push(*proposal);
1743 }
1744 if let Some(proposal) = remote_info.manager.requested_proposed {
1745 proposals.push(*proposal);
1746 }
1747 if let Some(locking) = remote_info.manager.requested_locking {
1748 match *locking {
1749 LockingBlock::Fast(proposal) => {
1750 proposals.push(proposal);
1751 }
1752 LockingBlock::Regular(cert) => {
1753 let hash = cert.hash();
1754 if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1755 {
1756 debug!(
1757 remote_node = remote_node.address(),
1758 %hash,
1759 height = %local_height,
1760 %error,
1761 "skipping locked block from validator",
1762 );
1763 }
1764 }
1765 }
1766 }
1767 'proposal_loop: for proposal in proposals {
1768 let owner: AccountOwner = proposal.owner();
1769 if let Err(mut err) = self
1770 .local_node
1771 .handle_block_proposal(proposal.clone())
1772 .await
1773 {
1774 if let LocalNodeError::BlobsNotFound(_) = &err {
1775 let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1776 if !required_blob_ids.is_empty() {
1777 let mut blobs = Vec::new();
1778 for blob_id in required_blob_ids {
1779 let blob_content = match self
1780 .requests_scheduler
1781 .download_pending_blob(remote_node, chain_id, blob_id)
1782 .await
1783 {
1784 Ok(content) => content,
1785 Err(error) => {
1786 info!(
1787 remote_node = remote_node.address(),
1788 height = %local_height,
1789 proposer = %owner,
1790 %blob_id,
1791 %error,
1792 "skipping proposal from validator; failed to download blob",
1793 );
1794 continue 'proposal_loop;
1795 }
1796 };
1797 blobs.push(Blob::new(blob_content));
1798 }
1799 self.local_node
1800 .handle_pending_blobs(chain_id, blobs)
1801 .await?;
1802 if let Err(new_err) = self
1804 .local_node
1805 .handle_block_proposal(proposal.clone())
1806 .await
1807 {
1808 err = new_err;
1809 } else {
1810 continue;
1811 }
1812 }
1813 if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1814 self.update_local_node_with_blobs_from(
1815 blob_ids.clone(),
1816 slice::from_ref(remote_node),
1817 )
1818 .await?;
1819 if let Err(new_err) = self
1821 .local_node
1822 .handle_block_proposal(proposal.clone())
1823 .await
1824 {
1825 err = new_err;
1826 } else {
1827 continue;
1828 }
1829 }
1830 }
1831 while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1832 if let ChainError::MissingCrossChainUpdate {
1833 chain_id,
1834 origin,
1835 height,
1836 } = &**chain_err
1837 {
1838 self.download_sender_block_with_sending_ancestors(
1839 *chain_id,
1840 *origin,
1841 *height,
1842 remote_node,
1843 )
1844 .await?;
1845 if let Err(new_err) = self
1847 .local_node
1848 .handle_block_proposal(proposal.clone())
1849 .await
1850 {
1851 err = new_err;
1852 } else {
1853 continue 'proposal_loop;
1854 }
1855 } else {
1856 break;
1857 }
1858 }
1859
1860 debug!(
1861 remote_node = remote_node.address(),
1862 proposer = %owner,
1863 height = %local_height,
1864 error = %err,
1865 "skipping proposal from validator",
1866 );
1867 }
1868 }
1869 Ok(())
1870 }
1871
1872 async fn try_process_locking_block_from(
1873 &self,
1874 remote_node: &RemoteNode<Env::ValidatorNode>,
1875 certificate: GenericCertificate<ValidatedBlock>,
1876 ) -> Result<(), chain_client::Error> {
1877 let chain_id = certificate.inner().chain_id();
1878 match self.handle_certificate(certificate.clone()).await {
1879 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1880 let mut blobs = Vec::new();
1881 for blob_id in blob_ids {
1882 let blob_content = self
1883 .requests_scheduler
1884 .download_pending_blob(remote_node, chain_id, blob_id)
1885 .await?;
1886 blobs.push(Blob::new(blob_content));
1887 }
1888 self.local_node
1889 .handle_pending_blobs(chain_id, blobs)
1890 .await?;
1891 self.handle_certificate(certificate).await?;
1892 Ok(())
1893 }
1894 Err(err) => Err(err.into()),
1895 Ok(_) => Ok(()),
1896 }
1897 }
1898
1899 async fn update_local_node_with_blobs_from(
1902 &self,
1903 blob_ids: Vec<BlobId>,
1904 remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1905 ) -> Result<Vec<Arc<Blob>>, chain_client::Error> {
1906 let timeout = self.options.blob_download_timeout;
1907 let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1909 stream::iter(blob_ids.into_iter().map(|blob_id| {
1910 communicate_concurrently(
1911 remote_nodes,
1912 async move |remote_node| {
1913 let certificate = self
1914 .requests_scheduler
1915 .download_certificate_for_blob(&remote_node, blob_id)
1916 .await?;
1917 self.receive_sender_certificate(
1918 self.storage_client().cache_certificate(certificate),
1919 ReceiveCertificateMode::NeedsCheck,
1920 Some(vec![remote_node.clone()]),
1921 )
1922 .await?;
1923 let blob = self
1924 .local_node
1925 .storage_client()
1926 .read_blob(blob_id)
1927 .await?
1928 .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1929 Result::<_, chain_client::Error>::Ok(blob)
1930 },
1931 move |_| chain_client::Error::from(NodeError::BlobsNotFound(vec![blob_id])),
1932 timeout,
1933 )
1934 }))
1935 .buffer_unordered(self.options.max_joined_tasks)
1936 .collect::<Vec<_>>()
1937 .await
1938 .into_iter()
1939 .collect()
1940 }
1941
1942 #[instrument(level = "trace", skip(self, block))]
1952 async fn stage_block_execution(
1953 &self,
1954 block: ProposedBlock,
1955 round: Option<u32>,
1956 published_blobs: Vec<Blob>,
1957 policy: BundleExecutionPolicy,
1958 ) -> Result<(Block, ChainInfoResponse, HashSet<ChainId>), chain_client::Error> {
1959 let mut downloaded_events = HashSet::<EventId>::new();
1960 loop {
1961 let result = self
1962 .local_node
1963 .stage_block_execution(
1964 block.clone(),
1965 round,
1966 published_blobs.clone(),
1967 policy.clone(),
1968 )
1969 .await;
1970 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1971 let validators = self.validator_nodes().await?;
1972 self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1973 .await?;
1974 continue; }
1976 if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
1977 let new_events = event_ids
1978 .iter()
1979 .filter(|id| !downloaded_events.contains(id))
1980 .cloned()
1981 .collect::<Vec<_>>();
1982 if !new_events.is_empty() {
1983 Box::pin(self.download_certificates_for_events(&new_events)).await?;
1984 downloaded_events.extend(new_events);
1985 continue; }
1987 }
1989 if let Ok((_, executed_block, _, _, _)) = &result {
1990 let hash = CryptoHash::new(executed_block);
1991 let notification = Notification {
1992 chain_id: executed_block.header.chain_id,
1993 reason: Reason::BlockExecuted {
1994 height: executed_block.header.height,
1995 hash,
1996 },
1997 };
1998 self.notifier.notify(&[notification]);
1999 }
2000 let (
2001 _modified_block,
2002 executed_block,
2003 response,
2004 _resource_tracker,
2005 never_reject_origins,
2006 ) = result?;
2007 return Ok((executed_block, response, never_reject_origins));
2008 }
2009 }
2010}
2011
2012async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
2015 nodes: &[RemoteNode<A>],
2016 f: F,
2017 err: G,
2018 timeout: Duration,
2019) -> Result<V, E2>
2020where
2021 F: Clone + FnOnce(RemoteNode<A>) -> R,
2022 RemoteNode<A>: Clone,
2023 G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
2024 R: Future<Output = Result<V, E1>> + 'a,
2025{
2026 let mut nodes = nodes.to_vec();
2027 nodes.shuffle(&mut rand::thread_rng());
2028 let mut stream = nodes
2029 .iter()
2030 .zip(0..)
2031 .map(|(remote_node, i)| {
2032 let fun = f.clone();
2033 let node = remote_node.clone();
2034 async move {
2035 linera_base::time::timer::sleep(timeout * i * i).await;
2036 fun(node).await.map_err(|err| (remote_node.public_key, err))
2037 }
2038 })
2039 .collect::<FuturesUnordered<_>>();
2040 let mut errors = vec![];
2041 while let Some(maybe_result) = stream.next().await {
2042 match maybe_result {
2043 Ok(result) => return Ok(result),
2044 Err(error) => errors.push(error),
2045 };
2046 }
2047 Err(err(errors))
2048}
2049
2050#[must_use]
2052pub struct AbortOnDrop(pub AbortHandle);
2053
2054impl Drop for AbortOnDrop {
2055 #[instrument(level = "trace", skip(self))]
2056 fn drop(&mut self) {
2057 self.0.abort();
2058 }
2059}
2060
2061#[derive(Clone, Serialize, Deserialize)]
2063pub struct PendingProposal {
2064 pub block: ProposedBlock,
2065 pub blobs: Vec<Blob>,
2066 #[serde(default)]
2069 pub auto_retry_outcome: Option<BlockExecutionOutcome>,
2070 #[serde(default)]
2072 pub round: Option<Round>,
2073}
2074
2075enum ReceiveCertificateMode {
2076 NeedsCheck,
2077 AlreadyChecked,
2078}
2079
2080enum CheckCertificateResult {
2081 OldEpoch,
2082 New,
2083 FutureEpoch,
2084}
2085
2086impl CheckCertificateResult {
2087 fn into_result(self) -> Result<(), chain_client::Error> {
2088 match self {
2089 Self::OldEpoch => Err(chain_client::Error::CommitteeDeprecationError),
2090 Self::New => Ok(()),
2091 Self::FutureEpoch => Err(chain_client::Error::CommitteeSynchronizationError),
2092 }
2093 }
2094}
2095
2096#[cfg(not(target_arch = "wasm32"))]
2098pub async fn create_bytecode_blobs(
2099 contract: Bytecode,
2100 service: Bytecode,
2101 vm_runtime: VmRuntime,
2102) -> (Vec<Blob>, ModuleId) {
2103 match vm_runtime {
2104 VmRuntime::Wasm => {
2105 let (compressed_contract, compressed_service) =
2106 tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
2107 .await
2108 .expect("Compression should not panic");
2109 let contract_blob = Blob::new_contract_bytecode(compressed_contract);
2110 let service_blob = Blob::new_service_bytecode(compressed_service);
2111 let module_id =
2112 ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
2113 (vec![contract_blob, service_blob], module_id)
2114 }
2115 VmRuntime::Evm => {
2116 let compressed_contract = contract.compress();
2117 let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
2118 let module_id = ModuleId::new(
2119 evm_contract_blob.id().hash,
2120 evm_contract_blob.id().hash,
2121 vm_runtime,
2122 );
2123 (vec![evm_contract_blob], module_id)
2124 }
2125 }
2126}