linera_core/
updater.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt,
8    hash::Hash,
9    mem,
10};
11
12use futures::{
13    stream::{FuturesUnordered, TryStreamExt},
14    Future, StreamExt,
15};
16use linera_base::{
17    crypto::ValidatorPublicKey,
18    data_types::{BlockHeight, Round},
19    ensure,
20    identifiers::{BlobId, ChainId, StreamId},
21    time::{timer::timeout, Duration, Instant},
22};
23use linera_chain::{
24    data_types::{BlockProposal, LiteVote},
25    types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
26};
27use linera_execution::{committee::Committee, system::EPOCH_STREAM_NAME};
28use linera_storage::{ResultReadCertificates, Storage};
29use thiserror::Error;
30use tracing::{instrument, Level};
31
32use crate::{
33    client::ChainClientError,
34    data_types::{ChainInfo, ChainInfoQuery},
35    local_node::LocalNodeClient,
36    node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
37    remote_node::RemoteNode,
38};
39
40/// The default amount of time we wait for additional validators to contribute
41/// to the result, as a fraction of how long it took to reach a quorum.
42pub const DEFAULT_GRACE_PERIOD: f64 = 0.2;
43/// The maximum timeout for requests to a stake-weighted quorum if no quorum is reached.
44const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); // 1 day.
45
46/// Used for `communicate_chain_action`
47#[derive(Clone)]
48pub enum CommunicateAction {
49    SubmitBlock {
50        proposal: Box<BlockProposal>,
51        blob_ids: Vec<BlobId>,
52    },
53    FinalizeBlock {
54        certificate: Box<ValidatedBlockCertificate>,
55        delivery: CrossChainMessageDelivery,
56    },
57    RequestTimeout {
58        chain_id: ChainId,
59        height: BlockHeight,
60        round: Round,
61    },
62}
63
64impl CommunicateAction {
65    /// The round to which this action pertains.
66    pub fn round(&self) -> Round {
67        match self {
68            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
69            CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
70            CommunicateAction::RequestTimeout { round, .. } => *round,
71        }
72    }
73}
74
75#[derive(Clone)]
76pub struct ValidatorUpdater<A, S>
77where
78    S: Storage,
79{
80    pub remote_node: RemoteNode<A>,
81    pub local_node: LocalNodeClient<S>,
82    pub admin_id: ChainId,
83}
84
85/// An error result for requests to a stake-weighted quorum.
86#[derive(Error, Debug)]
87pub enum CommunicationError<E: fmt::Debug> {
88    /// No consensus is possible since validators returned different possibilities
89    /// for the next block
90    #[error(
91        "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
92    )]
93    NoConsensus(u64, Vec<(u64, usize)>),
94    /// A single error that was returned by a sufficient number of nodes to be trusted as
95    /// valid.
96    #[error("Failed to communicate with a quorum of validators: {0}")]
97    Trusted(E),
98    /// No single error reached the validity threshold so we're returning a sample of
99    /// errors for debugging purposes, together with their weight.
100    #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
101    Sample(Vec<(E, u64)>),
102}
103
104/// Executes a sequence of actions in parallel for all validators.
105///
106/// Tries to stop early when a quorum is reached. If `grace_period` is specified, other validators
107/// are given additional time to contribute to the result. The grace period is calculated as a fraction
108/// (defaulting to `DEFAULT_GRACE_PERIOD`) of the time taken to reach quorum.
109pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
110    validator_clients: &'a [RemoteNode<A>],
111    committee: &Committee,
112    group_by: G,
113    execute: F,
114    // Grace period as a fraction of time taken to reach quorum
115    grace_period: f64,
116) -> Result<(K, Vec<(ValidatorPublicKey, V)>), CommunicationError<NodeError>>
117where
118    A: ValidatorNode + Clone + 'static,
119    F: Clone + Fn(RemoteNode<A>) -> R,
120    R: Future<Output = Result<V, ChainClientError>> + 'a,
121    G: Fn(&V) -> K,
122    K: Hash + PartialEq + Eq + Clone + 'static,
123    V: 'static,
124{
125    let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
126        .iter()
127        .filter_map(|remote_node| {
128            if committee.weight(&remote_node.public_key) == 0 {
129                // This should not happen but better prevent it because certificates
130                // are not allowed to include votes with weight 0.
131                return None;
132            }
133            let execute = execute.clone();
134            let remote_node = remote_node.clone();
135            Some(async move { (remote_node.public_key, execute(remote_node).await) })
136        })
137        .collect();
138
139    let start_time = Instant::now();
140    let mut end_time: Option<Instant> = None;
141    let mut remaining_votes = committee.total_votes();
142    let mut highest_key_score = 0;
143    let mut value_scores: HashMap<K, (u64, Vec<(ValidatorPublicKey, V)>)> = HashMap::new();
144    let mut error_scores = HashMap::new();
145
146    'vote_wait: while let Ok(Some((name, result))) = timeout(
147        end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
148        responses.next(),
149    )
150    .await
151    {
152        remaining_votes -= committee.weight(&name);
153        match result {
154            Ok(value) => {
155                let key = group_by(&value);
156                let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
157                entry.0 += committee.weight(&name);
158                entry.1.push((name, value));
159                highest_key_score = highest_key_score.max(entry.0);
160            }
161            Err(err) => {
162                // TODO(#2857): Handle non-remote errors properly.
163                let err = match err {
164                    ChainClientError::RemoteNodeError(err) => err,
165                    err => NodeError::ResponseHandlingError {
166                        error: err.to_string(),
167                    },
168                };
169                let entry = error_scores.entry(err.clone()).or_insert(0);
170                *entry += committee.weight(&name);
171                if *entry >= committee.validity_threshold() {
172                    // At least one honest node returned this error.
173                    // No quorum can be reached, so return early.
174                    return Err(CommunicationError::Trusted(err));
175                }
176            }
177        }
178        // If it becomes clear that no key can reach a quorum, break early.
179        if highest_key_score + remaining_votes < committee.quorum_threshold() {
180            break 'vote_wait;
181        }
182
183        // If a key reaches a quorum, wait for the grace period to collect more values
184        // or error information and then stop.
185        if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
186            end_time = Some(Instant::now() + start_time.elapsed().mul_f64(grace_period));
187        }
188    }
189
190    let scores = value_scores
191        .values()
192        .map(|(weight, values)| (*weight, values.len()))
193        .collect();
194    // If a key has a quorum, return it with its values.
195    if let Some((key, (_, values))) = value_scores
196        .into_iter()
197        .find(|(_, (score, _))| *score >= committee.quorum_threshold())
198    {
199        return Ok((key, values));
200    }
201
202    if error_scores.is_empty() {
203        return Err(CommunicationError::NoConsensus(
204            committee.quorum_threshold(),
205            scores,
206        ));
207    }
208
209    // No specific error is available to report reliably.
210    let mut sample = error_scores.into_iter().collect::<Vec<_>>();
211    sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
212    sample.truncate(4);
213    Err(CommunicationError::Sample(sample))
214}
215
216impl<A, S> ValidatorUpdater<A, S>
217where
218    A: ValidatorNode + Clone + 'static,
219    S: Storage + Clone + Send + Sync + 'static,
220{
221    #[instrument(
222        level = "trace", skip_all, err(level = Level::WARN),
223        fields(chain_id = %certificate.block().header.chain_id)
224    )]
225    async fn send_confirmed_certificate(
226        &mut self,
227        certificate: GenericCertificate<ConfirmedBlock>,
228        delivery: CrossChainMessageDelivery,
229    ) -> Result<Box<ChainInfo>, ChainClientError> {
230        let mut result = self
231            .remote_node
232            .handle_optimized_confirmed_certificate(&certificate, delivery)
233            .await;
234
235        let mut sent_admin_chain = false;
236        let mut sent_blobs = false;
237        loop {
238            result = match result {
239                Err(NodeError::EventsNotFound(event_ids))
240                    if !sent_admin_chain
241                        && certificate.inner().chain_id() != self.admin_id
242                        && event_ids.iter().all(|event_id| {
243                            event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
244                                && event_id.chain_id == self.admin_id
245                        }) =>
246                {
247                    // The validator doesn't have the committee that signed the certificate.
248                    self.update_admin_chain().await?;
249                    sent_admin_chain = true;
250                    self.remote_node
251                        .handle_confirmed_certificate(certificate.clone(), delivery)
252                        .await
253                }
254                Err(NodeError::BlobsNotFound(blob_ids)) if !sent_blobs => {
255                    // The validator is missing the blobs required by the certificate.
256                    self.remote_node
257                        .check_blobs_not_found(&certificate, &blob_ids)?;
258                    // The certificate is confirmed, so the blobs must be in storage.
259                    let maybe_blobs = self.local_node.read_blobs_from_storage(&blob_ids).await?;
260                    let blobs = maybe_blobs.ok_or(NodeError::BlobsNotFound(blob_ids))?;
261                    self.remote_node.node.upload_blobs(blobs).await?;
262                    sent_blobs = true;
263                    self.remote_node
264                        .handle_confirmed_certificate(certificate.clone(), delivery)
265                        .await
266                }
267                result => return Ok(result?),
268            };
269        }
270    }
271
272    async fn send_validated_certificate(
273        &mut self,
274        certificate: GenericCertificate<ValidatedBlock>,
275        delivery: CrossChainMessageDelivery,
276    ) -> Result<Box<ChainInfo>, ChainClientError> {
277        let result = self
278            .remote_node
279            .handle_optimized_validated_certificate(&certificate, delivery)
280            .await;
281
282        Ok(match &result {
283            Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
284                self.remote_node
285                    .check_blobs_not_found(&certificate, blob_ids)?;
286                let chain_id = certificate.inner().chain_id();
287                // The certificate is for a validated block, i.e. for our locking block.
288                // Take the missing blobs from our local chain manager.
289                let blobs = self
290                    .local_node
291                    .get_locking_blobs(blob_ids, chain_id)
292                    .await?
293                    .ok_or_else(|| original_err.clone())?;
294                self.remote_node.send_pending_blobs(chain_id, blobs).await?;
295                self.remote_node
296                    .handle_validated_certificate(certificate)
297                    .await
298            }
299            _ => result,
300        }?)
301    }
302
303    async fn send_block_proposal(
304        &mut self,
305        proposal: Box<BlockProposal>,
306        mut blob_ids: Vec<BlobId>,
307    ) -> Result<Box<ChainInfo>, ChainClientError> {
308        let chain_id = proposal.content.block.chain_id;
309        let mut sent_cross_chain_updates = false;
310        let mut publisher_chain_ids_sent = BTreeSet::new();
311        loop {
312            match self
313                .remote_node
314                .handle_block_proposal(proposal.clone())
315                .await
316            {
317                Ok(info) => return Ok(info),
318                Err(NodeError::WrongRound(_round)) => {
319                    // The proposal is for a different round, so we need to update the validator.
320                    // TODO: this should probably be more specific as to which rounds are retried.
321                    self.send_chain_information(
322                        chain_id,
323                        proposal.content.block.height,
324                        CrossChainMessageDelivery::NonBlocking,
325                    )
326                    .await?;
327                }
328                Err(NodeError::UnexpectedBlockHeight {
329                    expected_block_height,
330                    found_block_height,
331                }) if expected_block_height < found_block_height => {
332                    // The proposal is for a later block height, so we need to update the validator.
333                    self.send_chain_information(
334                        chain_id,
335                        found_block_height,
336                        CrossChainMessageDelivery::NonBlocking,
337                    )
338                    .await?;
339                }
340                Err(NodeError::MissingCrossChainUpdate { .. }) if !sent_cross_chain_updates => {
341                    // Some received certificates may be missing for this validator
342                    // (e.g. to create the chain or make the balance sufficient) so we are going to
343                    // synchronize them now and retry.
344                    self.send_chain_information_for_senders(chain_id).await?;
345                    sent_cross_chain_updates = true;
346                }
347                Err(NodeError::EventsNotFound(event_ids)) => {
348                    let mut publisher_heights = BTreeMap::new();
349                    let new_chain_ids = event_ids
350                        .iter()
351                        .map(|event_id| event_id.chain_id)
352                        .filter(|chain_id| !publisher_chain_ids_sent.contains(chain_id))
353                        .collect::<BTreeSet<_>>();
354                    ensure!(
355                        !new_chain_ids.is_empty(),
356                        NodeError::EventsNotFound(event_ids)
357                    );
358                    for chain_id in new_chain_ids {
359                        let height = self
360                            .local_node
361                            .chain_state_view(chain_id)
362                            .await?
363                            .next_height_to_preprocess()
364                            .await?;
365                        publisher_heights.insert(chain_id, height);
366                        publisher_chain_ids_sent.insert(chain_id);
367                    }
368                    self.send_chain_info_up_to_heights(
369                        publisher_heights,
370                        CrossChainMessageDelivery::NonBlocking,
371                    )
372                    .await?;
373                }
374                Err(NodeError::BlobsNotFound(_) | NodeError::InactiveChain(_))
375                    if !blob_ids.is_empty() =>
376                {
377                    // For `BlobsNotFound`, we assume that the local node should already be
378                    // updated with the needed blobs, so sending the chain information about the
379                    // certificates that last used the blobs to the validator node should be enough.
380                    let published_blob_ids =
381                        BTreeSet::from_iter(proposal.content.block.published_blob_ids());
382                    blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
383                    let mut published_blobs = Vec::new();
384                    {
385                        let chain = self.local_node.chain_state_view(chain_id).await?;
386                        for blob_id in published_blob_ids {
387                            published_blobs
388                                .extend(chain.manager.proposed_blobs.get(&blob_id).await?);
389                        }
390                    }
391                    self.remote_node
392                        .send_pending_blobs(chain_id, published_blobs)
393                        .await?;
394                    let missing_blob_ids = self
395                        .remote_node
396                        .node
397                        .missing_blob_ids(mem::take(&mut blob_ids))
398                        .await?;
399                    let blob_states = self
400                        .local_node
401                        .read_blob_states_from_storage(&missing_blob_ids)
402                        .await?;
403                    let mut chain_heights = BTreeMap::new();
404                    for blob_state in blob_states {
405                        let block_chain_id = blob_state.chain_id;
406                        let block_height = blob_state.block_height.try_add_one()?;
407                        chain_heights
408                            .entry(block_chain_id)
409                            .and_modify(|h| *h = block_height.max(*h))
410                            .or_insert(block_height);
411                    }
412
413                    self.send_chain_info_up_to_heights(
414                        chain_heights,
415                        CrossChainMessageDelivery::NonBlocking,
416                    )
417                    .await?;
418                }
419                // Fail immediately on other errors.
420                Err(err) => return Err(err.into()),
421            }
422        }
423    }
424
425    async fn update_admin_chain(&mut self) -> Result<(), ChainClientError> {
426        let local_admin_info = self.local_node.chain_info(self.admin_id).await?;
427        Box::pin(self.send_chain_information(
428            self.admin_id,
429            local_admin_info.next_block_height,
430            CrossChainMessageDelivery::NonBlocking,
431        ))
432        .await
433    }
434
435    pub async fn send_chain_information(
436        &mut self,
437        chain_id: ChainId,
438        target_block_height: BlockHeight,
439        delivery: CrossChainMessageDelivery,
440    ) -> Result<(), ChainClientError> {
441        let Ok(height) = target_block_height.try_sub_one() else {
442            if let Some(cert) = self.local_node.chain_info(chain_id).await?.manager.timeout {
443                self.remote_node.handle_timeout_certificate(*cert).await?;
444            }
445            return Ok(());
446        };
447        // Figure out which certificates this validator is missing. In many cases, it's just the
448        // last one, so we optimistically send that one right away.
449        let hash = self
450            .local_node
451            .chain_state_view(chain_id)
452            .await?
453            .block_hashes(height..=height)
454            .await?
455            .into_iter()
456            .next()
457            .ok_or_else(|| {
458                ChainClientError::InternalError(
459                    "send_chain_information called with invalid target_block_height",
460                )
461            })?;
462        let certificate = self
463            .local_node
464            .storage_client()
465            .read_certificate(hash)
466            .await?
467            .ok_or_else(|| ChainClientError::MissingConfirmedBlock(hash))?;
468        let info = match self.send_confirmed_certificate(certificate, delivery).await {
469            Err(ChainClientError::RemoteNodeError(NodeError::EventsNotFound(event_ids)))
470                if event_ids.iter().all(|event_id| {
471                    event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)
472                        && event_id.chain_id == self.admin_id
473                }) =>
474            {
475                // The chain is missing epoch events. Send all blocks.
476                let query = ChainInfoQuery::new(chain_id);
477                self.remote_node.handle_chain_info_query(query).await?
478            }
479            Err(err) => return Err(err),
480            Ok(info) => info,
481        };
482        let (remote_height, remote_round) = (info.next_block_height, info.manager.current_round);
483        // Obtain the missing blocks and the manager state from the local node.
484        let range = remote_height..target_block_height;
485        let validator_missing_hashes = self
486            .local_node
487            .chain_state_view(chain_id)
488            .await?
489            .block_hashes(range)
490            .await?;
491        if !validator_missing_hashes.is_empty() {
492            // Send the requested certificates in order.
493            let certificates = self
494                .local_node
495                .storage_client()
496                .read_certificates(validator_missing_hashes.clone())
497                .await?;
498            let certificates =
499                match ResultReadCertificates::new(certificates, validator_missing_hashes) {
500                    ResultReadCertificates::Certificates(certificates) => certificates,
501                    ResultReadCertificates::InvalidHashes(hashes) => {
502                        return Err(ChainClientError::ReadCertificatesError(hashes))
503                    }
504                };
505            for certificate in certificates {
506                self.send_confirmed_certificate(certificate, delivery)
507                    .await?;
508            }
509        }
510        // If the remote node is missing a timeout certificate, send it as well.
511        let local_info = self.local_node.chain_info(chain_id).await?;
512        if let Some(cert) = local_info.manager.timeout {
513            if (local_info.next_block_height, cert.round) >= (remote_height, remote_round) {
514                self.remote_node.handle_timeout_certificate(*cert).await?;
515            }
516        }
517        Ok(())
518    }
519
520    async fn send_chain_info_up_to_heights(
521        &mut self,
522        chain_heights: impl IntoIterator<Item = (ChainId, BlockHeight)>,
523        delivery: CrossChainMessageDelivery,
524    ) -> Result<(), ChainClientError> {
525        FuturesUnordered::from_iter(chain_heights.into_iter().map(|(chain_id, height)| {
526            let mut updater = self.clone();
527            async move {
528                updater
529                    .send_chain_information(chain_id, height, delivery)
530                    .await
531            }
532        }))
533        .try_collect::<Vec<_>>()
534        .await?;
535        Ok(())
536    }
537
538    /// Updates validator with certificates for all chains that have sent messages to `chain_id`.
539    async fn send_chain_information_for_senders(
540        &mut self,
541        chain_id: ChainId,
542    ) -> Result<(), ChainClientError> {
543        let sender_heights = self
544            .local_node
545            .chain_state_view(chain_id)
546            .await?
547            .inboxes
548            .try_load_all_entries()
549            .await?
550            .iter()
551            .map(|(origin, inbox)| {
552                let next_height = inbox.next_block_height_to_receive()?;
553                Ok((*origin, next_height))
554            })
555            .collect::<Result<Vec<(ChainId, BlockHeight)>, ChainClientError>>()?;
556
557        self.send_chain_info_up_to_heights(sender_heights, CrossChainMessageDelivery::Blocking)
558            .await?;
559        Ok(())
560    }
561
562    pub async fn send_chain_update(
563        &mut self,
564        action: CommunicateAction,
565    ) -> Result<LiteVote, ChainClientError> {
566        let chain_id = match &action {
567            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.block.chain_id,
568            CommunicateAction::FinalizeBlock { certificate, .. } => {
569                certificate.inner().block().header.chain_id
570            }
571            CommunicateAction::RequestTimeout { chain_id, .. } => *chain_id,
572        };
573        // Send the block proposal, certificate or timeout request and return a vote.
574        let vote = match action {
575            CommunicateAction::SubmitBlock { proposal, blob_ids } => {
576                let info = self.send_block_proposal(proposal, blob_ids).await?;
577                info.manager.pending.ok_or_else(|| {
578                    NodeError::MissingVoteInValidatorResponse("submit a block proposal".into())
579                })?
580            }
581            CommunicateAction::FinalizeBlock {
582                certificate,
583                delivery,
584            } => {
585                let info = self
586                    .send_validated_certificate(*certificate, delivery)
587                    .await?;
588                info.manager.pending.ok_or_else(|| {
589                    NodeError::MissingVoteInValidatorResponse("finalize a block".into())
590                })?
591            }
592            CommunicateAction::RequestTimeout { round, height, .. } => {
593                let query = ChainInfoQuery::new(chain_id).with_timeout(height, round);
594                let info = self.remote_node.handle_chain_info_query(query).await?;
595                info.manager.timeout_vote.ok_or_else(|| {
596                    NodeError::MissingVoteInValidatorResponse("request a timeout".into())
597                })?
598            }
599        };
600        vote.check(self.remote_node.public_key)?;
601        Ok(vote)
602    }
603}