1use std::{
6 collections::{BTreeMap, HashMap, HashSet, VecDeque},
7 sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{ArithmeticError, Blob, BlockHeight},
14 identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18 types::{Block, GenericCertificate},
19 ChainError, ChainExecutionContext,
20};
21use linera_execution::{BlobState, ExecutionError, Query, QueryOutcome, ResourceTracker};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tracing::{instrument, warn};
26
27use crate::{
28 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
29 notifier::Notifier,
30 worker::{ProcessableCertificate, WorkerError, WorkerState},
31};
32
33pub struct LocalNode<S>
35where
36 S: Storage,
37{
38 state: WorkerState<S>,
39}
40
41#[derive(Clone)]
43pub struct LocalNodeClient<S>
44where
45 S: Storage,
46{
47 node: Arc<LocalNode<S>>,
48}
49
50#[derive(Debug, Error)]
52pub enum LocalNodeError {
53 #[error(transparent)]
54 ArithmeticError(#[from] ArithmeticError),
55
56 #[error(transparent)]
57 ViewError(#[from] ViewError),
58
59 #[error("Worker operation failed: {0}")]
60 WorkerError(WorkerError),
61
62 #[error("The local node doesn't have an active chain {0}")]
63 InactiveChain(ChainId),
64
65 #[error("The chain info response received from the local node is invalid")]
66 InvalidChainInfoResponse,
67
68 #[error("Blobs not found: {0:?}")]
69 BlobsNotFound(Vec<BlobId>),
70
71 #[error("Events not found: {0:?}")]
72 EventsNotFound(Vec<EventId>),
73}
74
75impl From<ExecutionError> for LocalNodeError {
76 fn from(error: ExecutionError) -> Self {
77 match error {
78 ExecutionError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
79 ExecutionError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
80 ExecutionError::ViewError(view_error) => LocalNodeError::ViewError(view_error),
81 error => LocalNodeError::WorkerError(WorkerError::from(ChainError::ExecutionError(
82 Box::new(error),
83 ChainExecutionContext::Block,
84 ))),
85 }
86 }
87}
88
89impl From<WorkerError> for LocalNodeError {
90 fn from(error: WorkerError) -> Self {
91 match error {
92 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
93 WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
94 error => LocalNodeError::WorkerError(error),
95 }
96 }
97}
98
99impl<S> LocalNodeClient<S>
100where
101 S: Storage + Clone + 'static,
102{
103 #[instrument(level = "trace", skip_all)]
104 pub async fn handle_block_proposal(
105 &self,
106 proposal: BlockProposal,
107 ) -> Result<ChainInfoResponse, LocalNodeError> {
108 let (response, _actions) =
110 Box::pin(self.node.state.handle_block_proposal(proposal)).await?;
111 Ok(response)
112 }
113
114 #[instrument(level = "trace", skip_all)]
115 pub async fn handle_certificate<T>(
116 &self,
117 certificate: GenericCertificate<T>,
118 notifier: &impl Notifier,
119 ) -> Result<ChainInfoResponse, LocalNodeError>
120 where
121 T: ProcessableCertificate,
122 {
123 Ok(Box::pin(
124 self.node
125 .state
126 .fully_handle_certificate_with_notifications(certificate, notifier),
127 )
128 .await?)
129 }
130
131 #[instrument(level = "trace", skip_all)]
132 pub async fn handle_chain_info_query(
133 &self,
134 query: ChainInfoQuery,
135 ) -> Result<ChainInfoResponse, LocalNodeError> {
136 Ok(self.node.state.handle_chain_info_query(query).await?)
137 }
138
139 #[instrument(level = "trace", skip_all)]
140 pub fn new(state: WorkerState<S>) -> Self {
141 Self {
142 node: Arc::new(LocalNode { state }),
143 }
144 }
145
146 #[instrument(level = "trace", skip_all)]
147 pub(crate) fn storage_client(&self) -> S {
148 self.node.state.storage_client().clone()
149 }
150
151 #[instrument(level = "trace", skip_all)]
156 pub async fn stage_block_execution(
157 &self,
158 block: ProposedBlock,
159 round: Option<u32>,
160 published_blobs: Vec<Blob>,
161 policy: BundleExecutionPolicy,
162 ) -> Result<
163 (
164 ProposedBlock,
165 Block,
166 ChainInfoResponse,
167 ResourceTracker,
168 HashSet<ChainId>,
169 ),
170 LocalNodeError,
171 > {
172 Ok(self
173 .node
174 .state
175 .stage_block_execution(block, round, published_blobs, policy)
176 .await?)
177 }
178
179 pub async fn read_blobs_from_storage(
181 &self,
182 blob_ids: &[BlobId],
183 ) -> Result<Option<Vec<Arc<Blob>>>, LocalNodeError> {
184 let storage = self.storage_client();
185 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
186 }
187
188 pub async fn read_blob_states_from_storage(
190 &self,
191 blob_ids: &[BlobId],
192 ) -> Result<Vec<BlobState>, LocalNodeError> {
193 let storage = self.storage_client();
194 let mut blobs_not_found = Vec::new();
195 let mut blob_states = Vec::new();
196 for (blob_state, blob_id) in storage
197 .read_blob_states(blob_ids)
198 .await?
199 .into_iter()
200 .zip(blob_ids)
201 {
202 match blob_state {
203 None => blobs_not_found.push(*blob_id),
204 Some(blob_state) => blob_states.push(blob_state),
205 }
206 }
207 if !blobs_not_found.is_empty() {
208 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
209 }
210 Ok(blob_states)
211 }
212
213 pub async fn get_locking_blobs(
216 &self,
217 blob_ids: impl IntoIterator<Item = &BlobId>,
218 chain_id: ChainId,
219 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
220 let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
221 Ok(self
222 .node
223 .state
224 .get_locking_blobs(chain_id, blob_ids_vec)
225 .await?)
226 }
227
228 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
230 let storage = self.storage_client();
231 storage.maybe_write_blobs(blobs).await?;
232 Ok(())
233 }
234
235 pub async fn handle_pending_blobs(
236 &self,
237 chain_id: ChainId,
238 blobs: Vec<Blob>,
239 ) -> Result<(), LocalNodeError> {
240 for blob in blobs {
241 self.node.state.handle_pending_blob(chain_id, blob).await?;
242 }
243 Ok(())
244 }
245
246 #[instrument(level = "trace", skip(self))]
252 pub async fn chain_state_view(
253 &self,
254 chain_id: ChainId,
255 ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
256 Ok(self.node.state.chain_state_view(chain_id).await?)
257 }
258
259 #[instrument(level = "trace", skip(self))]
260 pub(crate) async fn chain_info(
261 &self,
262 chain_id: ChainId,
263 ) -> Result<Box<ChainInfo>, LocalNodeError> {
264 let query = ChainInfoQuery::new(chain_id);
265 Ok(self.handle_chain_info_query(query).await?.info)
266 }
267
268 #[instrument(level = "trace", skip(self, query))]
269 pub async fn query_application(
270 &self,
271 chain_id: ChainId,
272 query: Query,
273 block_hash: Option<CryptoHash>,
274 ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
275 let result = self
276 .node
277 .state
278 .query_application(chain_id, query, block_hash)
279 .await?;
280 Ok(result)
281 }
282
283 #[instrument(level = "trace", skip(self, notifier))]
291 pub async fn retry_pending_cross_chain_requests(
292 &self,
293 sender_chain: ChainId,
294 notifier: &impl Notifier,
295 ) -> Result<(), LocalNodeError> {
296 let actions = self
297 .node
298 .state
299 .cross_chain_network_actions(sender_chain)
300 .await?;
301 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
302 while let Some(request) = requests.pop_front() {
303 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
304 notifier.notify(&new_actions.notifications);
305 requests.extend(new_actions.cross_chain_requests);
306 }
307 Ok(())
308 }
309
310 pub async fn next_outbox_heights(
314 &self,
315 chain_ids: impl IntoIterator<Item = &ChainId>,
316 receiver_id: ChainId,
317 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
318 let futures = chain_ids
319 .into_iter()
320 .map(|chain_id| async move {
321 let (next_block_height, next_height_to_schedule) = match self
322 .get_tip_state_and_outbox_info(*chain_id, receiver_id)
323 .await
324 {
325 Ok(info) => info,
326 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
327 return Ok((*chain_id, BlockHeight::ZERO))
328 }
329 Err(err) => Err(err)?,
330 };
331 let next_height = if let Some(scheduled_height) = next_height_to_schedule {
332 next_block_height.max(scheduled_height)
333 } else {
334 next_block_height
335 };
336 Ok::<_, LocalNodeError>((*chain_id, next_height))
337 })
338 .collect::<FuturesUnordered<_>>();
339 futures.try_collect().await
340 }
341
342 pub async fn update_received_certificate_trackers(
343 &self,
344 chain_id: ChainId,
345 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
346 ) -> Result<(), LocalNodeError> {
347 self.node
348 .state
349 .update_received_certificate_trackers(chain_id, new_trackers)
350 .await?;
351 Ok(())
352 }
353
354 pub async fn get_preprocessed_block_hashes(
355 &self,
356 chain_id: ChainId,
357 start: BlockHeight,
358 end: BlockHeight,
359 ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
360 Ok(self
361 .node
362 .state
363 .get_preprocessed_block_hashes(chain_id, start, end)
364 .await?)
365 }
366
367 pub async fn get_inbox_next_height(
368 &self,
369 chain_id: ChainId,
370 origin: ChainId,
371 ) -> Result<BlockHeight, LocalNodeError> {
372 Ok(self
373 .node
374 .state
375 .get_inbox_next_height(chain_id, origin)
376 .await?)
377 }
378
379 pub async fn get_block_hashes(
381 &self,
382 chain_id: ChainId,
383 heights: Vec<BlockHeight>,
384 ) -> Result<Vec<CryptoHash>, LocalNodeError> {
385 Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
386 }
387
388 pub async fn get_proposed_blobs(
390 &self,
391 chain_id: ChainId,
392 blob_ids: Vec<BlobId>,
393 ) -> Result<Vec<Blob>, LocalNodeError> {
394 Ok(self
395 .node
396 .state
397 .get_proposed_blobs(chain_id, blob_ids)
398 .await?)
399 }
400
401 pub async fn get_event_subscriptions(
403 &self,
404 chain_id: ChainId,
405 ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
406 Ok(self.node.state.get_event_subscriptions(chain_id).await?)
407 }
408
409 pub async fn get_next_expected_event(
411 &self,
412 chain_id: ChainId,
413 stream_id: StreamId,
414 ) -> Result<Option<u32>, LocalNodeError> {
415 Ok(self
416 .node
417 .state
418 .get_next_expected_event(chain_id, stream_id)
419 .await?)
420 }
421
422 pub async fn next_expected_events(
424 &self,
425 chain_id: ChainId,
426 stream_ids: Vec<StreamId>,
427 ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
428 Ok(self
429 .node
430 .state
431 .next_expected_events(chain_id, stream_ids)
432 .await?)
433 }
434
435 pub async fn get_received_certificate_trackers(
437 &self,
438 chain_id: ChainId,
439 ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
440 Ok(self
441 .node
442 .state
443 .get_received_certificate_trackers(chain_id)
444 .await?)
445 }
446
447 pub async fn get_tip_state_and_outbox_info(
449 &self,
450 chain_id: ChainId,
451 receiver_id: ChainId,
452 ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
453 Ok(self
454 .node
455 .state
456 .get_tip_state_and_outbox_info(chain_id, receiver_id)
457 .await?)
458 }
459
460 pub async fn get_next_height_to_preprocess(
462 &self,
463 chain_id: ChainId,
464 ) -> Result<BlockHeight, LocalNodeError> {
465 Ok(self
466 .node
467 .state
468 .get_next_height_to_preprocess(chain_id)
469 .await?)
470 }
471}