1use std::sync::Arc;
6
7#[cfg(not(web))]
8use futures::stream::BoxStream;
9#[cfg(web)]
10use futures::stream::LocalBoxStream as BoxStream;
11use futures::stream::Stream;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
14 data_types::{
15 ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
16 },
17 identifiers::{BlobId, ChainId, EventId},
18 task::{MaybeSend, MaybeSync},
19};
20use linera_cache::Arc as CacheArc;
21use linera_chain::{
22 data_types::BlockProposal,
23 types::{
24 ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
25 ValidatedBlock,
26 },
27 ChainError,
28};
29use linera_execution::{committee::Committee, ExecutionError};
30use linera_version::VersionInfo;
31use linera_views::ViewError;
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35use crate::{
36 data_types::{ChainInfoQuery, ChainInfoResponse},
37 worker::{Notification, WorkerError},
38};
39
40pub type NotificationStream = BoxStream<'static, Notification>;
42
43pub type BlobStream = BoxStream<'static, Result<BlobContent, NodeError>>;
45
46#[derive(Debug, Default, Clone, Copy)]
48#[allow(missing_docs)]
49pub enum CrossChainMessageDelivery {
50 #[default]
51 NonBlocking,
52 Blocking,
53}
54
55#[allow(async_fn_in_trait)]
57#[cfg_attr(not(web), trait_variant::make(Send))]
58pub trait ValidatorNode {
59 type NotificationStream: Stream<Item = Notification> + Unpin + MaybeSend;
61
62 fn address(&self) -> String;
64
65 async fn handle_block_proposal(
67 &self,
68 proposal: BlockProposal,
69 ) -> Result<ChainInfoResponse, NodeError>;
70
71 async fn handle_lite_certificate(
73 &self,
74 certificate: LiteCertificate<'_>,
75 delivery: CrossChainMessageDelivery,
76 ) -> Result<ChainInfoResponse, NodeError>;
77
78 async fn handle_confirmed_certificate(
80 &self,
81 certificate: CacheArc<GenericCertificate<ConfirmedBlock>>,
82 delivery: CrossChainMessageDelivery,
83 ) -> Result<ChainInfoResponse, NodeError>;
84
85 async fn handle_validated_certificate(
87 &self,
88 certificate: GenericCertificate<ValidatedBlock>,
89 ) -> Result<ChainInfoResponse, NodeError>;
90
91 async fn handle_timeout_certificate(
93 &self,
94 certificate: GenericCertificate<Timeout>,
95 ) -> Result<ChainInfoResponse, NodeError>;
96
97 async fn handle_chain_info_query(
99 &self,
100 query: ChainInfoQuery,
101 ) -> Result<ChainInfoResponse, NodeError>;
102
103 async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
105
106 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
108
109 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
111
112 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
115
116 fn upload_blobs(
121 &self,
122 blobs: Vec<Arc<Blob>>,
123 ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
124 let tasks: Vec<_> = blobs
125 .into_iter()
126 .map(|blob| self.upload_blob(blob.into()))
127 .collect();
128 futures::future::try_join_all(tasks)
129 }
130
131 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
133
134 async fn download_blobs(&self, blob_ids: Vec<BlobId>) -> Result<BlobStream, NodeError>;
138
139 async fn download_pending_blob(
141 &self,
142 chain_id: ChainId,
143 blob_id: BlobId,
144 ) -> Result<BlobContent, NodeError>;
145
146 async fn handle_pending_blob(
148 &self,
149 chain_id: ChainId,
150 blob: BlobContent,
151 ) -> Result<ChainInfoResponse, NodeError>;
152
153 async fn download_certificate(
155 &self,
156 hash: CryptoHash,
157 ) -> Result<ConfirmedBlockCertificate, NodeError>;
158
159 async fn download_certificates(
161 &self,
162 hashes: Vec<CryptoHash>,
163 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
164
165 async fn download_certificates_by_heights(
171 &self,
172 chain_id: ChainId,
173 heights: Vec<BlockHeight>,
174 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
175
176 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
178
179 async fn blob_last_used_by_certificate(
181 &self,
182 blob_id: BlobId,
183 ) -> Result<ConfirmedBlockCertificate, NodeError>;
184
185 async fn event_block_heights(
188 &self,
189 event_ids: Vec<EventId>,
190 ) -> Result<Vec<Option<BlockHeight>>, NodeError>;
191
192 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
194
195 async fn get_shard_info(
197 &self,
198 chain_id: ChainId,
199 ) -> Result<crate::data_types::ShardInfo, NodeError>;
200}
201
202#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
204pub trait ValidatorNodeProvider: 'static {
205 type Node: ValidatorNode + MaybeSend + MaybeSync + Clone + 'static;
207
208 fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
210
211 fn make_nodes(
213 &self,
214 committee: &Committee,
215 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
216 let validator_addresses: Vec<_> = committee
217 .validator_addresses()
218 .map(|(node, name)| (node, name.to_owned()))
219 .collect();
220 self.make_nodes_from_list(validator_addresses)
221 }
222
223 fn make_nodes_from_list<A>(
225 &self,
226 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
227 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
228 where
229 A: AsRef<str>,
230 {
231 Ok(validators
232 .into_iter()
233 .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
234 .collect::<Result<Vec<_>, NodeError>>()?
235 .into_iter())
236 }
237}
238
239#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
244#[allow(missing_docs)]
245pub enum NodeError {
246 #[error("Cryptographic error: {error}")]
247 CryptoError { error: String },
248
249 #[error("Arithmetic error: {error}")]
250 ArithmeticError { error: String },
251
252 #[error("Error while accessing storage: {error}")]
253 ViewError { error: String },
254
255 #[error("Chain error: {error}")]
256 ChainError { error: String },
257
258 #[error("Worker error: {error}")]
259 WorkerError { error: String },
260
261 #[error("The chain {0} is not active in validator")]
263 InactiveChain(ChainId),
264
265 #[error("Round number should be {0:?}")]
266 WrongRound(Round),
267
268 #[error(
269 "Chain is expecting a next block at height {expected_block_height} but the given block \
270 is at height {found_block_height} instead"
271 )]
272 UnexpectedBlockHeight {
273 expected_block_height: BlockHeight,
274 found_block_height: BlockHeight,
275 },
276
277 #[error(
279 "Cannot vote for block proposal of chain {chain_id} because a message \
280 from chain {origin} at height {height} has not been received yet"
281 )]
282 MissingCrossChainUpdate {
283 chain_id: ChainId,
284 origin: ChainId,
285 height: BlockHeight,
286 },
287
288 #[error("Blobs not found: {0:?}")]
289 BlobsNotFound(Vec<BlobId>),
290
291 #[error("Blocks not found: {0:?}")]
292 BlocksNotFound(Vec<CryptoHash>),
293
294 #[error("Events not found: {0:?}")]
295 EventsNotFound(Vec<EventId>),
296
297 #[error("We don't have the value for the certificate.")]
299 MissingCertificateValue,
300
301 #[error("Response doesn't contain requested certificates: {0:?}")]
302 MissingCertificates(Vec<CryptoHash>),
303
304 #[error("Validator's response failed to include a vote when trying to {0}")]
305 MissingVoteInValidatorResponse(String),
306
307 #[error("The received chain info response is invalid")]
308 InvalidChainInfoResponse,
309 #[error("Unexpected certificate value")]
310 UnexpectedCertificateValue,
311
312 #[error("Cannot deserialize")]
315 InvalidDecoding,
316 #[error("Unexpected message")]
317 UnexpectedMessage,
318 #[error("Grpc error: {error}")]
319 GrpcError { error: String },
320 #[error("Network error while querying service: {error}")]
321 ClientIoError { error: String },
322 #[error("Failed to resolve validator address: {address}")]
323 CannotResolveValidatorAddress { address: String },
324 #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
325 SubscriptionError { transport: String },
326 #[error("Failed to subscribe; tonic status: {status:?}")]
327 SubscriptionFailed { status: String },
328
329 #[error("Node failed to provide a 'last used by' certificate for the blob")]
330 InvalidCertificateForBlob(BlobId),
331 #[error("Node returned a BlobsNotFound error with duplicates")]
332 DuplicatesInBlobsNotFound,
333 #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
334 UnexpectedEntriesInBlobsNotFound,
335 #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
336 UnexpectedCertificates {
337 returned: Vec<CryptoHash>,
338 requested: Vec<CryptoHash>,
339 },
340 #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
341 EmptyBlobsNotFound,
342 #[error("Local error handling validator response: {error}")]
343 ResponseHandlingError { error: String },
344
345 #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
346 MissingCertificatesByHeights {
347 chain_id: ChainId,
348 heights: Vec<BlockHeight>,
349 },
350
351 #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
352 TooManyCertificatesReturned {
353 chain_id: ChainId,
354 remote_node: Box<ValidatorPublicKey>,
355 },
356
357 #[error(
358 "Block timestamp ({block_timestamp}) is further in the future from local time \
359 ({local_time}) than block time grace period ({block_time_grace_period_ms} ms)"
360 )]
361 InvalidTimestamp {
362 block_timestamp: Timestamp,
363 local_time: Timestamp,
364 block_time_grace_period_ms: u64,
365 },
366
367 #[error("No validators available to handle the request")]
368 NoValidators,
369}
370
371impl NodeError {
372 pub fn is_expected(&self) -> bool {
379 match self {
380 NodeError::BlobsNotFound(_)
383 | NodeError::BlocksNotFound(_)
384 | NodeError::EventsNotFound(_)
385 | NodeError::MissingCrossChainUpdate { .. }
386 | NodeError::WrongRound(_)
387 | NodeError::UnexpectedBlockHeight { .. }
388 | NodeError::InactiveChain(_)
389 | NodeError::InvalidTimestamp { .. }
390 | NodeError::MissingCertificateValue => true,
391
392 NodeError::CryptoError { .. }
394 | NodeError::ArithmeticError { .. }
395 | NodeError::ViewError { .. }
396 | NodeError::ChainError { .. }
397 | NodeError::WorkerError { .. }
398 | NodeError::MissingCertificates(_)
399 | NodeError::MissingVoteInValidatorResponse(_)
400 | NodeError::InvalidChainInfoResponse
401 | NodeError::UnexpectedCertificateValue
402 | NodeError::InvalidDecoding
403 | NodeError::UnexpectedMessage
404 | NodeError::GrpcError { .. }
405 | NodeError::ClientIoError { .. }
406 | NodeError::CannotResolveValidatorAddress { .. }
407 | NodeError::SubscriptionError { .. }
408 | NodeError::SubscriptionFailed { .. }
409 | NodeError::InvalidCertificateForBlob(_)
410 | NodeError::DuplicatesInBlobsNotFound
411 | NodeError::UnexpectedEntriesInBlobsNotFound
412 | NodeError::UnexpectedCertificates { .. }
413 | NodeError::EmptyBlobsNotFound
414 | NodeError::ResponseHandlingError { .. }
415 | NodeError::MissingCertificatesByHeights { .. }
416 | NodeError::TooManyCertificatesReturned { .. }
417 | NodeError::NoValidators => false,
418 }
419 }
420}
421
422impl From<tonic::Status> for NodeError {
423 fn from(status: tonic::Status) -> Self {
424 Self::GrpcError {
425 error: status.to_string(),
426 }
427 }
428}
429
430impl CrossChainMessageDelivery {
431 pub fn new(wait_for_outgoing_messages: bool) -> Self {
433 if wait_for_outgoing_messages {
434 CrossChainMessageDelivery::Blocking
435 } else {
436 CrossChainMessageDelivery::NonBlocking
437 }
438 }
439
440 pub fn wait_for_outgoing_messages(self) -> bool {
442 match self {
443 CrossChainMessageDelivery::NonBlocking => false,
444 CrossChainMessageDelivery::Blocking => true,
445 }
446 }
447}
448
449impl From<ViewError> for NodeError {
450 fn from(error: ViewError) -> Self {
451 Self::ViewError {
452 error: error.to_string(),
453 }
454 }
455}
456
457impl From<ArithmeticError> for NodeError {
458 fn from(error: ArithmeticError) -> Self {
459 Self::ArithmeticError {
460 error: error.to_string(),
461 }
462 }
463}
464
465impl From<CryptoError> for NodeError {
466 fn from(error: CryptoError) -> Self {
467 Self::CryptoError {
468 error: error.to_string(),
469 }
470 }
471}
472
473impl From<ChainError> for NodeError {
474 fn from(error: ChainError) -> Self {
475 match error {
476 ChainError::MissingCrossChainUpdate {
477 chain_id,
478 origin,
479 height,
480 } => Self::MissingCrossChainUpdate {
481 chain_id,
482 origin,
483 height,
484 },
485 ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
486 ChainError::ExecutionError(execution_error, context) => match *execution_error {
487 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
488 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
489 _ => Self::ChainError {
490 error: ChainError::ExecutionError(execution_error, context).to_string(),
491 },
492 },
493 ChainError::UnexpectedBlockHeight {
494 expected_block_height,
495 found_block_height,
496 } => Self::UnexpectedBlockHeight {
497 expected_block_height,
498 found_block_height,
499 },
500 ChainError::WrongRound(round) => Self::WrongRound(round),
501 error => Self::ChainError {
502 error: error.to_string(),
503 },
504 }
505 }
506}
507
508impl From<WorkerError> for NodeError {
509 fn from(error: WorkerError) -> Self {
510 match error {
511 WorkerError::ChainError(error) => (*error).into(),
512 WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
513 WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
514 WorkerError::BlocksNotFound(hashes) => Self::BlocksNotFound(hashes),
515 WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
516 WorkerError::UnexpectedBlockHeight {
517 expected_block_height,
518 found_block_height,
519 } => NodeError::UnexpectedBlockHeight {
520 expected_block_height,
521 found_block_height,
522 },
523 WorkerError::InvalidTimestamp {
524 block_timestamp,
525 local_time,
526 block_time_grace_period,
527 } => NodeError::InvalidTimestamp {
528 block_timestamp,
529 local_time,
530 block_time_grace_period_ms: u64::try_from(block_time_grace_period.as_millis())
531 .unwrap_or(u64::MAX),
532 },
533 error => Self::WorkerError {
534 error: error.to_string(),
535 },
536 }
537 }
538}