linera_core/
remote_node.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{HashSet, VecDeque};
5
6use custom_debug_derive::Debug;
7use futures::future::try_join_all;
8use linera_base::{
9    crypto::ValidatorPublicKey,
10    data_types::{Blob, BlockHeight},
11    ensure,
12    identifiers::{BlobId, ChainId},
13};
14use linera_chain::{
15    data_types::BlockProposal,
16    types::{
17        CertificateValue, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate,
18        TimeoutCertificate, ValidatedBlockCertificate,
19    },
20};
21use tracing::{debug, info, instrument};
22
23use crate::{
24    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
25    node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
26};
27
28/// A validator node together with the validator's name.
29#[derive(Clone, Debug)]
30pub struct RemoteNode<N> {
31    pub public_key: ValidatorPublicKey,
32    #[debug(skip)]
33    pub node: N,
34}
35
36impl<N: ValidatorNode> RemoteNode<N> {
37    pub(crate) async fn handle_chain_info_query(
38        &self,
39        query: ChainInfoQuery,
40    ) -> Result<Box<ChainInfo>, NodeError> {
41        let chain_id = query.chain_id;
42        let response = self.node.handle_chain_info_query(query).await?;
43        self.check_and_return_info(response, chain_id)
44    }
45
46    #[instrument(level = "trace")]
47    pub(crate) async fn handle_block_proposal(
48        &self,
49        proposal: Box<BlockProposal>,
50    ) -> Result<Box<ChainInfo>, NodeError> {
51        let chain_id = proposal.content.block.chain_id;
52        let response = self.node.handle_block_proposal(*proposal).await?;
53        self.check_and_return_info(response, chain_id)
54    }
55
56    pub(crate) async fn handle_timeout_certificate(
57        &self,
58        certificate: TimeoutCertificate,
59    ) -> Result<Box<ChainInfo>, NodeError> {
60        let chain_id = certificate.inner().chain_id();
61        let response = self.node.handle_timeout_certificate(certificate).await?;
62        self.check_and_return_info(response, chain_id)
63    }
64
65    pub(crate) async fn handle_confirmed_certificate(
66        &self,
67        certificate: ConfirmedBlockCertificate,
68        delivery: CrossChainMessageDelivery,
69    ) -> Result<Box<ChainInfo>, NodeError> {
70        let chain_id = certificate.inner().chain_id();
71        let response = self
72            .node
73            .handle_confirmed_certificate(certificate, delivery)
74            .await?;
75        self.check_and_return_info(response, chain_id)
76    }
77
78    pub(crate) async fn handle_validated_certificate(
79        &self,
80        certificate: ValidatedBlockCertificate,
81    ) -> Result<Box<ChainInfo>, NodeError> {
82        let chain_id = certificate.inner().chain_id();
83        let response = self.node.handle_validated_certificate(certificate).await?;
84        self.check_and_return_info(response, chain_id)
85    }
86
87    #[instrument(level = "trace")]
88    pub(crate) async fn handle_lite_certificate(
89        &self,
90        certificate: LiteCertificate<'_>,
91        delivery: CrossChainMessageDelivery,
92    ) -> Result<Box<ChainInfo>, NodeError> {
93        let chain_id = certificate.value.chain_id;
94        let response = self
95            .node
96            .handle_lite_certificate(certificate, delivery)
97            .await?;
98        self.check_and_return_info(response, chain_id)
99    }
100
101    pub(crate) async fn handle_optimized_validated_certificate(
102        &mut self,
103        certificate: &ValidatedBlockCertificate,
104        delivery: CrossChainMessageDelivery,
105    ) -> Result<Box<ChainInfo>, NodeError> {
106        if certificate.is_signed_by(&self.public_key) {
107            let result = self
108                .handle_lite_certificate(certificate.lite_certificate(), delivery)
109                .await;
110            match result {
111                Err(NodeError::MissingCertificateValue) => {
112                    debug!(
113                        address = self.address(),
114                        certificate_hash = %certificate.hash(),
115                        "validator forgot a validated block value that they signed before",
116                    );
117                }
118                _ => return result,
119            }
120        }
121        self.handle_validated_certificate(certificate.clone()).await
122    }
123
124    pub(crate) async fn handle_optimized_confirmed_certificate(
125        &mut self,
126        certificate: &ConfirmedBlockCertificate,
127        delivery: CrossChainMessageDelivery,
128    ) -> Result<Box<ChainInfo>, NodeError> {
129        if certificate.is_signed_by(&self.public_key) {
130            let result = self
131                .handle_lite_certificate(certificate.lite_certificate(), delivery)
132                .await;
133            match result {
134                Err(NodeError::MissingCertificateValue) => {
135                    debug!(
136                        address = self.address(),
137                        certificate_hash = %certificate.hash(),
138                        "validator forgot a confirmed block value that they signed before",
139                    );
140                }
141                _ => return result,
142            }
143        }
144        self.handle_confirmed_certificate(certificate.clone(), delivery)
145            .await
146    }
147
148    fn check_and_return_info(
149        &self,
150        response: ChainInfoResponse,
151        chain_id: ChainId,
152    ) -> Result<Box<ChainInfo>, NodeError> {
153        let manager = &response.info.manager;
154        let proposed = manager.requested_proposed.as_ref();
155        let locking = manager.requested_locking.as_ref();
156        ensure!(
157            proposed.is_none_or(|proposal| proposal.content.block.chain_id == chain_id)
158                && locking.is_none_or(|cert| cert.chain_id() == chain_id)
159                && response.check(self.public_key).is_ok(),
160            NodeError::InvalidChainInfoResponse
161        );
162        Ok(response.info)
163    }
164
165    #[instrument(level = "trace")]
166    pub(crate) async fn download_certificate_for_blob(
167        &self,
168        blob_id: BlobId,
169    ) -> Result<ConfirmedBlockCertificate, NodeError> {
170        let certificate = self.node.blob_last_used_by_certificate(blob_id).await?;
171        if !certificate.block().requires_or_creates_blob(&blob_id) {
172            info!(
173                address = self.address(),
174                %blob_id,
175                "got invalid last used by certificate for blob from validator",
176            );
177            return Err(NodeError::InvalidCertificateForBlob(blob_id));
178        }
179        Ok(certificate)
180    }
181
182    /// Sends a pending validated block's blobs to the validator.
183    #[instrument(level = "trace")]
184    pub(crate) async fn send_pending_blobs(
185        &self,
186        chain_id: ChainId,
187        blobs: Vec<Blob>,
188    ) -> Result<(), NodeError> {
189        let tasks = blobs
190            .into_iter()
191            .map(|blob| self.node.handle_pending_blob(chain_id, blob.into_content()));
192        try_join_all(tasks).await?;
193        Ok(())
194    }
195
196    #[instrument(level = "trace")]
197    pub async fn download_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, NodeError> {
198        match self.node.download_blob(blob_id).await {
199            Ok(blob) => {
200                let blob = Blob::new(blob);
201                if blob.id() != blob_id {
202                    tracing::info!(
203                        address = self.address(),
204                        %blob_id,
205                        "validator sent an invalid blob.",
206                    );
207                    Ok(None)
208                } else {
209                    Ok(Some(blob))
210                }
211            }
212            Err(NodeError::BlobsNotFound(_error)) => {
213                tracing::debug!(
214                    ?blob_id,
215                    address = self.address(),
216                    "validator is missing the blob",
217                );
218                Ok(None)
219            }
220            Err(error) => Err(error),
221        }
222    }
223
224    /// Downloads a list of certificates from the given chain.
225    #[instrument(level = "trace")]
226    pub async fn download_certificates_by_heights(
227        &self,
228        chain_id: ChainId,
229        heights: Vec<BlockHeight>,
230    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
231        let mut expected_heights = VecDeque::from(heights.clone());
232        let certificates = self
233            .node
234            .download_certificates_by_heights(chain_id, heights)
235            .await?;
236
237        if certificates.len() > expected_heights.len() {
238            return Err(NodeError::TooManyCertificatesReturned {
239                chain_id,
240                remote_node: Box::new(self.public_key),
241            });
242        }
243
244        for certificate in &certificates {
245            ensure!(
246                certificate.inner().chain_id() == chain_id,
247                NodeError::UnexpectedCertificateValue
248            );
249            if let Some(expected_height) = expected_heights.pop_front() {
250                ensure!(
251                    expected_height == certificate.inner().height(),
252                    NodeError::UnexpectedCertificateValue
253                );
254            } else {
255                return Err(NodeError::UnexpectedCertificateValue);
256            }
257        }
258
259        ensure!(
260            expected_heights.is_empty(),
261            NodeError::MissingCertificatesByHeights {
262                chain_id,
263                heights: expected_heights.into_iter().collect(),
264            }
265        );
266        Ok(certificates)
267    }
268
269    /// Checks that requesting these blobs when trying to handle this certificate is legitimate,
270    /// i.e. that there are no duplicates and the blobs are actually required.
271    pub fn check_blobs_not_found<T: CertificateValue>(
272        &self,
273        certificate: &GenericCertificate<T>,
274        blob_ids: &[BlobId],
275    ) -> Result<(), NodeError> {
276        ensure!(!blob_ids.is_empty(), NodeError::EmptyBlobsNotFound);
277        let required = certificate.inner().required_blob_ids();
278        for blob_id in blob_ids {
279            if !required.contains(blob_id) {
280                info!(
281                    address = self.address(),
282                    %blob_id,
283                    "validator requested blob but it is not required",
284                );
285                return Err(NodeError::UnexpectedEntriesInBlobsNotFound);
286            }
287        }
288        let unique_missing_blob_ids = blob_ids.iter().copied().collect::<HashSet<_>>();
289        if blob_ids.len() > unique_missing_blob_ids.len() {
290            info!(
291                address = self.address(),
292                "blobs requested by validator contain duplicates",
293            );
294            return Err(NodeError::DuplicatesInBlobsNotFound);
295        }
296        Ok(())
297    }
298
299    /// Returns the validator's URL.
300    pub fn address(&self) -> String {
301        self.node.address()
302    }
303}
304
305impl<N: ValidatorNode> PartialEq for RemoteNode<N> {
306    fn eq(&self, other: &Self) -> bool {
307        self.public_key == other.public_key
308    }
309}
310
311impl<N: ValidatorNode> Eq for RemoteNode<N> {}