Skip to main content

linera_core/
node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7#[cfg(not(web))]
8use futures::stream::BoxStream;
9#[cfg(web)]
10use futures::stream::LocalBoxStream as BoxStream;
11use futures::stream::Stream;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
14    data_types::{
15        ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
16    },
17    identifiers::{BlobId, ChainId, EventId},
18    task::{MaybeSend, MaybeSync},
19};
20use linera_cache::Arc as CacheArc;
21use linera_chain::{
22    data_types::BlockProposal,
23    types::{
24        ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
25        ValidatedBlock,
26    },
27    ChainError,
28};
29use linera_execution::{committee::Committee, ExecutionError};
30use linera_version::VersionInfo;
31use linera_views::ViewError;
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35use crate::{
36    data_types::{ChainInfoQuery, ChainInfoResponse},
37    worker::{Notification, WorkerError},
38};
39
40/// A pinned [`Stream`] of Notifications.
41pub type NotificationStream = BoxStream<'static, Notification>;
42
43/// A pinned [`Stream`] of blob contents returned by batch downloads.
44pub type BlobStream = BoxStream<'static, Result<BlobContent, NodeError>>;
45
46/// Whether to wait for the delivery of outgoing cross-chain messages.
47#[derive(Debug, Default, Clone, Copy)]
48pub enum CrossChainMessageDelivery {
49    #[default]
50    NonBlocking,
51    Blocking,
52}
53
54/// How to communicate with a validator node.
55#[allow(async_fn_in_trait)]
56#[cfg_attr(not(web), trait_variant::make(Send))]
57pub trait ValidatorNode {
58    type NotificationStream: Stream<Item = Notification> + Unpin + MaybeSend;
59
60    fn address(&self) -> String;
61
62    /// Proposes a new block.
63    async fn handle_block_proposal(
64        &self,
65        proposal: BlockProposal,
66    ) -> Result<ChainInfoResponse, NodeError>;
67
68    /// Processes a certificate without a value.
69    async fn handle_lite_certificate(
70        &self,
71        certificate: LiteCertificate<'_>,
72        delivery: CrossChainMessageDelivery,
73    ) -> Result<ChainInfoResponse, NodeError>;
74
75    /// Processes a confirmed certificate.
76    async fn handle_confirmed_certificate(
77        &self,
78        certificate: CacheArc<GenericCertificate<ConfirmedBlock>>,
79        delivery: CrossChainMessageDelivery,
80    ) -> Result<ChainInfoResponse, NodeError>;
81
82    /// Processes a validated certificate.
83    async fn handle_validated_certificate(
84        &self,
85        certificate: GenericCertificate<ValidatedBlock>,
86    ) -> Result<ChainInfoResponse, NodeError>;
87
88    /// Processes a timeout certificate.
89    async fn handle_timeout_certificate(
90        &self,
91        certificate: GenericCertificate<Timeout>,
92    ) -> Result<ChainInfoResponse, NodeError>;
93
94    /// Handles information queries for this chain.
95    async fn handle_chain_info_query(
96        &self,
97        query: ChainInfoQuery,
98    ) -> Result<ChainInfoResponse, NodeError>;
99
100    /// Gets the version info for this validator node.
101    async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
102
103    /// Gets the network's description.
104    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
105
106    /// Subscribes to receiving notifications for a collection of chains.
107    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
108
109    // Uploads a blob. Returns an error if the validator has not seen a
110    // certificate using this blob.
111    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
112
113    /// Uploads the blobs to the validator.
114    // Unfortunately, this doesn't compile as an async function: async functions in traits
115    // don't play well with default implementations, apparently.
116    // See also https://github.com/rust-lang/impl-trait-utils/issues/17
117    fn upload_blobs(
118        &self,
119        blobs: Vec<Arc<Blob>>,
120    ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
121        let tasks: Vec<_> = blobs
122            .into_iter()
123            .map(|blob| self.upload_blob(blob.into()))
124            .collect();
125        futures::future::try_join_all(tasks)
126    }
127
128    /// Downloads a blob. Returns an error if the validator does not have the blob.
129    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
130
131    /// Downloads a batch of blobs as a stream. The stream yields one blob per
132    /// requested id, in order. On mid-stream errors, the caller can retry the
133    /// remaining blob ids against another validator.
134    async fn download_blobs(&self, blob_ids: Vec<BlobId>) -> Result<BlobStream, NodeError>;
135
136    /// Downloads a blob that belongs to a pending proposal or the locking block on a chain.
137    async fn download_pending_blob(
138        &self,
139        chain_id: ChainId,
140        blob_id: BlobId,
141    ) -> Result<BlobContent, NodeError>;
142
143    /// Handles a blob that belongs to a pending proposal or validated block certificate.
144    async fn handle_pending_blob(
145        &self,
146        chain_id: ChainId,
147        blob: BlobContent,
148    ) -> Result<ChainInfoResponse, NodeError>;
149
150    async fn download_certificate(
151        &self,
152        hash: CryptoHash,
153    ) -> Result<ConfirmedBlockCertificate, NodeError>;
154
155    /// Requests a batch of certificates from the validator.
156    async fn download_certificates(
157        &self,
158        hashes: Vec<CryptoHash>,
159    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
160
161    /// Requests a batch of certificates from a specific chain by heights.
162    ///
163    /// Returns certificates in ascending order by height. This method does not guarantee
164    /// that all requested heights will be returned; if some certificates are missing,
165    /// the caller must handle that.
166    async fn download_certificates_by_heights(
167        &self,
168        chain_id: ChainId,
169        heights: Vec<BlockHeight>,
170    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
171
172    /// Returns the hash of the `Certificate` that last used a blob.
173    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
174
175    /// Returns the certificate that last used the blob.
176    async fn blob_last_used_by_certificate(
177        &self,
178        blob_id: BlobId,
179    ) -> Result<ConfirmedBlockCertificate, NodeError>;
180
181    /// Looks up the block heights where the given events were published.
182    /// Returns `None` for events not found in the index.
183    async fn event_block_heights(
184        &self,
185        event_ids: Vec<EventId>,
186    ) -> Result<Vec<Option<BlockHeight>>, NodeError>;
187
188    /// Returns the missing `Blob`s by their IDs.
189    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
190
191    /// Gets shard information for a specific chain.
192    async fn get_shard_info(
193        &self,
194        chain_id: ChainId,
195    ) -> Result<crate::data_types::ShardInfo, NodeError>;
196}
197
198/// Turn an address into a validator node.
199#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
200pub trait ValidatorNodeProvider: 'static {
201    type Node: ValidatorNode + MaybeSend + MaybeSync + Clone + 'static;
202
203    fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
204
205    fn make_nodes(
206        &self,
207        committee: &Committee,
208    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
209        let validator_addresses: Vec<_> = committee
210            .validator_addresses()
211            .map(|(node, name)| (node, name.to_owned()))
212            .collect();
213        self.make_nodes_from_list(validator_addresses)
214    }
215
216    fn make_nodes_from_list<A>(
217        &self,
218        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
219    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
220    where
221        A: AsRef<str>,
222    {
223        Ok(validators
224            .into_iter()
225            .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
226            .collect::<Result<Vec<_>, NodeError>>()?
227            .into_iter())
228    }
229}
230
231/// Error type for node queries.
232///
233/// This error is meant to be serialized over the network and aggregated by clients (i.e.
234/// clients will track validator votes on each error value).
235#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
236pub enum NodeError {
237    #[error("Cryptographic error: {error}")]
238    CryptoError { error: String },
239
240    #[error("Arithmetic error: {error}")]
241    ArithmeticError { error: String },
242
243    #[error("Error while accessing storage: {error}")]
244    ViewError { error: String },
245
246    #[error("Chain error: {error}")]
247    ChainError { error: String },
248
249    #[error("Worker error: {error}")]
250    WorkerError { error: String },
251
252    // This error must be normalized during conversions.
253    #[error("The chain {0} is not active in validator")]
254    InactiveChain(ChainId),
255
256    #[error("Round number should be {0:?}")]
257    WrongRound(Round),
258
259    #[error(
260        "Chain is expecting a next block at height {expected_block_height} but the given block \
261        is at height {found_block_height} instead"
262    )]
263    UnexpectedBlockHeight {
264        expected_block_height: BlockHeight,
265        found_block_height: BlockHeight,
266    },
267
268    // This error must be normalized during conversions.
269    #[error(
270        "Cannot vote for block proposal of chain {chain_id} because a message \
271         from chain {origin} at height {height} has not been received yet"
272    )]
273    MissingCrossChainUpdate {
274        chain_id: ChainId,
275        origin: ChainId,
276        height: BlockHeight,
277    },
278
279    #[error("Blobs not found: {0:?}")]
280    BlobsNotFound(Vec<BlobId>),
281
282    #[error("Blocks not found: {0:?}")]
283    BlocksNotFound(Vec<CryptoHash>),
284
285    #[error("Events not found: {0:?}")]
286    EventsNotFound(Vec<EventId>),
287
288    // This error must be normalized during conversions.
289    #[error("We don't have the value for the certificate.")]
290    MissingCertificateValue,
291
292    #[error("Response doesn't contain requested certificates: {0:?}")]
293    MissingCertificates(Vec<CryptoHash>),
294
295    #[error("Validator's response failed to include a vote when trying to {0}")]
296    MissingVoteInValidatorResponse(String),
297
298    #[error("The received chain info response is invalid")]
299    InvalidChainInfoResponse,
300    #[error("Unexpected certificate value")]
301    UnexpectedCertificateValue,
302
303    // Networking errors.
304    // TODO(#258): These errors should be defined in linera-rpc.
305    #[error("Cannot deserialize")]
306    InvalidDecoding,
307    #[error("Unexpected message")]
308    UnexpectedMessage,
309    #[error("Grpc error: {error}")]
310    GrpcError { error: String },
311    #[error("Network error while querying service: {error}")]
312    ClientIoError { error: String },
313    #[error("Failed to resolve validator address: {address}")]
314    CannotResolveValidatorAddress { address: String },
315    #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
316    SubscriptionError { transport: String },
317    #[error("Failed to subscribe; tonic status: {status:?}")]
318    SubscriptionFailed { status: String },
319
320    #[error("Node failed to provide a 'last used by' certificate for the blob")]
321    InvalidCertificateForBlob(BlobId),
322    #[error("Node returned a BlobsNotFound error with duplicates")]
323    DuplicatesInBlobsNotFound,
324    #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
325    UnexpectedEntriesInBlobsNotFound,
326    #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
327    UnexpectedCertificates {
328        returned: Vec<CryptoHash>,
329        requested: Vec<CryptoHash>,
330    },
331    #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
332    EmptyBlobsNotFound,
333    #[error("Local error handling validator response: {error}")]
334    ResponseHandlingError { error: String },
335
336    #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
337    MissingCertificatesByHeights {
338        chain_id: ChainId,
339        heights: Vec<BlockHeight>,
340    },
341
342    #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
343    TooManyCertificatesReturned {
344        chain_id: ChainId,
345        remote_node: Box<ValidatorPublicKey>,
346    },
347
348    #[error(
349        "Block timestamp ({block_timestamp}) is further in the future from local time \
350        ({local_time}) than block time grace period ({block_time_grace_period_ms} ms)"
351    )]
352    InvalidTimestamp {
353        block_timestamp: Timestamp,
354        local_time: Timestamp,
355        block_time_grace_period_ms: u64,
356    },
357
358    #[error("No validators available to handle the request")]
359    NoValidators,
360}
361
362impl NodeError {
363    /// Returns whether this error is an expected part of the protocol flow.
364    ///
365    /// Expected errors are those that validators return during normal operation and that
366    /// the client handles automatically (e.g. by supplying missing data and retrying).
367    /// Unexpected errors indicate genuine network issues, validator misbehavior, or
368    /// internal problems.
369    pub fn is_expected(&self) -> bool {
370        match self {
371            // Expected: validators return these during normal operation and the client
372            // handles them automatically by supplying missing data and retrying.
373            NodeError::BlobsNotFound(_)
374            | NodeError::BlocksNotFound(_)
375            | NodeError::EventsNotFound(_)
376            | NodeError::MissingCrossChainUpdate { .. }
377            | NodeError::WrongRound(_)
378            | NodeError::UnexpectedBlockHeight { .. }
379            | NodeError::InactiveChain(_)
380            | NodeError::InvalidTimestamp { .. }
381            | NodeError::MissingCertificateValue => true,
382
383            // Unexpected: network issues, validator misbehavior, or internal problems.
384            NodeError::CryptoError { .. }
385            | NodeError::ArithmeticError { .. }
386            | NodeError::ViewError { .. }
387            | NodeError::ChainError { .. }
388            | NodeError::WorkerError { .. }
389            | NodeError::MissingCertificates(_)
390            | NodeError::MissingVoteInValidatorResponse(_)
391            | NodeError::InvalidChainInfoResponse
392            | NodeError::UnexpectedCertificateValue
393            | NodeError::InvalidDecoding
394            | NodeError::UnexpectedMessage
395            | NodeError::GrpcError { .. }
396            | NodeError::ClientIoError { .. }
397            | NodeError::CannotResolveValidatorAddress { .. }
398            | NodeError::SubscriptionError { .. }
399            | NodeError::SubscriptionFailed { .. }
400            | NodeError::InvalidCertificateForBlob(_)
401            | NodeError::DuplicatesInBlobsNotFound
402            | NodeError::UnexpectedEntriesInBlobsNotFound
403            | NodeError::UnexpectedCertificates { .. }
404            | NodeError::EmptyBlobsNotFound
405            | NodeError::ResponseHandlingError { .. }
406            | NodeError::MissingCertificatesByHeights { .. }
407            | NodeError::TooManyCertificatesReturned { .. }
408            | NodeError::NoValidators => false,
409        }
410    }
411}
412
413impl From<tonic::Status> for NodeError {
414    fn from(status: tonic::Status) -> Self {
415        Self::GrpcError {
416            error: status.to_string(),
417        }
418    }
419}
420
421impl CrossChainMessageDelivery {
422    pub fn new(wait_for_outgoing_messages: bool) -> Self {
423        if wait_for_outgoing_messages {
424            CrossChainMessageDelivery::Blocking
425        } else {
426            CrossChainMessageDelivery::NonBlocking
427        }
428    }
429
430    pub fn wait_for_outgoing_messages(self) -> bool {
431        match self {
432            CrossChainMessageDelivery::NonBlocking => false,
433            CrossChainMessageDelivery::Blocking => true,
434        }
435    }
436}
437
438impl From<ViewError> for NodeError {
439    fn from(error: ViewError) -> Self {
440        Self::ViewError {
441            error: error.to_string(),
442        }
443    }
444}
445
446impl From<ArithmeticError> for NodeError {
447    fn from(error: ArithmeticError) -> Self {
448        Self::ArithmeticError {
449            error: error.to_string(),
450        }
451    }
452}
453
454impl From<CryptoError> for NodeError {
455    fn from(error: CryptoError) -> Self {
456        Self::CryptoError {
457            error: error.to_string(),
458        }
459    }
460}
461
462impl From<ChainError> for NodeError {
463    fn from(error: ChainError) -> Self {
464        match error {
465            ChainError::MissingCrossChainUpdate {
466                chain_id,
467                origin,
468                height,
469            } => Self::MissingCrossChainUpdate {
470                chain_id,
471                origin,
472                height,
473            },
474            ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
475            ChainError::ExecutionError(execution_error, context) => match *execution_error {
476                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
477                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
478                _ => Self::ChainError {
479                    error: ChainError::ExecutionError(execution_error, context).to_string(),
480                },
481            },
482            ChainError::UnexpectedBlockHeight {
483                expected_block_height,
484                found_block_height,
485            } => Self::UnexpectedBlockHeight {
486                expected_block_height,
487                found_block_height,
488            },
489            ChainError::WrongRound(round) => Self::WrongRound(round),
490            error => Self::ChainError {
491                error: error.to_string(),
492            },
493        }
494    }
495}
496
497impl From<WorkerError> for NodeError {
498    fn from(error: WorkerError) -> Self {
499        match error {
500            WorkerError::ChainError(error) => (*error).into(),
501            WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
502            WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
503            WorkerError::BlocksNotFound(hashes) => Self::BlocksNotFound(hashes),
504            WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
505            WorkerError::UnexpectedBlockHeight {
506                expected_block_height,
507                found_block_height,
508            } => NodeError::UnexpectedBlockHeight {
509                expected_block_height,
510                found_block_height,
511            },
512            WorkerError::InvalidTimestamp {
513                block_timestamp,
514                local_time,
515                block_time_grace_period,
516            } => NodeError::InvalidTimestamp {
517                block_timestamp,
518                local_time,
519                block_time_grace_period_ms: u64::try_from(block_time_grace_period.as_millis())
520                    .unwrap_or(u64::MAX),
521            },
522            error => Self::WorkerError {
523                error: error.to_string(),
524            },
525        }
526    }
527}