1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt,
8 hash::Hash,
9 mem,
10 sync::Arc,
11};
12
13use futures::{
14 stream::{FuturesUnordered, TryStreamExt},
15 Future, StreamExt,
16};
17use linera_base::{
18 crypto::ValidatorPublicKey,
19 data_types::{BlockHeight, Round},
20 ensure,
21 identifiers::{BlobId, BlobType, ChainId, StreamId},
22 time::{timer::timeout, Duration, Instant},
23};
24use linera_chain::{
25 data_types::{BlockProposal, LiteVote},
26 manager::LockingBlock,
27 types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
28};
29use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
30use linera_storage::{ResultReadCertificates, Storage};
31use thiserror::Error;
32use tracing::{instrument, Level};
33
34use crate::{
35 client::{chain_client, Client},
36 data_types::{ChainInfo, ChainInfoQuery},
37 environment::Environment,
38 node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
39 remote_node::RemoteNode,
40 LocalNodeError,
41};
42
43pub const DEFAULT_QUORUM_GRACE_PERIOD: f64 = 0.2;
46const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); #[derive(Clone)]
51pub enum CommunicateAction {
52 SubmitBlock {
53 proposal: Box<BlockProposal>,
54 blob_ids: Vec<BlobId>,
55 },
56 FinalizeBlock {
57 certificate: Box<ValidatedBlockCertificate>,
58 delivery: CrossChainMessageDelivery,
59 },
60 RequestTimeout {
61 chain_id: ChainId,
62 height: BlockHeight,
63 round: Round,
64 },
65}
66
67impl CommunicateAction {
68 pub fn round(&self) -> Round {
70 match self {
71 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
72 CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
73 CommunicateAction::RequestTimeout { round, .. } => *round,
74 }
75 }
76}
77
78pub struct ValidatorUpdater<Env>
79where
80 Env: Environment,
81{
82 pub remote_node: RemoteNode<Env::ValidatorNode>,
83 pub client: Arc<Client<Env>>,
84 pub admin_id: ChainId,
85}
86
87impl<Env: Environment> Clone for ValidatorUpdater<Env> {
88 fn clone(&self) -> Self {
89 ValidatorUpdater {
90 remote_node: self.remote_node.clone(),
91 client: self.client.clone(),
92 admin_id: self.admin_id,
93 }
94 }
95}
96
97#[derive(Error, Debug)]
99pub enum CommunicationError<E: fmt::Debug> {
100 #[error(
103 "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
104 )]
105 NoConsensus(u64, Vec<(u64, usize)>),
106 #[error("Failed to communicate with a quorum of validators: {0}")]
109 Trusted(E),
110 #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
113 Sample(Vec<(E, u64)>),
114}
115
116pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
123 validator_clients: &'a [RemoteNode<A>],
124 committee: &Committee,
125 group_by: G,
126 execute: F,
127 quorum_grace_period: f64,
129) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
130where
131 A: ValidatorNode + Clone + 'static,
132 F: Clone + Fn(RemoteNode<A>) -> R,
133 R: Future<Output = Result<V, chain_client::Error>> + 'a,
134 G: Fn(&V) -> K,
135 K: Hash + PartialEq + Eq + Clone + 'static,
136 V: 'static,
137{
138 let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
139 .iter()
140 .filter_map(|remote_node| {
141 if committee.weight(&remote_node.public_key) == 0 {
142 return None;
145 }
146 let execute = execute.clone();
147 let remote_node = remote_node.clone();
148 Some(async move { (remote_node.public_key, execute(remote_node).await) })
149 })
150 .collect();
151
152 let start_time = Instant::now();
153 let mut end_time: Option<Instant> = None;
154 let mut remaining_votes = committee.total_votes();
155 let mut highest_key_score = 0;
156 let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
157 let mut error_scores = HashMap::new();
158
159 'vote_wait: while let Ok(Some((name, result))) = timeout(
160 end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
161 responses.next(),
162 )
163 .await
164 {
165 remaining_votes -= committee.weight(&name);
166 match result {
167 Ok(value) => {
168 let key = group_by(&value);
169 let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
170 entry.0 += committee.weight(&name);
171 entry.1.push((name, value));
172 highest_key_score = highest_key_score.max(entry.0);
173 }
174 Err(err) => {
175 let err = match err {
177 chain_client::Error::RemoteNodeError(err) => err,
178 err => NodeError::ResponseHandlingError {
179 error: err.to_string(),
180 },
181 };
182 let entry = error_scores.entry(err.clone()).or_insert(0);
183 *entry += committee.weight(&name);
184 if *entry >= committee.validity_threshold() {
185 return Err(CommunicationError::Trusted(err));
188 }
189 }
190 }
191 if highest_key_score + remaining_votes < committee.quorum_threshold() {
193 break 'vote_wait;
194 }
195
196 if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
199 end_time = Some(Instant::now() + start_time.elapsed().mul_f64(quorum_grace_period));
200 }
201 }
202
203 let scores = value_scores
204 .values()
205 .map(|(weight, values)| (*weight, values.len()))
206 .collect();
207 if let Some((key, (_, values))) = value_scores
209 .into_iter()
210 .find(|(_, (score, _))| *score >= committee.quorum_threshold())
211 {
212 return Ok((key, values));
213 }
214
215 if error_scores.is_empty() {
216 return Err(CommunicationError::NoConsensus(
217 committee.quorum_threshold(),
218 scores,
219 ));
220 }
221
222 let mut sample = error_scores.into_iter().collect::<Vec<_>>();
224 sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
225 sample.truncate(4);
226 Err(CommunicationError::Sample(sample))
227}
228
229impl<Env> ValidatorUpdater<Env>
230where
231 Env: Environment + 'static,
232{
233 #[instrument(
234 level = "trace", skip_all, err(level = Level::WARN),
235 fields(chain_id = %certificate.block().header.chain_id)
236 )]
237 async fn send_confirmed_certificate(
238 &mut self,
239 certificate: GenericCertificate<ConfirmedBlock>,
240 delivery: CrossChainMessageDelivery,
241 ) -> Result<Box<ChainInfo>, chain_client::Error> {
242 let mut result = self
243 .remote_node
244 .handle_optimized_confirmed_certificate(&certificate, delivery)
245 .await;
246
247 let mut sent_admin_chain = false;
248 let mut sent_blobs = false;
249 loop {
250 match result {
251 Err(NodeError::EventsNotFound(event_ids))
252 if !sent_admin_chain
253 && certificate.inner().chain_id() != self.admin_id
254 && event_ids.iter().all(|event_id| {
255 event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
256 && event_id.chain_id == self.admin_id
257 }) =>
258 {
259 self.update_admin_chain().await?;
261 sent_admin_chain = true;
262 }
263 Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
264 self.remote_node
266 .check_blobs_not_found(&certificate, &blob_ids)?;
267 let maybe_blobs = self
269 .client
270 .local_node
271 .read_blobs_from_storage(&blob_ids)
272 .await?;
273 let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
274 self.remote_node.node.upload_blobs(blobs).await?;
275 sent_blobs = true;
276 }
277 result => return Ok(result?),
278 }
279 result = self
280 .remote_node
281 .handle_confirmed_certificate(certificate.clone(), delivery)
282 .await;
283 }
284 }
285
286 async fn send_validated_certificate(
287 &mut self,
288 certificate: GenericCertificate<ValidatedBlock>,
289 delivery: CrossChainMessageDelivery,
290 ) -> Result<Box<ChainInfo>, chain_client::Error> {
291 let result = self
292 .remote_node
293 .handle_optimized_validated_certificate(&certificate, delivery)
294 .await;
295
296 let chain_id = certificate.inner().chain_id();
297 match &result {
298 Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
299 self.remote_node
300 .check_blobs_not_found(&certificate, blob_ids)?;
301 let blobs = self
304 .client
305 .local_node
306 .get_locking_blobs(blob_ids, chain_id)
307 .await?
308 .ok_or_else(|| original_err.clone())?;
309 self.remote_node.send_pending_blobs(chain_id, blobs).await?;
310 }
311 Err(error) => {
312 self.sync_if_needed(
313 chain_id,
314 certificate.round,
315 certificate.block().header.height,
316 error,
317 )
318 .await?;
319 }
320 _ => return Ok(result?),
321 }
322 Ok(self
323 .remote_node
324 .handle_validated_certificate(certificate)
325 .await?)
326 }
327
328 async fn request_timeout(
333 &mut self,
334 chain_id: ChainId,
335 round: Round,
336 height: BlockHeight,
337 ) -> Result<Box<ChainInfo>, chain_client::Error> {
338 let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
339 let result = self
340 .remote_node
341 .handle_chain_info_query(query.clone())
342 .await;
343 if let Err(err) = &result {
344 self.sync_if_needed(chain_id, round, height, err).await?;
345 }
346 Ok(result?)
347 }
348
349 async fn sync_if_needed(
351 &mut self,
352 chain_id: ChainId,
353 round: Round,
354 height: BlockHeight,
355 error: &NodeError,
356 ) -> Result<(), chain_client::Error> {
357 let address = &self.remote_node.address();
358 match error {
359 NodeError::WrongRound(validator_round) if *validator_round > round => {
360 tracing::debug!(
361 address, %chain_id, %validator_round, %round,
362 "validator is at a higher round; synchronizing",
363 );
364 self.client
365 .synchronize_chain_state_from(&self.remote_node, chain_id)
366 .await?;
367 }
368 NodeError::UnexpectedBlockHeight {
369 expected_block_height,
370 found_block_height,
371 } if expected_block_height > found_block_height => {
372 tracing::debug!(
373 address,
374 %chain_id,
375 %expected_block_height,
376 %found_block_height,
377 "validator is at a higher height; synchronizing",
378 );
379 self.client
380 .synchronize_chain_state_from(&self.remote_node, chain_id)
381 .await?;
382 }
383 NodeError::WrongRound(validator_round) if *validator_round < round => {
384 tracing::debug!(
385 address, %chain_id, %validator_round, %round,
386 "validator is at a lower round; sending chain info",
387 );
388 self.send_chain_information(
389 chain_id,
390 height,
391 CrossChainMessageDelivery::NonBlocking,
392 None,
393 )
394 .await?;
395 }
396 NodeError::UnexpectedBlockHeight {
397 expected_block_height,
398 found_block_height,
399 } if expected_block_height < found_block_height => {
400 tracing::debug!(
401 address,
402 %chain_id,
403 %expected_block_height,
404 %found_block_height,
405 "Validator is at a lower height; sending chain info.",
406 );
407 self.send_chain_information(
408 chain_id,
409 height,
410 CrossChainMessageDelivery::NonBlocking,
411 None,
412 )
413 .await?;
414 }
415 NodeError::InactiveChain(chain_id) => {
416 tracing::debug!(
417 address,
418 %chain_id,
419 "Validator has inactive chain; sending chain info.",
420 );
421 self.send_chain_information(
422 *chain_id,
423 height,
424 CrossChainMessageDelivery::NonBlocking,
425 None,
426 )
427 .await?;
428 }
429 _ => {}
430 }
431 Ok(())
432 }
433
434 async fn send_block_proposal(
435 &mut self,
436 proposal: Box<BlockProposal>,
437 mut blob_ids: Vec<BlobId>,
438 ) -> Result<Box<ChainInfo>, chain_client::Error> {
439 let chain_id = proposal.content.block.chain_id;
440 let mut sent_cross_chain_updates = BTreeMap::new();
441 let mut publisher_chain_ids_sent = BTreeSet::new();
442 loop {
443 match self
444 .remote_node
445 .handle_block_proposal(proposal.clone())
446 .await
447 {
448 Ok(info) => return Ok(info),
449 Err(NodeError::WrongRound(_round)) => {
450 tracing::debug!(
453 remote_node = self.remote_node.address(),
454 %chain_id,
455 "wrong round; sending chain to validator",
456 );
457 self.send_chain_information(
458 chain_id,
459 proposal.content.block.height,
460 CrossChainMessageDelivery::NonBlocking,
461 None,
462 )
463 .await?;
464 }
465 Err(NodeError::UnexpectedBlockHeight {
466 expected_block_height,
467 found_block_height,
468 }) if expected_block_height < found_block_height
469 && found_block_height == proposal.content.block.height =>
470 {
471 tracing::debug!(
472 remote_node = self.remote_node.address(),
473 %chain_id,
474 "wrong height; sending chain to validator",
475 );
476 self.send_chain_information(
478 chain_id,
479 found_block_height,
480 CrossChainMessageDelivery::NonBlocking,
481 None,
482 )
483 .await?;
484 }
485 Err(NodeError::MissingCrossChainUpdate {
486 chain_id,
487 origin,
488 height,
489 }) if chain_id == proposal.content.block.chain_id
490 && sent_cross_chain_updates
491 .get(&origin)
492 .is_none_or(|h| *h < height) =>
493 {
494 tracing::debug!(
495 remote_node = %self.remote_node.address(),
496 chain_id = %origin,
497 "Missing cross-chain update; sending chain to validator.",
498 );
499 sent_cross_chain_updates.insert(origin, height);
500 self.send_chain_information(
504 origin,
505 height.try_add_one()?,
506 CrossChainMessageDelivery::Blocking,
507 None,
508 )
509 .await?;
510 }
511 Err(NodeError::EventsNotFound(event_ids)) => {
512 let mut publisher_heights = BTreeMap::new();
513 let chain_ids = event_ids
514 .iter()
515 .map(|event_id| event_id.chain_id)
516 .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
517 .collect::<BTreeSet<_>>();
518 tracing::debug!(
519 remote_node = self.remote_node.address(),
520 ?chain_ids,
521 "missing events; sending chains to validator",
522 );
523 ensure!(!chain_ids.is_empty(), NodeError::EventsNotFound(event_ids));
524 for chain_id in chain_ids {
525 let height = self
526 .client
527 .local_node
528 .get_next_height_to_preprocess(chain_id)
529 .await?;
530 publisher_heights.insert(chain_id, height);
531 publisher_chain_ids_sent.insert(chain_id);
532 }
533 self.send_chain_info_up_to_heights(
534 publisher_heights,
535 CrossChainMessageDelivery::NonBlocking,
536 )
537 .await?;
538 }
539 Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
540 if !blob_ids.is_empty() =>
541 {
542 tracing::debug!("Missing blobs");
543 let published_blob_ids =
547 BTreeSet::from_iter(proposal.content.block.published_blob_ids());
548 blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
549 let published_blobs = self
550 .client
551 .local_node
552 .get_proposed_blobs(chain_id, published_blob_ids.into_iter().collect())
553 .await?;
554 self.remote_node
555 .send_pending_blobs(chain_id, published_blobs)
556 .await?;
557 let missing_blob_ids = self
558 .remote_node
559 .node
560 .missing_blob_ids(mem::take(&mut blob_ids))
561 .await?;
562 let blob_states = self
563 .client
564 .local_node
565 .read_blob_states_from_storage(&missing_blob_ids)
566 .await?;
567 let mut chain_heights = BTreeMap::new();
568 for blob_state in blob_states {
569 let block_chain_id = blob_state.chain_id;
570 let block_height = blob_state.block_height.try_add_one()?;
571 chain_heights
572 .entry(block_chain_id)
573 .and_modify(|h| *h = block_height.max(*h))
574 .or_insert(block_height);
575 }
576 tracing::debug!("Sending chains {chain_heights:?}");
577
578 self.send_chain_info_up_to_heights(
579 chain_heights,
580 CrossChainMessageDelivery::NonBlocking,
581 )
582 .await?;
583 }
584 Err(err) => return Err(err.into()),
586 }
587 }
588 }
589
590 async fn update_admin_chain(&mut self) -> Result<(), chain_client::Error> {
591 let local_admin_info = self.client.local_node.chain_info(self.admin_id).await?;
592 Box::pin(self.send_chain_information(
593 self.admin_id,
594 local_admin_info.next_block_height,
595 CrossChainMessageDelivery::NonBlocking,
596 None,
597 ))
598 .await
599 }
600
601 pub async fn send_chain_information(
602 &mut self,
603 chain_id: ChainId,
604 target_block_height: BlockHeight,
605 delivery: CrossChainMessageDelivery,
606 latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
607 ) -> Result<(), chain_client::Error> {
608 let info = if let Ok(height) = target_block_height.try_sub_one() {
609 let certificate = if let Some(cert) = latest_certificate {
612 cert
613 } else {
614 let hash = self
615 .client
616 .local_node
617 .get_block_hashes(chain_id, vec![height])
618 .await?
619 .into_iter()
620 .next()
621 .ok_or_else(|| {
622 chain_client::Error::InternalError(
623 "send_chain_information called with invalid target_block_height",
624 )
625 })?;
626 self.client
627 .local_node
628 .storage_client()
629 .read_certificate(hash)
630 .await?
631 .ok_or_else(|| chain_client::Error::MissingConfirmedBlock(hash))?
632 };
633 let info = match self.send_confirmed_certificate(certificate, delivery).await {
634 Ok(info) => info,
635 Err(error) => {
636 tracing::debug!(
637 address = self.remote_node.address(), %error,
638 "validator failed to handle confirmed certificate; sending whole chain",
639 );
640 let query = ChainInfoQuery::new(chain_id);
641 self.remote_node.handle_chain_info_query(query).await?
642 }
643 };
644 let heights: Vec<_> = (info.next_block_height.0..target_block_height.0)
646 .map(BlockHeight)
647 .collect();
648 let validator_missing_hashes = self
649 .client
650 .local_node
651 .get_block_hashes(chain_id, heights)
652 .await?;
653 if !validator_missing_hashes.is_empty() {
654 let certificates = self
656 .client
657 .local_node
658 .storage_client()
659 .read_certificates(validator_missing_hashes.clone())
660 .await?;
661 let certificates =
662 match ResultReadCertificates::new(certificates, validator_missing_hashes) {
663 ResultReadCertificates::Certificates(certificates) => certificates,
664 ResultReadCertificates::InvalidHashes(hashes) => {
665 return Err(chain_client::Error::ReadCertificatesError(hashes))
666 }
667 };
668 for certificate in certificates {
669 self.send_confirmed_certificate(certificate, delivery)
670 .await?;
671 }
672 }
673 info
674 } else {
675 let blob_states = self
677 .client
678 .local_node
679 .read_blob_states_from_storage(&[BlobId::new(
680 chain_id.0,
681 BlobType::ChainDescription,
682 )])
683 .await?;
684 let mut chain_heights = BTreeMap::new();
685 for blob_state in blob_states {
686 let block_chain_id = blob_state.chain_id;
687 let block_height = blob_state.block_height.try_add_one()?;
688 chain_heights
689 .entry(block_chain_id)
690 .and_modify(|h| *h = block_height.max(*h))
691 .or_insert(block_height);
692 }
693 self.send_chain_info_up_to_heights(
694 chain_heights,
695 CrossChainMessageDelivery::NonBlocking,
696 )
697 .await?;
698 let query = ChainInfoQuery::new(chain_id);
699 self.remote_node.handle_chain_info_query(query).await?
700 };
701 let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
702 let query = ChainInfoQuery::new(chain_id).with_manager_values();
703 let local_info = match self.client.local_node.handle_chain_info_query(query).await {
704 Ok(response) => response.info,
705 Err(LocalNodeError::BlobsNotFound(_)) => return Ok(()),
707 Err(error) => return Err(error.into()),
708 };
709 let manager = local_info.manager;
710 if local_info.next_block_height != remote_height || manager.current_round <= remote_round {
711 return Ok(());
712 }
713 for proposal in manager
716 .requested_proposed
717 .into_iter()
718 .chain(manager.requested_signed_proposal)
719 {
720 if proposal.content.round == manager.current_round {
721 if let Err(error) = self.remote_node.handle_block_proposal(proposal).await {
722 tracing::info!(%error, "failed to send block proposal");
723 } else {
724 return Ok(());
725 }
726 }
727 }
728 if let Some(LockingBlock::Regular(validated)) = manager.requested_locking.map(|b| *b) {
729 if validated.round == manager.current_round {
730 if let Err(error) = self
731 .remote_node
732 .handle_optimized_validated_certificate(
733 &validated,
734 CrossChainMessageDelivery::NonBlocking,
735 )
736 .await
737 {
738 tracing::info!(%error, "failed to send locking block");
739 } else {
740 return Ok(());
741 }
742 }
743 }
744 if let Some(cert) = manager.timeout {
745 if cert.round >= remote_round {
746 tracing::debug!(round = %cert.round, "sending timeout");
747 self.remote_node.handle_timeout_certificate(*cert).await?;
748 }
749 }
750 Ok(())
751 }
752
753 async fn send_chain_info_up_to_heights(
754 &mut self,
755 chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
756 delivery: CrossChainMessageDelivery,
757 ) -> Result<(), chain_client::Error> {
758 FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, height)| {
759 let mut updater = self.clone();
760 async move {
761 updater
762 .send_chain_information(chain_id, height, delivery, None)
763 .await
764 }
765 }))
766 .try_collect::<Vec<_>>()
767 .await?;
768 Ok(())
769 }
770
771 pub async fn send_chain_update(
772 &mut self,
773 action: CommunicateAction,
774 ) -> Result<LiteVote, chain_client::Error> {
775 let chain_id = match &action {
776 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
777 CommunicateAction::FinalizeBlock { certificate, .. } => {
778 certificate.inner().block().header.chain_id
779 }
780 CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
781 };
782 let vote = match action {
784 CommunicateAction::SubmitBlock { proposal, blob_ids } => {
785 let info = self.send_block_proposal(proposal, blob_ids).await?;
786 info.manager.pending.ok_or_else(|| {
787 NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
788 })?
789 }
790 CommunicateAction::FinalizeBlock {
791 certificate,
792 delivery,
793 } => {
794 let info = self
795 .send_validated_certificate(*certificate, delivery)
796 .await?;
797 info.manager.pending.ok_or_else(|| {
798 NodeError::MissingVoteInValidatorResponse("finalize a block".into())
799 })?
800 }
801 CommunicateAction::RequestTimeout { round, height, .. } => {
802 let info = self.request_timeout(chain_id, round, height).await?;
803 info.manager.timeout_vote.ok_or_else(|| {
804 NodeError::MissingVoteInValidatorResponse("request a timeout".into())
805 })?
806 }
807 };
808 vote.check(self.remote_node.public_key)?;
809 Ok(vote)
810 }
811}