linera_core/
node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(not(web))]
6use futures::stream::BoxStream;
7#[cfg(web)]
8use futures::stream::LocalBoxStream as BoxStream;
9use futures::stream::Stream;
10use linera_base::{
11    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
12    data_types::{ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round},
13    identifiers::{BlobId, ChainId, EventId},
14};
15use linera_chain::{
16    data_types::BlockProposal,
17    types::{
18        ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
19        ValidatedBlock,
20    },
21    ChainError,
22};
23use linera_execution::{committee::Committee, ExecutionError};
24use linera_version::VersionInfo;
25use linera_views::ViewError;
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28
29use crate::{
30    data_types::{ChainInfoQuery, ChainInfoResponse},
31    worker::{Notification, WorkerError},
32};
33
34/// A pinned [`Stream`] of Notifications.
35pub type NotificationStream = BoxStream<'static, Notification>;
36
37/// Whether to wait for the delivery of outgoing cross-chain messages.
38#[derive(Debug, Default, Clone, Copy)]
39pub enum CrossChainMessageDelivery {
40    #[default]
41    NonBlocking,
42    Blocking,
43}
44
45/// How to communicate with a validator node.
46#[allow(async_fn_in_trait)]
47#[cfg_attr(not(web), trait_variant::make(Send))]
48pub trait ValidatorNode {
49    #[cfg(not(web))]
50    type NotificationStream: Stream<Item = Notification> + Unpin + Send;
51    #[cfg(web)]
52    type NotificationStream: Stream<Item = Notification> + Unpin;
53
54    fn address(&self) -> String;
55
56    /// Proposes a new block.
57    async fn handle_block_proposal(
58        &self,
59        proposal: BlockProposal,
60    ) -> Result<ChainInfoResponse, NodeError>;
61
62    /// Processes a certificate without a value.
63    async fn handle_lite_certificate(
64        &self,
65        certificate: LiteCertificate<'_>,
66        delivery: CrossChainMessageDelivery,
67    ) -> Result<ChainInfoResponse, NodeError>;
68
69    /// Processes a confirmed certificate.
70    async fn handle_confirmed_certificate(
71        &self,
72        certificate: GenericCertificate<ConfirmedBlock>,
73        delivery: CrossChainMessageDelivery,
74    ) -> Result<ChainInfoResponse, NodeError>;
75
76    /// Processes a validated certificate.
77    async fn handle_validated_certificate(
78        &self,
79        certificate: GenericCertificate<ValidatedBlock>,
80    ) -> Result<ChainInfoResponse, NodeError>;
81
82    /// Processes a timeout certificate.
83    async fn handle_timeout_certificate(
84        &self,
85        certificate: GenericCertificate<Timeout>,
86    ) -> Result<ChainInfoResponse, NodeError>;
87
88    /// Handles information queries for this chain.
89    async fn handle_chain_info_query(
90        &self,
91        query: ChainInfoQuery,
92    ) -> Result<ChainInfoResponse, NodeError>;
93
94    /// Gets the version info for this validator node.
95    async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
96
97    /// Gets the network's description.
98    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
99
100    /// Subscribes to receiving notifications for a collection of chains.
101    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
102
103    // Uploads a blob. Returns an error if the validator has not seen a
104    // certificate using this blob.
105    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
106
107    /// Uploads the blobs to the validator.
108    // Unfortunately, this doesn't compile as an async function: async functions in traits
109    // don't play well with default implementations, apparently.
110    // See also https://github.com/rust-lang/impl-trait-utils/issues/17
111    fn upload_blobs(
112        &self,
113        blobs: Vec<Blob>,
114    ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
115        let tasks: Vec<_> = blobs
116            .into_iter()
117            .map(|blob| self.upload_blob(blob.into()))
118            .collect();
119        futures::future::try_join_all(tasks)
120    }
121
122    /// Downloads a blob. Returns an error if the validator does not have the blob.
123    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
124
125    /// Downloads a blob that belongs to a pending proposal or the locking block on a chain.
126    async fn download_pending_blob(
127        &self,
128        chain_id: ChainId,
129        blob_id: BlobId,
130    ) -> Result<BlobContent, NodeError>;
131
132    /// Handles a blob that belongs to a pending proposal or validated block certificate.
133    async fn handle_pending_blob(
134        &self,
135        chain_id: ChainId,
136        blob: BlobContent,
137    ) -> Result<ChainInfoResponse, NodeError>;
138
139    async fn download_certificate(
140        &self,
141        hash: CryptoHash,
142    ) -> Result<ConfirmedBlockCertificate, NodeError>;
143
144    /// Requests a batch of certificates from the validator.
145    async fn download_certificates(
146        &self,
147        hashes: Vec<CryptoHash>,
148    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
149
150    /// Requests a batch of certificates from a specific chain by heights.
151    async fn download_certificates_by_heights(
152        &self,
153        chain_id: ChainId,
154        heights: Vec<BlockHeight>,
155    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
156
157    /// Returns the hash of the `Certificate` that last used a blob.
158    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
159
160    /// Returns the certificate that last used the blob.
161    async fn blob_last_used_by_certificate(
162        &self,
163        blob_id: BlobId,
164    ) -> Result<ConfirmedBlockCertificate, NodeError>;
165
166    /// Returns the missing `Blob`s by their IDs.
167    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
168
169    /// Gets shard information for a specific chain.
170    async fn get_shard_info(
171        &self,
172        chain_id: ChainId,
173    ) -> Result<crate::data_types::ShardInfo, NodeError>;
174}
175
176/// Turn an address into a validator node.
177#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
178pub trait ValidatorNodeProvider: 'static {
179    #[cfg(not(web))]
180    type Node: ValidatorNode + Send + Sync + Clone + 'static;
181    #[cfg(web)]
182    type Node: ValidatorNode + Clone + 'static;
183
184    fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
185
186    fn make_nodes(
187        &self,
188        committee: &Committee,
189    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
190        let validator_addresses: Vec<_> = committee
191            .validator_addresses()
192            .map(|(node, name)| (node, name.to_owned()))
193            .collect();
194        self.make_nodes_from_list(validator_addresses)
195    }
196
197    fn make_nodes_from_list<A>(
198        &self,
199        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
200    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
201    where
202        A: AsRef<str>,
203    {
204        Ok(validators
205            .into_iter()
206            .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
207            .collect::<Result<Vec<_>, NodeError>>()?
208            .into_iter())
209    }
210}
211
212/// Error type for node queries.
213///
214/// This error is meant to be serialized over the network and aggregated by clients (i.e.
215/// clients will track validator votes on each error value).
216#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
217pub enum NodeError {
218    #[error("Cryptographic error: {error}")]
219    CryptoError { error: String },
220
221    #[error("Arithmetic error: {error}")]
222    ArithmeticError { error: String },
223
224    #[error("Error while accessing storage: {error}")]
225    ViewError { error: String },
226
227    #[error("Chain error: {error}")]
228    ChainError { error: String },
229
230    #[error("Worker error: {error}")]
231    WorkerError { error: String },
232
233    // This error must be normalized during conversions.
234    #[error("The chain {0} is not active in validator")]
235    InactiveChain(ChainId),
236
237    #[error("Round number should be {0:?}")]
238    WrongRound(Round),
239
240    #[error(
241        "Chain is expecting a next block at height {expected_block_height} but the given block \
242        is at height {found_block_height} instead"
243    )]
244    UnexpectedBlockHeight {
245        expected_block_height: BlockHeight,
246        found_block_height: BlockHeight,
247    },
248
249    // This error must be normalized during conversions.
250    #[error(
251        "Cannot vote for block proposal of chain {chain_id} because a message \
252         from chain {origin} at height {height} has not been received yet"
253    )]
254    MissingCrossChainUpdate {
255        chain_id: ChainId,
256        origin: ChainId,
257        height: BlockHeight,
258    },
259
260    #[error("Blobs not found: {0:?}")]
261    BlobsNotFound(Vec<BlobId>),
262
263    #[error("Events not found: {0:?}")]
264    EventsNotFound(Vec<EventId>),
265
266    // This error must be normalized during conversions.
267    #[error("We don't have the value for the certificate.")]
268    MissingCertificateValue,
269
270    #[error("Response doesn't contain requested certificates: {0:?}")]
271    MissingCertificates(Vec<CryptoHash>),
272
273    #[error("Validator's response failed to include a vote when trying to {0}")]
274    MissingVoteInValidatorResponse(String),
275
276    #[error("The received chain info response is invalid")]
277    InvalidChainInfoResponse,
278    #[error("Unexpected certificate value")]
279    UnexpectedCertificateValue,
280
281    // Networking errors.
282    // TODO(#258): These errors should be defined in linera-rpc.
283    #[error("Cannot deserialize")]
284    InvalidDecoding,
285    #[error("Unexpected message")]
286    UnexpectedMessage,
287    #[error("Grpc error: {error}")]
288    GrpcError { error: String },
289    #[error("Network error while querying service: {error}")]
290    ClientIoError { error: String },
291    #[error("Failed to resolve validator address: {address}")]
292    CannotResolveValidatorAddress { address: String },
293    #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
294    SubscriptionError { transport: String },
295    #[error("Failed to subscribe; tonic status: {status:?}")]
296    SubscriptionFailed { status: String },
297
298    #[error("Node failed to provide a 'last used by' certificate for the blob")]
299    InvalidCertificateForBlob(BlobId),
300    #[error("Node returned a BlobsNotFound error with duplicates")]
301    DuplicatesInBlobsNotFound,
302    #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
303    UnexpectedEntriesInBlobsNotFound,
304    #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
305    UnexpectedCertificates {
306        returned: Vec<CryptoHash>,
307        requested: Vec<CryptoHash>,
308    },
309    #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
310    EmptyBlobsNotFound,
311    #[error("Local error handling validator response: {error}")]
312    ResponseHandlingError { error: String },
313
314    #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
315    MissingCertificatesByHeights {
316        chain_id: ChainId,
317        heights: Vec<BlockHeight>,
318    },
319
320    #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
321    TooManyCertificatesReturned {
322        chain_id: ChainId,
323        remote_node: Box<ValidatorPublicKey>,
324    },
325}
326
327impl From<tonic::Status> for NodeError {
328    fn from(status: tonic::Status) -> Self {
329        Self::GrpcError {
330            error: status.to_string(),
331        }
332    }
333}
334
335impl CrossChainMessageDelivery {
336    pub fn new(wait_for_outgoing_messages: bool) -> Self {
337        if wait_for_outgoing_messages {
338            CrossChainMessageDelivery::Blocking
339        } else {
340            CrossChainMessageDelivery::NonBlocking
341        }
342    }
343
344    pub fn wait_for_outgoing_messages(self) -> bool {
345        match self {
346            CrossChainMessageDelivery::NonBlocking => false,
347            CrossChainMessageDelivery::Blocking => true,
348        }
349    }
350}
351
352impl From<ViewError> for NodeError {
353    fn from(error: ViewError) -> Self {
354        Self::ViewError {
355            error: error.to_string(),
356        }
357    }
358}
359
360impl From<ArithmeticError> for NodeError {
361    fn from(error: ArithmeticError) -> Self {
362        Self::ArithmeticError {
363            error: error.to_string(),
364        }
365    }
366}
367
368impl From<CryptoError> for NodeError {
369    fn from(error: CryptoError) -> Self {
370        Self::CryptoError {
371            error: error.to_string(),
372        }
373    }
374}
375
376impl From<ChainError> for NodeError {
377    fn from(error: ChainError) -> Self {
378        match error {
379            ChainError::MissingCrossChainUpdate {
380                chain_id,
381                origin,
382                height,
383            } => Self::MissingCrossChainUpdate {
384                chain_id,
385                origin,
386                height,
387            },
388            ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
389            ChainError::ExecutionError(execution_error, context) => match *execution_error {
390                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
391                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
392                _ => Self::ChainError {
393                    error: ChainError::ExecutionError(execution_error, context).to_string(),
394                },
395            },
396            ChainError::UnexpectedBlockHeight {
397                expected_block_height,
398                found_block_height,
399            } => Self::UnexpectedBlockHeight {
400                expected_block_height,
401                found_block_height,
402            },
403            ChainError::WrongRound(round) => Self::WrongRound(round),
404            error => Self::ChainError {
405                error: error.to_string(),
406            },
407        }
408    }
409}
410
411impl From<WorkerError> for NodeError {
412    fn from(error: WorkerError) -> Self {
413        match error {
414            WorkerError::ChainError(error) => (*error).into(),
415            WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
416            WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
417            WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
418            WorkerError::UnexpectedBlockHeight {
419                expected_block_height,
420                found_block_height,
421            } => NodeError::UnexpectedBlockHeight {
422                expected_block_height,
423                found_block_height,
424            },
425            error => Self::WorkerError {
426                error: error.to_string(),
427            },
428        }
429    }
430}