1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt,
8 hash::Hash,
9 mem,
10};
11
12use futures::{
13 stream::{FuturesUnordered, TryStreamExt},
14 Future, StreamExt,
15};
16use linera_base::{
17 crypto::ValidatorPublicKey,
18 data_types::{BlockHeight, Round},
19 ensure,
20 identifiers::{BlobId, ChainId, StreamId},
21 time::{timer::timeout, Duration, Instant},
22};
23use linera_chain::{
24 data_types::{BlockProposal, LiteVote},
25 types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
26};
27use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
28use linera_storage::{ResultReadCertificates, Storage};
29use thiserror::Error;
30use tracing::{instrument, Level};
31
32use crate::{
33 client::ChainClientError,
34 data_types::{ChainInfo, ChainInfoQuery},
35 local_node::LocalNodeClient,
36 node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
37 remote_node::RemoteNode,
38};
39
40pub const DEFAULT_GRACE_PERIOD: f64 = 0.2;
43const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); #[derive(Clone)]
48pub enum CommunicateAction {
49 SubmitBlock {
50 proposal: Box<BlockProposal>,
51 blob_ids: Vec<BlobId>,
52 },
53 FinalizeBlock {
54 certificate: Box<ValidatedBlockCertificate>,
55 delivery: CrossChainMessageDelivery,
56 },
57 RequestTimeout {
58 chain_id: ChainId,
59 height: BlockHeight,
60 round: Round,
61 },
62}
63
64impl CommunicateAction {
65 pub fn round(&self) -> Round {
67 match self {
68 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
69 CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
70 CommunicateAction::RequestTimeout { round, .. } => *round,
71 }
72 }
73}
74
75#[derive(Clone)]
76pub struct ValidatorUpdater<A, S>
77where
78 S: Storage,
79{
80 pub remote_node: RemoteNode<A>,
81 pub local_node: LocalNodeClient<S>,
82 pub admin_id: ChainId,
83}
84
85#[derive(Error, Debug)]
87pub enum CommunicationError<E: fmt::Debug> {
88 #[error(
91 "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
92 )]
93 NoConsensus(u64, Vec<(u64, usize)>),
94 #[error("Failed to communicate with a quorum of validators: {0}")]
97 Trusted(E),
98 #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
101 Sample(Vec<(E, u64)>),
102}
103
104pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
110 validator_clients: &'a [RemoteNode<A>],
111 committee: &Committee,
112 group_by: G,
113 execute: F,
114 grace_period: f64,
116) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
117where
118 A: ValidatorNode + Clone + 'static,
119 F: Clone + Fn(RemoteNode<A>) -> R,
120 R: Future<Output = Result<V, ChainClientError>> + 'a,
121 G: Fn(&V) -> K,
122 K: Hash + PartialEq + Eq + Clone + 'static,
123 V: 'static,
124{
125 let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
126 .iter()
127 .filter_map(|remote_node| {
128 if committee.weight(&remote_node.public_key) == 0 {
129 return None;
132 }
133 let execute = execute.clone();
134 let remote_node = remote_node.clone();
135 Some(async move { (remote_node.public_key, execute(remote_node).await) })
136 })
137 .collect();
138
139 let start_time = Instant::now();
140 let mut end_time: Option<Instant> = None;
141 let mut remaining_votes = committee.total_votes();
142 let mut highest_key_score = 0;
143 let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
144 let mut error_scores = HashMap::new();
145
146 'vote_wait: while let Ok(Some((name, result))) = timeout(
147 end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
148 responses.next(),
149 )
150 .await
151 {
152 remaining_votes -= committee.weight(&name);
153 match result {
154 Ok(value) => {
155 let key = group_by(&value);
156 let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
157 entry.0 += committee.weight(&name);
158 entry.1.push((name, value));
159 highest_key_score = highest_key_score.max(entry.0);
160 }
161 Err(err) => {
162 let err = match err {
164 ChainClientError::RemoteNodeError(err) => err,
165 err => NodeError::ResponseHandlingError {
166 error: err.to_string(),
167 },
168 };
169 let entry = error_scores.entry(err.clone()).or_insert(0);
170 *entry += committee.weight(&name);
171 if *entry >= committee.validity_threshold() {
172 return Err(CommunicationError::Trusted(err));
175 }
176 }
177 }
178 if highest_key_score + remaining_votes < committee.quorum_threshold() {
180 break 'vote_wait;
181 }
182
183 if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
186 end_time = Some(Instant::now() + start_time.elapsed().mul_f64(grace_period));
187 }
188 }
189
190 let scores = value_scores
191 .values()
192 .map(|(weight, values)| (*weight, values.len()))
193 .collect();
194 if let Some((key, (_, values))) = value_scores
196 .into_iter()
197 .find(|(_, (score, _))| *score >= committee.quorum_threshold())
198 {
199 return Ok((key, values));
200 }
201
202 if error_scores.is_empty() {
203 return Err(CommunicationError::NoConsensus(
204 committee.quorum_threshold(),
205 scores,
206 ));
207 }
208
209 let mut sample = error_scores.into_iter().collect::<Vec<_>>();
211 sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
212 sample.truncate(4);
213 Err(CommunicationError::Sample(sample))
214}
215
216impl<A, S> ValidatorUpdater<A, S>
217where
218 A: ValidatorNode + Clone + 'static,
219 S: Storage + Clone + Send + Sync + 'static,
220{
221 #[instrument(
222 level = "trace", skip_all, err(level = Level::WARN),
223 fields(chain_id = %certificate.block().header.chain_id)
224 )]
225 async fn send_confirmed_certificate(
226 &mut self,
227 certificate: GenericCertificate<ConfirmedBlock>,
228 delivery: CrossChainMessageDelivery,
229 ) -> Result<Box<ChainInfo>, ChainClientError> {
230 let mut result = self
231 .remote_node
232 .handle_optimized_confirmed_certificate(&certificate, delivery)
233 .await;
234
235 let mut sent_admin_chain = false;
236 let mut sent_blobs = false;
237 loop {
238 result = match result {
239 Err(NodeError::EventsNotFound(event_ids))
240 if !sent_admin_chain
241 && certificate.inner().chain_id() != self.admin_id
242 && event_ids.iter().all(|event_id| {
243 event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
244 && event_id.chain_id == self.admin_id
245 }) =>
246 {
247 self.update_admin_chain().await?;
249 sent_admin_chain = true;
250 self.remote_node
251 .handle_confirmed_certificate(certificate.clone(), delivery)
252 .await
253 }
254 Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
255 self.remote_node
257 .check_blobs_not_found(&certificate, &blob_ids)?;
258 let maybe_blobs = self.local_node.read_blobs_from_storage(&blob_ids).await?;
260 let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
261 self.remote_node.node.upload_blobs(blobs).await?;
262 sent_blobs = true;
263 self.remote_node
264 .handle_confirmed_certificate(certificate.clone(), delivery)
265 .await
266 }
267 result => return Ok(result?),
268 };
269 }
270 }
271
272 async fn send_validated_certificate(
273 &mut self,
274 certificate: GenericCertificate<ValidatedBlock>,
275 delivery: CrossChainMessageDelivery,
276 ) -> Result<Box<ChainInfo>, ChainClientError> {
277 let result = self
278 .remote_node
279 .handle_optimized_validated_certificate(&certificate, delivery)
280 .await;
281
282 Ok(match &result {
283 Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
284 self.remote_node
285 .check_blobs_not_found(&certificate, blob_ids)?;
286 let chain_id = certificate.inner().chain_id();
287 let blobs = self
290 .local_node
291 .get_locking_blobs(blob_ids, chain_id)
292 .await?
293 .ok_or_else(|| original_err.clone())?;
294 self.remote_node.send_pending_blobs(chain_id, blobs).await?;
295 self.remote_node
296 .handle_validated_certificate(certificate)
297 .await
298 }
299 _ => result,
300 }?)
301 }
302
303 async fn send_block_proposal(
304 &mut self,
305 proposal: Box<BlockProposal>,
306 mut blob_ids: Vec<BlobId>,
307 ) -> Result<Box<ChainInfo>, ChainClientError> {
308 let chain_id = proposal.content.block.chain_id;
309 let mut sent_cross_chain_updates = false;
310 let mut publisher_chain_ids_sent = BTreeSet::new();
311 loop {
312 match self
313 .remote_node
314 .handle_block_proposal(proposal.clone())
315 .await
316 {
317 Ok(info) => return Ok(info),
318 Err(NodeError::WrongRound(_round)) => {
319 self.send_chain_information(
322 chain_id,
323 proposal.content.block.height,
324 CrossChainMessageDelivery::NonBlocking,
325 )
326 .await?;
327 }
328 Err(NodeError::UnexpectedBlockHeight {
329 expected_block_height,
330 found_block_height,
331 }) if expected_block_height < found_block_height => {
332 self.send_chain_information(
334 chain_id,
335 found_block_height,
336 CrossChainMessageDelivery::NonBlocking,
337 )
338 .await?;
339 }
340 Err(NodeError::MissingCrossChainUpdate { .. }) if !sent_cross_chain_updates => {
341 self.send_chain_information_for_senders(chain_id).await?;
345 sent_cross_chain_updates = true;
346 }
347 Err(NodeError::EventsNotFound(event_ids)) => {
348 let mut publisher_heights = BTreeMap::new();
349 let new_chain_ids = event_ids
350 .iter()
351 .map(|event_id| event_id.chain_id)
352 .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
353 .collect::<BTreeSet<_>>();
354 ensure!(
355 !new_chain_ids.is_empty(),
356 NodeError::EventsNotFound(event_ids)
357 );
358 for chain_id in new_chain_ids {
359 let height = self
360 .local_node
361 .chain_state_view(chain_id)
362 .await?
363 .next_height_to_preprocess()
364 .await?;
365 publisher_heights.insert(chain_id, height);
366 publisher_chain_ids_sent.insert(chain_id);
367 }
368 self.send_chain_info_up_to_heights(
369 publisher_heights,
370 CrossChainMessageDelivery::NonBlocking,
371 )
372 .await?;
373 }
374 Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
375 if !blob_ids.is_empty() =>
376 {
377 let published_blob_ids =
381 BTreeSet::from_iter(proposal.content.block.published_blob_ids());
382 blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
383 let mut published_blobs = Vec::new();
384 {
385 let chain = self.local_node.chain_state_view(chain_id).await?;
386 for blob_id in published_blob_ids {
387 published_blobs
388 .extend(chain.manager.proposed_blobs.get(&blob_id).await?);
389 }
390 }
391 self.remote_node
392 .send_pending_blobs(chain_id, published_blobs)
393 .await?;
394 let missing_blob_ids = self
395 .remote_node
396 .node
397 .missing_blob_ids(mem::take(&mut blob_ids))
398 .await?;
399 let blob_states = self
400 .local_node
401 .read_blob_states_from_storage(&missing_blob_ids)
402 .await?;
403 let mut chain_heights = BTreeMap::new();
404 for blob_state in blob_states {
405 let block_chain_id = blob_state.chain_id;
406 let block_height = blob_state.block_height.try_add_one()?;
407 chain_heights
408 .entry(block_chain_id)
409 .and_modify(|h| *h = block_height.max(*h))
410 .or_insert(block_height);
411 }
412
413 self.send_chain_info_up_to_heights(
414 chain_heights,
415 CrossChainMessageDelivery::NonBlocking,
416 )
417 .await?;
418 }
419 Err(err) => return Err(err.into()),
421 }
422 }
423 }
424
425 async fn update_admin_chain(&mut self) -> Result<(), ChainClientError> {
426 let local_admin_info = self.local_node.chain_info(self.admin_id).await?;
427 Box::pin(self.send_chain_information(
428 self.admin_id,
429 local_admin_info.next_block_height,
430 CrossChainMessageDelivery::NonBlocking,
431 ))
432 .await
433 }
434
435 pub async fn send_chain_information(
436 &mut self,
437 chain_id: ChainId,
438 target_block_height: BlockHeight,
439 delivery: CrossChainMessageDelivery,
440 ) -> Result<(), ChainClientError> {
441 let Ok(height) = target_block_height.try_sub_one() else {
442 if let Some(cert) = self.local_node.chain_info(chain_id).await?.manager.timeout {
443 self.remote_node.handle_timeout_certificate(*cert).await?;
444 }
445 return Ok(());
446 };
447 let hash = self
450 .local_node
451 .chain_state_view(chain_id)
452 .await?
453 .block_hashes(height..=height)
454 .await?
455 .into_iter()
456 .next()
457 .ok_or_else(|| {
458 ChainClientError::InternalError(
459 "send_chain_information called with invalid target_block_height",
460 )
461 })?;
462 let certificate = self
463 .local_node
464 .storage_client()
465 .read_certificate(hash)
466 .await?
467 .ok_or_else(|| ChainClientError::MissingConfirmedBlock(hash))?;
468 let info = match self.send_confirmed_certificate(certificate, delivery).await {
469 Err(ChainClientError::RemoteNodeError(NodeError::EventsNotFound(event_ids)))
470 if event_ids.iter().all(|event_id| {
471 event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
472 && event_id.chain_id == self.admin_id
473 }) =>
474 {
475 let query = ChainInfoQuery::new(chain_id);
477 self.remote_node.handle_chain_info_query(query).await?
478 }
479 Err(err) => return Err(err),
480 Ok(info) => info,
481 };
482 let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
483 let range = remote_height..target_block_height;
485 let validator_missing_hashes = self
486 .local_node
487 .chain_state_view(chain_id)
488 .await?
489 .block_hashes(range)
490 .await?;
491 if !validator_missing_hashes.is_empty() {
492 let certificates = self
494 .local_node
495 .storage_client()
496 .read_certificates(validator_missing_hashes.clone())
497 .await?;
498 let certificates =
499 match ResultReadCertificates::new(certificates, validator_missing_hashes) {
500 ResultReadCertificates::Certificates(certificates) => certificates,
501 ResultReadCertificates::InvalidHashes(hashes) => {
502 return Err(ChainClientError::ReadCertificatesError(hashes))
503 }
504 };
505 for certificate in certificates {
506 self.send_confirmed_certificate(certificate, delivery)
507 .await?;
508 }
509 }
510 let local_info = self.local_node.chain_info(chain_id).await?;
512 if let Some(cert) = local_info.manager.timeout {
513 if (local_info.next_block_height, cert.round) >= (remote_height, remote_round) {
514 self.remote_node.handle_timeout_certificate(*cert).await?;
515 }
516 }
517 Ok(())
518 }
519
520 async fn send_chain_info_up_to_heights(
521 &mut self,
522 chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
523 delivery: CrossChainMessageDelivery,
524 ) -> Result<(), ChainClientError> {
525 FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, height)| {
526 let mut updater = self.clone();
527 async move {
528 updater
529 .send_chain_information(chain_id, height, delivery)
530 .await
531 }
532 }))
533 .try_collect::<Vec<_>>()
534 .await?;
535 Ok(())
536 }
537
538 async fn send_chain_information_for_senders(
540 &mut self,
541 chain_id: ChainId,
542 ) -> Result<(), ChainClientError> {
543 let sender_heights = self
544 .local_node
545 .chain_state_view(chain_id)
546 .await?
547 .inboxes
548 .try_load_all_entries()
549 .await?
550 .iter()
551 .map(|(origin, inbox)| {
552 let next_height = inbox.next_block_height_to_receive()?;
553 Ok((*origin, next_height))
554 })
555 .collect::<Result<Vec<(ChainId, BlockHeight)>, ChainClientError>>()?;
556
557 self.send_chain_info_up_to_heights(sender_heights, CrossChainMessageDelivery::Blocking)
558 .await?;
559 Ok(())
560 }
561
562 pub async fn send_chain_update(
563 &mut self,
564 action: CommunicateAction,
565 ) -> Result<LiteVote, ChainClientError> {
566 let chain_id = match &action {
567 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
568 CommunicateAction::FinalizeBlock { certificate, .. } => {
569 certificate.inner().block().header.chain_id
570 }
571 CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
572 };
573 let vote = match action {
575 CommunicateAction::SubmitBlock { proposal, blob_ids } => {
576 let info = self.send_block_proposal(proposal, blob_ids).await?;
577 info.manager.pending.ok_or_else(|| {
578 NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
579 })?
580 }
581 CommunicateAction::FinalizeBlock {
582 certificate,
583 delivery,
584 } => {
585 let info = self
586 .send_validated_certificate(*certificate, delivery)
587 .await?;
588 info.manager.pending.ok_or_else(|| {
589 NodeError::MissingVoteInValidatorResponse("finalize a block".into())
590 })?
591 }
592 CommunicateAction::RequestTimeout { round, height, .. } => {
593 let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
594 let info = self.remote_node.handle_chain_info_query(query).await?;
595 info.manager.timeout_vote.ok_or_else(|| {
596 NodeError::MissingVoteInValidatorResponse("request a timeout".into())
597 })?
598 }
599 };
600 vote.check(self.remote_node.public_key)?;
601 Ok(vote)
602 }
603}