Skip to main content

linera_core/
node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7#[cfg(not(web))]
8use futures::stream::BoxStream;
9#[cfg(web)]
10use futures::stream::LocalBoxStream as BoxStream;
11use futures::stream::Stream;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
14    data_types::{
15        ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
16    },
17    identifiers::{BlobId, ChainId, EventId},
18    task::{MaybeSend, MaybeSync},
19};
20use linera_cache::Arc as CacheArc;
21use linera_chain::{
22    data_types::BlockProposal,
23    types::{
24        ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
25        ValidatedBlock,
26    },
27    ChainError,
28};
29use linera_execution::{committee::Committee, ExecutionError};
30use linera_version::VersionInfo;
31use linera_views::ViewError;
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35use crate::{
36    data_types::{ChainInfoQuery, ChainInfoResponse},
37    worker::{Notification, WorkerError},
38};
39
40/// A pinned [`Stream`] of Notifications.
41pub type NotificationStream = BoxStream<'static, Notification>;
42
43/// A pinned [`Stream`] of blob contents returned by batch downloads.
44pub type BlobStream = BoxStream<'static, Result<BlobContent, NodeError>>;
45
46/// Whether to wait for the delivery of outgoing cross-chain messages.
47#[derive(Debug, Default, Clone, Copy)]
48#[allow(missing_docs)]
49pub enum CrossChainMessageDelivery {
50    #[default]
51    NonBlocking,
52    Blocking,
53}
54
55/// How to communicate with a validator node.
56#[allow(async_fn_in_trait)]
57#[cfg_attr(not(web), trait_variant::make(Send))]
58pub trait ValidatorNode {
59    /// The type of stream of notifications returned when subscribing.
60    type NotificationStream: Stream<Item = Notification> + Unpin + MaybeSend;
61
62    /// Returns the address of this validator node.
63    fn address(&self) -> String;
64
65    /// Proposes a new block.
66    async fn handle_block_proposal(
67        &self,
68        proposal: BlockProposal,
69    ) -> Result<ChainInfoResponse, NodeError>;
70
71    /// Processes a certificate without a value.
72    async fn handle_lite_certificate(
73        &self,
74        certificate: LiteCertificate<'_>,
75        delivery: CrossChainMessageDelivery,
76    ) -> Result<ChainInfoResponse, NodeError>;
77
78    /// Processes a confirmed certificate.
79    async fn handle_confirmed_certificate(
80        &self,
81        certificate: CacheArc<GenericCertificate<ConfirmedBlock>>,
82        delivery: CrossChainMessageDelivery,
83    ) -> Result<ChainInfoResponse, NodeError>;
84
85    /// Processes a validated certificate.
86    async fn handle_validated_certificate(
87        &self,
88        certificate: GenericCertificate<ValidatedBlock>,
89    ) -> Result<ChainInfoResponse, NodeError>;
90
91    /// Processes a timeout certificate.
92    async fn handle_timeout_certificate(
93        &self,
94        certificate: GenericCertificate<Timeout>,
95    ) -> Result<ChainInfoResponse, NodeError>;
96
97    /// Handles information queries for this chain.
98    async fn handle_chain_info_query(
99        &self,
100        query: ChainInfoQuery,
101    ) -> Result<ChainInfoResponse, NodeError>;
102
103    /// Gets the version info for this validator node.
104    async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
105
106    /// Gets the network's description.
107    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
108
109    /// Subscribes to receiving notifications for a collection of chains.
110    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
111
112    /// Uploads a blob. Returns an error if the validator has not seen a
113    /// certificate using this blob.
114    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
115
116    /// Uploads the blobs to the validator.
117    // Unfortunately, this doesn't compile as an async function: async functions in traits
118    // don't play well with default implementations, apparently.
119    // See also https://github.com/rust-lang/impl-trait-utils/issues/17
120    fn upload_blobs(
121        &self,
122        blobs: Vec<Arc<Blob>>,
123    ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
124        let tasks: Vec<_> = blobs
125            .into_iter()
126            .map(|blob| self.upload_blob(blob.into()))
127            .collect();
128        futures::future::try_join_all(tasks)
129    }
130
131    /// Downloads a blob. Returns an error if the validator does not have the blob.
132    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
133
134    /// Downloads a batch of blobs as a stream. The stream yields one blob per
135    /// requested id, in order. On mid-stream errors, the caller can retry the
136    /// remaining blob ids against another validator.
137    async fn download_blobs(&self, blob_ids: Vec<BlobId>) -> Result<BlobStream, NodeError>;
138
139    /// Downloads a blob that belongs to a pending proposal or the locking block on a chain.
140    async fn download_pending_blob(
141        &self,
142        chain_id: ChainId,
143        blob_id: BlobId,
144    ) -> Result<BlobContent, NodeError>;
145
146    /// Handles a blob that belongs to a pending proposal or validated block certificate.
147    async fn handle_pending_blob(
148        &self,
149        chain_id: ChainId,
150        blob: BlobContent,
151    ) -> Result<ChainInfoResponse, NodeError>;
152
153    /// Downloads the confirmed block certificate with the given hash.
154    async fn download_certificate(
155        &self,
156        hash: CryptoHash,
157    ) -> Result<ConfirmedBlockCertificate, NodeError>;
158
159    /// Requests a batch of certificates from the validator.
160    async fn download_certificates(
161        &self,
162        hashes: Vec<CryptoHash>,
163    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
164
165    /// Requests a batch of certificates from a specific chain by heights.
166    ///
167    /// Returns certificates in ascending order by height. This method does not guarantee
168    /// that all requested heights will be returned; if some certificates are missing,
169    /// the caller must handle that.
170    async fn download_certificates_by_heights(
171        &self,
172        chain_id: ChainId,
173        heights: Vec<BlockHeight>,
174    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
175
176    /// Returns the hash of the `Certificate` that last used a blob.
177    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
178
179    /// Returns the certificate that last used the blob.
180    async fn blob_last_used_by_certificate(
181        &self,
182        blob_id: BlobId,
183    ) -> Result<ConfirmedBlockCertificate, NodeError>;
184
185    /// Looks up the block heights where the given events were published.
186    /// Returns `None` for events not found in the index.
187    async fn event_block_heights(
188        &self,
189        event_ids: Vec<EventId>,
190    ) -> Result<Vec<Option<BlockHeight>>, NodeError>;
191
192    /// Returns the missing `Blob`s by their IDs.
193    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
194
195    /// Gets shard information for a specific chain.
196    async fn get_shard_info(
197        &self,
198        chain_id: ChainId,
199    ) -> Result<crate::data_types::ShardInfo, NodeError>;
200}
201
202/// Turn an address into a validator node.
203#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
204pub trait ValidatorNodeProvider: 'static {
205    /// The type of validator node produced by this provider.
206    type Node: ValidatorNode + MaybeSend + MaybeSync + Clone + 'static;
207
208    /// Creates a node from a validator's address.
209    fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
210
211    /// Creates a node for each validator in the committee.
212    fn make_nodes(
213        &self,
214        committee: &Committee,
215    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
216        let validator_addresses: Vec<_> = committee
217            .validator_addresses()
218            .map(|(node, name)| (node, name.to_owned()))
219            .collect();
220        self.make_nodes_from_list(validator_addresses)
221    }
222
223    /// Creates a node for each validator in the given list of public keys and addresses.
224    fn make_nodes_from_list<A>(
225        &self,
226        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
227    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
228    where
229        A: AsRef<str>,
230    {
231        Ok(validators
232            .into_iter()
233            .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
234            .collect::<Result<Vec<_>, NodeError>>()?
235            .into_iter())
236    }
237}
238
239/// Error type for node queries.
240///
241/// This error is meant to be serialized over the network and aggregated by clients (i.e.
242/// clients will track validator votes on each error value).
243#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
244#[allow(missing_docs)]
245pub enum NodeError {
246    #[error("Cryptographic error: {error}")]
247    CryptoError { error: String },
248
249    #[error("Arithmetic error: {error}")]
250    ArithmeticError { error: String },
251
252    #[error("Error while accessing storage: {error}")]
253    ViewError { error: String },
254
255    #[error("Chain error: {error}")]
256    ChainError { error: String },
257
258    #[error("Worker error: {error}")]
259    WorkerError { error: String },
260
261    // This error must be normalized during conversions.
262    #[error("The chain {0} is not active in validator")]
263    InactiveChain(ChainId),
264
265    #[error("Round number should be {0:?}")]
266    WrongRound(Round),
267
268    #[error(
269        "Chain is expecting a next block at height {expected_block_height} but the given block \
270        is at height {found_block_height} instead"
271    )]
272    UnexpectedBlockHeight {
273        expected_block_height: BlockHeight,
274        found_block_height: BlockHeight,
275    },
276
277    // This error must be normalized during conversions.
278    #[error(
279        "Cannot vote for block proposal of chain {chain_id} because a message \
280         from chain {origin} at height {height} has not been received yet"
281    )]
282    MissingCrossChainUpdate {
283        chain_id: ChainId,
284        origin: ChainId,
285        height: BlockHeight,
286    },
287
288    #[error("Blobs not found: {0:?}")]
289    BlobsNotFound(Vec<BlobId>),
290
291    #[error("Blocks not found: {0:?}")]
292    BlocksNotFound(Vec<CryptoHash>),
293
294    #[error("Events not found: {0:?}")]
295    EventsNotFound(Vec<EventId>),
296
297    // This error must be normalized during conversions.
298    #[error("We don't have the value for the certificate.")]
299    MissingCertificateValue,
300
301    #[error("Response doesn't contain requested certificates: {0:?}")]
302    MissingCertificates(Vec<CryptoHash>),
303
304    #[error("Validator's response failed to include a vote when trying to {0}")]
305    MissingVoteInValidatorResponse(String),
306
307    #[error("The received chain info response is invalid")]
308    InvalidChainInfoResponse,
309    #[error("Unexpected certificate value")]
310    UnexpectedCertificateValue,
311
312    // Networking errors.
313    // TODO(#258): These errors should be defined in linera-rpc.
314    #[error("Cannot deserialize")]
315    InvalidDecoding,
316    #[error("Unexpected message")]
317    UnexpectedMessage,
318    #[error("Grpc error: {error}")]
319    GrpcError { error: String },
320    #[error("Network error while querying service: {error}")]
321    ClientIoError { error: String },
322    #[error("Failed to resolve validator address: {address}")]
323    CannotResolveValidatorAddress { address: String },
324    #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
325    SubscriptionError { transport: String },
326    #[error("Failed to subscribe; tonic status: {status:?}")]
327    SubscriptionFailed { status: String },
328
329    #[error("Node failed to provide a 'last used by' certificate for the blob")]
330    InvalidCertificateForBlob(BlobId),
331    #[error("Node returned a BlobsNotFound error with duplicates")]
332    DuplicatesInBlobsNotFound,
333    #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
334    UnexpectedEntriesInBlobsNotFound,
335    #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
336    UnexpectedCertificates {
337        returned: Vec<CryptoHash>,
338        requested: Vec<CryptoHash>,
339    },
340    #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
341    EmptyBlobsNotFound,
342    #[error("Local error handling validator response: {error}")]
343    ResponseHandlingError { error: String },
344
345    #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
346    MissingCertificatesByHeights {
347        chain_id: ChainId,
348        heights: Vec<BlockHeight>,
349    },
350
351    #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
352    TooManyCertificatesReturned {
353        chain_id: ChainId,
354        remote_node: Box<ValidatorPublicKey>,
355    },
356
357    #[error(
358        "Block timestamp ({block_timestamp}) is further in the future from local time \
359        ({local_time}) than block time grace period ({block_time_grace_period_ms} ms)"
360    )]
361    InvalidTimestamp {
362        block_timestamp: Timestamp,
363        local_time: Timestamp,
364        block_time_grace_period_ms: u64,
365    },
366
367    #[error("No validators available to handle the request")]
368    NoValidators,
369}
370
371impl NodeError {
372    /// Returns whether this error is an expected part of the protocol flow.
373    ///
374    /// Expected errors are those that validators return during normal operation and that
375    /// the client handles automatically (e.g. by supplying missing data and retrying).
376    /// Unexpected errors indicate genuine network issues, validator misbehavior, or
377    /// internal problems.
378    pub fn is_expected(&self) -> bool {
379        match self {
380            // Expected: validators return these during normal operation and the client
381            // handles them automatically by supplying missing data and retrying.
382            NodeError::BlobsNotFound(_)
383            | NodeError::BlocksNotFound(_)
384            | NodeError::EventsNotFound(_)
385            | NodeError::MissingCrossChainUpdate { .. }
386            | NodeError::WrongRound(_)
387            | NodeError::UnexpectedBlockHeight { .. }
388            | NodeError::InactiveChain(_)
389            | NodeError::InvalidTimestamp { .. }
390            | NodeError::MissingCertificateValue => true,
391
392            // Unexpected: network issues, validator misbehavior, or internal problems.
393            NodeError::CryptoError { .. }
394            | NodeError::ArithmeticError { .. }
395            | NodeError::ViewError { .. }
396            | NodeError::ChainError { .. }
397            | NodeError::WorkerError { .. }
398            | NodeError::MissingCertificates(_)
399            | NodeError::MissingVoteInValidatorResponse(_)
400            | NodeError::InvalidChainInfoResponse
401            | NodeError::UnexpectedCertificateValue
402            | NodeError::InvalidDecoding
403            | NodeError::UnexpectedMessage
404            | NodeError::GrpcError { .. }
405            | NodeError::ClientIoError { .. }
406            | NodeError::CannotResolveValidatorAddress { .. }
407            | NodeError::SubscriptionError { .. }
408            | NodeError::SubscriptionFailed { .. }
409            | NodeError::InvalidCertificateForBlob(_)
410            | NodeError::DuplicatesInBlobsNotFound
411            | NodeError::UnexpectedEntriesInBlobsNotFound
412            | NodeError::UnexpectedCertificates { .. }
413            | NodeError::EmptyBlobsNotFound
414            | NodeError::ResponseHandlingError { .. }
415            | NodeError::MissingCertificatesByHeights { .. }
416            | NodeError::TooManyCertificatesReturned { .. }
417            | NodeError::NoValidators => false,
418        }
419    }
420}
421
422impl From<tonic::Status> for NodeError {
423    fn from(status: tonic::Status) -> Self {
424        Self::GrpcError {
425            error: status.to_string(),
426        }
427    }
428}
429
430impl CrossChainMessageDelivery {
431    /// Creates a new value, blocking on outgoing message delivery if requested.
432    pub fn new(wait_for_outgoing_messages: bool) -> Self {
433        if wait_for_outgoing_messages {
434            CrossChainMessageDelivery::Blocking
435        } else {
436            CrossChainMessageDelivery::NonBlocking
437        }
438    }
439
440    /// Returns whether to wait for the delivery of outgoing cross-chain messages.
441    pub fn wait_for_outgoing_messages(self) -> bool {
442        match self {
443            CrossChainMessageDelivery::NonBlocking => false,
444            CrossChainMessageDelivery::Blocking => true,
445        }
446    }
447}
448
449impl From<ViewError> for NodeError {
450    fn from(error: ViewError) -> Self {
451        Self::ViewError {
452            error: error.to_string(),
453        }
454    }
455}
456
457impl From<ArithmeticError> for NodeError {
458    fn from(error: ArithmeticError) -> Self {
459        Self::ArithmeticError {
460            error: error.to_string(),
461        }
462    }
463}
464
465impl From<CryptoError> for NodeError {
466    fn from(error: CryptoError) -> Self {
467        Self::CryptoError {
468            error: error.to_string(),
469        }
470    }
471}
472
473impl From<ChainError> for NodeError {
474    fn from(error: ChainError) -> Self {
475        match error {
476            ChainError::MissingCrossChainUpdate {
477                chain_id,
478                origin,
479                height,
480            } => Self::MissingCrossChainUpdate {
481                chain_id,
482                origin,
483                height,
484            },
485            ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
486            ChainError::ExecutionError(execution_error, context) => match *execution_error {
487                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
488                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
489                _ => Self::ChainError {
490                    error: ChainError::ExecutionError(execution_error, context).to_string(),
491                },
492            },
493            ChainError::UnexpectedBlockHeight {
494                expected_block_height,
495                found_block_height,
496            } => Self::UnexpectedBlockHeight {
497                expected_block_height,
498                found_block_height,
499            },
500            ChainError::WrongRound(round) => Self::WrongRound(round),
501            error => Self::ChainError {
502                error: error.to_string(),
503            },
504        }
505    }
506}
507
508impl From<WorkerError> for NodeError {
509    fn from(error: WorkerError) -> Self {
510        match error {
511            WorkerError::ChainError(error) => (*error).into(),
512            WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
513            WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
514            WorkerError::BlocksNotFound(hashes) => Self::BlocksNotFound(hashes),
515            WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
516            WorkerError::UnexpectedBlockHeight {
517                expected_block_height,
518                found_block_height,
519            } => NodeError::UnexpectedBlockHeight {
520                expected_block_height,
521                found_block_height,
522            },
523            WorkerError::InvalidTimestamp {
524                block_timestamp,
525                local_time,
526                block_time_grace_period,
527            } => NodeError::InvalidTimestamp {
528                block_timestamp,
529                local_time,
530                block_time_grace_period_ms: u64::try_from(block_time_grace_period.as_millis())
531                    .unwrap_or(u64::MAX),
532            },
533            error => Self::WorkerError {
534                error: error.to_string(),
535            },
536        }
537    }
538}