linera_core/
node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(not(web))]
6use futures::stream::BoxStream;
7#[cfg(web)]
8use futures::stream::LocalBoxStream as BoxStream;
9use futures::stream::Stream;
10use linera_base::{
11    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
12    data_types::{ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round},
13    identifiers::{BlobId, ChainId, EventId},
14};
15use linera_chain::{
16    data_types::BlockProposal,
17    types::{
18        ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
19        ValidatedBlock,
20    },
21    ChainError,
22};
23use linera_execution::{committee::Committee, ExecutionError};
24use linera_version::VersionInfo;
25use linera_views::ViewError;
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28
29use crate::{
30    data_types::{ChainInfoQuery, ChainInfoResponse},
31    worker::{Notification, WorkerError},
32};
33
34/// A pinned [`Stream`] of Notifications.
35pub type NotificationStream = BoxStream<'static, Notification>;
36
37/// Whether to wait for the delivery of outgoing cross-chain messages.
38#[derive(Debug, Default, Clone, Copy)]
39pub enum CrossChainMessageDelivery {
40    #[default]
41    NonBlocking,
42    Blocking,
43}
44
45/// How to communicate with a validator node.
46#[allow(async_fn_in_trait)]
47#[cfg_attr(not(web), trait_variant::make(Send))]
48pub trait ValidatorNode {
49    #[cfg(not(web))]
50    type NotificationStream: Stream<Item = Notification> + Unpin + Send;
51    #[cfg(web)]
52    type NotificationStream: Stream<Item = Notification> + Unpin;
53
54    /// Proposes a new block.
55    async fn handle_block_proposal(
56        &self,
57        proposal: BlockProposal,
58    ) -> Result<ChainInfoResponse, NodeError>;
59
60    /// Processes a certificate without a value.
61    async fn handle_lite_certificate(
62        &self,
63        certificate: LiteCertificate<'_>,
64        delivery: CrossChainMessageDelivery,
65    ) -> Result<ChainInfoResponse, NodeError>;
66
67    /// Processes a confirmed certificate.
68    async fn handle_confirmed_certificate(
69        &self,
70        certificate: GenericCertificate<ConfirmedBlock>,
71        delivery: CrossChainMessageDelivery,
72    ) -> Result<ChainInfoResponse, NodeError>;
73
74    /// Processes a validated certificate.
75    async fn handle_validated_certificate(
76        &self,
77        certificate: GenericCertificate<ValidatedBlock>,
78    ) -> Result<ChainInfoResponse, NodeError>;
79
80    /// Processes a timeout certificate.
81    async fn handle_timeout_certificate(
82        &self,
83        certificate: GenericCertificate<Timeout>,
84    ) -> Result<ChainInfoResponse, NodeError>;
85
86    /// Handles information queries for this chain.
87    async fn handle_chain_info_query(
88        &self,
89        query: ChainInfoQuery,
90    ) -> Result<ChainInfoResponse, NodeError>;
91
92    /// Gets the version info for this validator node.
93    async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
94
95    /// Gets the network's description.
96    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
97
98    /// Subscribes to receiving notifications for a collection of chains.
99    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
100
101    // Uploads a blob. Returns an error if the validator has not seen a
102    // certificate using this blob.
103    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
104
105    /// Uploads the blobs to the validator.
106    // Unfortunately, this doesn't compile as an async function: async functions in traits
107    // don't play well with default implementations, apparently.
108    // See also https://github.com/rust-lang/impl-trait-utils/issues/17
109    fn upload_blobs(
110        &self,
111        blobs: Vec<Blob>,
112    ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
113        let tasks: Vec<_> = blobs
114            .into_iter()
115            .map(|blob| self.upload_blob(blob.into()))
116            .collect();
117        futures::future::try_join_all(tasks)
118    }
119
120    /// Downloads a blob. Returns an error if the validator does not have the blob.
121    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
122
123    /// Downloads a blob that belongs to a pending proposal or the locking block on a chain.
124    async fn download_pending_blob(
125        &self,
126        chain_id: ChainId,
127        blob_id: BlobId,
128    ) -> Result<BlobContent, NodeError>;
129
130    /// Handles a blob that belongs to a pending proposal or validated block certificate.
131    async fn handle_pending_blob(
132        &self,
133        chain_id: ChainId,
134        blob: BlobContent,
135    ) -> Result<ChainInfoResponse, NodeError>;
136
137    async fn download_certificate(
138        &self,
139        hash: CryptoHash,
140    ) -> Result<ConfirmedBlockCertificate, NodeError>;
141
142    /// Requests a batch of certificates from the validator.
143    async fn download_certificates(
144        &self,
145        hashes: Vec<CryptoHash>,
146    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
147
148    /// Requests a batch of certificates from a specific chain by heights.
149    async fn download_certificates_by_heights(
150        &self,
151        chain_id: ChainId,
152        heights: Vec<BlockHeight>,
153    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
154
155    /// Returns the hash of the `Certificate` that last used a blob.
156    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
157
158    /// Returns the certificate that last used the blob.
159    async fn blob_last_used_by_certificate(
160        &self,
161        blob_id: BlobId,
162    ) -> Result<ConfirmedBlockCertificate, NodeError>;
163
164    /// Returns the missing `Blob`s by their IDs.
165    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
166
167    /// Gets shard information for a specific chain.
168    async fn get_shard_info(
169        &self,
170        chain_id: ChainId,
171    ) -> Result<crate::data_types::ShardInfo, NodeError>;
172}
173
174/// Turn an address into a validator node.
175#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
176pub trait ValidatorNodeProvider: 'static {
177    #[cfg(not(web))]
178    type Node: ValidatorNode + Send + Sync + Clone + 'static;
179    #[cfg(web)]
180    type Node: ValidatorNode + Clone + 'static;
181
182    fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
183
184    fn make_nodes(
185        &self,
186        committee: &Committee,
187    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
188        let validator_addresses: Vec<_> = committee
189            .validator_addresses()
190            .map(|(node, name)| (node, name.to_owned()))
191            .collect();
192        self.make_nodes_from_list(validator_addresses)
193    }
194
195    fn make_nodes_from_list<A>(
196        &self,
197        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
198    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
199    where
200        A: AsRef<str>,
201    {
202        Ok(validators
203            .into_iter()
204            .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
205            .collect::<Result<Vec<_>, NodeError>>()?
206            .into_iter())
207    }
208}
209
210/// Error type for node queries.
211///
212/// This error is meant to be serialized over the network and aggregated by clients (i.e.
213/// clients will track validator votes on each error value).
214#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
215pub enum NodeError {
216    #[error("Cryptographic error: {error}")]
217    CryptoError { error: String },
218
219    #[error("Arithmetic error: {error}")]
220    ArithmeticError { error: String },
221
222    #[error("Error while accessing storage: {error}")]
223    ViewError { error: String },
224
225    #[error("Chain error: {error}")]
226    ChainError { error: String },
227
228    #[error("Worker error: {error}")]
229    WorkerError { error: String },
230
231    // This error must be normalized during conversions.
232    #[error("The chain {0} is not active in validator")]
233    InactiveChain(ChainId),
234
235    #[error("Round number should be {0:?}")]
236    WrongRound(Round),
237
238    #[error(
239        "Chain is expecting a next block at height {expected_block_height} but the given block \
240        is at height {found_block_height} instead"
241    )]
242    UnexpectedBlockHeight {
243        expected_block_height: BlockHeight,
244        found_block_height: BlockHeight,
245    },
246
247    // This error must be normalized during conversions.
248    #[error(
249        "Cannot vote for block proposal of chain {chain_id} because a message \
250         from chain {origin} at height {height} has not been received yet"
251    )]
252    MissingCrossChainUpdate {
253        chain_id: ChainId,
254        origin: ChainId,
255        height: BlockHeight,
256    },
257
258    #[error("Blobs not found: {0:?}")]
259    BlobsNotFound(Vec<BlobId>),
260
261    #[error("Events not found: {0:?}")]
262    EventsNotFound(Vec<EventId>),
263
264    // This error must be normalized during conversions.
265    #[error("We don't have the value for the certificate.")]
266    MissingCertificateValue,
267
268    #[error("Response doesn't contain requested certificates: {0:?}")]
269    MissingCertificates(Vec<CryptoHash>),
270
271    #[error("Validator's response failed to include a vote when trying to {0}")]
272    MissingVoteInValidatorResponse(String),
273
274    #[error("The received chain info response is invalid")]
275    InvalidChainInfoResponse,
276    #[error("Unexpected certificate value")]
277    UnexpectedCertificateValue,
278
279    // Networking errors.
280    // TODO(#258): These errors should be defined in linera-rpc.
281    #[error("Cannot deserialize")]
282    InvalidDecoding,
283    #[error("Unexpected message")]
284    UnexpectedMessage,
285    #[error("Grpc error: {error}")]
286    GrpcError { error: String },
287    #[error("Network error while querying service: {error}")]
288    ClientIoError { error: String },
289    #[error("Failed to resolve validator address: {address}")]
290    CannotResolveValidatorAddress { address: String },
291    #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
292    SubscriptionError { transport: String },
293    #[error("Failed to subscribe; tonic status: {status:?}")]
294    SubscriptionFailed { status: String },
295
296    #[error("Node failed to provide a 'last used by' certificate for the blob")]
297    InvalidCertificateForBlob(BlobId),
298    #[error("Node returned a BlobsNotFound error with duplicates")]
299    DuplicatesInBlobsNotFound,
300    #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
301    UnexpectedEntriesInBlobsNotFound,
302    #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
303    UnexpectedCertificates {
304        returned: Vec<CryptoHash>,
305        requested: Vec<CryptoHash>,
306    },
307    #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
308    EmptyBlobsNotFound,
309    #[error("Local error handling validator response: {error}")]
310    ResponseHandlingError { error: String },
311
312    #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
313    MissingCertificatesByHeights {
314        chain_id: ChainId,
315        heights: Vec<BlockHeight>,
316    },
317
318    #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
319    TooManyCertificatesReturned {
320        chain_id: ChainId,
321        remote_node: Box<ValidatorPublicKey>,
322    },
323}
324
325impl From<tonic::Status> for NodeError {
326    fn from(status: tonic::Status) -> Self {
327        Self::GrpcError {
328            error: status.to_string(),
329        }
330    }
331}
332
333impl CrossChainMessageDelivery {
334    pub fn new(wait_for_outgoing_messages: bool) -> Self {
335        if wait_for_outgoing_messages {
336            CrossChainMessageDelivery::Blocking
337        } else {
338            CrossChainMessageDelivery::NonBlocking
339        }
340    }
341
342    pub fn wait_for_outgoing_messages(self) -> bool {
343        match self {
344            CrossChainMessageDelivery::NonBlocking => false,
345            CrossChainMessageDelivery::Blocking => true,
346        }
347    }
348}
349
350impl From<ViewError> for NodeError {
351    fn from(error: ViewError) -> Self {
352        Self::ViewError {
353            error: error.to_string(),
354        }
355    }
356}
357
358impl From<ArithmeticError> for NodeError {
359    fn from(error: ArithmeticError) -> Self {
360        Self::ArithmeticError {
361            error: error.to_string(),
362        }
363    }
364}
365
366impl From<CryptoError> for NodeError {
367    fn from(error: CryptoError) -> Self {
368        Self::CryptoError {
369            error: error.to_string(),
370        }
371    }
372}
373
374impl From<ChainError> for NodeError {
375    fn from(error: ChainError) -> Self {
376        match error {
377            ChainError::MissingCrossChainUpdate {
378                chain_id,
379                origin,
380                height,
381            } => Self::MissingCrossChainUpdate {
382                chain_id,
383                origin,
384                height,
385            },
386            ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
387            ChainError::ExecutionError(execution_error, context) => match *execution_error {
388                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
389                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
390                _ => Self::ChainError {
391                    error: ChainError::ExecutionError(execution_error, context).to_string(),
392                },
393            },
394            ChainError::UnexpectedBlockHeight {
395                expected_block_height,
396                found_block_height,
397            } => Self::UnexpectedBlockHeight {
398                expected_block_height,
399                found_block_height,
400            },
401            ChainError::WrongRound(round) => Self::WrongRound(round),
402            error => Self::ChainError {
403                error: error.to_string(),
404            },
405        }
406    }
407}
408
409impl From<WorkerError> for NodeError {
410    fn from(error: WorkerError) -> Self {
411        match error {
412            WorkerError::ChainError(error) => (*error).into(),
413            WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
414            WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
415            WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
416            WorkerError::UnexpectedBlockHeight {
417                expected_block_height,
418                found_block_height,
419            } => NodeError::UnexpectedBlockHeight {
420                expected_block_height,
421                found_block_height,
422            },
423            error => Self::WorkerError {
424                error: error.to_string(),
425            },
426        }
427    }
428}