1use std::{
6 collections::{BTreeMap, HashMap, HashSet, VecDeque},
7 sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{ArithmeticError, Blob, BlockHeight},
14 identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18 types::{Block, ConfirmedBlockCertificate, GenericCertificate},
19 ChainError, ChainExecutionContext, StreamCounts,
20};
21use linera_execution::{BlobState, ExecutionError, Query, QueryOutcome, ResourceTracker};
22use linera_storage::{Arc as CacheArc, Storage};
23use linera_views::ViewError;
24use thiserror::Error;
25use tracing::{instrument, warn};
26
27use crate::{
28 chain_worker::ProcessConfirmedBlockMode,
29 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30 notifier::Notifier,
31 worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34pub struct LocalNode<S>
36where
37 S: Storage,
38{
39 state: WorkerState<S>,
40}
41
42#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46 S: Storage,
47{
48 node: Arc<LocalNode<S>>,
49}
50
51#[derive(Debug, Error, strum::IntoStaticStr)]
53#[allow(missing_docs)]
54pub enum LocalNodeError {
55 #[error(transparent)]
56 ArithmeticError(#[from] ArithmeticError),
57
58 #[error(transparent)]
59 ViewError(#[from] ViewError),
60
61 #[error("Worker operation failed: {0}")]
62 WorkerError(WorkerError),
63
64 #[error("The local node doesn't have an active chain {0}")]
65 InactiveChain(ChainId),
66
67 #[error("The chain info response received from the local node is invalid")]
68 InvalidChainInfoResponse,
69
70 #[error("Blobs not found: {0:?}")]
71 BlobsNotFound(Vec<BlobId>),
72
73 #[error("Blocks not found: {0:?}")]
74 BlocksNotFound(Vec<CryptoHash>),
75
76 #[error("Events not found: {0:?}")]
77 EventsNotFound(Vec<EventId>),
78}
79
80impl LocalNodeError {
81 pub fn error_type(&self) -> String {
84 match self {
85 LocalNodeError::WorkerError(worker_error) => worker_error.error_type(),
86 other => {
87 let variant: &'static str = other.into();
88 format!("LocalNodeError::{variant}")
89 }
90 }
91 }
92}
93
94impl From<ExecutionError> for LocalNodeError {
95 fn from(error: ExecutionError) -> Self {
96 match error {
97 ExecutionError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
98 ExecutionError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
99 ExecutionError::ViewError(view_error) => LocalNodeError::ViewError(view_error),
100 error => LocalNodeError::WorkerError(WorkerError::from(ChainError::ExecutionError(
101 Box::new(error),
102 ChainExecutionContext::Block,
103 ))),
104 }
105 }
106}
107
108impl From<WorkerError> for LocalNodeError {
109 fn from(error: WorkerError) -> Self {
110 match error {
111 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
112 WorkerError::BlocksNotFound(hashes) => LocalNodeError::BlocksNotFound(hashes),
113 WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
114 error => LocalNodeError::WorkerError(error),
115 }
116 }
117}
118
119impl<S> LocalNodeClient<S>
120where
121 S: Storage + Clone + 'static,
122{
123 #[instrument(level = "trace", skip_all)]
124 pub async fn handle_block_proposal(
125 &self,
126 proposal: BlockProposal,
127 ) -> Result<ChainInfoResponse, LocalNodeError> {
128 let (response, _actions) = Box::pin(self.node.state.handle_block_proposal(proposal)).await;
130 Ok(response?)
131 }
132
133 #[instrument(level = "trace", skip_all)]
134 pub async fn handle_certificate<T>(
135 &self,
136 certificate: GenericCertificate<T>,
137 notifier: &impl Notifier,
138 ) -> Result<ChainInfoResponse, LocalNodeError>
139 where
140 T: ProcessableCertificate,
141 {
142 Ok(Box::pin(
143 self.node
144 .state
145 .fully_handle_certificate_with_notifications(certificate, notifier),
146 )
147 .await?)
148 }
149
150 #[instrument(level = "trace", skip_all)]
154 pub async fn handle_confirmed_certificate(
155 &self,
156 certificate: ConfirmedBlockCertificate,
157 mode: ProcessConfirmedBlockMode,
158 notifier: &impl Notifier,
159 ) -> Result<ChainInfoResponse, LocalNodeError> {
160 Ok(Box::pin(
161 self.node
162 .state
163 .fully_handle_confirmed_certificate_with_notifications(certificate, mode, notifier),
164 )
165 .await?)
166 }
167
168 #[cfg(with_testing)]
172 pub async fn read_certificate(
173 &self,
174 chain_id: linera_base::identifiers::ChainId,
175 height: linera_base::data_types::BlockHeight,
176 ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, LocalNodeError> {
177 Ok(self.node.state.read_certificate(chain_id, height).await?)
178 }
179
180 #[instrument(level = "trace", skip_all)]
181 pub async fn handle_chain_info_query(
182 &self,
183 query: ChainInfoQuery,
184 ) -> Result<ChainInfoResponse, LocalNodeError> {
185 Ok(self.node.state.handle_chain_info_query(query).await?)
186 }
187
188 #[instrument(level = "trace", skip_all)]
189 pub fn new(state: WorkerState<S>) -> Self {
190 Self {
191 node: Arc::new(LocalNode { state }),
192 }
193 }
194
195 #[instrument(level = "trace", skip_all)]
196 pub(crate) fn storage_client(&self) -> S {
197 self.node.state.storage_client().clone()
198 }
199
200 #[instrument(level = "trace", skip_all)]
205 pub async fn stage_block_execution(
206 &self,
207 block: ProposedBlock,
208 round: Option<u32>,
209 published_blobs: Vec<Blob>,
210 policy: BundleExecutionPolicy,
211 ) -> Result<
212 (
213 ProposedBlock,
214 Block,
215 ChainInfoResponse,
216 ResourceTracker,
217 HashSet<ChainId>,
218 ),
219 LocalNodeError,
220 > {
221 Ok(self
222 .node
223 .state
224 .stage_block_execution(block, round, published_blobs, policy)
225 .await?)
226 }
227
228 pub async fn read_blobs_from_storage(
230 &self,
231 blob_ids: &[BlobId],
232 ) -> Result<Option<Vec<CacheArc<Blob>>>, LocalNodeError> {
233 let storage = self.storage_client();
234 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
235 }
236
237 pub async fn read_blob_states_from_storage(
239 &self,
240 blob_ids: &[BlobId],
241 ) -> Result<Vec<BlobState>, LocalNodeError> {
242 let storage = self.storage_client();
243 let mut blobs_not_found = Vec::new();
244 let mut blob_states = Vec::new();
245 for (blob_state, blob_id) in storage
246 .read_blob_states(blob_ids)
247 .await?
248 .into_iter()
249 .zip(blob_ids)
250 {
251 match blob_state {
252 None => blobs_not_found.push(*blob_id),
253 Some(blob_state) => blob_states.push(blob_state),
254 }
255 }
256 if !blobs_not_found.is_empty() {
257 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
258 }
259 Ok(blob_states)
260 }
261
262 pub async fn get_locking_blobs(
265 &self,
266 blob_ids: impl IntoIterator<Item = &BlobId>,
267 chain_id: ChainId,
268 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
269 let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
270 Ok(self
271 .node
272 .state
273 .get_locking_blobs(chain_id, blob_ids_vec)
274 .await?)
275 }
276
277 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
279 let storage = self.storage_client();
280 storage.maybe_write_blobs(blobs).await?;
281 Ok(())
282 }
283
284 pub async fn handle_pending_blobs(
285 &self,
286 chain_id: ChainId,
287 blobs: Vec<Blob>,
288 ) -> Result<(), LocalNodeError> {
289 for blob in blobs {
290 self.node.state.handle_pending_blob(chain_id, blob).await?;
291 }
292 Ok(())
293 }
294
295 #[instrument(level = "trace", skip(self))]
301 pub async fn chain_state_view(
302 &self,
303 chain_id: ChainId,
304 ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
305 Ok(self.node.state.chain_state_view(chain_id).await?)
306 }
307
308 #[instrument(level = "trace", skip(self))]
309 pub(crate) async fn chain_info(
310 &self,
311 chain_id: ChainId,
312 ) -> Result<Box<ChainInfo>, LocalNodeError> {
313 let query = ChainInfoQuery::new(chain_id);
314 Ok(self.handle_chain_info_query(query).await?.info)
315 }
316
317 #[instrument(level = "trace", skip(self, query))]
318 pub async fn query_application(
319 &self,
320 chain_id: ChainId,
321 query: Query,
322 block_hash: Option<CryptoHash>,
323 ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
324 let result = self
325 .node
326 .state
327 .query_application(chain_id, query, block_hash)
328 .await?;
329 Ok(result)
330 }
331
332 #[instrument(level = "trace", skip(self, notifier))]
340 pub async fn retry_pending_cross_chain_requests(
341 &self,
342 sender_chain: ChainId,
343 notifier: &impl Notifier,
344 ) -> Result<(), LocalNodeError> {
345 let actions = self
346 .node
347 .state
348 .cross_chain_network_actions(sender_chain)
349 .await?;
350 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
351 while let Some(request) = requests.pop_front() {
352 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
353 notifier.notify(&new_actions.notifications);
354 requests.extend(new_actions.cross_chain_requests);
355 }
356 Ok(())
357 }
358
359 pub async fn next_outbox_heights(
363 &self,
364 chain_ids: impl IntoIterator<Item = &ChainId>,
365 receiver_id: ChainId,
366 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
367 let futures = chain_ids
368 .into_iter()
369 .map(|chain_id| async move {
370 let (next_block_height, next_height_to_schedule) = match self
371 .get_tip_state_and_outbox_info(*chain_id, receiver_id)
372 .await
373 {
374 Ok(info) => info,
375 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
376 return Ok((*chain_id, BlockHeight::ZERO))
377 }
378 Err(err) => Err(err)?,
379 };
380 let next_height = if let Some(scheduled_height) = next_height_to_schedule {
381 next_block_height.max(scheduled_height)
382 } else {
383 next_block_height
384 };
385 Ok::<_, LocalNodeError>((*chain_id, next_height))
386 })
387 .collect::<FuturesUnordered<_>>();
388 futures.try_collect().await
389 }
390
391 pub async fn update_received_certificate_trackers(
392 &self,
393 chain_id: ChainId,
394 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
395 ) -> Result<(), LocalNodeError> {
396 self.node
397 .state
398 .update_received_certificate_trackers(chain_id, new_trackers)
399 .await?;
400 Ok(())
401 }
402
403 pub async fn get_preprocessed_block_hashes(
404 &self,
405 chain_id: ChainId,
406 start: BlockHeight,
407 end: BlockHeight,
408 ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
409 Ok(self
410 .node
411 .state
412 .get_preprocessed_block_hashes(chain_id, start, end)
413 .await?)
414 }
415
416 pub async fn get_inbox_next_height(
417 &self,
418 chain_id: ChainId,
419 origin: ChainId,
420 ) -> Result<BlockHeight, LocalNodeError> {
421 Ok(self
422 .node
423 .state
424 .get_inbox_next_height(chain_id, origin)
425 .await?)
426 }
427
428 pub async fn get_block_hashes(
430 &self,
431 chain_id: ChainId,
432 heights: Vec<BlockHeight>,
433 ) -> Result<Vec<CryptoHash>, LocalNodeError> {
434 Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
435 }
436
437 pub async fn get_proposed_blobs(
439 &self,
440 chain_id: ChainId,
441 blob_ids: Vec<BlobId>,
442 ) -> Result<Vec<Blob>, LocalNodeError> {
443 Ok(self
444 .node
445 .state
446 .get_proposed_blobs(chain_id, blob_ids)
447 .await?)
448 }
449
450 pub async fn get_event_subscriptions(
452 &self,
453 chain_id: ChainId,
454 ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
455 Ok(self.node.state.get_event_subscriptions(chain_id).await?)
456 }
457
458 pub async fn get_stream_indices(
461 &self,
462 chain_id: ChainId,
463 stream_id: StreamId,
464 ) -> Result<StreamCounts, LocalNodeError> {
465 Ok(self
466 .node
467 .state
468 .get_stream_indices(chain_id, stream_id)
469 .await?)
470 }
471
472 pub async fn next_expected_events(
474 &self,
475 chain_id: ChainId,
476 stream_ids: Vec<StreamId>,
477 ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
478 Ok(self
479 .node
480 .state
481 .next_expected_events(chain_id, stream_ids)
482 .await?)
483 }
484
485 #[cfg(with_testing)]
487 pub async fn reset_and_reexecute_chain(
488 &self,
489 chain_id: ChainId,
490 ) -> Result<Vec<crate::data_types::CrossChainRequest>, LocalNodeError> {
491 Ok(self.node.state.reset_and_reexecute_chain(chain_id).await?)
492 }
493
494 pub async fn get_received_certificate_trackers(
496 &self,
497 chain_id: ChainId,
498 ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
499 Ok(self
500 .node
501 .state
502 .get_received_certificate_trackers(chain_id)
503 .await?)
504 }
505
506 pub async fn get_tip_state_and_outbox_info(
508 &self,
509 chain_id: ChainId,
510 receiver_id: ChainId,
511 ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
512 Ok(self
513 .node
514 .state
515 .get_tip_state_and_outbox_info(chain_id, receiver_id)
516 .await?)
517 }
518
519 pub async fn get_next_height_to_preprocess(
521 &self,
522 chain_id: ChainId,
523 ) -> Result<BlockHeight, LocalNodeError> {
524 Ok(self
525 .node
526 .state
527 .get_next_height_to_preprocess(chain_id)
528 .await?)
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 use super::*;
535
536 #[test]
537 fn error_type_delegates_to_worker_error() {
538 assert_eq!(
539 LocalNodeError::WorkerError(WorkerError::InvalidOwner).error_type(),
540 "WorkerError::InvalidOwner"
541 );
542 }
543
544 #[test]
545 fn error_type_falls_back_to_local_node_variant() {
546 assert_eq!(
547 LocalNodeError::InvalidChainInfoResponse.error_type(),
548 "LocalNodeError::InvalidChainInfoResponse"
549 );
550 }
551}