1use std::{
6 collections::{BTreeMap, HashMap, HashSet, VecDeque},
7 sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{ArithmeticError, Blob, BlockHeight},
14 identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18 types::{Block, ConfirmedBlockCertificate, GenericCertificate},
19 ChainError, ChainExecutionContext,
20};
21use linera_execution::{BlobState, ExecutionError, Query, QueryOutcome, ResourceTracker};
22use linera_storage::{Arc as CacheArc, Storage};
23use linera_views::ViewError;
24use thiserror::Error;
25use tracing::{instrument, warn};
26
27use crate::{
28 chain_worker::ProcessConfirmedBlockMode,
29 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30 notifier::Notifier,
31 worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34pub struct LocalNode<S>
36where
37 S: Storage,
38{
39 state: WorkerState<S>,
40}
41
42#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46 S: Storage,
47{
48 node: Arc<LocalNode<S>>,
49}
50
51#[derive(Debug, Error)]
53pub enum LocalNodeError {
54 #[error(transparent)]
55 ArithmeticError(#[from] ArithmeticError),
56
57 #[error(transparent)]
58 ViewError(#[from] ViewError),
59
60 #[error("Worker operation failed: {0}")]
61 WorkerError(WorkerError),
62
63 #[error("The local node doesn't have an active chain {0}")]
64 InactiveChain(ChainId),
65
66 #[error("The chain info response received from the local node is invalid")]
67 InvalidChainInfoResponse,
68
69 #[error("Blobs not found: {0:?}")]
70 BlobsNotFound(Vec<BlobId>),
71
72 #[error("Blocks not found: {0:?}")]
73 BlocksNotFound(Vec<CryptoHash>),
74
75 #[error("Events not found: {0:?}")]
76 EventsNotFound(Vec<EventId>),
77}
78
79impl From<ExecutionError> for LocalNodeError {
80 fn from(error: ExecutionError) -> Self {
81 match error {
82 ExecutionError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
83 ExecutionError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
84 ExecutionError::ViewError(view_error) => LocalNodeError::ViewError(view_error),
85 error => LocalNodeError::WorkerError(WorkerError::from(ChainError::ExecutionError(
86 Box::new(error),
87 ChainExecutionContext::Block,
88 ))),
89 }
90 }
91}
92
93impl From<WorkerError> for LocalNodeError {
94 fn from(error: WorkerError) -> Self {
95 match error {
96 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
97 WorkerError::BlocksNotFound(hashes) => LocalNodeError::BlocksNotFound(hashes),
98 WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
99 error => LocalNodeError::WorkerError(error),
100 }
101 }
102}
103
104impl<S> LocalNodeClient<S>
105where
106 S: Storage + Clone + 'static,
107{
108 #[instrument(level = "trace", skip_all)]
109 pub async fn handle_block_proposal(
110 &self,
111 proposal: BlockProposal,
112 ) -> Result<ChainInfoResponse, LocalNodeError> {
113 let (response, _actions) = Box::pin(self.node.state.handle_block_proposal(proposal)).await;
115 Ok(response?)
116 }
117
118 #[instrument(level = "trace", skip_all)]
119 pub async fn handle_certificate<T>(
120 &self,
121 certificate: GenericCertificate<T>,
122 notifier: &impl Notifier,
123 ) -> Result<ChainInfoResponse, LocalNodeError>
124 where
125 T: ProcessableCertificate,
126 {
127 Ok(Box::pin(
128 self.node
129 .state
130 .fully_handle_certificate_with_notifications(certificate, notifier),
131 )
132 .await?)
133 }
134
135 #[instrument(level = "trace", skip_all)]
139 pub async fn handle_confirmed_certificate(
140 &self,
141 certificate: ConfirmedBlockCertificate,
142 mode: ProcessConfirmedBlockMode,
143 notifier: &impl Notifier,
144 ) -> Result<ChainInfoResponse, LocalNodeError> {
145 Ok(Box::pin(
146 self.node
147 .state
148 .fully_handle_confirmed_certificate_with_notifications(certificate, mode, notifier),
149 )
150 .await?)
151 }
152
153 #[cfg(with_testing)]
157 pub async fn read_certificate(
158 &self,
159 chain_id: linera_base::identifiers::ChainId,
160 height: linera_base::data_types::BlockHeight,
161 ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, LocalNodeError> {
162 Ok(self.node.state.read_certificate(chain_id, height).await?)
163 }
164
165 #[instrument(level = "trace", skip_all)]
166 pub async fn handle_chain_info_query(
167 &self,
168 query: ChainInfoQuery,
169 ) -> Result<ChainInfoResponse, LocalNodeError> {
170 Ok(self.node.state.handle_chain_info_query(query).await?)
171 }
172
173 #[instrument(level = "trace", skip_all)]
174 pub fn new(state: WorkerState<S>) -> Self {
175 Self {
176 node: Arc::new(LocalNode { state }),
177 }
178 }
179
180 #[instrument(level = "trace", skip_all)]
181 pub(crate) fn storage_client(&self) -> S {
182 self.node.state.storage_client().clone()
183 }
184
185 #[instrument(level = "trace", skip_all)]
190 pub async fn stage_block_execution(
191 &self,
192 block: ProposedBlock,
193 round: Option<u32>,
194 published_blobs: Vec<Blob>,
195 policy: BundleExecutionPolicy,
196 ) -> Result<
197 (
198 ProposedBlock,
199 Block,
200 ChainInfoResponse,
201 ResourceTracker,
202 HashSet<ChainId>,
203 ),
204 LocalNodeError,
205 > {
206 Ok(self
207 .node
208 .state
209 .stage_block_execution(block, round, published_blobs, policy)
210 .await?)
211 }
212
213 pub async fn read_blobs_from_storage(
215 &self,
216 blob_ids: &[BlobId],
217 ) -> Result<Option<Vec<CacheArc<Blob>>>, LocalNodeError> {
218 let storage = self.storage_client();
219 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
220 }
221
222 pub async fn read_blob_states_from_storage(
224 &self,
225 blob_ids: &[BlobId],
226 ) -> Result<Vec<BlobState>, LocalNodeError> {
227 let storage = self.storage_client();
228 let mut blobs_not_found = Vec::new();
229 let mut blob_states = Vec::new();
230 for (blob_state, blob_id) in storage
231 .read_blob_states(blob_ids)
232 .await?
233 .into_iter()
234 .zip(blob_ids)
235 {
236 match blob_state {
237 None => blobs_not_found.push(*blob_id),
238 Some(blob_state) => blob_states.push(blob_state),
239 }
240 }
241 if !blobs_not_found.is_empty() {
242 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
243 }
244 Ok(blob_states)
245 }
246
247 pub async fn get_locking_blobs(
250 &self,
251 blob_ids: impl IntoIterator<Item = &BlobId>,
252 chain_id: ChainId,
253 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
254 let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
255 Ok(self
256 .node
257 .state
258 .get_locking_blobs(chain_id, blob_ids_vec)
259 .await?)
260 }
261
262 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
264 let storage = self.storage_client();
265 storage.maybe_write_blobs(blobs).await?;
266 Ok(())
267 }
268
269 pub async fn handle_pending_blobs(
270 &self,
271 chain_id: ChainId,
272 blobs: Vec<Blob>,
273 ) -> Result<(), LocalNodeError> {
274 for blob in blobs {
275 self.node.state.handle_pending_blob(chain_id, blob).await?;
276 }
277 Ok(())
278 }
279
280 #[instrument(level = "trace", skip(self))]
286 pub async fn chain_state_view(
287 &self,
288 chain_id: ChainId,
289 ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
290 Ok(self.node.state.chain_state_view(chain_id).await?)
291 }
292
293 #[instrument(level = "trace", skip(self))]
294 pub(crate) async fn chain_info(
295 &self,
296 chain_id: ChainId,
297 ) -> Result<Box<ChainInfo>, LocalNodeError> {
298 let query = ChainInfoQuery::new(chain_id);
299 Ok(self.handle_chain_info_query(query).await?.info)
300 }
301
302 #[instrument(level = "trace", skip(self, query))]
303 pub async fn query_application(
304 &self,
305 chain_id: ChainId,
306 query: Query,
307 block_hash: Option<CryptoHash>,
308 ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
309 let result = self
310 .node
311 .state
312 .query_application(chain_id, query, block_hash)
313 .await?;
314 Ok(result)
315 }
316
317 #[instrument(level = "trace", skip(self, notifier))]
325 pub async fn retry_pending_cross_chain_requests(
326 &self,
327 sender_chain: ChainId,
328 notifier: &impl Notifier,
329 ) -> Result<(), LocalNodeError> {
330 let actions = self
331 .node
332 .state
333 .cross_chain_network_actions(sender_chain)
334 .await?;
335 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
336 while let Some(request) = requests.pop_front() {
337 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
338 notifier.notify(&new_actions.notifications);
339 requests.extend(new_actions.cross_chain_requests);
340 }
341 Ok(())
342 }
343
344 pub async fn next_outbox_heights(
348 &self,
349 chain_ids: impl IntoIterator<Item = &ChainId>,
350 receiver_id: ChainId,
351 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
352 let futures = chain_ids
353 .into_iter()
354 .map(|chain_id| async move {
355 let (next_block_height, next_height_to_schedule) = match self
356 .get_tip_state_and_outbox_info(*chain_id, receiver_id)
357 .await
358 {
359 Ok(info) => info,
360 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
361 return Ok((*chain_id, BlockHeight::ZERO))
362 }
363 Err(err) => Err(err)?,
364 };
365 let next_height = if let Some(scheduled_height) = next_height_to_schedule {
366 next_block_height.max(scheduled_height)
367 } else {
368 next_block_height
369 };
370 Ok::<_, LocalNodeError>((*chain_id, next_height))
371 })
372 .collect::<FuturesUnordered<_>>();
373 futures.try_collect().await
374 }
375
376 pub async fn update_received_certificate_trackers(
377 &self,
378 chain_id: ChainId,
379 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
380 ) -> Result<(), LocalNodeError> {
381 self.node
382 .state
383 .update_received_certificate_trackers(chain_id, new_trackers)
384 .await?;
385 Ok(())
386 }
387
388 pub async fn get_preprocessed_block_hashes(
389 &self,
390 chain_id: ChainId,
391 start: BlockHeight,
392 end: BlockHeight,
393 ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
394 Ok(self
395 .node
396 .state
397 .get_preprocessed_block_hashes(chain_id, start, end)
398 .await?)
399 }
400
401 pub async fn get_inbox_next_height(
402 &self,
403 chain_id: ChainId,
404 origin: ChainId,
405 ) -> Result<BlockHeight, LocalNodeError> {
406 Ok(self
407 .node
408 .state
409 .get_inbox_next_height(chain_id, origin)
410 .await?)
411 }
412
413 pub async fn get_block_hashes(
415 &self,
416 chain_id: ChainId,
417 heights: Vec<BlockHeight>,
418 ) -> Result<Vec<CryptoHash>, LocalNodeError> {
419 Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
420 }
421
422 pub async fn get_proposed_blobs(
424 &self,
425 chain_id: ChainId,
426 blob_ids: Vec<BlobId>,
427 ) -> Result<Vec<Blob>, LocalNodeError> {
428 Ok(self
429 .node
430 .state
431 .get_proposed_blobs(chain_id, blob_ids)
432 .await?)
433 }
434
435 pub async fn get_event_subscriptions(
437 &self,
438 chain_id: ChainId,
439 ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
440 Ok(self.node.state.get_event_subscriptions(chain_id).await?)
441 }
442
443 pub async fn get_next_expected_event(
445 &self,
446 chain_id: ChainId,
447 stream_id: StreamId,
448 ) -> Result<Option<u32>, LocalNodeError> {
449 Ok(self
450 .node
451 .state
452 .get_next_expected_event(chain_id, stream_id)
453 .await?)
454 }
455
456 pub async fn next_expected_events(
458 &self,
459 chain_id: ChainId,
460 stream_ids: Vec<StreamId>,
461 ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
462 Ok(self
463 .node
464 .state
465 .next_expected_events(chain_id, stream_ids)
466 .await?)
467 }
468
469 pub async fn get_received_certificate_trackers(
471 &self,
472 chain_id: ChainId,
473 ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
474 Ok(self
475 .node
476 .state
477 .get_received_certificate_trackers(chain_id)
478 .await?)
479 }
480
481 pub async fn get_tip_state_and_outbox_info(
483 &self,
484 chain_id: ChainId,
485 receiver_id: ChainId,
486 ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
487 Ok(self
488 .node
489 .state
490 .get_tip_state_and_outbox_info(chain_id, receiver_id)
491 .await?)
492 }
493
494 pub async fn get_next_height_to_preprocess(
496 &self,
497 chain_id: ChainId,
498 ) -> Result<BlockHeight, LocalNodeError> {
499 Ok(self
500 .node
501 .state
502 .get_next_height_to_preprocess(chain_id)
503 .await?)
504 }
505}