1use std::{
6 collections::{BTreeMap, BTreeSet, HashMap},
7 fmt,
8 hash::Hash,
9 mem,
10};
11
12use futures::{stream, stream::TryStreamExt, Future, StreamExt};
13use linera_base::{
14 data_types::{BlockHeight, Round},
15 identifiers::{BlobId, ChainId},
16 time::{timer::timeout, Duration, Instant},
17};
18use linera_chain::{
19 data_types::{BlockProposal, LiteVote},
20 types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
21};
22use linera_execution::committee::Committee;
23use linera_storage::Storage;
24use thiserror::Error;
25
26use crate::{
27 client::ChainClientError,
28 data_types::{ChainInfo, ChainInfoQuery},
29 local_node::LocalNodeClient,
30 node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
31 remote_node::RemoteNode,
32};
33
34pub const DEFAULT_GRACE_PERIOD: f64 = 0.2;
37const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); #[derive(Clone)]
42pub enum CommunicateAction {
43 SubmitBlock {
44 proposal: Box<BlockProposal>,
45 blob_ids: Vec<BlobId>,
46 },
47 FinalizeBlock {
48 certificate: Box<ValidatedBlockCertificate>,
49 delivery: CrossChainMessageDelivery,
50 },
51 RequestTimeout {
52 chain_id: ChainId,
53 height: BlockHeight,
54 round: Round,
55 },
56}
57
58impl CommunicateAction {
59 pub fn round(&self) -> Round {
61 match self {
62 CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
63 CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
64 CommunicateAction::RequestTimeout { round, .. } => *round,
65 }
66 }
67}
68
69#[derive(Clone)]
70pub struct ValidatorUpdater<A, S>
71where
72 S: Storage,
73{
74 pub chain_worker_count: usize,
75 pub remote_node: RemoteNode<A>,
76 pub local_node: LocalNodeClient<S>,
77}
78
79#[derive(Error, Debug)]
81pub enum CommunicationError<E: fmt::Debug> {
82 #[error(
85 "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
86 )]
87 NoConsensus(u64, Vec<(u64, usize)>),
88 #[error("Failed to communicate with a quorum of validators: {0}")]
91 Trusted(E),
92 #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
95 Sample(Vec<(E, u64)>),
96}
97
98pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
104 validator_clients: &'a [RemoteNode<A>],
105 committee: &Committee,
106 group_by: G,
107 execute: F,
108 grace_period: f64,
110) -> Result<(K, Vec<V>), CommunicationError<NodeError>>
111where
112 A: ValidatorNode + Clone + 'static,
113 F: Clone + Fn(RemoteNode<A>) -> R,
114 R: Future<Output = Result<V, ChainClientError>> + 'a,
115 G: Fn(&V) -> K,
116 K: Hash + PartialEq + Eq + Clone + 'static,
117 V: 'static,
118{
119 let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
120 .iter()
121 .filter_map(|remote_node| {
122 if committee.weight(&remote_node.public_key) == 0 {
123 return None;
126 }
127 let execute = execute.clone();
128 let remote_node = remote_node.clone();
129 Some(async move { (remote_node.public_key, execute(remote_node).await) })
130 })
131 .collect();
132
133 let start_time = Instant::now();
134 let mut end_time: Option<Instant> = None;
135 let mut remaining_votes = committee.total_votes();
136 let mut highest_key_score = 0;
137 let mut value_scores = HashMap::new();
138 let mut error_scores = HashMap::new();
139
140 'vote_wait: while let Ok(Some((name, result))) = timeout(
141 end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
142 responses.next(),
143 )
144 .await
145 {
146 remaining_votes -= committee.weight(&name);
147 match result {
148 Ok(value) => {
149 let key = group_by(&value);
150 let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
151 entry.0 += committee.weight(&name);
152 entry.1.push(value);
153 highest_key_score = highest_key_score.max(entry.0);
154 }
155 Err(err) => {
156 let err = match err {
158 ChainClientError::RemoteNodeError(err) => err,
159 err => NodeError::ResponseHandlingError {
160 error: err.to_string(),
161 },
162 };
163 let entry = error_scores.entry(err.clone()).or_insert(0);
164 *entry += committee.weight(&name);
165 if *entry >= committee.validity_threshold() {
166 return Err(CommunicationError::Trusted(err));
169 }
170 }
171 }
172 if highest_key_score + remaining_votes < committee.quorum_threshold() {
174 break 'vote_wait;
175 }
176
177 if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
180 end_time = Some(Instant::now() + start_time.elapsed().mul_f64(grace_period));
181 }
182 }
183
184 let scores = value_scores
185 .values()
186 .map(|(weight, values)| (*weight, values.len()))
187 .collect();
188 if let Some((key, (_, values))) = value_scores
190 .into_iter()
191 .find(|(_, (score, _))| *score >= committee.quorum_threshold())
192 {
193 return Ok((key, values));
194 }
195
196 if error_scores.is_empty() {
197 return Err(CommunicationError::NoConsensus(
198 committee.quorum_threshold(),
199 scores,
200 ));
201 }
202
203 let mut sample = error_scores.into_iter().collect::<Vec<_>>();
205 sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
206 sample.truncate(4);
207 Err(CommunicationError::Sample(sample))
208}
209
210impl<A, S> ValidatorUpdater<A, S>
211where
212 A: ValidatorNode + Clone + 'static,
213 S: Storage + Clone + Send + Sync + 'static,
214{
215 async fn send_confirmed_certificate(
216 &mut self,
217 certificate: GenericCertificate<ConfirmedBlock>,
218 delivery: CrossChainMessageDelivery,
219 ) -> Result<Box<ChainInfo>, ChainClientError> {
220 let result = self
221 .remote_node
222 .handle_optimized_confirmed_certificate(&certificate, delivery)
223 .await;
224
225 Ok(match &result {
226 Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
227 self.remote_node
228 .check_blobs_not_found(&certificate, blob_ids)?;
229 let maybe_blobs = self.local_node.read_blobs_from_storage(blob_ids).await?;
231 let blobs = maybe_blobs.ok_or_else(|| original_err.clone())?;
232 self.remote_node.upload_blobs(blobs.clone()).await?;
233 self.remote_node
234 .handle_confirmed_certificate(certificate, delivery)
235 .await
236 }
237 _ => result,
238 }?)
239 }
240
241 async fn send_validated_certificate(
242 &mut self,
243 certificate: GenericCertificate<ValidatedBlock>,
244 delivery: CrossChainMessageDelivery,
245 ) -> Result<Box<ChainInfo>, ChainClientError> {
246 let result = self
247 .remote_node
248 .handle_optimized_validated_certificate(&certificate, delivery)
249 .await;
250
251 Ok(match &result {
252 Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
253 self.remote_node
254 .check_blobs_not_found(&certificate, blob_ids)?;
255 let chain_id = certificate.inner().chain_id();
256 let blobs = self
259 .local_node
260 .get_locking_blobs(blob_ids, chain_id)
261 .await?
262 .ok_or_else(|| original_err.clone())?;
263 self.remote_node.send_pending_blobs(chain_id, blobs).await?;
264 self.remote_node
265 .handle_validated_certificate(certificate)
266 .await
267 }
268 _ => result,
269 }?)
270 }
271
272 async fn send_block_proposal(
273 &mut self,
274 proposal: Box<BlockProposal>,
275 mut blob_ids: Vec<BlobId>,
276 ) -> Result<Box<ChainInfo>, ChainClientError> {
277 let chain_id = proposal.content.block.chain_id;
278 let mut sent_cross_chain_updates = false;
279 loop {
280 match self
281 .remote_node
282 .handle_block_proposal(proposal.clone())
283 .await
284 {
285 Ok(info) => return Ok(info),
286 Err(NodeError::MissingCrossChainUpdate { .. })
287 | Err(NodeError::InactiveChain(_))
288 if !sent_cross_chain_updates =>
289 {
290 sent_cross_chain_updates = true;
291 self.send_chain_information_for_senders(chain_id).await?;
295 }
296 Err(NodeError::BlobsNotFound(_)) if !blob_ids.is_empty() => {
297 let published_blob_ids =
301 BTreeSet::from_iter(proposal.content.block.published_blob_ids());
302 blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
303 let mut published_blobs = Vec::new();
304 {
305 let chain = self.local_node.chain_state_view(chain_id).await?;
306 for blob_id in published_blob_ids {
307 published_blobs
308 .extend(chain.manager.proposed_blobs.get(&blob_id).await?);
309 }
310 }
311 self.remote_node
312 .send_pending_blobs(chain_id, published_blobs)
313 .await?;
314 let missing_blob_ids = self
315 .remote_node
316 .node
317 .missing_blob_ids(mem::take(&mut blob_ids))
318 .await?;
319 let blob_states = self
320 .local_node
321 .read_blob_states_from_storage(&missing_blob_ids)
322 .await?;
323 let mut chain_heights = BTreeMap::new();
324 for blob_state in blob_states {
325 let block_chain_id = blob_state.chain_id;
326 let block_height = blob_state.block_height.try_add_one()?;
327 chain_heights
328 .entry(block_chain_id)
329 .and_modify(|h| *h = block_height.max(*h))
330 .or_insert(block_height);
331 }
332
333 self.send_chain_info_up_to_heights(
334 chain_heights,
335 CrossChainMessageDelivery::NonBlocking,
336 )
337 .await?;
338 }
339 Err(err) => return Err(err.into()),
341 }
342 }
343 }
344
345 pub async fn send_chain_information(
346 &mut self,
347 chain_id: ChainId,
348 target_block_height: BlockHeight,
349 delivery: CrossChainMessageDelivery,
350 ) -> Result<(), ChainClientError> {
351 let query = ChainInfoQuery::new(chain_id);
353 let remote_info = self.remote_node.handle_chain_info_query(query).await?;
354 let initial_block_height = remote_info.next_block_height;
355 let range = initial_block_height..target_block_height;
357 let (keys, timeout) = {
358 let chain = self.local_node.chain_state_view(chain_id).await?;
359 (
360 chain.block_hashes(range).await?,
361 chain.manager.timeout.get().clone(),
362 )
363 };
364 if !keys.is_empty() {
365 let storage = self.local_node.storage_client();
367 let certs = storage.read_certificates(keys.into_iter()).await?;
368 for cert in certs {
369 self.send_confirmed_certificate(cert, delivery).await?;
370 }
371 }
372 if let Some(cert) = timeout {
373 if cert.value().chain_id() == chain_id {
374 self.remote_node.handle_timeout_certificate(cert).await?;
377 }
378 }
379 Ok(())
380 }
381
382 async fn send_chain_info_up_to_heights(
383 &mut self,
384 chain_heights: BTreeMap<ChainId, BlockHeight>,
385 delivery: CrossChainMessageDelivery,
386 ) -> Result<(), ChainClientError> {
387 let stream = stream::iter(chain_heights)
388 .map(|(chain_id, height)| {
389 let mut updater = self.clone();
390 async move {
391 updater
392 .send_chain_information(chain_id, height, delivery)
393 .await
394 }
395 })
396 .buffer_unordered(self.chain_worker_count);
397 stream.try_collect::<Vec<_>>().await?;
398 Ok(())
399 }
400
401 async fn send_chain_information_for_senders(
402 &mut self,
403 chain_id: ChainId,
404 ) -> Result<(), ChainClientError> {
405 let mut sender_heights = BTreeMap::new();
406 {
407 let chain = self.local_node.chain_state_view(chain_id).await?;
408 let pairs = chain.inboxes.try_load_all_entries().await?;
409 for (origin, inbox) in pairs {
410 let inbox_next_height = inbox.next_block_height_to_receive()?;
411 sender_heights
412 .entry(origin)
413 .and_modify(|h| *h = inbox_next_height.max(*h))
414 .or_insert(inbox_next_height);
415 }
416 }
417
418 self.send_chain_info_up_to_heights(sender_heights, CrossChainMessageDelivery::Blocking)
419 .await?;
420 Ok(())
421 }
422
423 pub async fn send_chain_update(
424 &mut self,
425 action: CommunicateAction,
426 ) -> Result<LiteVote, ChainClientError> {
427 let (target_block_height, chain_id) = match &action {
428 CommunicateAction::SubmitBlock { proposal, .. } => {
429 let block = &proposal.content.block;
430 (block.height, block.chain_id)
431 }
432 CommunicateAction::FinalizeBlock { certificate, .. } => (
433 certificate.inner().block().header.height,
434 certificate.inner().block().header.chain_id,
435 ),
436 CommunicateAction::RequestTimeout {
437 height, chain_id, ..
438 } => (*height, *chain_id),
439 };
440 let delivery = CrossChainMessageDelivery::NonBlocking;
442 self.send_chain_information(chain_id, target_block_height, delivery)
443 .await?;
444 let vote = match action {
446 CommunicateAction::SubmitBlock { proposal, blob_ids } => {
447 let info = self.send_block_proposal(proposal, blob_ids).await?;
448 info.manager.pending
449 }
450 CommunicateAction::FinalizeBlock {
451 certificate,
452 delivery,
453 } => {
454 let info = self
455 .send_validated_certificate(*certificate, delivery)
456 .await?;
457 info.manager.pending
458 }
459 CommunicateAction::RequestTimeout { .. } => {
460 let query = ChainInfoQuery::new(chain_id).with_timeout();
461 let info = self.remote_node.handle_chain_info_query(query).await?;
462 info.manager.timeout_vote
463 }
464 };
465 match vote {
466 Some(vote) if vote.public_key == self.remote_node.public_key => {
467 vote.check()?;
468 Ok(vote)
469 }
470 Some(_) | None => Err(NodeError::MissingVoteInValidatorResponse.into()),
471 }
472 }
473}