1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt,
8 hash::Hash,
9 mem,
10 sync::Arc,
11};
12
13use futures::{
14 stream::{FuturesUnordered, TryStreamExt},
15 Future, StreamExt,
16};
17use linera_base::{
18 crypto::ValidatorPublicKey,
19 data_types::{BlockHeight, Round},
20 ensure,
21 identifiers::{BlobId, BlobType, ChainId, StreamId},
22 time::{timer::timeout, Duration, Instant},
23};
24use linera_chain::{
25 data_types::{BlockProposal, LiteVote},
26 manager::LockingBlock,
27 types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
28};
29use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
30use linera_storage::{ResultReadCertificates, Storage};
31use thiserror::Error;
32use tracing::{instrument, Level};
33
34use crate::{
35 client::{chain_client, Client},
36 data_types::{ChainInfo, ChainInfoQuery},
37 environment::Environment,
38 node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
39 remote_node::RemoteNode,
40 LocalNodeError,
41};
42
43pub const DEFAULT_GRACE_PERIOD: f64 = 0.2;
46const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); #[derive(Clone)]
51pub enum CommunicateAction {
52 SubmitBlock {
53 proposal: Box<BlockProposal>,
54 blob_ids: Vec<BlobId>,
55 },
56 FinalizeBlock {
57 certificate: Box<ValidatedBlockCertificate>,
58 delivery: CrossChainMessageDelivery,
59 },
60 RequestTimeout {
61 chain_id: ChainId,
62 height: BlockHeight,
63 round: Round,
64 },
65}
66
67impl CommunicateAction {
68 pub fn round(&self) -> Round {
70 match self {
71 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
72 CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
73 CommunicateAction::RequestTimeout { round, .. } => *round,
74 }
75 }
76}
77
78pub struct ValidatorUpdater<Env>
79where
80 Env: Environment,
81{
82 pub remote_node: RemoteNode<Env::ValidatorNode>,
83 pub client: Arc<Client<Env>>,
84 pub admin_id: ChainId,
85}
86
87impl<Env: Environment> Clone for ValidatorUpdater<Env> {
88 fn clone(&self) -> Self {
89 ValidatorUpdater {
90 remote_node: self.remote_node.clone(),
91 client: self.client.clone(),
92 admin_id: self.admin_id,
93 }
94 }
95}
96
97#[derive(Error, Debug)]
99pub enum CommunicationError<E: fmt::Debug> {
100 #[error(
103 "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
104 )]
105 NoConsensus(u64, Vec<(u64, usize)>),
106 #[error("Failed to communicate with a quorum of validators: {0}")]
109 Trusted(E),
110 #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
113 Sample(Vec<(E, u64)>),
114}
115
116pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
122 validator_clients: &'a [RemoteNode<A>],
123 committee: &Committee,
124 group_by: G,
125 execute: F,
126 grace_period: f64,
128) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
129where
130 A: ValidatorNode + Clone + 'static,
131 F: Clone + Fn(RemoteNode<A>) -> R,
132 R: Future<Output = Result<V, chain_client::Error>> + 'a,
133 G: Fn(&V) -> K,
134 K: Hash + PartialEq + Eq + Clone + 'static,
135 V: 'static,
136{
137 let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
138 .iter()
139 .filter_map(|remote_node| {
140 if committee.weight(&remote_node.public_key) == 0 {
141 return None;
144 }
145 let execute = execute.clone();
146 let remote_node = remote_node.clone();
147 Some(async move { (remote_node.public_key, execute(remote_node).await) })
148 })
149 .collect();
150
151 let start_time = Instant::now();
152 let mut end_time: Option<Instant> = None;
153 let mut remaining_votes = committee.total_votes();
154 let mut highest_key_score = 0;
155 let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
156 let mut error_scores = HashMap::new();
157
158 'vote_wait: while let Ok(Some((name, result))) = timeout(
159 end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
160 responses.next(),
161 )
162 .await
163 {
164 remaining_votes -= committee.weight(&name);
165 match result {
166 Ok(value) => {
167 let key = group_by(&value);
168 let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
169 entry.0 += committee.weight(&name);
170 entry.1.push((name, value));
171 highest_key_score = highest_key_score.max(entry.0);
172 }
173 Err(err) => {
174 let err = match err {
176 chain_client::Error::RemoteNodeError(err) => err,
177 err => NodeError::ResponseHandlingError {
178 error: err.to_string(),
179 },
180 };
181 let entry = error_scores.entry(err.clone()).or_insert(0);
182 *entry += committee.weight(&name);
183 if *entry >= committee.validity_threshold() {
184 return Err(CommunicationError::Trusted(err));
187 }
188 }
189 }
190 if highest_key_score + remaining_votes < committee.quorum_threshold() {
192 break 'vote_wait;
193 }
194
195 if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
198 end_time = Some(Instant::now() + start_time.elapsed().mul_f64(grace_period));
199 }
200 }
201
202 let scores = value_scores
203 .values()
204 .map(|(weight, values)| (*weight, values.len()))
205 .collect();
206 if let Some((key, (_, values))) = value_scores
208 .into_iter()
209 .find(|(_, (score, _))| *score >= committee.quorum_threshold())
210 {
211 return Ok((key, values));
212 }
213
214 if error_scores.is_empty() {
215 return Err(CommunicationError::NoConsensus(
216 committee.quorum_threshold(),
217 scores,
218 ));
219 }
220
221 let mut sample = error_scores.into_iter().collect::<Vec<_>>();
223 sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
224 sample.truncate(4);
225 Err(CommunicationError::Sample(sample))
226}
227
228impl<Env> ValidatorUpdater<Env>
229where
230 Env: Environment + 'static,
231{
232 #[instrument(
233 level = "trace", skip_all, err(level = Level::WARN),
234 fields(chain_id = %certificate.block().header.chain_id)
235 )]
236 async fn send_confirmed_certificate(
237 &mut self,
238 certificate: GenericCertificate<ConfirmedBlock>,
239 delivery: CrossChainMessageDelivery,
240 ) -> Result<Box<ChainInfo>, chain_client::Error> {
241 let mut result = self
242 .remote_node
243 .handle_optimized_confirmed_certificate(&certificate, delivery)
244 .await;
245
246 let mut sent_admin_chain = false;
247 let mut sent_blobs = false;
248 loop {
249 match result {
250 Err(NodeError::EventsNotFound(event_ids))
251 if !sent_admin_chain
252 && certificate.inner().chain_id() != self.admin_id
253 && event_ids.iter().all(|event_id| {
254 event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
255 && event_id.chain_id == self.admin_id
256 }) =>
257 {
258 self.update_admin_chain().await?;
260 sent_admin_chain = true;
261 }
262 Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
263 self.remote_node
265 .check_blobs_not_found(&certificate, &blob_ids)?;
266 let maybe_blobs = self
268 .client
269 .local_node
270 .read_blobs_from_storage(&blob_ids)
271 .await?;
272 let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
273 self.remote_node.node.upload_blobs(blobs).await?;
274 sent_blobs = true;
275 }
276 result => return Ok(result?),
277 }
278 result = self
279 .remote_node
280 .handle_confirmed_certificate(certificate.clone(), delivery)
281 .await;
282 }
283 }
284
285 async fn send_validated_certificate(
286 &mut self,
287 certificate: GenericCertificate<ValidatedBlock>,
288 delivery: CrossChainMessageDelivery,
289 ) -> Result<Box<ChainInfo>, chain_client::Error> {
290 let result = self
291 .remote_node
292 .handle_optimized_validated_certificate(&certificate, delivery)
293 .await;
294
295 let chain_id = certificate.inner().chain_id();
296 match &result {
297 Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
298 self.remote_node
299 .check_blobs_not_found(&certificate, blob_ids)?;
300 let blobs = self
303 .client
304 .local_node
305 .get_locking_blobs(blob_ids, chain_id)
306 .await?
307 .ok_or_else(|| original_err.clone())?;
308 self.remote_node.send_pending_blobs(chain_id, blobs).await?;
309 }
310 Err(error) => {
311 self.sync_if_needed(
312 chain_id,
313 certificate.round,
314 certificate.block().header.height,
315 error,
316 )
317 .await?;
318 }
319 _ => return Ok(result?),
320 }
321 Ok(self
322 .remote_node
323 .handle_validated_certificate(certificate)
324 .await?)
325 }
326
327 async fn request_timeout(
332 &mut self,
333 chain_id: ChainId,
334 round: Round,
335 height: BlockHeight,
336 ) -> Result<Box<ChainInfo>, chain_client::Error> {
337 let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
338 let result = self
339 .remote_node
340 .handle_chain_info_query(query.clone())
341 .await;
342 if let Err(err) = &result {
343 self.sync_if_needed(chain_id, round, height, err).await?;
344 }
345 Ok(result?)
346 }
347
348 async fn sync_if_needed(
350 &mut self,
351 chain_id: ChainId,
352 round: Round,
353 height: BlockHeight,
354 error: &NodeError,
355 ) -> Result<(), chain_client::Error> {
356 let address = &self.remote_node.address();
357 match error {
358 NodeError::WrongRound(validator_round) if *validator_round > round => {
359 tracing::debug!(
360 address, %chain_id, %validator_round, %round,
361 "validator is at a higher round; synchronizing",
362 );
363 self.client
364 .synchronize_chain_state_from(&self.remote_node, chain_id)
365 .await?;
366 }
367 NodeError::UnexpectedBlockHeight {
368 expected_block_height,
369 found_block_height,
370 } if expected_block_height > found_block_height => {
371 tracing::debug!(
372 address,
373 %chain_id,
374 %expected_block_height,
375 %found_block_height,
376 "validator is at a higher height; synchronizing",
377 );
378 self.client
379 .synchronize_chain_state_from(&self.remote_node, chain_id)
380 .await?;
381 }
382 NodeError::WrongRound(validator_round) if *validator_round < round => {
383 tracing::debug!(
384 address, %chain_id, %validator_round, %round,
385 "validator is at a lower round; sending chain info",
386 );
387 self.send_chain_information(
388 chain_id,
389 height,
390 CrossChainMessageDelivery::NonBlocking,
391 None,
392 )
393 .await?;
394 }
395 NodeError::UnexpectedBlockHeight {
396 expected_block_height,
397 found_block_height,
398 } if expected_block_height < found_block_height => {
399 tracing::debug!(
400 address,
401 %chain_id,
402 %expected_block_height,
403 %found_block_height,
404 "Validator is at a lower height; sending chain info.",
405 );
406 self.send_chain_information(
407 chain_id,
408 height,
409 CrossChainMessageDelivery::NonBlocking,
410 None,
411 )
412 .await?;
413 }
414 NodeError::InactiveChain(chain_id) => {
415 tracing::debug!(
416 address,
417 %chain_id,
418 "Validator has inactive chain; sending chain info.",
419 );
420 self.send_chain_information(
421 *chain_id,
422 height,
423 CrossChainMessageDelivery::NonBlocking,
424 None,
425 )
426 .await?;
427 }
428 _ => {}
429 }
430 Ok(())
431 }
432
433 async fn send_block_proposal(
434 &mut self,
435 proposal: Box<BlockProposal>,
436 mut blob_ids: Vec<BlobId>,
437 ) -> Result<Box<ChainInfo>, chain_client::Error> {
438 let chain_id = proposal.content.block.chain_id;
439 let mut sent_cross_chain_updates = BTreeMap::new();
440 let mut publisher_chain_ids_sent = BTreeSet::new();
441 loop {
442 match self
443 .remote_node
444 .handle_block_proposal(proposal.clone())
445 .await
446 {
447 Ok(info) => return Ok(info),
448 Err(NodeError::WrongRound(_round)) => {
449 tracing::debug!(
452 remote_node = self.remote_node.address(),
453 %chain_id,
454 "wrong round; sending chain to validator",
455 );
456 self.send_chain_information(
457 chain_id,
458 proposal.content.block.height,
459 CrossChainMessageDelivery::NonBlocking,
460 None,
461 )
462 .await?;
463 }
464 Err(NodeError::UnexpectedBlockHeight {
465 expected_block_height,
466 found_block_height,
467 }) if expected_block_height < found_block_height
468 && found_block_height == proposal.content.block.height =>
469 {
470 tracing::debug!(
471 remote_node = self.remote_node.address(),
472 %chain_id,
473 "wrong height; sending chain to validator",
474 );
475 self.send_chain_information(
477 chain_id,
478 found_block_height,
479 CrossChainMessageDelivery::NonBlocking,
480 None,
481 )
482 .await?;
483 }
484 Err(NodeError::MissingCrossChainUpdate {
485 chain_id,
486 origin,
487 height,
488 }) if chain_id == proposal.content.block.chain_id
489 && sent_cross_chain_updates
490 .get(&origin)
491 .is_none_or(|h| *h < height) =>
492 {
493 tracing::debug!(
494 remote_node = %self.remote_node.address(),
495 chain_id = %origin,
496 "Missing cross-chain update; sending chain to validator.",
497 );
498 sent_cross_chain_updates.insert(origin, height);
499 self.send_chain_information(
503 origin,
504 height.try_add_one()?,
505 CrossChainMessageDelivery::Blocking,
506 None,
507 )
508 .await?;
509 }
510 Err(NodeError::EventsNotFound(event_ids)) => {
511 let mut publisher_heights = BTreeMap::new();
512 let chain_ids = event_ids
513 .iter()
514 .map(|event_id| event_id.chain_id)
515 .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
516 .collect::<BTreeSet<_>>();
517 tracing::debug!(
518 remote_node = self.remote_node.address(),
519 ?chain_ids,
520 "missing events; sending chains to validator",
521 );
522 ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
523 for chain_id in chain_ids {
524 let height = self
525 .client
526 .local_node
527 .chain_state_view(chain_id)
528 .await?
529 .next_height_to_preprocess()
530 .await?;
531 publisher_heights.insert(chain_id, height);
532 publisher_chain_ids_sent.insert(chain_id);
533 }
534 self.send_chain_info_up_to_heights(
535 publisher_heights,
536 CrossChainMessageDelivery::NonBlocking,
537 )
538 .await?;
539 }
540 Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
541 if !blob_ids.is_empty() =>
542 {
543 tracing::debug!("Missing blobs");
544 let published_blob_ids =
548 BTreeSet::from_iter(proposal.content.block.published_blob_ids());
549 blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
550 let mut published_blobs = Vec::new();
551 {
552 let chain = self.client.local_node.chain_state_view(chain_id).await?;
553 for blob_id in published_blob_ids {
554 published_blobs
555 .extend(chain.manager.proposed_blobs.get(&blob_id).await?);
556 }
557 }
558 self.remote_node
559 .send_pending_blobs(chain_id, published_blobs)
560 .await?;
561 let missing_blob_ids = self
562 .remote_node
563 .node
564 .missing_blob_ids(mem::take(&mut blob_ids))
565 .await?;
566 let blob_states = self
567 .client
568 .local_node
569 .read_blob_states_from_storage(&missing_blob_ids)
570 .await?;
571 let mut chain_heights = BTreeMap::new();
572 for blob_state in blob_states {
573 let block_chain_id = blob_state.chain_id;
574 let block_height = blob_state.block_height.try_add_one()?;
575 chain_heights
576 .entry(block_chain_id)
577 .and_modify(|h| *h = block_height.max(*h))
578 .or_insert(block_height);
579 }
580 tracing::debug!("Sending chains {chain_heights:?}");
581
582 self.send_chain_info_up_to_heights(
583 chain_heights,
584 CrossChainMessageDelivery::NonBlocking,
585 )
586 .await?;
587 }
588 Err(err) => return Err(err.into()),
590 }
591 }
592 }
593
594 async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
595 let local_admin_info = self.client.local_node.chain_info(self.admin_id).await?;
596 Box::pin(self.send_chain_information(
597 self.admin_id,
598 local_admin_info.next_block_height,
599 CrossChainMessageDelivery::NonBlocking,
600 None,
601 ))
602 .await
603 }
604
605 pub async fn send_chain_information(
606 &mut self,
607 chain_id: ChainId,
608 target_block_height: BlockHeight,
609 delivery: CrossChainMessageDelivery,
610 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
611 ) -> Result<(), chain_client::Error> {
612 let info = if let Ok(height) = target_block_height.try_sub_one() {
613 let certificate = if let Some(cert) = latest_certificate {
616 cert
617 } else {
618 let hash = self
619 .client
620 .local_node
621 .chain_state_view(chain_id)
622 .await?
623 .block_hashes([height])
624 .await?
625 .into_iter()
626 .next()
627 .ok_or_else(|| {
628 chain_client::Error::InternalError(
629 "send_chain_information called with invalid target_block_height",
630 )
631 })?;
632 self.client
633 .local_node
634 .storage_client()
635 .read_certificate(hash)
636 .await?
637 .ok_or_else(|| chain_client::Error::MissingConfirmedBlock(hash))?
638 };
639 let info = match self.send_confirmed_certificate(certificate, delivery).await {
640 Ok(info) => info,
641 Err(error) => {
642 tracing::debug!(
643 address = self.remote_node.address(), %error,
644 "validator failed to handle confirmed certificate; sending whole chain",
645 );
646 let query = ChainInfoQuery::new(chain_id);
647 self.remote_node.handle_chain_info_query(query).await?
648 }
649 };
650 let heights = (info.next_block_height.0..target_block_height.0).map(BlockHeight);
652 let validator_missing_hashes = self
653 .client
654 .local_node
655 .chain_state_view(chain_id)
656 .await?
657 .block_hashes(heights)
658 .await?;
659 if !validator_missing_hashes.is_empty() {
660 let certificates = self
662 .client
663 .local_node
664 .storage_client()
665 .read_certificates(validator_missing_hashes.clone())
666 .await?;
667 let certificates =
668 match ResultReadCertificates::new(certificates, validator_missing_hashes) {
669 ResultReadCertificates::Certificates(certificates) => certificates,
670 ResultReadCertificates::InvalidHashes(hashes) => {
671 return Err(chain_client::Error::ReadCertificatesError(hashes))
672 }
673 };
674 for certificate in certificates {
675 self.send_confirmed_certificate(certificate, delivery)
676 .await?;
677 }
678 }
679 info
680 } else {
681 let blob_states = self
683 .client
684 .local_node
685 .read_blob_states_from_storage(&[BlobId::new(
686 chain_id.0,
687 BlobType::ChainDescription,
688 )])
689 .await?;
690 let mut chain_heights = BTreeMap::new();
691 for blob_state in blob_states {
692 let block_chain_id = blob_state.chain_id;
693 let block_height = blob_state.block_height.try_add_one()?;
694 chain_heights
695 .entry(block_chain_id)
696 .and_modify(|h| *h = block_height.max(*h))
697 .or_insert(block_height);
698 }
699 self.send_chain_info_up_to_heights(
700 chain_heights,
701 CrossChainMessageDelivery::NonBlocking,
702 )
703 .await?;
704 let query = ChainInfoQuery::new(chain_id);
705 self.remote_node.handle_chain_info_query(query).await?
706 };
707 let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
708 let query = ChainInfoQuery::new(chain_id).with_manager_values();
709 let local_info = match self.client.local_node.handle_chain_info_query(query).await {
710 Ok(response) => response.info,
711 Err(LocalNodeError::BlobsNotFound(_)) => return Ok(()),
713 Err(error) => return Err(error.into()),
714 };
715 let manager = local_info.manager;
716 if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
717 return Ok(());
718 }
719 for proposal in manager
722 .requested_proposed
723 .into_iter()
724 .chain(manager.requested_signed_proposal)
725 {
726 if proposal.content.round == manager.current_round {
727 if let Err(error) = self.remote_node.handle_block_proposal(proposal).await {
728 tracing::info!(%error, "failed to send block proposal");
729 } else {
730 return Ok(());
731 }
732 }
733 }
734 if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.map(|b| *b) {
735 if validated.round == manager.current_round {
736 if let Err(error) = self
737 .remote_node
738 .handle_optimized_validated_certificate(
739 &validated,
740 CrossChainMessageDelivery::NonBlocking,
741 )
742 .await
743 {
744 tracing::info!(%error, "failed to send locking block");
745 } else {
746 return Ok(());
747 }
748 }
749 }
750 if let Some(cert) = manager.timeout {
751 if cert.round >= remote_round {
752 tracing::debug!(round = %cert.round, "sending timeout");
753 self.remote_node.handle_timeout_certificate(*cert).await?;
754 }
755 }
756 Ok(())
757 }
758
759 async fn send_chain_info_up_to_heights(
760 &mut self,
761 chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
762 delivery: CrossChainMessageDelivery,
763 ) -> Result<(), chain_client::Error> {
764 FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, height)| {
765 let mut updater = self.clone();
766 async move {
767 updater
768 .send_chain_information(chain_id, height, delivery, None)
769 .await
770 }
771 }))
772 .try_collect::<Vec<_>>()
773 .await?;
774 Ok(())
775 }
776
777 pub async fn send_chain_update(
778 &mut self,
779 action: CommunicateAction,
780 ) -> Result<LiteVote, chain_client::Error> {
781 let chain_id = match &action {
782 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
783 CommunicateAction::FinalizeBlock { certificate, .. } => {
784 certificate.inner().block().header.chain_id
785 }
786 CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
787 };
788 let vote = match action {
790 CommunicateAction::SubmitBlock { proposal, blob_ids } => {
791 let info = self.send_block_proposal(proposal, blob_ids).await?;
792 info.manager.pending.ok_or_else(|| {
793 NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
794 })?
795 }
796 CommunicateAction::FinalizeBlock {
797 certificate,
798 delivery,
799 } => {
800 let info = self
801 .send_validated_certificate(*certificate, delivery)
802 .await?;
803 info.manager.pending.ok_or_else(|| {
804 NodeError::MissingVoteInValidatorResponse("finalize a block".into())
805 })?
806 }
807 CommunicateAction::RequestTimeout { round, height, .. } => {
808 let info = self.request_timeout(chain_id, round, height).await?;
809 info.manager.timeout_vote.ok_or_else(|| {
810 NodeError::MissingVoteInValidatorResponse("request a timeout".into())
811 })?
812 }
813 };
814 vote.check(self.remote_node.public_key)?;
815 Ok(vote)
816 }
817}