linera_core/
node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(not(web))]
6use futures::stream::BoxStream;
7#[cfg(web)]
8use futures::stream::LocalBoxStream as BoxStream;
9use futures::stream::Stream;
10use linera_base::{
11    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
12    data_types::{ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round},
13    identifiers::{BlobId, ChainId, EventId},
14};
15use linera_chain::{
16    data_types::BlockProposal,
17    types::{
18        ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
19        ValidatedBlock,
20    },
21    ChainError,
22};
23use linera_execution::{committee::Committee, ExecutionError};
24use linera_version::VersionInfo;
25use linera_views::ViewError;
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28
29use crate::{
30    data_types::{ChainInfoQuery, ChainInfoResponse},
31    worker::{Notification, WorkerError},
32};
33
34/// A pinned [`Stream`] of Notifications.
35pub type NotificationStream = BoxStream<'static, Notification>;
36
37/// Whether to wait for the delivery of outgoing cross-chain messages.
38#[derive(Debug, Default, Clone, Copy)]
39pub enum CrossChainMessageDelivery {
40    #[default]
41    NonBlocking,
42    Blocking,
43}
44
45/// How to communicate with a validator node.
46#[allow(async_fn_in_trait)]
47#[cfg_attr(not(web), trait_variant::make(Send))]
48pub trait ValidatorNode {
49    #[cfg(not(web))]
50    type NotificationStream: Stream<Item = Notification> + Unpin + Send;
51    #[cfg(web)]
52    type NotificationStream: Stream<Item = Notification> + Unpin;
53
54    /// Proposes a new block.
55    async fn handle_block_proposal(
56        &self,
57        proposal: BlockProposal,
58    ) -> Result<ChainInfoResponse, NodeError>;
59
60    /// Processes a certificate without a value.
61    async fn handle_lite_certificate(
62        &self,
63        certificate: LiteCertificate<'_>,
64        delivery: CrossChainMessageDelivery,
65    ) -> Result<ChainInfoResponse, NodeError>;
66
67    /// Processes a confirmed certificate.
68    async fn handle_confirmed_certificate(
69        &self,
70        certificate: GenericCertificate<ConfirmedBlock>,
71        delivery: CrossChainMessageDelivery,
72    ) -> Result<ChainInfoResponse, NodeError>;
73
74    /// Processes a validated certificate.
75    async fn handle_validated_certificate(
76        &self,
77        certificate: GenericCertificate<ValidatedBlock>,
78    ) -> Result<ChainInfoResponse, NodeError>;
79
80    /// Processes a timeout certificate.
81    async fn handle_timeout_certificate(
82        &self,
83        certificate: GenericCertificate<Timeout>,
84    ) -> Result<ChainInfoResponse, NodeError>;
85
86    /// Handles information queries for this chain.
87    async fn handle_chain_info_query(
88        &self,
89        query: ChainInfoQuery,
90    ) -> Result<ChainInfoResponse, NodeError>;
91
92    /// Gets the version info for this validator node.
93    async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
94
95    /// Gets the network's description.
96    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
97
98    /// Subscribes to receiving notifications for a collection of chains.
99    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
100
101    // Uploads a blob. Returns an error if the validator has not seen a
102    // certificate using this blob.
103    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
104
105    /// Uploads the blobs to the validator.
106    // Unfortunately, this doesn't compile as an async function: async functions in traits
107    // don't play well with default implementations, apparently.
108    // See also https://github.com/rust-lang/impl-trait-utils/issues/17
109    fn upload_blobs(
110        &self,
111        blobs: Vec<Blob>,
112    ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
113        let tasks: Vec<_> = blobs
114            .into_iter()
115            .map(|blob| self.upload_blob(blob.into()))
116            .collect();
117        futures::future::try_join_all(tasks)
118    }
119
120    /// Downloads a blob. Returns an error if the validator does not have the blob.
121    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
122
123    /// Downloads a blob that belongs to a pending proposal or the locking block on a chain.
124    async fn download_pending_blob(
125        &self,
126        chain_id: ChainId,
127        blob_id: BlobId,
128    ) -> Result<BlobContent, NodeError>;
129
130    /// Handles a blob that belongs to a pending proposal or validated block certificate.
131    async fn handle_pending_blob(
132        &self,
133        chain_id: ChainId,
134        blob: BlobContent,
135    ) -> Result<ChainInfoResponse, NodeError>;
136
137    async fn download_certificate(
138        &self,
139        hash: CryptoHash,
140    ) -> Result<ConfirmedBlockCertificate, NodeError>;
141
142    /// Requests a batch of certificates from the validator.
143    async fn download_certificates(
144        &self,
145        hashes: Vec<CryptoHash>,
146    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
147
148    /// Requests a batch of certificates from a specific chain by heights.
149    async fn download_certificates_by_heights(
150        &self,
151        chain_id: ChainId,
152        heights: Vec<BlockHeight>,
153    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
154
155    /// Returns the hash of the `Certificate` that last used a blob.
156    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
157
158    /// Returns the certificate that last used the blob.
159    async fn blob_last_used_by_certificate(
160        &self,
161        blob_id: BlobId,
162    ) -> Result<ConfirmedBlockCertificate, NodeError>;
163
164    /// Returns the missing `Blob`s by their IDs.
165    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
166}
167
168/// Turn an address into a validator node.
169#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
170pub trait ValidatorNodeProvider: 'static {
171    #[cfg(not(web))]
172    type Node: ValidatorNode + Send + Sync + Clone + 'static;
173    #[cfg(web)]
174    type Node: ValidatorNode + Clone + 'static;
175
176    fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
177
178    fn make_nodes(
179        &self,
180        committee: &Committee,
181    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
182        let validator_addresses: Vec<_> = committee
183            .validator_addresses()
184            .map(|(node, name)| (node, name.to_owned()))
185            .collect();
186        self.make_nodes_from_list(validator_addresses)
187    }
188
189    fn make_nodes_from_list<A>(
190        &self,
191        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
192    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
193    where
194        A: AsRef<str>,
195    {
196        Ok(validators
197            .into_iter()
198            .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
199            .collect::<Result<Vec<_>, NodeError>>()?
200            .into_iter())
201    }
202}
203
204/// Error type for node queries.
205///
206/// This error is meant to be serialized over the network and aggregated by clients (i.e.
207/// clients will track validator votes on each error value).
208#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
209pub enum NodeError {
210    #[error("Cryptographic error: {error}")]
211    CryptoError { error: String },
212
213    #[error("Arithmetic error: {error}")]
214    ArithmeticError { error: String },
215
216    #[error("Error while accessing storage: {error}")]
217    ViewError { error: String },
218
219    #[error("Chain error: {error}")]
220    ChainError { error: String },
221
222    #[error("Worker error: {error}")]
223    WorkerError { error: String },
224
225    // This error must be normalized during conversions.
226    #[error("The chain {0} is not active in validator")]
227    InactiveChain(ChainId),
228
229    #[error("Round number should be {0:?}")]
230    WrongRound(Round),
231
232    #[error(
233        "Was expecting block height {expected_block_height} but found {found_block_height} instead"
234    )]
235    UnexpectedBlockHeight {
236        expected_block_height: BlockHeight,
237        found_block_height: BlockHeight,
238    },
239
240    // This error must be normalized during conversions.
241    #[error(
242        "Cannot vote for block proposal of chain {chain_id} because a message \
243         from chain {origin} at height {height} has not been received yet"
244    )]
245    MissingCrossChainUpdate {
246        chain_id: ChainId,
247        origin: ChainId,
248        height: BlockHeight,
249    },
250
251    #[error("Blobs not found: {0:?}")]
252    BlobsNotFound(Vec<BlobId>),
253
254    #[error("Events not found: {0:?}")]
255    EventsNotFound(Vec<EventId>),
256
257    // This error must be normalized during conversions.
258    #[error("We don't have the value for the certificate.")]
259    MissingCertificateValue,
260
261    #[error("Response doesn't contain requested certificates: {0:?}")]
262    MissingCertificates(Vec<CryptoHash>),
263
264    #[error("Validator's response failed to include a vote when trying to {0}")]
265    MissingVoteInValidatorResponse(String),
266
267    #[error("The received chain info response is invalid")]
268    InvalidChainInfoResponse,
269    #[error("Unexpected certificate value")]
270    UnexpectedCertificateValue,
271
272    // Networking errors.
273    // TODO(#258): These errors should be defined in linera-rpc.
274    #[error("Cannot deserialize")]
275    InvalidDecoding,
276    #[error("Unexpected message")]
277    UnexpectedMessage,
278    #[error("Grpc error: {error}")]
279    GrpcError { error: String },
280    #[error("Network error while querying service: {error}")]
281    ClientIoError { error: String },
282    #[error("Failed to resolve validator address: {address}")]
283    CannotResolveValidatorAddress { address: String },
284    #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
285    SubscriptionError { transport: String },
286    #[error("Failed to subscribe; tonic status: {status}")]
287    SubscriptionFailed { status: String },
288
289    #[error("Node failed to provide a 'last used by' certificate for the blob")]
290    InvalidCertificateForBlob(BlobId),
291    #[error("Node returned a BlobsNotFound error with duplicates")]
292    DuplicatesInBlobsNotFound,
293    #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
294    UnexpectedEntriesInBlobsNotFound,
295    #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
296    UnexpectedCertificates {
297        returned: Vec<CryptoHash>,
298        requested: Vec<CryptoHash>,
299    },
300    #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
301    EmptyBlobsNotFound,
302    #[error("Local error handling validator response: {error}")]
303    ResponseHandlingError { error: String },
304
305    #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
306    MissingCertificatesByHeights {
307        chain_id: ChainId,
308        heights: Vec<BlockHeight>,
309    },
310
311    #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
312    TooManyCertificatesReturned {
313        chain_id: ChainId,
314        remote_node: Box<ValidatorPublicKey>,
315    },
316}
317
318impl From<tonic::Status> for NodeError {
319    fn from(status: tonic::Status) -> Self {
320        Self::GrpcError {
321            error: status.to_string(),
322        }
323    }
324}
325
326impl CrossChainMessageDelivery {
327    pub fn new(wait_for_outgoing_messages: bool) -> Self {
328        if wait_for_outgoing_messages {
329            CrossChainMessageDelivery::Blocking
330        } else {
331            CrossChainMessageDelivery::NonBlocking
332        }
333    }
334
335    pub fn wait_for_outgoing_messages(self) -> bool {
336        match self {
337            CrossChainMessageDelivery::NonBlocking => false,
338            CrossChainMessageDelivery::Blocking => true,
339        }
340    }
341}
342
343impl From<ViewError> for NodeError {
344    fn from(error: ViewError) -> Self {
345        Self::ViewError {
346            error: error.to_string(),
347        }
348    }
349}
350
351impl From<ArithmeticError> for NodeError {
352    fn from(error: ArithmeticError) -> Self {
353        Self::ArithmeticError {
354            error: error.to_string(),
355        }
356    }
357}
358
359impl From<CryptoError> for NodeError {
360    fn from(error: CryptoError) -> Self {
361        Self::CryptoError {
362            error: error.to_string(),
363        }
364    }
365}
366
367impl From<ChainError> for NodeError {
368    fn from(error: ChainError) -> Self {
369        match error {
370            ChainError::MissingCrossChainUpdate {
371                chain_id,
372                origin,
373                height,
374            } => Self::MissingCrossChainUpdate {
375                chain_id,
376                origin,
377                height,
378            },
379            ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
380            ChainError::ExecutionError(execution_error, context) => match *execution_error {
381                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
382                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
383                _ => Self::ChainError {
384                    error: ChainError::ExecutionError(execution_error, context).to_string(),
385                },
386            },
387            ChainError::UnexpectedBlockHeight {
388                expected_block_height,
389                found_block_height,
390            } => Self::UnexpectedBlockHeight {
391                expected_block_height,
392                found_block_height,
393            },
394            ChainError::WrongRound(round) => Self::WrongRound(round),
395            error => Self::ChainError {
396                error: error.to_string(),
397            },
398        }
399    }
400}
401
402impl From<WorkerError> for NodeError {
403    fn from(error: WorkerError) -> Self {
404        match error {
405            WorkerError::ChainError(error) => (*error).into(),
406            WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
407            WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
408            WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
409            WorkerError::UnexpectedBlockHeight {
410                expected_block_height,
411                found_block_height,
412            } => NodeError::UnexpectedBlockHeight {
413                expected_block_height,
414                found_block_height,
415            },
416            error => Self::WorkerError {
417                error: error.to_string(),
418            },
419        }
420    }
421}