linera_core/
node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(not(web))]
6use futures::stream::BoxStream;
7#[cfg(web)]
8use futures::stream::LocalBoxStream as BoxStream;
9use futures::stream::Stream;
10use linera_base::{
11    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
12    data_types::{ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription},
13    identifiers::{BlobId, ChainId},
14};
15use linera_chain::{
16    data_types::BlockProposal,
17    types::{
18        ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
19        ValidatedBlock,
20    },
21    ChainError,
22};
23use linera_execution::{committee::Committee, ExecutionError};
24use linera_version::VersionInfo;
25use linera_views::ViewError;
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28
29use crate::{
30    data_types::{ChainInfoQuery, ChainInfoResponse},
31    worker::{Notification, WorkerError},
32};
33
34/// A pinned [`Stream`] of Notifications.
35pub type NotificationStream = BoxStream<'static, Notification>;
36
37/// Whether to wait for the delivery of outgoing cross-chain messages.
38#[derive(Debug, Default, Clone, Copy)]
39pub enum CrossChainMessageDelivery {
40    #[default]
41    NonBlocking,
42    Blocking,
43}
44
45/// How to communicate with a validator node.
46#[allow(async_fn_in_trait)]
47#[cfg_attr(not(web), trait_variant::make(Send))]
48pub trait ValidatorNode {
49    #[cfg(not(web))]
50    type NotificationStream: Stream<Item = Notification> + Unpin + Send;
51    #[cfg(web)]
52    type NotificationStream: Stream<Item = Notification> + Unpin;
53
54    /// Proposes a new block.
55    async fn handle_block_proposal(
56        &self,
57        proposal: BlockProposal,
58    ) -> Result<ChainInfoResponse, NodeError>;
59
60    /// Processes a certificate without a value.
61    async fn handle_lite_certificate(
62        &self,
63        certificate: LiteCertificate<'_>,
64        delivery: CrossChainMessageDelivery,
65    ) -> Result<ChainInfoResponse, NodeError>;
66
67    /// Processes a confirmed certificate.
68    async fn handle_confirmed_certificate(
69        &self,
70        certificate: GenericCertificate<ConfirmedBlock>,
71        delivery: CrossChainMessageDelivery,
72    ) -> Result<ChainInfoResponse, NodeError>;
73
74    /// Processes a validated certificate.
75    async fn handle_validated_certificate(
76        &self,
77        certificate: GenericCertificate<ValidatedBlock>,
78    ) -> Result<ChainInfoResponse, NodeError>;
79
80    /// Processes a timeout certificate.
81    async fn handle_timeout_certificate(
82        &self,
83        certificate: GenericCertificate<Timeout>,
84    ) -> Result<ChainInfoResponse, NodeError>;
85
86    /// Handles information queries for this chain.
87    async fn handle_chain_info_query(
88        &self,
89        query: ChainInfoQuery,
90    ) -> Result<ChainInfoResponse, NodeError>;
91
92    /// Gets the version info for this validator node.
93    async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
94
95    /// Gets the network's description.
96    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
97
98    /// Subscribes to receiving notifications for a collection of chains.
99    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
100
101    // Uploads a blob. Returns an error if the validator has not seen a
102    // certificate using this blob.
103    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
104
105    /// Uploads the blobs to the validator.
106    // Unfortunately, this doesn't compile as an async function: async functions in traits
107    // don't play well with default implementations, apparently.
108    // See also https://github.com/rust-lang/impl-trait-utils/issues/17
109    fn upload_blobs(
110        &self,
111        blobs: Vec<Blob>,
112    ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
113        let tasks: Vec<_> = blobs
114            .into_iter()
115            .map(|blob| self.upload_blob(blob.into()))
116            .collect();
117        futures::future::try_join_all(tasks)
118    }
119
120    /// Downloads a blob. Returns an error if the validator does not have the blob.
121    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
122
123    /// Downloads a blob that belongs to a pending proposal or the locking block on a chain.
124    async fn download_pending_blob(
125        &self,
126        chain_id: ChainId,
127        blob_id: BlobId,
128    ) -> Result<BlobContent, NodeError>;
129
130    /// Handles a blob that belongs to a pending proposal or validated block certificate.
131    async fn handle_pending_blob(
132        &self,
133        chain_id: ChainId,
134        blob: BlobContent,
135    ) -> Result<ChainInfoResponse, NodeError>;
136
137    async fn download_certificate(
138        &self,
139        hash: CryptoHash,
140    ) -> Result<ConfirmedBlockCertificate, NodeError>;
141
142    /// Requests a batch of certificates from the validator.
143    async fn download_certificates(
144        &self,
145        hashes: Vec<CryptoHash>,
146    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
147
148    /// Returns the hash of the `Certificate` that last used a blob.
149    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
150
151    /// Returns the missing `Blob`s by their IDs.
152    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
153}
154
155/// Turn an address into a validator node.
156#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
157pub trait ValidatorNodeProvider: 'static {
158    #[cfg(not(web))]
159    type Node: ValidatorNode + Send + Sync + Clone + 'static;
160    #[cfg(web)]
161    type Node: ValidatorNode + Clone + 'static;
162
163    fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
164
165    fn make_nodes(
166        &self,
167        committee: &Committee,
168    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
169        let validator_addresses: Vec<_> = committee
170            .validator_addresses()
171            .map(|(node, name)| (node, name.to_owned()))
172            .collect();
173        self.make_nodes_from_list(validator_addresses)
174    }
175
176    fn make_nodes_from_list<A>(
177        &self,
178        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
179    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
180    where
181        A: AsRef<str>,
182    {
183        Ok(validators
184            .into_iter()
185            .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
186            .collect::<Result<Vec<_>, NodeError>>()?
187            .into_iter())
188    }
189}
190
191/// Error type for node queries.
192///
193/// This error is meant to be serialized over the network and aggregated by clients (i.e.
194/// clients will track validator votes on each error value).
195#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
196pub enum NodeError {
197    #[error("Cryptographic error: {error}")]
198    CryptoError { error: String },
199
200    #[error("Arithmetic error: {error}")]
201    ArithmeticError { error: String },
202
203    #[error("Error while accessing storage: {error}")]
204    ViewError { error: String },
205
206    #[error("Chain error: {error}")]
207    ChainError { error: String },
208
209    #[error("Worker error: {error}")]
210    WorkerError { error: String },
211
212    // This error must be normalized during conversions.
213    #[error("The chain {0} is not active in validator")]
214    InactiveChain(ChainId),
215
216    // This error must be normalized during conversions.
217    #[error(
218        "Cannot vote for block proposal of chain {chain_id:?} because a message \
219         from chain {origin:?} at height {height:?} has not been received yet"
220    )]
221    MissingCrossChainUpdate {
222        chain_id: ChainId,
223        origin: ChainId,
224        height: BlockHeight,
225    },
226
227    #[error("Blobs not found: {0:?}")]
228    BlobsNotFound(Vec<BlobId>),
229
230    // This error must be normalized during conversions.
231    #[error("We don't have the value for the certificate.")]
232    MissingCertificateValue,
233
234    #[error("Response doesn't contain requested certificates: {0:?}")]
235    MissingCertificates(Vec<CryptoHash>),
236
237    #[error("Validator's response to block proposal failed to include a vote")]
238    MissingVoteInValidatorResponse,
239
240    #[error("The received chain info response is invalid")]
241    InvalidChainInfoResponse,
242    #[error("Unexpected certificate value")]
243    UnexpectedCertificateValue,
244
245    // Networking errors.
246    // TODO(#258): These errors should be defined in linera-rpc.
247    #[error("Cannot deserialize")]
248    InvalidDecoding,
249    #[error("Unexpected message")]
250    UnexpectedMessage,
251    #[error("Grpc error: {error}")]
252    GrpcError { error: String },
253    #[error("Network error while querying service: {error}")]
254    ClientIoError { error: String },
255    #[error("Failed to resolve validator address: {address}")]
256    CannotResolveValidatorAddress { address: String },
257    #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
258    SubscriptionError { transport: String },
259    #[error("Failed to subscribe; tonic status: {status}")]
260    SubscriptionFailed { status: String },
261
262    #[error("Node failed to provide a 'last used by' certificate for the blob")]
263    InvalidCertificateForBlob(BlobId),
264    #[error("Node returned a BlobsNotFound error with duplicates")]
265    DuplicatesInBlobsNotFound,
266    #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
267    UnexpectedEntriesInBlobsNotFound,
268    #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
269    UnexpectedCertificates {
270        returned: Vec<CryptoHash>,
271        requested: Vec<CryptoHash>,
272    },
273    #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
274    EmptyBlobsNotFound,
275    #[error("Local error handling validator response: {error}")]
276    ResponseHandlingError { error: String },
277}
278
279impl From<tonic::Status> for NodeError {
280    fn from(status: tonic::Status) -> Self {
281        Self::GrpcError {
282            error: status.to_string(),
283        }
284    }
285}
286
287impl CrossChainMessageDelivery {
288    pub fn new(wait_for_outgoing_messages: bool) -> Self {
289        if wait_for_outgoing_messages {
290            CrossChainMessageDelivery::Blocking
291        } else {
292            CrossChainMessageDelivery::NonBlocking
293        }
294    }
295
296    pub fn wait_for_outgoing_messages(self) -> bool {
297        match self {
298            CrossChainMessageDelivery::NonBlocking => false,
299            CrossChainMessageDelivery::Blocking => true,
300        }
301    }
302}
303
304impl From<ViewError> for NodeError {
305    fn from(error: ViewError) -> Self {
306        Self::ViewError {
307            error: error.to_string(),
308        }
309    }
310}
311
312impl From<ArithmeticError> for NodeError {
313    fn from(error: ArithmeticError) -> Self {
314        Self::ArithmeticError {
315            error: error.to_string(),
316        }
317    }
318}
319
320impl From<CryptoError> for NodeError {
321    fn from(error: CryptoError) -> Self {
322        Self::CryptoError {
323            error: error.to_string(),
324        }
325    }
326}
327
328impl From<ChainError> for NodeError {
329    fn from(error: ChainError) -> Self {
330        match error {
331            ChainError::MissingCrossChainUpdate {
332                chain_id,
333                origin,
334                height,
335            } => Self::MissingCrossChainUpdate {
336                chain_id,
337                origin,
338                height,
339            },
340            ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
341            ChainError::ExecutionError(execution_error, context) => {
342                if let ExecutionError::BlobsNotFound(blob_ids) = *execution_error {
343                    Self::BlobsNotFound(blob_ids)
344                } else {
345                    Self::ChainError {
346                        error: ChainError::ExecutionError(execution_error, context).to_string(),
347                    }
348                }
349            }
350            error => Self::ChainError {
351                error: error.to_string(),
352            },
353        }
354    }
355}
356
357impl From<WorkerError> for NodeError {
358    fn from(error: WorkerError) -> Self {
359        match error {
360            WorkerError::ChainError(error) => (*error).into(),
361            WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
362            WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
363            error => Self::WorkerError {
364                error: error.to_string(),
365            },
366        }
367    }
368}