linera_core/
node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(not(web))]
6use futures::stream::BoxStream;
7#[cfg(web)]
8use futures::stream::LocalBoxStream as BoxStream;
9use futures::stream::Stream;
10use linera_base::{
11    crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
12    data_types::{
13        ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
14    },
15    identifiers::{BlobId, ChainId, EventId},
16};
17use linera_chain::{
18    data_types::BlockProposal,
19    types::{
20        ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
21        ValidatedBlock,
22    },
23    ChainError,
24};
25use linera_execution::{committee::Committee, ExecutionError};
26use linera_version::VersionInfo;
27use linera_views::ViewError;
28use serde::{Deserialize, Serialize};
29use thiserror::Error;
30
31use crate::{
32    data_types::{ChainInfoQuery, ChainInfoResponse},
33    worker::{Notification, WorkerError},
34};
35
36/// A pinned [`Stream`] of Notifications.
37pub type NotificationStream = BoxStream<'static, Notification>;
38
39/// Whether to wait for the delivery of outgoing cross-chain messages.
40#[derive(Debug, Default, Clone, Copy)]
41pub enum CrossChainMessageDelivery {
42    #[default]
43    NonBlocking,
44    Blocking,
45}
46
47/// How to communicate with a validator node.
48#[allow(async_fn_in_trait)]
49#[cfg_attr(not(web), trait_variant::make(Send))]
50pub trait ValidatorNode {
51    #[cfg(not(web))]
52    type NotificationStream: Stream<Item = Notification> + Unpin + Send;
53    #[cfg(web)]
54    type NotificationStream: Stream<Item = Notification> + Unpin;
55
56    fn address(&self) -> String;
57
58    /// Proposes a new block.
59    async fn handle_block_proposal(
60        &self,
61        proposal: BlockProposal,
62    ) -> Result<ChainInfoResponse, NodeError>;
63
64    /// Processes a certificate without a value.
65    async fn handle_lite_certificate(
66        &self,
67        certificate: LiteCertificate<'_>,
68        delivery: CrossChainMessageDelivery,
69    ) -> Result<ChainInfoResponse, NodeError>;
70
71    /// Processes a confirmed certificate.
72    async fn handle_confirmed_certificate(
73        &self,
74        certificate: GenericCertificate<ConfirmedBlock>,
75        delivery: CrossChainMessageDelivery,
76    ) -> Result<ChainInfoResponse, NodeError>;
77
78    /// Processes a validated certificate.
79    async fn handle_validated_certificate(
80        &self,
81        certificate: GenericCertificate<ValidatedBlock>,
82    ) -> Result<ChainInfoResponse, NodeError>;
83
84    /// Processes a timeout certificate.
85    async fn handle_timeout_certificate(
86        &self,
87        certificate: GenericCertificate<Timeout>,
88    ) -> Result<ChainInfoResponse, NodeError>;
89
90    /// Handles information queries for this chain.
91    async fn handle_chain_info_query(
92        &self,
93        query: ChainInfoQuery,
94    ) -> Result<ChainInfoResponse, NodeError>;
95
96    /// Gets the version info for this validator node.
97    async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
98
99    /// Gets the network's description.
100    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
101
102    /// Subscribes to receiving notifications for a collection of chains.
103    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
104
105    // Uploads a blob. Returns an error if the validator has not seen a
106    // certificate using this blob.
107    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
108
109    /// Uploads the blobs to the validator.
110    // Unfortunately, this doesn't compile as an async function: async functions in traits
111    // don't play well with default implementations, apparently.
112    // See also https://github.com/rust-lang/impl-trait-utils/issues/17
113    fn upload_blobs(
114        &self,
115        blobs: Vec<Blob>,
116    ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
117        let tasks: Vec<_> = blobs
118            .into_iter()
119            .map(|blob| self.upload_blob(blob.into()))
120            .collect();
121        futures::future::try_join_all(tasks)
122    }
123
124    /// Downloads a blob. Returns an error if the validator does not have the blob.
125    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
126
127    /// Downloads a blob that belongs to a pending proposal or the locking block on a chain.
128    async fn download_pending_blob(
129        &self,
130        chain_id: ChainId,
131        blob_id: BlobId,
132    ) -> Result<BlobContent, NodeError>;
133
134    /// Handles a blob that belongs to a pending proposal or validated block certificate.
135    async fn handle_pending_blob(
136        &self,
137        chain_id: ChainId,
138        blob: BlobContent,
139    ) -> Result<ChainInfoResponse, NodeError>;
140
141    async fn download_certificate(
142        &self,
143        hash: CryptoHash,
144    ) -> Result<ConfirmedBlockCertificate, NodeError>;
145
146    /// Requests a batch of certificates from the validator.
147    async fn download_certificates(
148        &self,
149        hashes: Vec<CryptoHash>,
150    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
151
152    /// Requests a batch of certificates from a specific chain by heights.
153    ///
154    /// Returns certificates in ascending order by height. This method does not guarantee
155    /// that all requested heights will be returned; if some certificates are missing,
156    /// the caller must handle that.
157    async fn download_certificates_by_heights(
158        &self,
159        chain_id: ChainId,
160        heights: Vec<BlockHeight>,
161    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
162
163    /// Returns the hash of the `Certificate` that last used a blob.
164    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
165
166    /// Returns the certificate that last used the blob.
167    async fn blob_last_used_by_certificate(
168        &self,
169        blob_id: BlobId,
170    ) -> Result<ConfirmedBlockCertificate, NodeError>;
171
172    /// Returns the missing `Blob`s by their IDs.
173    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
174
175    /// Gets shard information for a specific chain.
176    async fn get_shard_info(
177        &self,
178        chain_id: ChainId,
179    ) -> Result<crate::data_types::ShardInfo, NodeError>;
180}
181
182/// Turn an address into a validator node.
183#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
184pub trait ValidatorNodeProvider: 'static {
185    #[cfg(not(web))]
186    type Node: ValidatorNode + Send + Sync + Clone + 'static;
187    #[cfg(web)]
188    type Node: ValidatorNode + Clone + 'static;
189
190    fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
191
192    fn make_nodes(
193        &self,
194        committee: &Committee,
195    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
196        let validator_addresses: Vec<_> = committee
197            .validator_addresses()
198            .map(|(node, name)| (node, name.to_owned()))
199            .collect();
200        self.make_nodes_from_list(validator_addresses)
201    }
202
203    fn make_nodes_from_list<A>(
204        &self,
205        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
206    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
207    where
208        A: AsRef<str>,
209    {
210        Ok(validators
211            .into_iter()
212            .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
213            .collect::<Result<Vec<_>, NodeError>>()?
214            .into_iter())
215    }
216}
217
218/// Error type for node queries.
219///
220/// This error is meant to be serialized over the network and aggregated by clients (i.e.
221/// clients will track validator votes on each error value).
222#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
223pub enum NodeError {
224    #[error("Cryptographic error: {error}")]
225    CryptoError { error: String },
226
227    #[error("Arithmetic error: {error}")]
228    ArithmeticError { error: String },
229
230    #[error("Error while accessing storage: {error}")]
231    ViewError { error: String },
232
233    #[error("Chain error: {error}")]
234    ChainError { error: String },
235
236    #[error("Worker error: {error}")]
237    WorkerError { error: String },
238
239    // This error must be normalized during conversions.
240    #[error("The chain {0} is not active in validator")]
241    InactiveChain(ChainId),
242
243    #[error("Round number should be {0:?}")]
244    WrongRound(Round),
245
246    #[error(
247        "Chain is expecting a next block at height {expected_block_height} but the given block \
248        is at height {found_block_height} instead"
249    )]
250    UnexpectedBlockHeight {
251        expected_block_height: BlockHeight,
252        found_block_height: BlockHeight,
253    },
254
255    // This error must be normalized during conversions.
256    #[error(
257        "Cannot vote for block proposal of chain {chain_id} because a message \
258         from chain {origin} at height {height} has not been received yet"
259    )]
260    MissingCrossChainUpdate {
261        chain_id: ChainId,
262        origin: ChainId,
263        height: BlockHeight,
264    },
265
266    #[error("Blobs not found: {0:?}")]
267    BlobsNotFound(Vec<BlobId>),
268
269    #[error("Events not found: {0:?}")]
270    EventsNotFound(Vec<EventId>),
271
272    // This error must be normalized during conversions.
273    #[error("We don't have the value for the certificate.")]
274    MissingCertificateValue,
275
276    #[error("Response doesn't contain requested certificates: {0:?}")]
277    MissingCertificates(Vec<CryptoHash>),
278
279    #[error("Validator's response failed to include a vote when trying to {0}")]
280    MissingVoteInValidatorResponse(String),
281
282    #[error("The received chain info response is invalid")]
283    InvalidChainInfoResponse,
284    #[error("Unexpected certificate value")]
285    UnexpectedCertificateValue,
286
287    // Networking errors.
288    // TODO(#258): These errors should be defined in linera-rpc.
289    #[error("Cannot deserialize")]
290    InvalidDecoding,
291    #[error("Unexpected message")]
292    UnexpectedMessage,
293    #[error("Grpc error: {error}")]
294    GrpcError { error: String },
295    #[error("Network error while querying service: {error}")]
296    ClientIoError { error: String },
297    #[error("Failed to resolve validator address: {address}")]
298    CannotResolveValidatorAddress { address: String },
299    #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
300    SubscriptionError { transport: String },
301    #[error("Failed to subscribe; tonic status: {status:?}")]
302    SubscriptionFailed { status: String },
303
304    #[error("Node failed to provide a 'last used by' certificate for the blob")]
305    InvalidCertificateForBlob(BlobId),
306    #[error("Node returned a BlobsNotFound error with duplicates")]
307    DuplicatesInBlobsNotFound,
308    #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
309    UnexpectedEntriesInBlobsNotFound,
310    #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
311    UnexpectedCertificates {
312        returned: Vec<CryptoHash>,
313        requested: Vec<CryptoHash>,
314    },
315    #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
316    EmptyBlobsNotFound,
317    #[error("Local error handling validator response: {error}")]
318    ResponseHandlingError { error: String },
319
320    #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
321    MissingCertificatesByHeights {
322        chain_id: ChainId,
323        heights: Vec<BlockHeight>,
324    },
325
326    #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
327    TooManyCertificatesReturned {
328        chain_id: ChainId,
329        remote_node: Box<ValidatorPublicKey>,
330    },
331
332    #[error(
333        "Block timestamp ({block_timestamp}) is further in the future from local time \
334        ({local_time}) than block time grace period ({block_time_grace_period_ms} ms)"
335    )]
336    InvalidTimestamp {
337        block_timestamp: Timestamp,
338        local_time: Timestamp,
339        block_time_grace_period_ms: u64,
340    },
341}
342
343impl NodeError {
344    /// Returns whether this error is an expected part of the protocol flow.
345    ///
346    /// Expected errors are those that validators return during normal operation and that
347    /// the client handles automatically (e.g. by supplying missing data and retrying).
348    /// Unexpected errors indicate genuine network issues, validator misbehavior, or
349    /// internal problems.
350    pub fn is_expected(&self) -> bool {
351        match self {
352            // Expected: validators return these during normal operation and the client
353            // handles them automatically by supplying missing data and retrying.
354            NodeError::BlobsNotFound(_)
355            | NodeError::EventsNotFound(_)
356            | NodeError::MissingCrossChainUpdate { .. }
357            | NodeError::WrongRound(_)
358            | NodeError::UnexpectedBlockHeight { .. }
359            | NodeError::InactiveChain(_)
360            | NodeError::InvalidTimestamp { .. }
361            | NodeError::MissingCertificateValue => true,
362
363            // Unexpected: network issues, validator misbehavior, or internal problems.
364            NodeError::CryptoError { .. }
365            | NodeError::ArithmeticError { .. }
366            | NodeError::ViewError { .. }
367            | NodeError::ChainError { .. }
368            | NodeError::WorkerError { .. }
369            | NodeError::MissingCertificates(_)
370            | NodeError::MissingVoteInValidatorResponse(_)
371            | NodeError::InvalidChainInfoResponse
372            | NodeError::UnexpectedCertificateValue
373            | NodeError::InvalidDecoding
374            | NodeError::UnexpectedMessage
375            | NodeError::GrpcError { .. }
376            | NodeError::ClientIoError { .. }
377            | NodeError::CannotResolveValidatorAddress { .. }
378            | NodeError::SubscriptionError { .. }
379            | NodeError::SubscriptionFailed { .. }
380            | NodeError::InvalidCertificateForBlob(_)
381            | NodeError::DuplicatesInBlobsNotFound
382            | NodeError::UnexpectedEntriesInBlobsNotFound
383            | NodeError::UnexpectedCertificates { .. }
384            | NodeError::EmptyBlobsNotFound
385            | NodeError::ResponseHandlingError { .. }
386            | NodeError::MissingCertificatesByHeights { .. }
387            | NodeError::TooManyCertificatesReturned { .. } => false,
388        }
389    }
390}
391
392impl From<tonic::Status> for NodeError {
393    fn from(status: tonic::Status) -> Self {
394        Self::GrpcError {
395            error: status.to_string(),
396        }
397    }
398}
399
400impl CrossChainMessageDelivery {
401    pub fn new(wait_for_outgoing_messages: bool) -> Self {
402        if wait_for_outgoing_messages {
403            CrossChainMessageDelivery::Blocking
404        } else {
405            CrossChainMessageDelivery::NonBlocking
406        }
407    }
408
409    pub fn wait_for_outgoing_messages(self) -> bool {
410        match self {
411            CrossChainMessageDelivery::NonBlocking => false,
412            CrossChainMessageDelivery::Blocking => true,
413        }
414    }
415}
416
417impl From<ViewError> for NodeError {
418    fn from(error: ViewError) -> Self {
419        Self::ViewError {
420            error: error.to_string(),
421        }
422    }
423}
424
425impl From<ArithmeticError> for NodeError {
426    fn from(error: ArithmeticError) -> Self {
427        Self::ArithmeticError {
428            error: error.to_string(),
429        }
430    }
431}
432
433impl From<CryptoError> for NodeError {
434    fn from(error: CryptoError) -> Self {
435        Self::CryptoError {
436            error: error.to_string(),
437        }
438    }
439}
440
441impl From<ChainError> for NodeError {
442    fn from(error: ChainError) -> Self {
443        match error {
444            ChainError::MissingCrossChainUpdate {
445                chain_id,
446                origin,
447                height,
448            } => Self::MissingCrossChainUpdate {
449                chain_id,
450                origin,
451                height,
452            },
453            ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
454            ChainError::ExecutionError(execution_error, context) => match *execution_error {
455                ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
456                ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
457                _ => Self::ChainError {
458                    error: ChainError::ExecutionError(execution_error, context).to_string(),
459                },
460            },
461            ChainError::UnexpectedBlockHeight {
462                expected_block_height,
463                found_block_height,
464            } => Self::UnexpectedBlockHeight {
465                expected_block_height,
466                found_block_height,
467            },
468            ChainError::WrongRound(round) => Self::WrongRound(round),
469            error => Self::ChainError {
470                error: error.to_string(),
471            },
472        }
473    }
474}
475
476impl From<WorkerError> for NodeError {
477    fn from(error: WorkerError) -> Self {
478        match error {
479            WorkerError::ChainError(error) => (*error).into(),
480            WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
481            WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
482            WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
483            WorkerError::UnexpectedBlockHeight {
484                expected_block_height,
485                found_block_height,
486            } => NodeError::UnexpectedBlockHeight {
487                expected_block_height,
488                found_block_height,
489            },
490            WorkerError::InvalidTimestamp {
491                block_timestamp,
492                local_time,
493                block_time_grace_period,
494            } => NodeError::InvalidTimestamp {
495                block_timestamp,
496                local_time,
497                block_time_grace_period_ms: block_time_grace_period.as_millis() as u64,
498            },
499            error => Self::WorkerError {
500                error: error.to_string(),
501            },
502        }
503    }
504}