1use std::sync::Arc;
6
7#[cfg(not(web))]
8use futures::stream::BoxStream;
9#[cfg(web)]
10use futures::stream::LocalBoxStream as BoxStream;
11use futures::stream::Stream;
12use linera_base::{
13 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
14 data_types::{
15 ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
16 },
17 identifiers::{BlobId, ChainId, EventId},
18 task::{MaybeSend, MaybeSync},
19};
20use linera_cache::Arc as CacheArc;
21use linera_chain::{
22 data_types::BlockProposal,
23 types::{
24 ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
25 ValidatedBlock,
26 },
27 ChainError,
28};
29use linera_execution::{committee::Committee, ExecutionError};
30use linera_version::VersionInfo;
31use linera_views::ViewError;
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35use crate::{
36 data_types::{ChainInfoQuery, ChainInfoResponse},
37 worker::{Notification, WorkerError},
38};
39
40pub type NotificationStream = BoxStream<'static, Notification>;
42
43pub type BlobStream = BoxStream<'static, Result<BlobContent, NodeError>>;
45
46#[derive(Debug, Default, Clone, Copy)]
48pub enum CrossChainMessageDelivery {
49 #[default]
50 NonBlocking,
51 Blocking,
52}
53
54#[allow(async_fn_in_trait)]
56#[cfg_attr(not(web), trait_variant::make(Send))]
57pub trait ValidatorNode {
58 type NotificationStream: Stream<Item = Notification> + Unpin + MaybeSend;
59
60 fn address(&self) -> String;
61
62 async fn handle_block_proposal(
64 &self,
65 proposal: BlockProposal,
66 ) -> Result<ChainInfoResponse, NodeError>;
67
68 async fn handle_lite_certificate(
70 &self,
71 certificate: LiteCertificate<'_>,
72 delivery: CrossChainMessageDelivery,
73 ) -> Result<ChainInfoResponse, NodeError>;
74
75 async fn handle_confirmed_certificate(
77 &self,
78 certificate: CacheArc<GenericCertificate<ConfirmedBlock>>,
79 delivery: CrossChainMessageDelivery,
80 ) -> Result<ChainInfoResponse, NodeError>;
81
82 async fn handle_validated_certificate(
84 &self,
85 certificate: GenericCertificate<ValidatedBlock>,
86 ) -> Result<ChainInfoResponse, NodeError>;
87
88 async fn handle_timeout_certificate(
90 &self,
91 certificate: GenericCertificate<Timeout>,
92 ) -> Result<ChainInfoResponse, NodeError>;
93
94 async fn handle_chain_info_query(
96 &self,
97 query: ChainInfoQuery,
98 ) -> Result<ChainInfoResponse, NodeError>;
99
100 async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
102
103 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
105
106 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
108
109 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
112
113 fn upload_blobs(
118 &self,
119 blobs: Vec<Arc<Blob>>,
120 ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
121 let tasks: Vec<_> = blobs
122 .into_iter()
123 .map(|blob| self.upload_blob(blob.into()))
124 .collect();
125 futures::future::try_join_all(tasks)
126 }
127
128 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
130
131 async fn download_blobs(&self, blob_ids: Vec<BlobId>) -> Result<BlobStream, NodeError>;
135
136 async fn download_pending_blob(
138 &self,
139 chain_id: ChainId,
140 blob_id: BlobId,
141 ) -> Result<BlobContent, NodeError>;
142
143 async fn handle_pending_blob(
145 &self,
146 chain_id: ChainId,
147 blob: BlobContent,
148 ) -> Result<ChainInfoResponse, NodeError>;
149
150 async fn download_certificate(
151 &self,
152 hash: CryptoHash,
153 ) -> Result<ConfirmedBlockCertificate, NodeError>;
154
155 async fn download_certificates(
157 &self,
158 hashes: Vec<CryptoHash>,
159 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
160
161 async fn download_certificates_by_heights(
167 &self,
168 chain_id: ChainId,
169 heights: Vec<BlockHeight>,
170 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
171
172 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
174
175 async fn blob_last_used_by_certificate(
177 &self,
178 blob_id: BlobId,
179 ) -> Result<ConfirmedBlockCertificate, NodeError>;
180
181 async fn event_block_heights(
184 &self,
185 event_ids: Vec<EventId>,
186 ) -> Result<Vec<Option<BlockHeight>>, NodeError>;
187
188 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
190
191 async fn get_shard_info(
193 &self,
194 chain_id: ChainId,
195 ) -> Result<crate::data_types::ShardInfo, NodeError>;
196}
197
198#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
200pub trait ValidatorNodeProvider: 'static {
201 type Node: ValidatorNode + MaybeSend + MaybeSync + Clone + 'static;
202
203 fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
204
205 fn make_nodes(
206 &self,
207 committee: &Committee,
208 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
209 let validator_addresses: Vec<_> = committee
210 .validator_addresses()
211 .map(|(node, name)| (node, name.to_owned()))
212 .collect();
213 self.make_nodes_from_list(validator_addresses)
214 }
215
216 fn make_nodes_from_list<A>(
217 &self,
218 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
219 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
220 where
221 A: AsRef<str>,
222 {
223 Ok(validators
224 .into_iter()
225 .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
226 .collect::<Result<Vec<_>, NodeError>>()?
227 .into_iter())
228 }
229}
230
231#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
236pub enum NodeError {
237 #[error("Cryptographic error: {error}")]
238 CryptoError { error: String },
239
240 #[error("Arithmetic error: {error}")]
241 ArithmeticError { error: String },
242
243 #[error("Error while accessing storage: {error}")]
244 ViewError { error: String },
245
246 #[error("Chain error: {error}")]
247 ChainError { error: String },
248
249 #[error("Worker error: {error}")]
250 WorkerError { error: String },
251
252 #[error("The chain {0} is not active in validator")]
254 InactiveChain(ChainId),
255
256 #[error("Round number should be {0:?}")]
257 WrongRound(Round),
258
259 #[error(
260 "Chain is expecting a next block at height {expected_block_height} but the given block \
261 is at height {found_block_height} instead"
262 )]
263 UnexpectedBlockHeight {
264 expected_block_height: BlockHeight,
265 found_block_height: BlockHeight,
266 },
267
268 #[error(
270 "Cannot vote for block proposal of chain {chain_id} because a message \
271 from chain {origin} at height {height} has not been received yet"
272 )]
273 MissingCrossChainUpdate {
274 chain_id: ChainId,
275 origin: ChainId,
276 height: BlockHeight,
277 },
278
279 #[error("Blobs not found: {0:?}")]
280 BlobsNotFound(Vec<BlobId>),
281
282 #[error("Blocks not found: {0:?}")]
283 BlocksNotFound(Vec<CryptoHash>),
284
285 #[error("Events not found: {0:?}")]
286 EventsNotFound(Vec<EventId>),
287
288 #[error("We don't have the value for the certificate.")]
290 MissingCertificateValue,
291
292 #[error("Response doesn't contain requested certificates: {0:?}")]
293 MissingCertificates(Vec<CryptoHash>),
294
295 #[error("Validator's response failed to include a vote when trying to {0}")]
296 MissingVoteInValidatorResponse(String),
297
298 #[error("The received chain info response is invalid")]
299 InvalidChainInfoResponse,
300 #[error("Unexpected certificate value")]
301 UnexpectedCertificateValue,
302
303 #[error("Cannot deserialize")]
306 InvalidDecoding,
307 #[error("Unexpected message")]
308 UnexpectedMessage,
309 #[error("Grpc error: {error}")]
310 GrpcError { error: String },
311 #[error("Network error while querying service: {error}")]
312 ClientIoError { error: String },
313 #[error("Failed to resolve validator address: {address}")]
314 CannotResolveValidatorAddress { address: String },
315 #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
316 SubscriptionError { transport: String },
317 #[error("Failed to subscribe; tonic status: {status:?}")]
318 SubscriptionFailed { status: String },
319
320 #[error("Node failed to provide a 'last used by' certificate for the blob")]
321 InvalidCertificateForBlob(BlobId),
322 #[error("Node returned a BlobsNotFound error with duplicates")]
323 DuplicatesInBlobsNotFound,
324 #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
325 UnexpectedEntriesInBlobsNotFound,
326 #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
327 UnexpectedCertificates {
328 returned: Vec<CryptoHash>,
329 requested: Vec<CryptoHash>,
330 },
331 #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
332 EmptyBlobsNotFound,
333 #[error("Local error handling validator response: {error}")]
334 ResponseHandlingError { error: String },
335
336 #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
337 MissingCertificatesByHeights {
338 chain_id: ChainId,
339 heights: Vec<BlockHeight>,
340 },
341
342 #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
343 TooManyCertificatesReturned {
344 chain_id: ChainId,
345 remote_node: Box<ValidatorPublicKey>,
346 },
347
348 #[error(
349 "Block timestamp ({block_timestamp}) is further in the future from local time \
350 ({local_time}) than block time grace period ({block_time_grace_period_ms} ms)"
351 )]
352 InvalidTimestamp {
353 block_timestamp: Timestamp,
354 local_time: Timestamp,
355 block_time_grace_period_ms: u64,
356 },
357
358 #[error("No validators available to handle the request")]
359 NoValidators,
360}
361
362impl NodeError {
363 pub fn is_expected(&self) -> bool {
370 match self {
371 NodeError::BlobsNotFound(_)
374 | NodeError::BlocksNotFound(_)
375 | NodeError::EventsNotFound(_)
376 | NodeError::MissingCrossChainUpdate { .. }
377 | NodeError::WrongRound(_)
378 | NodeError::UnexpectedBlockHeight { .. }
379 | NodeError::InactiveChain(_)
380 | NodeError::InvalidTimestamp { .. }
381 | NodeError::MissingCertificateValue => true,
382
383 NodeError::CryptoError { .. }
385 | NodeError::ArithmeticError { .. }
386 | NodeError::ViewError { .. }
387 | NodeError::ChainError { .. }
388 | NodeError::WorkerError { .. }
389 | NodeError::MissingCertificates(_)
390 | NodeError::MissingVoteInValidatorResponse(_)
391 | NodeError::InvalidChainInfoResponse
392 | NodeError::UnexpectedCertificateValue
393 | NodeError::InvalidDecoding
394 | NodeError::UnexpectedMessage
395 | NodeError::GrpcError { .. }
396 | NodeError::ClientIoError { .. }
397 | NodeError::CannotResolveValidatorAddress { .. }
398 | NodeError::SubscriptionError { .. }
399 | NodeError::SubscriptionFailed { .. }
400 | NodeError::InvalidCertificateForBlob(_)
401 | NodeError::DuplicatesInBlobsNotFound
402 | NodeError::UnexpectedEntriesInBlobsNotFound
403 | NodeError::UnexpectedCertificates { .. }
404 | NodeError::EmptyBlobsNotFound
405 | NodeError::ResponseHandlingError { .. }
406 | NodeError::MissingCertificatesByHeights { .. }
407 | NodeError::TooManyCertificatesReturned { .. }
408 | NodeError::NoValidators => false,
409 }
410 }
411}
412
413impl From<tonic::Status> for NodeError {
414 fn from(status: tonic::Status) -> Self {
415 Self::GrpcError {
416 error: status.to_string(),
417 }
418 }
419}
420
421impl CrossChainMessageDelivery {
422 pub fn new(wait_for_outgoing_messages: bool) -> Self {
423 if wait_for_outgoing_messages {
424 CrossChainMessageDelivery::Blocking
425 } else {
426 CrossChainMessageDelivery::NonBlocking
427 }
428 }
429
430 pub fn wait_for_outgoing_messages(self) -> bool {
431 match self {
432 CrossChainMessageDelivery::NonBlocking => false,
433 CrossChainMessageDelivery::Blocking => true,
434 }
435 }
436}
437
438impl From<ViewError> for NodeError {
439 fn from(error: ViewError) -> Self {
440 Self::ViewError {
441 error: error.to_string(),
442 }
443 }
444}
445
446impl From<ArithmeticError> for NodeError {
447 fn from(error: ArithmeticError) -> Self {
448 Self::ArithmeticError {
449 error: error.to_string(),
450 }
451 }
452}
453
454impl From<CryptoError> for NodeError {
455 fn from(error: CryptoError) -> Self {
456 Self::CryptoError {
457 error: error.to_string(),
458 }
459 }
460}
461
462impl From<ChainError> for NodeError {
463 fn from(error: ChainError) -> Self {
464 match error {
465 ChainError::MissingCrossChainUpdate {
466 chain_id,
467 origin,
468 height,
469 } => Self::MissingCrossChainUpdate {
470 chain_id,
471 origin,
472 height,
473 },
474 ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
475 ChainError::ExecutionError(execution_error, context) => match *execution_error {
476 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
477 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
478 _ => Self::ChainError {
479 error: ChainError::ExecutionError(execution_error, context).to_string(),
480 },
481 },
482 ChainError::UnexpectedBlockHeight {
483 expected_block_height,
484 found_block_height,
485 } => Self::UnexpectedBlockHeight {
486 expected_block_height,
487 found_block_height,
488 },
489 ChainError::WrongRound(round) => Self::WrongRound(round),
490 error => Self::ChainError {
491 error: error.to_string(),
492 },
493 }
494 }
495}
496
497impl From<WorkerError> for NodeError {
498 fn from(error: WorkerError) -> Self {
499 match error {
500 WorkerError::ChainError(error) => (*error).into(),
501 WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
502 WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
503 WorkerError::BlocksNotFound(hashes) => Self::BlocksNotFound(hashes),
504 WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
505 WorkerError::UnexpectedBlockHeight {
506 expected_block_height,
507 found_block_height,
508 } => NodeError::UnexpectedBlockHeight {
509 expected_block_height,
510 found_block_height,
511 },
512 WorkerError::InvalidTimestamp {
513 block_timestamp,
514 local_time,
515 block_time_grace_period,
516 } => NodeError::InvalidTimestamp {
517 block_timestamp,
518 local_time,
519 block_time_grace_period_ms: u64::try_from(block_time_grace_period.as_millis())
520 .unwrap_or(u64::MAX),
521 },
522 error => Self::WorkerError {
523 error: error.to_string(),
524 },
525 }
526 }
527}