1use std::{
6 collections::{BTreeMap, HashMap, HashSet, VecDeque},
7 sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{ArithmeticError, Blob, BlockHeight},
14 identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18 types::{Block, ConfirmedBlockCertificate, GenericCertificate},
19 ChainError, ChainExecutionContext,
20};
21use linera_execution::{BlobState, ExecutionError, Query, QueryOutcome, ResourceTracker};
22use linera_storage::{Arc as CacheArc, Storage};
23use linera_views::ViewError;
24use thiserror::Error;
25use tracing::{instrument, warn};
26
27use crate::{
28 chain_worker::ProcessConfirmedBlockMode,
29 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30 notifier::Notifier,
31 worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34pub struct LocalNode<S>
36where
37 S: Storage,
38{
39 state: WorkerState<S>,
40}
41
42#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46 S: Storage,
47{
48 node: Arc<LocalNode<S>>,
49}
50
51#[derive(Debug, Error)]
53pub enum LocalNodeError {
54 #[error(transparent)]
55 ArithmeticError(#[from] ArithmeticError),
56
57 #[error(transparent)]
58 ViewError(#[from] ViewError),
59
60 #[error("Worker operation failed: {0}")]
61 WorkerError(WorkerError),
62
63 #[error("The local node doesn't have an active chain {0}")]
64 InactiveChain(ChainId),
65
66 #[error("The chain info response received from the local node is invalid")]
67 InvalidChainInfoResponse,
68
69 #[error("Blobs not found: {0:?}")]
70 BlobsNotFound(Vec<BlobId>),
71
72 #[error("Events not found: {0:?}")]
73 EventsNotFound(Vec<EventId>),
74}
75
76impl From<ExecutionError> for LocalNodeError {
77 fn from(error: ExecutionError) -> Self {
78 match error {
79 ExecutionError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
80 ExecutionError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
81 ExecutionError::ViewError(view_error) => LocalNodeError::ViewError(view_error),
82 error => LocalNodeError::WorkerError(WorkerError::from(ChainError::ExecutionError(
83 Box::new(error),
84 ChainExecutionContext::Block,
85 ))),
86 }
87 }
88}
89
90impl From<WorkerError> for LocalNodeError {
91 fn from(error: WorkerError) -> Self {
92 match error {
93 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
94 WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
95 error => LocalNodeError::WorkerError(error),
96 }
97 }
98}
99
100impl<S> LocalNodeClient<S>
101where
102 S: Storage + Clone + 'static,
103{
104 #[instrument(level = "trace", skip_all)]
105 pub async fn handle_block_proposal(
106 &self,
107 proposal: BlockProposal,
108 ) -> Result<ChainInfoResponse, LocalNodeError> {
109 let (response, _actions) = Box::pin(self.node.state.handle_block_proposal(proposal)).await;
111 Ok(response?)
112 }
113
114 #[instrument(level = "trace", skip_all)]
115 pub async fn handle_certificate<T>(
116 &self,
117 certificate: GenericCertificate<T>,
118 notifier: &impl Notifier,
119 ) -> Result<ChainInfoResponse, LocalNodeError>
120 where
121 T: ProcessableCertificate,
122 {
123 Ok(Box::pin(
124 self.node
125 .state
126 .fully_handle_certificate_with_notifications(certificate, notifier),
127 )
128 .await?)
129 }
130
131 #[instrument(level = "trace", skip_all)]
135 pub async fn handle_confirmed_certificate(
136 &self,
137 certificate: ConfirmedBlockCertificate,
138 mode: ProcessConfirmedBlockMode,
139 notifier: &impl Notifier,
140 ) -> Result<ChainInfoResponse, LocalNodeError> {
141 Ok(Box::pin(
142 self.node
143 .state
144 .fully_handle_confirmed_certificate_with_notifications(certificate, mode, notifier),
145 )
146 .await?)
147 }
148
149 #[instrument(level = "trace", skip_all)]
150 pub async fn handle_chain_info_query(
151 &self,
152 query: ChainInfoQuery,
153 ) -> Result<ChainInfoResponse, LocalNodeError> {
154 Ok(self.node.state.handle_chain_info_query(query).await?)
155 }
156
157 #[instrument(level = "trace", skip_all)]
158 pub fn new(state: WorkerState<S>) -> Self {
159 Self {
160 node: Arc::new(LocalNode { state }),
161 }
162 }
163
164 #[instrument(level = "trace", skip_all)]
165 pub(crate) fn storage_client(&self) -> S {
166 self.node.state.storage_client().clone()
167 }
168
169 #[instrument(level = "trace", skip_all)]
174 pub async fn stage_block_execution(
175 &self,
176 block: ProposedBlock,
177 round: Option<u32>,
178 published_blobs: Vec<Blob>,
179 policy: BundleExecutionPolicy,
180 ) -> Result<
181 (
182 ProposedBlock,
183 Block,
184 ChainInfoResponse,
185 ResourceTracker,
186 HashSet<ChainId>,
187 ),
188 LocalNodeError,
189 > {
190 Ok(self
191 .node
192 .state
193 .stage_block_execution(block, round, published_blobs, policy)
194 .await?)
195 }
196
197 pub async fn read_blobs_from_storage(
199 &self,
200 blob_ids: &[BlobId],
201 ) -> Result<Option<Vec<CacheArc<Blob>>>, LocalNodeError> {
202 let storage = self.storage_client();
203 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
204 }
205
206 pub async fn read_blob_states_from_storage(
208 &self,
209 blob_ids: &[BlobId],
210 ) -> Result<Vec<BlobState>, LocalNodeError> {
211 let storage = self.storage_client();
212 let mut blobs_not_found = Vec::new();
213 let mut blob_states = Vec::new();
214 for (blob_state, blob_id) in storage
215 .read_blob_states(blob_ids)
216 .await?
217 .into_iter()
218 .zip(blob_ids)
219 {
220 match blob_state {
221 None => blobs_not_found.push(*blob_id),
222 Some(blob_state) => blob_states.push(blob_state),
223 }
224 }
225 if !blobs_not_found.is_empty() {
226 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
227 }
228 Ok(blob_states)
229 }
230
231 pub async fn get_locking_blobs(
234 &self,
235 blob_ids: impl IntoIterator<Item = &BlobId>,
236 chain_id: ChainId,
237 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
238 let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
239 Ok(self
240 .node
241 .state
242 .get_locking_blobs(chain_id, blob_ids_vec)
243 .await?)
244 }
245
246 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
248 let storage = self.storage_client();
249 storage.maybe_write_blobs(blobs).await?;
250 Ok(())
251 }
252
253 pub async fn handle_pending_blobs(
254 &self,
255 chain_id: ChainId,
256 blobs: Vec<Blob>,
257 ) -> Result<(), LocalNodeError> {
258 for blob in blobs {
259 self.node.state.handle_pending_blob(chain_id, blob).await?;
260 }
261 Ok(())
262 }
263
264 #[instrument(level = "trace", skip(self))]
270 pub async fn chain_state_view(
271 &self,
272 chain_id: ChainId,
273 ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
274 Ok(self.node.state.chain_state_view(chain_id).await?)
275 }
276
277 #[instrument(level = "trace", skip(self))]
278 pub(crate) async fn chain_info(
279 &self,
280 chain_id: ChainId,
281 ) -> Result<Box<ChainInfo>, LocalNodeError> {
282 let query = ChainInfoQuery::new(chain_id);
283 Ok(self.handle_chain_info_query(query).await?.info)
284 }
285
286 #[instrument(level = "trace", skip(self, query))]
287 pub async fn query_application(
288 &self,
289 chain_id: ChainId,
290 query: Query,
291 block_hash: Option<CryptoHash>,
292 ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
293 let result = self
294 .node
295 .state
296 .query_application(chain_id, query, block_hash)
297 .await?;
298 Ok(result)
299 }
300
301 #[instrument(level = "trace", skip(self, notifier))]
309 pub async fn retry_pending_cross_chain_requests(
310 &self,
311 sender_chain: ChainId,
312 notifier: &impl Notifier,
313 ) -> Result<(), LocalNodeError> {
314 let actions = self
315 .node
316 .state
317 .cross_chain_network_actions(sender_chain)
318 .await?;
319 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
320 while let Some(request) = requests.pop_front() {
321 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
322 notifier.notify(&new_actions.notifications);
323 requests.extend(new_actions.cross_chain_requests);
324 }
325 Ok(())
326 }
327
328 pub async fn next_outbox_heights(
332 &self,
333 chain_ids: impl IntoIterator<Item = &ChainId>,
334 receiver_id: ChainId,
335 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
336 let futures = chain_ids
337 .into_iter()
338 .map(|chain_id| async move {
339 let (next_block_height, next_height_to_schedule) = match self
340 .get_tip_state_and_outbox_info(*chain_id, receiver_id)
341 .await
342 {
343 Ok(info) => info,
344 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
345 return Ok((*chain_id, BlockHeight::ZERO))
346 }
347 Err(err) => Err(err)?,
348 };
349 let next_height = if let Some(scheduled_height) = next_height_to_schedule {
350 next_block_height.max(scheduled_height)
351 } else {
352 next_block_height
353 };
354 Ok::<_, LocalNodeError>((*chain_id, next_height))
355 })
356 .collect::<FuturesUnordered<_>>();
357 futures.try_collect().await
358 }
359
360 pub async fn update_received_certificate_trackers(
361 &self,
362 chain_id: ChainId,
363 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
364 ) -> Result<(), LocalNodeError> {
365 self.node
366 .state
367 .update_received_certificate_trackers(chain_id, new_trackers)
368 .await?;
369 Ok(())
370 }
371
372 pub async fn get_preprocessed_block_hashes(
373 &self,
374 chain_id: ChainId,
375 start: BlockHeight,
376 end: BlockHeight,
377 ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
378 Ok(self
379 .node
380 .state
381 .get_preprocessed_block_hashes(chain_id, start, end)
382 .await?)
383 }
384
385 pub async fn get_inbox_next_height(
386 &self,
387 chain_id: ChainId,
388 origin: ChainId,
389 ) -> Result<BlockHeight, LocalNodeError> {
390 Ok(self
391 .node
392 .state
393 .get_inbox_next_height(chain_id, origin)
394 .await?)
395 }
396
397 pub async fn get_block_hashes(
399 &self,
400 chain_id: ChainId,
401 heights: Vec<BlockHeight>,
402 ) -> Result<Vec<CryptoHash>, LocalNodeError> {
403 Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
404 }
405
406 pub async fn get_proposed_blobs(
408 &self,
409 chain_id: ChainId,
410 blob_ids: Vec<BlobId>,
411 ) -> Result<Vec<Blob>, LocalNodeError> {
412 Ok(self
413 .node
414 .state
415 .get_proposed_blobs(chain_id, blob_ids)
416 .await?)
417 }
418
419 pub async fn get_event_subscriptions(
421 &self,
422 chain_id: ChainId,
423 ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
424 Ok(self.node.state.get_event_subscriptions(chain_id).await?)
425 }
426
427 pub async fn get_next_expected_event(
429 &self,
430 chain_id: ChainId,
431 stream_id: StreamId,
432 ) -> Result<Option<u32>, LocalNodeError> {
433 Ok(self
434 .node
435 .state
436 .get_next_expected_event(chain_id, stream_id)
437 .await?)
438 }
439
440 pub async fn next_expected_events(
442 &self,
443 chain_id: ChainId,
444 stream_ids: Vec<StreamId>,
445 ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
446 Ok(self
447 .node
448 .state
449 .next_expected_events(chain_id, stream_ids)
450 .await?)
451 }
452
453 pub async fn get_received_certificate_trackers(
455 &self,
456 chain_id: ChainId,
457 ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
458 Ok(self
459 .node
460 .state
461 .get_received_certificate_trackers(chain_id)
462 .await?)
463 }
464
465 pub async fn get_tip_state_and_outbox_info(
467 &self,
468 chain_id: ChainId,
469 receiver_id: ChainId,
470 ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
471 Ok(self
472 .node
473 .state
474 .get_tip_state_and_outbox_info(chain_id, receiver_id)
475 .await?)
476 }
477
478 pub async fn get_next_height_to_preprocess(
480 &self,
481 chain_id: ChainId,
482 ) -> Result<BlockHeight, LocalNodeError> {
483 Ok(self
484 .node
485 .state
486 .get_next_height_to_preprocess(chain_id)
487 .await?)
488 }
489}