linera_core/
updater.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt,
8    hash::Hash,
9    mem,
10};
11
12use futures::{stream, stream::TryStreamExt, Future, StreamExt};
13use linera_base::{
14    data_types::{BlockHeight, Round},
15    identifiers::{BlobId, ChainId},
16    time::{timer::timeout, Duration, Instant},
17};
18use linera_chain::{
19    data_types::{BlockProposal, LiteVote},
20    types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
21};
22use linera_execution::committee::Committee;
23use linera_storage::Storage;
24use thiserror::Error;
25
26use crate::{
27    client::ChainClientError,
28    data_types::{ChainInfo, ChainInfoQuery},
29    local_node::LocalNodeClient,
30    node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
31    remote_node::RemoteNode,
32};
33
34/// The default amount of time we wait for additional validators to contribute
35/// to the result, as a fraction of how long it took to reach a quorum.
36pub const DEFAULT_GRACE_PERIOD: f64 = 0.2;
37/// The maximum timeout for requests to a stake-weighted quorum if no quorum is reached.
38const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); // 1 day.
39
40/// Used for `communicate_chain_action`
41#[derive(Clone)]
42pub enum CommunicateAction {
43    SubmitBlock {
44        proposal: Box<BlockProposal>,
45        blob_ids: Vec<BlobId>,
46    },
47    FinalizeBlock {
48        certificate: Box<ValidatedBlockCertificate>,
49        delivery: CrossChainMessageDelivery,
50    },
51    RequestTimeout {
52        chain_id: ChainId,
53        height: BlockHeight,
54        round: Round,
55    },
56}
57
58impl CommunicateAction {
59    /// The round to which this action pertains.
60    pub fn round(&self) -> Round {
61        match self {
62            CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
63            CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
64            CommunicateAction::RequestTimeout { round, .. } => *round,
65        }
66    }
67}
68
69#[derive(Clone)]
70pub struct ValidatorUpdater<A, S>
71where
72    S: Storage,
73{
74    pub chain_worker_count: usize,
75    pub remote_node: RemoteNode<A>,
76    pub local_node: LocalNodeClient<S>,
77}
78
79/// An error result for requests to a stake-weighted quorum.
80#[derive(Error, Debug)]
81pub enum CommunicationError<E: fmt::Debug> {
82    /// No consensus is possible since validators returned different possibilities
83    /// for the next block
84    #[error(
85        "No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
86    )]
87    NoConsensus(u64, Vec<(u64, usize)>),
88    /// A single error that was returned by a sufficient number of nodes to be trusted as
89    /// valid.
90    #[error("Failed to communicate with a quorum of validators: {0}")]
91    Trusted(E),
92    /// No single error reached the validity threshold so we're returning a sample of
93    /// errors for debugging purposes, together with their weight.
94    #[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
95    Sample(Vec<(E, u64)>),
96}
97
98/// Executes a sequence of actions in parallel for all validators.
99///
100/// Tries to stop early when a quorum is reached. If `grace_period` is specified, other validators
101/// are given additional time to contribute to the result. The grace period is calculated as a fraction
102/// (defaulting to `DEFAULT_GRACE_PERIOD`) of the time taken to reach quorum.
103pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
104    validator_clients: &'a [RemoteNode<A>],
105    committee: &Committee,
106    group_by: G,
107    execute: F,
108    // Grace period as a fraction of time taken to reach quorum
109    grace_period: f64,
110) -> Result<(K, Vec<V>), CommunicationError<NodeError>>
111where
112    A: ValidatorNode + Clone + 'static,
113    F: Clone + Fn(RemoteNode<A>) -> R,
114    R: Future<Output = Result<V, ChainClientError>> + 'a,
115    G: Fn(&V) -> K,
116    K: Hash + PartialEq + Eq + Clone + 'static,
117    V: 'static,
118{
119    let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
120        .iter()
121        .filter_map(|remote_node| {
122            if committee.weight(&remote_node.public_key) == 0 {
123                // This should not happen but better prevent it because certificates
124                // are not allowed to include votes with weight 0.
125                return None;
126            }
127            let execute = execute.clone();
128            let remote_node = remote_node.clone();
129            Some(async move { (remote_node.public_key, execute(remote_node).await) })
130        })
131        .collect();
132
133    let start_time = Instant::now();
134    let mut end_time: Option<Instant> = None;
135    let mut remaining_votes = committee.total_votes();
136    let mut highest_key_score = 0;
137    let mut value_scores = HashMap::new();
138    let mut error_scores = HashMap::new();
139
140    'vote_wait: while let Ok(Some((name, result))) = timeout(
141        end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
142        responses.next(),
143    )
144    .await
145    {
146        remaining_votes -= committee.weight(&name);
147        match result {
148            Ok(value) => {
149                let key = group_by(&value);
150                let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
151                entry.0 += committee.weight(&name);
152                entry.1.push(value);
153                highest_key_score = highest_key_score.max(entry.0);
154            }
155            Err(err) => {
156                // TODO(#2857): Handle non-remote errors properly.
157                let err = match err {
158                    ChainClientError::RemoteNodeError(err) => err,
159                    err => NodeError::ResponseHandlingError {
160                        error: err.to_string(),
161                    },
162                };
163                let entry = error_scores.entry(err.clone()).or_insert(0);
164                *entry += committee.weight(&name);
165                if *entry >= committee.validity_threshold() {
166                    // At least one honest node returned this error.
167                    // No quorum can be reached, so return early.
168                    return Err(CommunicationError::Trusted(err));
169                }
170            }
171        }
172        // If it becomes clear that no key can reach a quorum, break early.
173        if highest_key_score + remaining_votes < committee.quorum_threshold() {
174            break 'vote_wait;
175        }
176
177        // If a key reaches a quorum, wait for the grace period to collect more values
178        // or error information and then stop.
179        if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
180            end_time = Some(Instant::now() + start_time.elapsed().mul_f64(grace_period));
181        }
182    }
183
184    let scores = value_scores
185        .values()
186        .map(|(weight, values)| (*weight, values.len()))
187        .collect();
188    // If a key has a quorum, return it with its values.
189    if let Some((key, (_, values))) = value_scores
190        .into_iter()
191        .find(|(_, (score, _))| *score >= committee.quorum_threshold())
192    {
193        return Ok((key, values));
194    }
195
196    if error_scores.is_empty() {
197        return Err(CommunicationError::NoConsensus(
198            committee.quorum_threshold(),
199            scores,
200        ));
201    }
202
203    // No specific error is available to report reliably.
204    let mut sample = error_scores.into_iter().collect::<Vec<_>>();
205    sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
206    sample.truncate(4);
207    Err(CommunicationError::Sample(sample))
208}
209
210impl<A, S> ValidatorUpdater<A, S>
211where
212    A: ValidatorNode + Clone + 'static,
213    S: Storage + Clone + Send + Sync + 'static,
214{
215    async fn send_confirmed_certificate(
216        &mut self,
217        certificate: GenericCertificate<ConfirmedBlock>,
218        delivery: CrossChainMessageDelivery,
219    ) -> Result<Box<ChainInfo>, ChainClientError> {
220        let result = self
221            .remote_node
222            .handle_optimized_confirmed_certificate(&certificate, delivery)
223            .await;
224
225        Ok(match &result {
226            Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
227                self.remote_node
228                    .check_blobs_not_found(&certificate, blob_ids)?;
229                // The certificate is confirmed, so the blobs must be in storage.
230                let maybe_blobs = self.local_node.read_blobs_from_storage(blob_ids).await?;
231                let blobs = maybe_blobs.ok_or_else(|| original_err.clone())?;
232                self.remote_node.upload_blobs(blobs.clone()).await?;
233                self.remote_node
234                    .handle_confirmed_certificate(certificate, delivery)
235                    .await
236            }
237            _ => result,
238        }?)
239    }
240
241    async fn send_validated_certificate(
242        &mut self,
243        certificate: GenericCertificate<ValidatedBlock>,
244        delivery: CrossChainMessageDelivery,
245    ) -> Result<Box<ChainInfo>, ChainClientError> {
246        let result = self
247            .remote_node
248            .handle_optimized_validated_certificate(&certificate, delivery)
249            .await;
250
251        Ok(match &result {
252            Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
253                self.remote_node
254                    .check_blobs_not_found(&certificate, blob_ids)?;
255                let chain_id = certificate.inner().chain_id();
256                // The certificate is for a validated block, i.e. for our locking block.
257                // Take the missing blobs from our local chain manager.
258                let blobs = self
259                    .local_node
260                    .get_locking_blobs(blob_ids, chain_id)
261                    .await?
262                    .ok_or_else(|| original_err.clone())?;
263                self.remote_node.send_pending_blobs(chain_id, blobs).await?;
264                self.remote_node
265                    .handle_validated_certificate(certificate)
266                    .await
267            }
268            _ => result,
269        }?)
270    }
271
272    async fn send_block_proposal(
273        &mut self,
274        proposal: Box<BlockProposal>,
275        mut blob_ids: Vec<BlobId>,
276    ) -> Result<Box<ChainInfo>, ChainClientError> {
277        let chain_id = proposal.content.block.chain_id;
278        let mut sent_cross_chain_updates = false;
279        loop {
280            match self
281                .remote_node
282                .handle_block_proposal(proposal.clone())
283                .await
284            {
285                Ok(info) => return Ok(info),
286                Err(NodeError::MissingCrossChainUpdate { .. })
287                | Err(NodeError::InactiveChain(_))
288                    if !sent_cross_chain_updates =>
289                {
290                    sent_cross_chain_updates = true;
291                    // Some received certificates may be missing for this validator
292                    // (e.g. to create the chain or make the balance sufficient) so we are going to
293                    // synchronize them now and retry.
294                    self.send_chain_information_for_senders(chain_id).await?;
295                }
296                Err(NodeError::BlobsNotFound(_)) if !blob_ids.is_empty() => {
297                    // For `BlobsNotFound`, we assume that the local node should already be
298                    // updated with the needed blobs, so sending the chain information about the
299                    // certificates that last used the blobs to the validator node should be enough.
300                    let published_blob_ids =
301                        BTreeSet::from_iter(proposal.content.block.published_blob_ids());
302                    blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
303                    let mut published_blobs = Vec::new();
304                    {
305                        let chain = self.local_node.chain_state_view(chain_id).await?;
306                        for blob_id in published_blob_ids {
307                            published_blobs
308                                .extend(chain.manager.proposed_blobs.get(&blob_id).await?);
309                        }
310                    }
311                    self.remote_node
312                        .send_pending_blobs(chain_id, published_blobs)
313                        .await?;
314                    let missing_blob_ids = self
315                        .remote_node
316                        .node
317                        .missing_blob_ids(mem::take(&mut blob_ids))
318                        .await?;
319                    let blob_states = self
320                        .local_node
321                        .read_blob_states_from_storage(&missing_blob_ids)
322                        .await?;
323                    let mut chain_heights = BTreeMap::new();
324                    for blob_state in blob_states {
325                        let block_chain_id = blob_state.chain_id;
326                        let block_height = blob_state.block_height.try_add_one()?;
327                        chain_heights
328                            .entry(block_chain_id)
329                            .and_modify(|h| *h = block_height.max(*h))
330                            .or_insert(block_height);
331                    }
332
333                    self.send_chain_info_up_to_heights(
334                        chain_heights,
335                        CrossChainMessageDelivery::NonBlocking,
336                    )
337                    .await?;
338                }
339                // Fail immediately on other errors.
340                Err(err) => return Err(err.into()),
341            }
342        }
343    }
344
345    pub async fn send_chain_information(
346        &mut self,
347        chain_id: ChainId,
348        target_block_height: BlockHeight,
349        delivery: CrossChainMessageDelivery,
350    ) -> Result<(), ChainClientError> {
351        // Figure out which certificates this validator is missing.
352        let query = ChainInfoQuery::new(chain_id);
353        let remote_info = self.remote_node.handle_chain_info_query(query).await?;
354        let initial_block_height = remote_info.next_block_height;
355        // Obtain the missing blocks and the manager state from the local node.
356        let range = initial_block_height..target_block_height;
357        let (keys, timeout) = {
358            let chain = self.local_node.chain_state_view(chain_id).await?;
359            (
360                chain.block_hashes(range).await?,
361                chain.manager.timeout.get().clone(),
362            )
363        };
364        if !keys.is_empty() {
365            // Send the requested certificates in order.
366            let storage = self.local_node.storage_client();
367            let certs = storage.read_certificates(keys.into_iter()).await?;
368            for cert in certs {
369                self.send_confirmed_certificate(cert, delivery).await?;
370            }
371        }
372        if let Some(cert) = timeout {
373            if cert.value().chain_id() == chain_id {
374                // Timeouts are small and don't have blobs, so we can call `handle_certificate`
375                // directly.
376                self.remote_node.handle_timeout_certificate(cert).await?;
377            }
378        }
379        Ok(())
380    }
381
382    async fn send_chain_info_up_to_heights(
383        &mut self,
384        chain_heights: BTreeMap<ChainId, BlockHeight>,
385        delivery: CrossChainMessageDelivery,
386    ) -> Result<(), ChainClientError> {
387        let stream = stream::iter(chain_heights)
388            .map(|(chain_id, height)| {
389                let mut updater = self.clone();
390                async move {
391                    updater
392                        .send_chain_information(chain_id, height, delivery)
393                        .await
394                }
395            })
396            .buffer_unordered(self.chain_worker_count);
397        stream.try_collect::<Vec<_>>().await?;
398        Ok(())
399    }
400
401    async fn send_chain_information_for_senders(
402        &mut self,
403        chain_id: ChainId,
404    ) -> Result<(), ChainClientError> {
405        let mut sender_heights = BTreeMap::new();
406        {
407            let chain = self.local_node.chain_state_view(chain_id).await?;
408            let pairs = chain.inboxes.try_load_all_entries().await?;
409            for (origin, inbox) in pairs {
410                let inbox_next_height = inbox.next_block_height_to_receive()?;
411                sender_heights
412                    .entry(origin)
413                    .and_modify(|h| *h = inbox_next_height.max(*h))
414                    .or_insert(inbox_next_height);
415            }
416        }
417
418        self.send_chain_info_up_to_heights(sender_heights, CrossChainMessageDelivery::Blocking)
419            .await?;
420        Ok(())
421    }
422
423    pub async fn send_chain_update(
424        &mut self,
425        action: CommunicateAction,
426    ) -> Result<LiteVote, ChainClientError> {
427        let (target_block_height, chain_id) = match &action {
428            CommunicateAction::SubmitBlock { proposal, .. } => {
429                let block = &proposal.content.block;
430                (block.height, block.chain_id)
431            }
432            CommunicateAction::FinalizeBlock { certificate, .. } => (
433                certificate.inner().block().header.height,
434                certificate.inner().block().header.chain_id,
435            ),
436            CommunicateAction::RequestTimeout {
437                height, chain_id, ..
438            } => (*height, *chain_id),
439        };
440        // Update the validator with missing information, if needed.
441        let delivery = CrossChainMessageDelivery::NonBlocking;
442        self.send_chain_information(chain_id, target_block_height, delivery)
443            .await?;
444        // Send the block proposal, certificate or timeout request and return a vote.
445        let vote = match action {
446            CommunicateAction::SubmitBlock { proposal, blob_ids } => {
447                let info = self.send_block_proposal(proposal, blob_ids).await?;
448                info.manager.pending
449            }
450            CommunicateAction::FinalizeBlock {
451                certificate,
452                delivery,
453            } => {
454                let info = self
455                    .send_validated_certificate(*certificate, delivery)
456                    .await?;
457                info.manager.pending
458            }
459            CommunicateAction::RequestTimeout { .. } => {
460                let query = ChainInfoQuery::new(chain_id).with_timeout();
461                let info = self.remote_node.handle_chain_info_query(query).await?;
462                info.manager.timeout_vote
463            }
464        };
465        match vote {
466            Some(vote) if vote.public_key == self.remote_node.public_key => {
467                vote.check()?;
468                Ok(vote)
469            }
470            Some(_) | None => Err(NodeError::MissingVoteInValidatorResponse.into()),
471        }
472    }
473}