1use std::{
6 collections::{BTreeMap, HashMap, HashSet, VecDeque},
7 sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{ArithmeticError, Blob, BlockHeight},
14 identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18 types::{Block, ConfirmedBlockCertificate, GenericCertificate},
19 ChainError, ChainExecutionContext, StreamCounts,
20};
21use linera_execution::{BlobState, ExecutionError, Query, QueryOutcome, ResourceTracker};
22use linera_storage::{Arc as CacheArc, Storage};
23use linera_views::ViewError;
24use thiserror::Error;
25use tracing::{instrument, warn};
26
27use crate::{
28 chain_worker::ProcessConfirmedBlockMode,
29 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30 notifier::Notifier,
31 worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34pub struct LocalNode<S>
36where
37 S: Storage,
38{
39 state: WorkerState<S>,
40}
41
42#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46 S: Storage,
47{
48 node: Arc<LocalNode<S>>,
49}
50
51#[derive(Debug, Error)]
53#[allow(missing_docs)]
54pub enum LocalNodeError {
55 #[error(transparent)]
56 ArithmeticError(#[from] ArithmeticError),
57
58 #[error(transparent)]
59 ViewError(#[from] ViewError),
60
61 #[error("Worker operation failed: {0}")]
62 WorkerError(WorkerError),
63
64 #[error("The local node doesn't have an active chain {0}")]
65 InactiveChain(ChainId),
66
67 #[error("The chain info response received from the local node is invalid")]
68 InvalidChainInfoResponse,
69
70 #[error("Blobs not found: {0:?}")]
71 BlobsNotFound(Vec<BlobId>),
72
73 #[error("Blocks not found: {0:?}")]
74 BlocksNotFound(Vec<CryptoHash>),
75
76 #[error("Events not found: {0:?}")]
77 EventsNotFound(Vec<EventId>),
78}
79
80impl From<ExecutionError> for LocalNodeError {
81 fn from(error: ExecutionError) -> Self {
82 match error {
83 ExecutionError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
84 ExecutionError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
85 ExecutionError::ViewError(view_error) => LocalNodeError::ViewError(view_error),
86 error => LocalNodeError::WorkerError(WorkerError::from(ChainError::ExecutionError(
87 Box::new(error),
88 ChainExecutionContext::Block,
89 ))),
90 }
91 }
92}
93
94impl From<WorkerError> for LocalNodeError {
95 fn from(error: WorkerError) -> Self {
96 match error {
97 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
98 WorkerError::BlocksNotFound(hashes) => LocalNodeError::BlocksNotFound(hashes),
99 WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
100 error => LocalNodeError::WorkerError(error),
101 }
102 }
103}
104
105impl<S> LocalNodeClient<S>
106where
107 S: Storage + Clone + 'static,
108{
109 #[instrument(level = "trace", skip_all)]
110 pub async fn handle_block_proposal(
111 &self,
112 proposal: BlockProposal,
113 ) -> Result<ChainInfoResponse, LocalNodeError> {
114 let (response, _actions) = Box::pin(self.node.state.handle_block_proposal(proposal)).await;
116 Ok(response?)
117 }
118
119 #[instrument(level = "trace", skip_all)]
120 pub async fn handle_certificate<T>(
121 &self,
122 certificate: GenericCertificate<T>,
123 notifier: &impl Notifier,
124 ) -> Result<ChainInfoResponse, LocalNodeError>
125 where
126 T: ProcessableCertificate,
127 {
128 Ok(Box::pin(
129 self.node
130 .state
131 .fully_handle_certificate_with_notifications(certificate, notifier),
132 )
133 .await?)
134 }
135
136 #[instrument(level = "trace", skip_all)]
140 pub async fn handle_confirmed_certificate(
141 &self,
142 certificate: ConfirmedBlockCertificate,
143 mode: ProcessConfirmedBlockMode,
144 notifier: &impl Notifier,
145 ) -> Result<ChainInfoResponse, LocalNodeError> {
146 Ok(Box::pin(
147 self.node
148 .state
149 .fully_handle_confirmed_certificate_with_notifications(certificate, mode, notifier),
150 )
151 .await?)
152 }
153
154 #[cfg(with_testing)]
158 pub async fn read_certificate(
159 &self,
160 chain_id: linera_base::identifiers::ChainId,
161 height: linera_base::data_types::BlockHeight,
162 ) -> Result<Option<CacheArc<ConfirmedBlockCertificate>>, LocalNodeError> {
163 Ok(self.node.state.read_certificate(chain_id, height).await?)
164 }
165
166 #[instrument(level = "trace", skip_all)]
167 pub async fn handle_chain_info_query(
168 &self,
169 query: ChainInfoQuery,
170 ) -> Result<ChainInfoResponse, LocalNodeError> {
171 Ok(self.node.state.handle_chain_info_query(query).await?)
172 }
173
174 #[instrument(level = "trace", skip_all)]
175 pub fn new(state: WorkerState<S>) -> Self {
176 Self {
177 node: Arc::new(LocalNode { state }),
178 }
179 }
180
181 #[instrument(level = "trace", skip_all)]
182 pub(crate) fn storage_client(&self) -> S {
183 self.node.state.storage_client().clone()
184 }
185
186 #[instrument(level = "trace", skip_all)]
191 pub async fn stage_block_execution(
192 &self,
193 block: ProposedBlock,
194 round: Option<u32>,
195 published_blobs: Vec<Blob>,
196 policy: BundleExecutionPolicy,
197 ) -> Result<
198 (
199 ProposedBlock,
200 Block,
201 ChainInfoResponse,
202 ResourceTracker,
203 HashSet<ChainId>,
204 ),
205 LocalNodeError,
206 > {
207 Ok(self
208 .node
209 .state
210 .stage_block_execution(block, round, published_blobs, policy)
211 .await?)
212 }
213
214 pub async fn read_blobs_from_storage(
216 &self,
217 blob_ids: &[BlobId],
218 ) -> Result<Option<Vec<CacheArc<Blob>>>, LocalNodeError> {
219 let storage = self.storage_client();
220 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
221 }
222
223 pub async fn read_blob_states_from_storage(
225 &self,
226 blob_ids: &[BlobId],
227 ) -> Result<Vec<BlobState>, LocalNodeError> {
228 let storage = self.storage_client();
229 let mut blobs_not_found = Vec::new();
230 let mut blob_states = Vec::new();
231 for (blob_state, blob_id) in storage
232 .read_blob_states(blob_ids)
233 .await?
234 .into_iter()
235 .zip(blob_ids)
236 {
237 match blob_state {
238 None => blobs_not_found.push(*blob_id),
239 Some(blob_state) => blob_states.push(blob_state),
240 }
241 }
242 if !blobs_not_found.is_empty() {
243 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
244 }
245 Ok(blob_states)
246 }
247
248 pub async fn get_locking_blobs(
251 &self,
252 blob_ids: impl IntoIterator<Item = &BlobId>,
253 chain_id: ChainId,
254 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
255 let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
256 Ok(self
257 .node
258 .state
259 .get_locking_blobs(chain_id, blob_ids_vec)
260 .await?)
261 }
262
263 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
265 let storage = self.storage_client();
266 storage.maybe_write_blobs(blobs).await?;
267 Ok(())
268 }
269
270 pub async fn handle_pending_blobs(
271 &self,
272 chain_id: ChainId,
273 blobs: Vec<Blob>,
274 ) -> Result<(), LocalNodeError> {
275 for blob in blobs {
276 self.node.state.handle_pending_blob(chain_id, blob).await?;
277 }
278 Ok(())
279 }
280
281 #[instrument(level = "trace", skip(self))]
287 pub async fn chain_state_view(
288 &self,
289 chain_id: ChainId,
290 ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
291 Ok(self.node.state.chain_state_view(chain_id).await?)
292 }
293
294 #[instrument(level = "trace", skip(self))]
295 pub(crate) async fn chain_info(
296 &self,
297 chain_id: ChainId,
298 ) -> Result<Box<ChainInfo>, LocalNodeError> {
299 let query = ChainInfoQuery::new(chain_id);
300 Ok(self.handle_chain_info_query(query).await?.info)
301 }
302
303 #[instrument(level = "trace", skip(self, query))]
304 pub async fn query_application(
305 &self,
306 chain_id: ChainId,
307 query: Query,
308 block_hash: Option<CryptoHash>,
309 ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
310 let result = self
311 .node
312 .state
313 .query_application(chain_id, query, block_hash)
314 .await?;
315 Ok(result)
316 }
317
318 #[instrument(level = "trace", skip(self, notifier))]
326 pub async fn retry_pending_cross_chain_requests(
327 &self,
328 sender_chain: ChainId,
329 notifier: &impl Notifier,
330 ) -> Result<(), LocalNodeError> {
331 let actions = self
332 .node
333 .state
334 .cross_chain_network_actions(sender_chain)
335 .await?;
336 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
337 while let Some(request) = requests.pop_front() {
338 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
339 notifier.notify(&new_actions.notifications);
340 requests.extend(new_actions.cross_chain_requests);
341 }
342 Ok(())
343 }
344
345 pub async fn next_outbox_heights(
349 &self,
350 chain_ids: impl IntoIterator<Item = &ChainId>,
351 receiver_id: ChainId,
352 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
353 let futures = chain_ids
354 .into_iter()
355 .map(|chain_id| async move {
356 let (next_block_height, next_height_to_schedule) = match self
357 .get_tip_state_and_outbox_info(*chain_id, receiver_id)
358 .await
359 {
360 Ok(info) => info,
361 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
362 return Ok((*chain_id, BlockHeight::ZERO))
363 }
364 Err(err) => Err(err)?,
365 };
366 let next_height = if let Some(scheduled_height) = next_height_to_schedule {
367 next_block_height.max(scheduled_height)
368 } else {
369 next_block_height
370 };
371 Ok::<_, LocalNodeError>((*chain_id, next_height))
372 })
373 .collect::<FuturesUnordered<_>>();
374 futures.try_collect().await
375 }
376
377 pub async fn update_received_certificate_trackers(
378 &self,
379 chain_id: ChainId,
380 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
381 ) -> Result<(), LocalNodeError> {
382 self.node
383 .state
384 .update_received_certificate_trackers(chain_id, new_trackers)
385 .await?;
386 Ok(())
387 }
388
389 pub async fn get_preprocessed_block_hashes(
390 &self,
391 chain_id: ChainId,
392 start: BlockHeight,
393 end: BlockHeight,
394 ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
395 Ok(self
396 .node
397 .state
398 .get_preprocessed_block_hashes(chain_id, start, end)
399 .await?)
400 }
401
402 pub async fn get_inbox_next_height(
403 &self,
404 chain_id: ChainId,
405 origin: ChainId,
406 ) -> Result<BlockHeight, LocalNodeError> {
407 Ok(self
408 .node
409 .state
410 .get_inbox_next_height(chain_id, origin)
411 .await?)
412 }
413
414 pub async fn get_block_hashes(
416 &self,
417 chain_id: ChainId,
418 heights: Vec<BlockHeight>,
419 ) -> Result<Vec<CryptoHash>, LocalNodeError> {
420 Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
421 }
422
423 pub async fn get_proposed_blobs(
425 &self,
426 chain_id: ChainId,
427 blob_ids: Vec<BlobId>,
428 ) -> Result<Vec<Blob>, LocalNodeError> {
429 Ok(self
430 .node
431 .state
432 .get_proposed_blobs(chain_id, blob_ids)
433 .await?)
434 }
435
436 pub async fn get_event_subscriptions(
438 &self,
439 chain_id: ChainId,
440 ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
441 Ok(self.node.state.get_event_subscriptions(chain_id).await?)
442 }
443
444 pub async fn get_stream_indices(
447 &self,
448 chain_id: ChainId,
449 stream_id: StreamId,
450 ) -> Result<StreamCounts, LocalNodeError> {
451 Ok(self
452 .node
453 .state
454 .get_stream_indices(chain_id, stream_id)
455 .await?)
456 }
457
458 pub async fn next_expected_events(
460 &self,
461 chain_id: ChainId,
462 stream_ids: Vec<StreamId>,
463 ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
464 Ok(self
465 .node
466 .state
467 .next_expected_events(chain_id, stream_ids)
468 .await?)
469 }
470
471 #[cfg(with_testing)]
473 pub async fn reset_and_reexecute_chain(
474 &self,
475 chain_id: ChainId,
476 ) -> Result<Vec<crate::data_types::CrossChainRequest>, LocalNodeError> {
477 Ok(self.node.state.reset_and_reexecute_chain(chain_id).await?)
478 }
479
480 pub async fn get_received_certificate_trackers(
482 &self,
483 chain_id: ChainId,
484 ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
485 Ok(self
486 .node
487 .state
488 .get_received_certificate_trackers(chain_id)
489 .await?)
490 }
491
492 pub async fn get_tip_state_and_outbox_info(
494 &self,
495 chain_id: ChainId,
496 receiver_id: ChainId,
497 ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
498 Ok(self
499 .node
500 .state
501 .get_tip_state_and_outbox_info(chain_id, receiver_id)
502 .await?)
503 }
504
505 pub async fn get_next_height_to_preprocess(
507 &self,
508 chain_id: ChainId,
509 ) -> Result<BlockHeight, LocalNodeError> {
510 Ok(self
511 .node
512 .state
513 .get_next_height_to_preprocess(chain_id)
514 .await?)
515 }
516}