1#[cfg(not(web))]
6use futures::stream::BoxStream;
7#[cfg(web)]
8use futures::stream::LocalBoxStream as BoxStream;
9use futures::stream::Stream;
10use linera_base::{
11 crypto::{CryptoError, CryptoHash, ValidatorPublicKey},
12 data_types::{
13 ArithmeticError, Blob, BlobContent, BlockHeight, NetworkDescription, Round, Timestamp,
14 },
15 identifiers::{BlobId, ChainId, EventId},
16};
17use linera_chain::{
18 data_types::BlockProposal,
19 types::{
20 ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate, Timeout,
21 ValidatedBlock,
22 },
23 ChainError,
24};
25use linera_execution::{committee::Committee, ExecutionError};
26use linera_version::VersionInfo;
27use linera_views::ViewError;
28use serde::{Deserialize, Serialize};
29use thiserror::Error;
30
31use crate::{
32 data_types::{ChainInfoQuery, ChainInfoResponse},
33 worker::{Notification, WorkerError},
34};
35
36pub type NotificationStream = BoxStream<'static, Notification>;
38
39#[derive(Debug, Default, Clone, Copy)]
41pub enum CrossChainMessageDelivery {
42 #[default]
43 NonBlocking,
44 Blocking,
45}
46
47#[allow(async_fn_in_trait)]
49#[cfg_attr(not(web), trait_variant::make(Send))]
50pub trait ValidatorNode {
51 #[cfg(not(web))]
52 type NotificationStream: Stream<Item = Notification> + Unpin + Send;
53 #[cfg(web)]
54 type NotificationStream: Stream<Item = Notification> + Unpin;
55
56 fn address(&self) -> String;
57
58 async fn handle_block_proposal(
60 &self,
61 proposal: BlockProposal,
62 ) -> Result<ChainInfoResponse, NodeError>;
63
64 async fn handle_lite_certificate(
66 &self,
67 certificate: LiteCertificate<'_>,
68 delivery: CrossChainMessageDelivery,
69 ) -> Result<ChainInfoResponse, NodeError>;
70
71 async fn handle_confirmed_certificate(
73 &self,
74 certificate: GenericCertificate<ConfirmedBlock>,
75 delivery: CrossChainMessageDelivery,
76 ) -> Result<ChainInfoResponse, NodeError>;
77
78 async fn handle_validated_certificate(
80 &self,
81 certificate: GenericCertificate<ValidatedBlock>,
82 ) -> Result<ChainInfoResponse, NodeError>;
83
84 async fn handle_timeout_certificate(
86 &self,
87 certificate: GenericCertificate<Timeout>,
88 ) -> Result<ChainInfoResponse, NodeError>;
89
90 async fn handle_chain_info_query(
92 &self,
93 query: ChainInfoQuery,
94 ) -> Result<ChainInfoResponse, NodeError>;
95
96 async fn get_version_info(&self) -> Result<VersionInfo, NodeError>;
98
99 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError>;
101
102 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
104
105 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
108
109 fn upload_blobs(
114 &self,
115 blobs: Vec<Blob>,
116 ) -> impl futures::Future<Output = Result<Vec<BlobId>, NodeError>> {
117 let tasks: Vec<_> = blobs
118 .into_iter()
119 .map(|blob| self.upload_blob(blob.into()))
120 .collect();
121 futures::future::try_join_all(tasks)
122 }
123
124 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
126
127 async fn download_pending_blob(
129 &self,
130 chain_id: ChainId,
131 blob_id: BlobId,
132 ) -> Result<BlobContent, NodeError>;
133
134 async fn handle_pending_blob(
136 &self,
137 chain_id: ChainId,
138 blob: BlobContent,
139 ) -> Result<ChainInfoResponse, NodeError>;
140
141 async fn download_certificate(
142 &self,
143 hash: CryptoHash,
144 ) -> Result<ConfirmedBlockCertificate, NodeError>;
145
146 async fn download_certificates(
148 &self,
149 hashes: Vec<CryptoHash>,
150 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
151
152 async fn download_certificates_by_heights(
158 &self,
159 chain_id: ChainId,
160 heights: Vec<BlockHeight>,
161 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError>;
162
163 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
165
166 async fn blob_last_used_by_certificate(
168 &self,
169 blob_id: BlobId,
170 ) -> Result<ConfirmedBlockCertificate, NodeError>;
171
172 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError>;
174
175 async fn get_shard_info(
177 &self,
178 chain_id: ChainId,
179 ) -> Result<crate::data_types::ShardInfo, NodeError>;
180}
181
182#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
184pub trait ValidatorNodeProvider: 'static {
185 #[cfg(not(web))]
186 type Node: ValidatorNode + Send + Sync + Clone + 'static;
187 #[cfg(web)]
188 type Node: ValidatorNode + Clone + 'static;
189
190 fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>;
191
192 fn make_nodes(
193 &self,
194 committee: &Committee,
195 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)> + '_, NodeError> {
196 let validator_addresses: Vec<_> = committee
197 .validator_addresses()
198 .map(|(node, name)| (node, name.to_owned()))
199 .collect();
200 self.make_nodes_from_list(validator_addresses)
201 }
202
203 fn make_nodes_from_list<A>(
204 &self,
205 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
206 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
207 where
208 A: AsRef<str>,
209 {
210 Ok(validators
211 .into_iter()
212 .map(|(name, address)| Ok((name, self.make_node(address.as_ref())?)))
213 .collect::<Result<Vec<_>, NodeError>>()?
214 .into_iter())
215 }
216}
217
218#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize, Error, Hash)]
223pub enum NodeError {
224 #[error("Cryptographic error: {error}")]
225 CryptoError { error: String },
226
227 #[error("Arithmetic error: {error}")]
228 ArithmeticError { error: String },
229
230 #[error("Error while accessing storage: {error}")]
231 ViewError { error: String },
232
233 #[error("Chain error: {error}")]
234 ChainError { error: String },
235
236 #[error("Worker error: {error}")]
237 WorkerError { error: String },
238
239 #[error("The chain {0} is not active in validator")]
241 InactiveChain(ChainId),
242
243 #[error("Round number should be {0:?}")]
244 WrongRound(Round),
245
246 #[error(
247 "Chain is expecting a next block at height {expected_block_height} but the given block \
248 is at height {found_block_height} instead"
249 )]
250 UnexpectedBlockHeight {
251 expected_block_height: BlockHeight,
252 found_block_height: BlockHeight,
253 },
254
255 #[error(
257 "Cannot vote for block proposal of chain {chain_id} because a message \
258 from chain {origin} at height {height} has not been received yet"
259 )]
260 MissingCrossChainUpdate {
261 chain_id: ChainId,
262 origin: ChainId,
263 height: BlockHeight,
264 },
265
266 #[error("Blobs not found: {0:?}")]
267 BlobsNotFound(Vec<BlobId>),
268
269 #[error("Events not found: {0:?}")]
270 EventsNotFound(Vec<EventId>),
271
272 #[error("We don't have the value for the certificate.")]
274 MissingCertificateValue,
275
276 #[error("Response doesn't contain requested certificates: {0:?}")]
277 MissingCertificates(Vec<CryptoHash>),
278
279 #[error("Validator's response failed to include a vote when trying to {0}")]
280 MissingVoteInValidatorResponse(String),
281
282 #[error("The received chain info response is invalid")]
283 InvalidChainInfoResponse,
284 #[error("Unexpected certificate value")]
285 UnexpectedCertificateValue,
286
287 #[error("Cannot deserialize")]
290 InvalidDecoding,
291 #[error("Unexpected message")]
292 UnexpectedMessage,
293 #[error("Grpc error: {error}")]
294 GrpcError { error: String },
295 #[error("Network error while querying service: {error}")]
296 ClientIoError { error: String },
297 #[error("Failed to resolve validator address: {address}")]
298 CannotResolveValidatorAddress { address: String },
299 #[error("Subscription error due to incorrect transport. Was expecting gRPC, instead found: {transport}")]
300 SubscriptionError { transport: String },
301 #[error("Failed to subscribe; tonic status: {status:?}")]
302 SubscriptionFailed { status: String },
303
304 #[error("Node failed to provide a 'last used by' certificate for the blob")]
305 InvalidCertificateForBlob(BlobId),
306 #[error("Node returned a BlobsNotFound error with duplicates")]
307 DuplicatesInBlobsNotFound,
308 #[error("Node returned a BlobsNotFound error with unexpected blob IDs")]
309 UnexpectedEntriesInBlobsNotFound,
310 #[error("Node returned certificates {returned:?}, but we requested {requested:?}")]
311 UnexpectedCertificates {
312 returned: Vec<CryptoHash>,
313 requested: Vec<CryptoHash>,
314 },
315 #[error("Node returned a BlobsNotFound error with an empty list of missing blob IDs")]
316 EmptyBlobsNotFound,
317 #[error("Local error handling validator response: {error}")]
318 ResponseHandlingError { error: String },
319
320 #[error("Missing certificates for chain {chain_id} in heights {heights:?}")]
321 MissingCertificatesByHeights {
322 chain_id: ChainId,
323 heights: Vec<BlockHeight>,
324 },
325
326 #[error("Too many certificates returned for chain {chain_id} from {remote_node}")]
327 TooManyCertificatesReturned {
328 chain_id: ChainId,
329 remote_node: Box<ValidatorPublicKey>,
330 },
331
332 #[error(
333 "Block timestamp ({block_timestamp}) is further in the future from local time \
334 ({local_time}) than block time grace period ({block_time_grace_period_ms} ms)"
335 )]
336 InvalidTimestamp {
337 block_timestamp: Timestamp,
338 local_time: Timestamp,
339 block_time_grace_period_ms: u64,
340 },
341}
342
343impl NodeError {
344 pub fn is_expected(&self) -> bool {
351 match self {
352 NodeError::BlobsNotFound(_)
355 | NodeError::EventsNotFound(_)
356 | NodeError::MissingCrossChainUpdate { .. }
357 | NodeError::WrongRound(_)
358 | NodeError::UnexpectedBlockHeight { .. }
359 | NodeError::InactiveChain(_)
360 | NodeError::InvalidTimestamp { .. }
361 | NodeError::MissingCertificateValue => true,
362
363 NodeError::CryptoError { .. }
365 | NodeError::ArithmeticError { .. }
366 | NodeError::ViewError { .. }
367 | NodeError::ChainError { .. }
368 | NodeError::WorkerError { .. }
369 | NodeError::MissingCertificates(_)
370 | NodeError::MissingVoteInValidatorResponse(_)
371 | NodeError::InvalidChainInfoResponse
372 | NodeError::UnexpectedCertificateValue
373 | NodeError::InvalidDecoding
374 | NodeError::UnexpectedMessage
375 | NodeError::GrpcError { .. }
376 | NodeError::ClientIoError { .. }
377 | NodeError::CannotResolveValidatorAddress { .. }
378 | NodeError::SubscriptionError { .. }
379 | NodeError::SubscriptionFailed { .. }
380 | NodeError::InvalidCertificateForBlob(_)
381 | NodeError::DuplicatesInBlobsNotFound
382 | NodeError::UnexpectedEntriesInBlobsNotFound
383 | NodeError::UnexpectedCertificates { .. }
384 | NodeError::EmptyBlobsNotFound
385 | NodeError::ResponseHandlingError { .. }
386 | NodeError::MissingCertificatesByHeights { .. }
387 | NodeError::TooManyCertificatesReturned { .. } => false,
388 }
389 }
390}
391
392impl From<tonic::Status> for NodeError {
393 fn from(status: tonic::Status) -> Self {
394 Self::GrpcError {
395 error: status.to_string(),
396 }
397 }
398}
399
400impl CrossChainMessageDelivery {
401 pub fn new(wait_for_outgoing_messages: bool) -> Self {
402 if wait_for_outgoing_messages {
403 CrossChainMessageDelivery::Blocking
404 } else {
405 CrossChainMessageDelivery::NonBlocking
406 }
407 }
408
409 pub fn wait_for_outgoing_messages(self) -> bool {
410 match self {
411 CrossChainMessageDelivery::NonBlocking => false,
412 CrossChainMessageDelivery::Blocking => true,
413 }
414 }
415}
416
417impl From<ViewError> for NodeError {
418 fn from(error: ViewError) -> Self {
419 Self::ViewError {
420 error: error.to_string(),
421 }
422 }
423}
424
425impl From<ArithmeticError> for NodeError {
426 fn from(error: ArithmeticError) -> Self {
427 Self::ArithmeticError {
428 error: error.to_string(),
429 }
430 }
431}
432
433impl From<CryptoError> for NodeError {
434 fn from(error: CryptoError) -> Self {
435 Self::CryptoError {
436 error: error.to_string(),
437 }
438 }
439}
440
441impl From<ChainError> for NodeError {
442 fn from(error: ChainError) -> Self {
443 match error {
444 ChainError::MissingCrossChainUpdate {
445 chain_id,
446 origin,
447 height,
448 } => Self::MissingCrossChainUpdate {
449 chain_id,
450 origin,
451 height,
452 },
453 ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
454 ChainError::ExecutionError(execution_error, context) => match *execution_error {
455 ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
456 ExecutionError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
457 _ => Self::ChainError {
458 error: ChainError::ExecutionError(execution_error, context).to_string(),
459 },
460 },
461 ChainError::UnexpectedBlockHeight {
462 expected_block_height,
463 found_block_height,
464 } => Self::UnexpectedBlockHeight {
465 expected_block_height,
466 found_block_height,
467 },
468 ChainError::WrongRound(round) => Self::WrongRound(round),
469 error => Self::ChainError {
470 error: error.to_string(),
471 },
472 }
473 }
474}
475
476impl From<WorkerError> for NodeError {
477 fn from(error: WorkerError) -> Self {
478 match error {
479 WorkerError::ChainError(error) => (*error).into(),
480 WorkerError::MissingCertificateValue => Self::MissingCertificateValue,
481 WorkerError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
482 WorkerError::EventsNotFound(event_ids) => Self::EventsNotFound(event_ids),
483 WorkerError::UnexpectedBlockHeight {
484 expected_block_height,
485 found_block_height,
486 } => NodeError::UnexpectedBlockHeight {
487 expected_block_height,
488 found_block_height,
489 },
490 WorkerError::InvalidTimestamp {
491 block_timestamp,
492 local_time,
493 block_time_grace_period,
494 } => NodeError::InvalidTimestamp {
495 block_timestamp,
496 local_time,
497 block_time_grace_period_ms: block_time_grace_period.as_millis() as u64,
498 },
499 error => Self::WorkerError {
500 error: error.to_string(),
501 },
502 }
503 }
504}