linera_core/
node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7#[cfg(not(web))]
8use futures::stream::BoxStream;
9#[cfg(web)]
10use futures::stream::LocalBoxStream as BoxStream;
11use futures::stream::Stream;
12use linera_base::{
13    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
14    data_types::{
15        ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
16    },
17    identifiers::{BlobId, ChainId, EventId},
18    task::{MaybeSend, MaybeSync},
19};
20use linera_chain::{
21    data_types::BlockProposal,
22    types::{
23        ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
24        ValidatedBlock,
25    },
26    ChainError,
27};
28use linera_execution::{committee::Committee, ExecutionError};
29use linera_version::VersionInfo;
30use linera_views::ViewError;
31use serde::{Deserialize, Serialize};
32use thiserror::Error;
33
34use crate::{
35    data_types::{ChainInfoQuery, ChainInfoResponse},
36    worker::{Notification, WorkerError},
37};
38
39/// A pinned [`Stream`] of Notifications.
40pub type NotificationStream = BoxStream<'static, Notification>;
41
42/// A pinned [`Stream`] of blob contents returned by batch downloads.
43pub type BlobStream = BoxStream<'static, Result<BlobContent, NodeError>>;
44
45/// Whether to wait for the delivery of outgoing cross-chain messages.
46#[derive(Debug, Default, Clone, Copy)]
47pub enum CrossChainMessageDelivery {
48    #[default]
49    NonBlocking,
50    Blocking,
51}
52
53/// How to communicate with a validator node.
54#[allow(async_fn_in_trait)]
55#[cfg_attr(not(web), trait_variant::make(Send))]
56pub trait ValidatorNode {
57    type NotificationStream: Stream<Item = Notification> + Unpin + MaybeSend;
58
59    fn address(&self) -> String;
60
61    /// Proposes a new block.
62    async fn handle_block_proposal(
63        &self,
64        proposal: BlockProposal,
65    ) -> Result<ChainInfoResponse, NodeError>;
66
67    /// Processes a certificate without a value.
68    async fn handle_lite_certificate(
69        &self,
70        certificate: LiteCertificate<'_>,
71        delivery: CrossChainMessageDelivery,
72    ) -> Result<ChainInfoResponse, NodeError>;
73
74    /// Processes a confirmed certificate.
75    async fn handle_confirmed_certificate(
76        &self,
77        certificate: Arc<GenericCertificate<ConfirmedBlock>>,
78        delivery: CrossChainMessageDelivery,
79    ) -> Result<ChainInfoResponse, NodeError>;
80
81    /// Processes a validated certificate.
82    async fn handle_validated_certificate(
83        &self,
84        certificate: GenericCertificate<ValidatedBlock>,
85    ) -> Result<ChainInfoResponse, NodeError>;
86
87    /// Processes a timeout certificate.
88    async fn handle_timeout_certificate(
89        &self,
90        certificate: GenericCertificate<Timeout>,
91    ) -> Result<ChainInfoResponse, NodeError>;
92
93    /// Handles information queries for this chain.
94    async fn handle_chain_info_query(
95        &self,
96        query: ChainInfoQuery,
97    ) -> Result<ChainInfoResponse, NodeError>;
98
99    /// Gets the version info for this validator node.
100    async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
101
102    /// Gets the network's description.
103    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
104
105    /// Subscribes to receiving notifications for a collection of chains.
106    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
107
108    // Uploads a blob. Returns an error if the validator has not seen a
109    // certificate using this blob.
110    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
111
112    /// Uploads the blobs to the validator.
113    // Unfortunately, this doesn't compile as an async function: async functions in traits
114    // don't play well with default implementations, apparently.
115    // See also https://github.com/rust-lang/impl-trait-utils/issues/17
116    fn upload_blobs(
117        &self,
118        blobs: Vec<Arc<Blob>>,
119    ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
120        let tasks: Vec<_> = blobs
121            .into_iter()
122            .map(|blob| self.upload_blob(blob.into()))
123            .collect();
124        futures::future::try_join_all(tasks)
125    }
126
127    /// Downloads a blob. Returns an error if the validator does not have the blob.
128    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
129
130    /// Downloads a batch of blobs as a stream. The stream yields one blob per
131    /// requested id, in order. On mid-stream errors, the caller can retry the
132    /// remaining blob ids against another validator.
133    async fn download_blobs(&self, blob_ids: Vec<BlobId>) -> Result<BlobStream, NodeError>;
134
135    /// Downloads a blob that belongs to a pending proposal or the locking block on a chain.
136    async fn download_pending_blob(
137        &self,
138        chain_id: ChainId,
139        blob_id: BlobId,
140    ) -> Result<BlobContent, NodeError>;
141
142    /// Handles a blob that belongs to a pending proposal or validated block certificate.
143    async fn handle_pending_blob(
144        &self,
145        chain_id: ChainId,
146        blob: BlobContent,
147    ) -> Result<ChainInfoResponse, NodeError>;
148
149    async fn download_certificate(
150        &self,
151        hash: CryptoHash,
152    ) -> Result<ConfirmedBlockCertificate, NodeError>;
153
154    /// Requests a batch of certificates from the validator.
155    async fn download_certificates(
156        &self,
157        hashes: Vec<CryptoHash>,
158    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
159
160    /// Requests a batch of certificates from a specific chain by heights.
161    ///
162    /// Returns certificates in ascending order by height. This method does not guarantee
163    /// that all requested heights will be returned; if some certificates are missing,
164    /// the caller must handle that.
165    async fn download_certificates_by_heights(
166        &self,
167        chain_id: ChainId,
168        heights: Vec<BlockHeight>,
169    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
170
171    /// Returns the hash of the `Certificate` that last used a blob.
172    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
173
174    /// Returns the certificate that last used the blob.
175    async fn blob_last_used_by_certificate(
176        &self,
177        blob_id: BlobId,
178    ) -> Result<ConfirmedBlockCertificate, NodeError>;
179
180    /// Looks up the block heights where the given events were published.
181    /// Returns `None` for events not found in the index.
182    async fn event_block_heights(
183        &self,
184        event_ids: Vec<EventId>,
185    ) -> Result<Vec<Option<BlockHeight>>, NodeError>;
186
187    /// Returns the missing `Blob`s by their IDs.
188    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
189
190    /// Gets shard information for a specific chain.
191    async fn get_shard_info(
192        &self,
193        chain_id: ChainId,
194    ) -> Result<crate::data_types::ShardInfo, NodeError>;
195}
196
197/// Turn an address into a validator node.
198#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
199pub trait ValidatorNodeProvider: 'static {
200    type Node: ValidatorNode + MaybeSend + MaybeSync + Clone + 'static;
201
202    fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
203
204    fn make_nodes(
205        &self,
206        committee: &Committee,
207    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
208        let validator_addresses: Vec<_> = committee
209            .validator_addresses()
210            .map(|(node, name)| (node, name.to_owned()))
211            .collect();
212        self.make_nodes_from_list(validator_addresses)
213    }
214
215    fn make_nodes_from_list<A>(
216        &self,
217        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
218    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
219    where
220        A: AsRef<str>,
221    {
222        Ok(validators
223            .into_iter()
224            .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
225            .collect::<Result<Vec<_>, NodeError>>()?
226            .into_iter())
227    }
228}
229
230/// Error type for node queries.
231///
232/// This error is meant to be serialized over the network and aggregated by clients (i.e.
233/// clients will track validator votes on each error value).
234#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
235pub enum NodeError {
236    #[error("Cryptographic error: {error}")]
237    CryptoError { error: String },
238
239    #[error("Arithmetic error: {error}")]
240    ArithmeticError { error: String },
241
242    #[error("Error while accessing storage: {error}")]
243    ViewError { error: String },
244
245    #[error("Chain error: {error}")]
246    ChainError { error: String },
247
248    #[error("Worker error: {error}")]
249    WorkerError { error: String },
250
251    // This error must be normalized during conversions.
252    #[error("The chain {0} is not active in validator")]
253    InactiveChain(ChainId),
254
255    #[error("Round number should be {0:?}")]
256    WrongRound(Round),
257
258    #[error(
259        "Chain is expecting a next block at height {expected_block_height} but the given block \
260        is at height {found_block_height} instead"
261    )]
262    UnexpectedBlockHeight {
263        expected_block_height: BlockHeight,
264        found_block_height: BlockHeight,
265    },
266
267    // This error must be normalized during conversions.
268    #[error(
269        "Cannot vote for block proposal of chain {chain_id} because a message \
270         from chain {origin} at height {height} has not been received yet"
271    )]
272    MissingCrossChainUpdate {
273        chain_id: ChainId,
274        origin: ChainId,
275        height: BlockHeight,
276    },
277
278    #[error("Blobs not found: {0:?}")]
279    BlobsNotFound(Vec<BlobId>),
280
281    #[error("Events not found: {0:?}")]
282    EventsNotFound(Vec<EventId>),
283
284    // This error must be normalized during conversions.
285    #[error("We don't have the value for the certificate.")]
286    MissingCertificateValue,
287
288    #[error("Response doesn't contain requested certificates: {0:?}")]
289    MissingCertificates(Vec<CryptoHash>),
290
291    #[error("Validator's response failed to include a vote when trying to {0}")]
292    MissingVoteInValidatorResponse(String),
293
294    #[error("The received chain info response is invalid")]
295    InvalidChainInfoResponse,
296    #[error("Unexpected certificate value")]
297    UnexpectedCertificateValue,
298
299    // Networking errors.
300    // TODO(#258): These errors should be defined in linera-rpc.
301    #[error("Cannot deserialize")]
302    InvalidDecoding,
303    #[error("Unexpected message")]
304    UnexpectedMessage,
305    #[error("Grpc error: {error}")]
306    GrpcError { error: String },
307    #[error("Network error while querying service: {error}")]
308    ClientIoError { error: String },
309    #[error("Failed to resolve validator address: {address}")]
310    CannotResolveValidatorAddress { address: String },
311    #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
312    SubscriptionError { transport: String },
313    #[error("Failed to subscribe; tonic status: {status:?}")]
314    SubscriptionFailed { status: String },
315
316    #[error("Node failed to provide a 'last used by' certificate for the blob")]
317    InvalidCertificateForBlob(BlobId),
318    #[error("Node returned a BlobsNotFound error with duplicates")]
319    DuplicatesInBlobsNotFound,
320    #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
321    UnexpectedEntriesInBlobsNotFound,
322    #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
323    UnexpectedCertificates {
324        returned: Vec<CryptoHash>,
325        requested: Vec<CryptoHash>,
326    },
327    #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
328    EmptyBlobsNotFound,
329    #[error("Local error handling validator response: {error}")]
330    ResponseHandlingError { error: String },
331
332    #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
333    MissingCertificatesByHeights {
334        chain_id: ChainId,
335        heights: Vec<BlockHeight>,
336    },
337
338    #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
339    TooManyCertificatesReturned {
340        chain_id: ChainId,
341        remote_node: Box<ValidatorPublicKey>,
342    },
343
344    #[error(
345        "Block timestamp ({block_timestamp}) is further in the future from local time \
346        ({local_time}) than block time grace period ({block_time_grace_period_ms} ms)"
347    )]
348    InvalidTimestamp {
349        block_timestamp: Timestamp,
350        local_time: Timestamp,
351        block_time_grace_period_ms: u64,
352    },
353
354    #[error("No validators available to handle the request")]
355    NoValidators,
356}
357
358impl NodeError {
359    /// Returns whether this error is an expected part of the protocol flow.
360    ///
361    /// Expected errors are those that validators return during normal operation and that
362    /// the client handles automatically (e.g. by supplying missing data and retrying).
363    /// Unexpected errors indicate genuine network issues, validator misbehavior, or
364    /// internal problems.
365    pub fn is_expected(&self) -> bool {
366        match self {
367            // Expected: validators return these during normal operation and the client
368            // handles them automatically by supplying missing data and retrying.
369            NodeError::BlobsNotFound(_)
370            | NodeError::EventsNotFound(_)
371            | NodeError::MissingCrossChainUpdate { .. }
372            | NodeError::WrongRound(_)
373            | NodeError::UnexpectedBlockHeight { .. }
374            | NodeError::InactiveChain(_)
375            | NodeError::InvalidTimestamp { .. }
376            | NodeError::MissingCertificateValue => true,
377
378            // Unexpected: network issues, validator misbehavior, or internal problems.
379            NodeError::CryptoError { .. }
380            | NodeError::ArithmeticError { .. }
381            | NodeError::ViewError { .. }
382            | NodeError::ChainError { .. }
383            | NodeError::WorkerError { .. }
384            | NodeError::MissingCertificates(_)
385            | NodeError::MissingVoteInValidatorResponse(_)
386            | NodeError::InvalidChainInfoResponse
387            | NodeError::UnexpectedCertificateValue
388            | NodeError::InvalidDecoding
389            | NodeError::UnexpectedMessage
390            | NodeError::GrpcError { .. }
391            | NodeError::ClientIoError { .. }
392            | NodeError::CannotResolveValidatorAddress { .. }
393            | NodeError::SubscriptionError { .. }
394            | NodeError::SubscriptionFailed { .. }
395            | NodeError::InvalidCertificateForBlob(_)
396            | NodeError::DuplicatesInBlobsNotFound
397            | NodeError::UnexpectedEntriesInBlobsNotFound
398            | NodeError::UnexpectedCertificates { .. }
399            | NodeError::EmptyBlobsNotFound
400            | NodeError::ResponseHandlingError { .. }
401            | NodeError::MissingCertificatesByHeights { .. }
402            | NodeError::TooManyCertificatesReturned { .. }
403            | NodeError::NoValidators => false,
404        }
405    }
406}
407
408impl From<tonic::Status> for NodeError {
409    fn from(status: tonic::Status) -> Self {
410        Self::GrpcError {
411            error: status.to_string(),
412        }
413    }
414}
415
416impl CrossChainMessageDelivery {
417    pub fn new(wait_for_outgoing_messages: bool) -> Self {
418        if wait_for_outgoing_messages {
419            CrossChainMessageDelivery::Blocking
420        } else {
421            CrossChainMessageDelivery::NonBlocking
422        }
423    }
424
425    pub fn wait_for_outgoing_messages(self) -> bool {
426        match self {
427            CrossChainMessageDelivery::NonBlocking => false,
428            CrossChainMessageDelivery::Blocking => true,
429        }
430    }
431}
432
433impl From<ViewError> for NodeError {
434    fn from(error: ViewError) -> Self {
435        Self::ViewError {
436            error: error.to_string(),
437        }
438    }
439}
440
441impl From<ArithmeticError> for NodeError {
442    fn from(error: ArithmeticError) -> Self {
443        Self::ArithmeticError {
444            error: error.to_string(),
445        }
446    }
447}
448
449impl From<CryptoError> for NodeError {
450    fn from(error: CryptoError) -> Self {
451        Self::CryptoError {
452            error: error.to_string(),
453        }
454    }
455}
456
457impl From<ChainError> for NodeError {
458    fn from(error: ChainError) -> Self {
459        match error {
460            ChainError::MissingCrossChainUpdate {
461                chain_id,
462                origin,
463                height,
464            } => Self::MissingCrossChainUpdate {
465                chain_id,
466                origin,
467                height,
468            },
469            ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
470            ChainError::ExecutionError(execution_error, context) => match *execution_error {
471                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
472                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
473                _ => Self::ChainError {
474                    error: ChainError::ExecutionError(execution_error, context).to_string(),
475                },
476            },
477            ChainError::UnexpectedBlockHeight {
478                expected_block_height,
479                found_block_height,
480            } => Self::UnexpectedBlockHeight {
481                expected_block_height,
482                found_block_height,
483            },
484            ChainError::WrongRound(round) => Self::WrongRound(round),
485            error => Self::ChainError {
486                error: error.to_string(),
487            },
488        }
489    }
490}
491
492impl From<WorkerError> for NodeError {
493    fn from(error: WorkerError) -> Self {
494        match error {
495            WorkerError::ChainError(error) => (*error).into(),
496            WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
497            WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
498            WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
499            WorkerError::UnexpectedBlockHeight {
500                expected_block_height,
501                found_block_height,
502            } => NodeError::UnexpectedBlockHeight {
503                expected_block_height,
504                found_block_height,
505            },
506            WorkerError::InvalidTimestamp {
507                block_timestamp,
508                local_time,
509                block_time_grace_period,
510            } => NodeError::InvalidTimestamp {
511                block_timestamp,
512                local_time,
513                block_time_grace_period_ms: block_time_grace_period.as_millis() as u64,
514            },
515            error => Self::WorkerError {
516                error: error.to_string(),
517            },
518        }
519    }
520}