linera_core/
remote_node.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::HashSet, time::Duration};
5
6use custom_debug_derive::Debug;
7use futures::{future::try_join_all, stream::FuturesUnordered, StreamExt};
8use linera_base::{
9    crypto::{CryptoHash, ValidatorPublicKey},
10    data_types::{Blob, BlockHeight},
11    ensure,
12    identifiers::{BlobId, ChainId},
13};
14use linera_chain::{
15    data_types::BlockProposal,
16    types::{
17        CertificateValue, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate,
18        TimeoutCertificate, ValidatedBlockCertificate,
19    },
20};
21use rand::seq::SliceRandom as _;
22use tracing::{instrument, warn};
23
24use crate::{
25    data_types::{BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse},
26    node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
27};
28
29/// A validator node together with the validator's name.
30#[derive(Clone, Debug)]
31pub struct RemoteNode<N> {
32    pub public_key: ValidatorPublicKey,
33    #[debug(skip)]
34    pub node: N,
35}
36
37impl<N: ValidatorNode> RemoteNode<N> {
38    pub(crate) async fn handle_chain_info_query(
39        &self,
40        query: ChainInfoQuery,
41    ) -> Result<Box<ChainInfo>, NodeError> {
42        let chain_id = query.chain_id;
43        let response = self.node.handle_chain_info_query(query).await?;
44        self.check_and_return_info(response, chain_id)
45    }
46
47    #[instrument(level = "trace")]
48    pub(crate) async fn handle_block_proposal(
49        &self,
50        proposal: Box<BlockProposal>,
51    ) -> Result<Box<ChainInfo>, NodeError> {
52        let chain_id = proposal.content.block.chain_id;
53        let response = self.node.handle_block_proposal(*proposal).await?;
54        self.check_and_return_info(response, chain_id)
55    }
56
57    pub(crate) async fn handle_timeout_certificate(
58        &self,
59        certificate: TimeoutCertificate,
60    ) -> Result<Box<ChainInfo>, NodeError> {
61        let chain_id = certificate.inner().chain_id();
62        let response = self.node.handle_timeout_certificate(certificate).await?;
63        self.check_and_return_info(response, chain_id)
64    }
65
66    pub(crate) async fn handle_confirmed_certificate(
67        &self,
68        certificate: ConfirmedBlockCertificate,
69        delivery: CrossChainMessageDelivery,
70    ) -> Result<Box<ChainInfo>, NodeError> {
71        let chain_id = certificate.inner().chain_id();
72        let response = self
73            .node
74            .handle_confirmed_certificate(certificate, delivery)
75            .await?;
76        self.check_and_return_info(response, chain_id)
77    }
78
79    pub(crate) async fn handle_validated_certificate(
80        &self,
81        certificate: ValidatedBlockCertificate,
82    ) -> Result<Box<ChainInfo>, NodeError> {
83        let chain_id = certificate.inner().chain_id();
84        let response = self.node.handle_validated_certificate(certificate).await?;
85        self.check_and_return_info(response, chain_id)
86    }
87
88    #[instrument(level = "trace")]
89    pub(crate) async fn handle_lite_certificate(
90        &self,
91        certificate: LiteCertificate<'_>,
92        delivery: CrossChainMessageDelivery,
93    ) -> Result<Box<ChainInfo>, NodeError> {
94        let chain_id = certificate.value.chain_id;
95        let response = self
96            .node
97            .handle_lite_certificate(certificate, delivery)
98            .await?;
99        self.check_and_return_info(response, chain_id)
100    }
101
102    pub(crate) async fn handle_optimized_validated_certificate(
103        &mut self,
104        certificate: &ValidatedBlockCertificate,
105        delivery: CrossChainMessageDelivery,
106    ) -> Result<Box<ChainInfo>, NodeError> {
107        if certificate.is_signed_by(&self.public_key) {
108            let result = self
109                .handle_lite_certificate(certificate.lite_certificate(), delivery)
110                .await;
111            match result {
112                Err(NodeError::MissingCertificateValue) => {
113                    warn!(
114                        "Validator {} forgot a certificate value that they signed before",
115                        self.public_key
116                    );
117                }
118                _ => return result,
119            }
120        }
121        self.handle_validated_certificate(certificate.clone()).await
122    }
123
124    pub(crate) async fn handle_optimized_confirmed_certificate(
125        &mut self,
126        certificate: &ConfirmedBlockCertificate,
127        delivery: CrossChainMessageDelivery,
128    ) -> Result<Box<ChainInfo>, NodeError> {
129        if certificate.is_signed_by(&self.public_key) {
130            let result = self
131                .handle_lite_certificate(certificate.lite_certificate(), delivery)
132                .await;
133            match result {
134                Err(NodeError::MissingCertificateValue) => {
135                    warn!(
136                        "Validator {} forgot a certificate value that they signed before",
137                        self.public_key
138                    );
139                }
140                _ => return result,
141            }
142        }
143        self.handle_confirmed_certificate(certificate.clone(), delivery)
144            .await
145    }
146
147    fn check_and_return_info(
148        &self,
149        response: ChainInfoResponse,
150        chain_id: ChainId,
151    ) -> Result<Box<ChainInfo>, NodeError> {
152        let manager = &response.info.manager;
153        let proposed = manager.requested_proposed.as_ref();
154        let locking = manager.requested_locking.as_ref();
155        ensure!(
156            proposed.is_none_or(|proposal| proposal.content.block.chain_id == chain_id)
157                && locking.is_none_or(|cert| cert.chain_id() == chain_id)
158                && response.check(self.public_key).is_ok(),
159            NodeError::InvalidChainInfoResponse
160        );
161        Ok(response.info)
162    }
163
164    #[instrument(level = "trace", skip_all)]
165    pub(crate) async fn query_certificates_from(
166        &self,
167        chain_id: ChainId,
168        start: BlockHeight,
169        limit: u64,
170    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
171        tracing::debug!(name = ?self.public_key, ?chain_id, ?start, ?limit, "Querying certificates");
172        let range = BlockHeightRange {
173            start,
174            limit: Some(limit),
175        };
176        let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(range);
177        let info = self.handle_chain_info_query(query).await?;
178        self.node
179            .download_certificates(info.requested_sent_certificate_hashes)
180            .await?
181            .into_iter()
182            .map(|c| {
183                ensure!(
184                    c.inner().chain_id() == chain_id,
185                    NodeError::UnexpectedCertificateValue
186                );
187                ConfirmedBlockCertificate::try_from(c)
188                    .map_err(|_| NodeError::InvalidChainInfoResponse)
189            })
190            .collect()
191    }
192
193    #[instrument(level = "trace")]
194    pub(crate) async fn download_certificate_for_blob(
195        &self,
196        blob_id: BlobId,
197    ) -> Result<ConfirmedBlockCertificate, NodeError> {
198        let last_used_hash = self.node.blob_last_used_by(blob_id).await?;
199        let certificate = self.node.download_certificate(last_used_hash).await?;
200        if !certificate.block().requires_or_creates_blob(&blob_id) {
201            warn!(
202                "Got invalid last used by certificate for blob {} from validator {}",
203                blob_id, self.public_key
204            );
205            return Err(NodeError::InvalidCertificateForBlob(blob_id));
206        }
207        Ok(certificate)
208    }
209
210    /// Sends a pending validated block's blobs to the validator.
211    #[instrument(level = "trace")]
212    pub(crate) async fn send_pending_blobs(
213        &self,
214        chain_id: ChainId,
215        blobs: Vec<Blob>,
216    ) -> Result<(), NodeError> {
217        let tasks = blobs
218            .into_iter()
219            .map(|blob| self.node.handle_pending_blob(chain_id, blob.into_content()));
220        try_join_all(tasks).await?;
221        Ok(())
222    }
223
224    #[instrument(level = "trace")]
225    async fn try_download_blob(&self, blob_id: BlobId) -> Option<Blob> {
226        match self.node.download_blob(blob_id).await {
227            Ok(blob) => {
228                let blob = Blob::new(blob);
229                if blob.id() != blob_id {
230                    tracing::info!(
231                        "Validator {} sent an invalid blob {blob_id}.",
232                        self.public_key
233                    );
234                    None
235                } else {
236                    Some(blob)
237                }
238            }
239            Err(error) => {
240                tracing::debug!(
241                    "Failed to fetch blob {blob_id} from validator {}: {error}",
242                    self.public_key
243                );
244                None
245            }
246        }
247    }
248
249    /// Returns the list of certificate hashes on the given chain in the given range of heights.
250    /// Returns an error if the number of hashes does not match the size of the range.
251    #[instrument(level = "trace")]
252    pub(crate) async fn fetch_sent_certificate_hashes(
253        &self,
254        chain_id: ChainId,
255        range: BlockHeightRange,
256    ) -> Result<Vec<CryptoHash>, NodeError> {
257        let query =
258            ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_in_range(range.clone());
259        let response = self.handle_chain_info_query(query).await?;
260        let hashes = response.requested_sent_certificate_hashes;
261
262        if range
263            .limit
264            .is_some_and(|limit| hashes.len() as u64 != limit)
265        {
266            warn!(
267                ?range,
268                received_num = hashes.len(),
269                "Validator sent invalid number of certificate hashes."
270            );
271            return Err(NodeError::InvalidChainInfoResponse);
272        }
273        Ok(hashes)
274    }
275
276    #[instrument(level = "trace")]
277    pub async fn download_certificates(
278        &self,
279        hashes: Vec<CryptoHash>,
280    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
281        if hashes.is_empty() {
282            return Ok(Vec::new());
283        }
284        let certificates = self.node.download_certificates(hashes.clone()).await?;
285        let returned = certificates
286            .iter()
287            .map(ConfirmedBlockCertificate::hash)
288            .collect();
289        ensure!(
290            returned == hashes,
291            NodeError::UnexpectedCertificates {
292                returned,
293                requested: hashes
294            }
295        );
296        Ok(certificates)
297    }
298
299    /// Downloads a blob, but does not verify if it has actually been published and
300    /// accepted by a quorum of validators.
301    #[instrument(level = "trace", skip(validators))]
302    pub async fn download_blob(
303        validators: &[Self],
304        blob_id: BlobId,
305        timeout: Duration,
306    ) -> Option<Blob> {
307        // Sequentially try each validator in random order.
308        let mut validators = validators.iter().collect::<Vec<_>>();
309        validators.shuffle(&mut rand::thread_rng());
310        let mut stream = validators
311            .into_iter()
312            .zip(0..)
313            .map(|(remote_node, i)| async move {
314                linera_base::time::timer::sleep(timeout * i * i).await;
315                remote_node.try_download_blob(blob_id).await
316            })
317            .collect::<FuturesUnordered<_>>();
318        while let Some(maybe_blob) = stream.next().await {
319            if let Some(blob) = maybe_blob {
320                return Some(blob);
321            }
322        }
323        None
324    }
325
326    /// Downloads the blobs with the given IDs. This is done in one concurrent task per block.
327    /// Each task goes through the validators sequentially in random order and tries to download
328    /// it. Returns `None` if it couldn't find all blobs.
329    #[instrument(level = "trace", skip(validators))]
330    pub async fn download_blobs(
331        blob_ids: &[BlobId],
332        validators: &[Self],
333        timeout: Duration,
334    ) -> Option<Vec<Blob>> {
335        let mut stream = blob_ids
336            .iter()
337            .map(|blob_id| Self::download_blob(validators, *blob_id, timeout))
338            .collect::<FuturesUnordered<_>>();
339        let mut blobs = Vec::new();
340        while let Some(maybe_blob) = stream.next().await {
341            blobs.push(maybe_blob?);
342        }
343        Some(blobs)
344    }
345
346    /// Checks that requesting these blobs when trying to handle this certificate is legitimate,
347    /// i.e. that there are no duplicates and the blobs are actually required.
348    pub fn check_blobs_not_found<T: CertificateValue>(
349        &self,
350        certificate: &GenericCertificate<T>,
351        blob_ids: &[BlobId],
352    ) -> Result<(), NodeError> {
353        ensure!(!blob_ids.is_empty(), NodeError::EmptyBlobsNotFound);
354        let required = certificate.inner().required_blob_ids();
355        let public_key = &self.public_key;
356        for blob_id in blob_ids {
357            if !required.contains(blob_id) {
358                warn!("validator {public_key} requested blob {blob_id:?} but it is not required");
359                return Err(NodeError::UnexpectedEntriesInBlobsNotFound);
360            }
361        }
362        let unique_missing_blob_ids = blob_ids.iter().cloned().collect::<HashSet<_>>();
363        if blob_ids.len() > unique_missing_blob_ids.len() {
364            warn!("blobs requested by validator {public_key} contain duplicates");
365            return Err(NodeError::DuplicatesInBlobsNotFound);
366        }
367        Ok(())
368    }
369}