1use std::{
6 collections::{BTreeMap, HashMap, VecDeque},
7 sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{ArithmeticError, Blob, BlockHeight, Epoch},
14 identifiers::{BlobId, ChainId, StreamId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18 types::{Block, GenericCertificate},
19};
20use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome, ResourceTracker};
21use linera_storage::Storage;
22use linera_views::ViewError;
23use thiserror::Error;
24use tracing::{instrument, warn};
25
26use crate::{
27 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
28 notifier::Notifier,
29 worker::{ProcessableCertificate, WorkerError, WorkerState},
30};
31
32pub struct LocalNode<S>
34where
35 S: Storage,
36{
37 state: WorkerState<S>,
38}
39
40#[derive(Clone)]
42pub struct LocalNodeClient<S>
43where
44 S: Storage,
45{
46 node: Arc<LocalNode<S>>,
47}
48
49#[derive(Debug, Error)]
51pub enum LocalNodeError {
52 #[error(transparent)]
53 ArithmeticError(#[from] ArithmeticError),
54
55 #[error(transparent)]
56 ViewError(#[from] ViewError),
57
58 #[error("Worker operation failed: {0}")]
59 WorkerError(WorkerError),
60
61 #[error("The local node doesn't have an active chain {0}")]
62 InactiveChain(ChainId),
63
64 #[error("The chain info response received from the local node is invalid")]
65 InvalidChainInfoResponse,
66
67 #[error("Blobs not found: {0:?}")]
68 BlobsNotFound(Vec<BlobId>),
69}
70
71impl From<WorkerError> for LocalNodeError {
72 fn from(error: WorkerError) -> Self {
73 match error {
74 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
75 error => LocalNodeError::WorkerError(error),
76 }
77 }
78}
79
80impl<S> LocalNodeClient<S>
81where
82 S: Storage + Clone + 'static,
83{
84 #[instrument(level = "trace", skip_all)]
85 pub async fn handle_block_proposal(
86 &self,
87 proposal: BlockProposal,
88 ) -> Result<ChainInfoResponse, LocalNodeError> {
89 let (response, _actions) =
91 Box::pin(self.node.state.handle_block_proposal(proposal)).await?;
92 Ok(response)
93 }
94
95 #[instrument(level = "trace", skip_all)]
96 pub async fn handle_certificate<T>(
97 &self,
98 certificate: GenericCertificate<T>,
99 notifier: &impl Notifier,
100 ) -> Result<ChainInfoResponse, LocalNodeError>
101 where
102 T: ProcessableCertificate,
103 {
104 Ok(Box::pin(
105 self.node
106 .state
107 .fully_handle_certificate_with_notifications(certificate, notifier),
108 )
109 .await?)
110 }
111
112 #[instrument(level = "trace", skip_all)]
113 pub async fn handle_chain_info_query(
114 &self,
115 query: ChainInfoQuery,
116 ) -> Result<ChainInfoResponse, LocalNodeError> {
117 let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
119 Ok(response)
120 }
121
122 #[instrument(level = "trace", skip_all)]
123 pub fn new(state: WorkerState<S>) -> Self {
124 Self {
125 node: Arc::new(LocalNode { state }),
126 }
127 }
128
129 #[instrument(level = "trace", skip_all)]
130 pub(crate) fn storage_client(&self) -> S {
131 self.node.state.storage_client().clone()
132 }
133
134 #[instrument(level = "trace", skip_all)]
135 pub async fn stage_block_execution(
136 &self,
137 block: ProposedBlock,
138 round: Option<u32>,
139 published_blobs: Vec<Blob>,
140 ) -> Result<(Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
141 Ok(self
142 .node
143 .state
144 .stage_block_execution(block, round, published_blobs)
145 .await?)
146 }
147
148 #[instrument(level = "trace", skip_all)]
153 pub async fn stage_block_execution_with_policy(
154 &self,
155 block: ProposedBlock,
156 round: Option<u32>,
157 published_blobs: Vec<Blob>,
158 policy: BundleExecutionPolicy,
159 ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
160 Ok(self
161 .node
162 .state
163 .stage_block_execution_with_policy(block, round, published_blobs, policy)
164 .await?)
165 }
166
167 pub async fn read_blobs_from_storage(
169 &self,
170 blob_ids: &[BlobId],
171 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
172 let storage = self.storage_client();
173 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
174 }
175
176 pub async fn read_blob_states_from_storage(
178 &self,
179 blob_ids: &[BlobId],
180 ) -> Result<Vec<BlobState>, LocalNodeError> {
181 let storage = self.storage_client();
182 let mut blobs_not_found = Vec::new();
183 let mut blob_states = Vec::new();
184 for (blob_state, blob_id) in storage
185 .read_blob_states(blob_ids)
186 .await?
187 .into_iter()
188 .zip(blob_ids)
189 {
190 match blob_state {
191 None => blobs_not_found.push(*blob_id),
192 Some(blob_state) => blob_states.push(blob_state),
193 }
194 }
195 if !blobs_not_found.is_empty() {
196 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
197 }
198 Ok(blob_states)
199 }
200
201 pub async fn get_locking_blobs(
204 &self,
205 blob_ids: impl IntoIterator<Item = &BlobId>,
206 chain_id: ChainId,
207 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
208 let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
209 Ok(self
210 .node
211 .state
212 .get_locking_blobs(chain_id, blob_ids_vec)
213 .await?)
214 }
215
216 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
218 let storage = self.storage_client();
219 storage.maybe_write_blobs(blobs).await?;
220 Ok(())
221 }
222
223 pub async fn handle_pending_blobs(
224 &self,
225 chain_id: ChainId,
226 blobs: Vec<Blob>,
227 ) -> Result<(), LocalNodeError> {
228 for blob in blobs {
229 self.node.state.handle_pending_blob(chain_id, blob).await?;
230 }
231 Ok(())
232 }
233
234 #[instrument(level = "trace", skip(self))]
240 pub async fn chain_state_view(
241 &self,
242 chain_id: ChainId,
243 ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
244 Ok(self.node.state.chain_state_view(chain_id).await?)
245 }
246
247 #[instrument(level = "trace", skip(self))]
248 pub(crate) async fn chain_info(
249 &self,
250 chain_id: ChainId,
251 ) -> Result<Box<ChainInfo>, LocalNodeError> {
252 let query = ChainInfoQuery::new(chain_id);
253 Ok(self.handle_chain_info_query(query).await?.info)
254 }
255
256 #[instrument(level = "trace", skip(self, query))]
257 pub async fn query_application(
258 &self,
259 chain_id: ChainId,
260 query: Query,
261 block_hash: Option<CryptoHash>,
262 ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
263 let result = self
264 .node
265 .state
266 .query_application(chain_id, query, block_hash)
267 .await?;
268 Ok(result)
269 }
270
271 #[instrument(level = "trace", skip(self, notifier))]
273 pub async fn retry_pending_cross_chain_requests(
274 &self,
275 sender_chain: ChainId,
276 notifier: &impl Notifier,
277 ) -> Result<(), LocalNodeError> {
278 let (_response, actions) = self
279 .node
280 .state
281 .handle_chain_info_query(ChainInfoQuery::new(sender_chain).with_network_actions())
282 .await?;
283 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
284 while let Some(request) = requests.pop_front() {
285 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
286 notifier.notify(&new_actions.notifications);
287 requests.extend(new_actions.cross_chain_requests);
288 }
289 Ok(())
290 }
291
292 pub async fn next_outbox_heights(
296 &self,
297 chain_ids: impl IntoIterator<Item = &ChainId>,
298 receiver_id: ChainId,
299 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
300 let futures =
301 FuturesUnordered::from_iter(chain_ids.into_iter().map(|chain_id| async move {
302 let (next_block_height, next_height_to_schedule) = match self
303 .get_tip_state_and_outbox_info(*chain_id, receiver_id)
304 .await
305 {
306 Ok(info) => info,
307 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
308 return Ok((*chain_id, BlockHeight::ZERO))
309 }
310 Err(err) => Err(err)?,
311 };
312 let next_height = if let Some(scheduled_height) = next_height_to_schedule {
313 next_block_height.max(scheduled_height)
314 } else {
315 next_block_height
316 };
317 Ok::<_, LocalNodeError>((*chain_id, next_height))
318 }));
319 futures.try_collect().await
320 }
321
322 pub async fn update_received_certificate_trackers(
323 &self,
324 chain_id: ChainId,
325 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
326 ) -> Result<(), LocalNodeError> {
327 self.node
328 .state
329 .update_received_certificate_trackers(chain_id, new_trackers)
330 .await?;
331 Ok(())
332 }
333
334 pub async fn get_preprocessed_block_hashes(
335 &self,
336 chain_id: ChainId,
337 start: BlockHeight,
338 end: BlockHeight,
339 ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
340 Ok(self
341 .node
342 .state
343 .get_preprocessed_block_hashes(chain_id, start, end)
344 .await?)
345 }
346
347 pub async fn get_inbox_next_height(
348 &self,
349 chain_id: ChainId,
350 origin: ChainId,
351 ) -> Result<BlockHeight, LocalNodeError> {
352 Ok(self
353 .node
354 .state
355 .get_inbox_next_height(chain_id, origin)
356 .await?)
357 }
358
359 pub async fn get_block_hashes(
361 &self,
362 chain_id: ChainId,
363 heights: Vec<BlockHeight>,
364 ) -> Result<Vec<CryptoHash>, LocalNodeError> {
365 Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
366 }
367
368 pub async fn get_proposed_blobs(
370 &self,
371 chain_id: ChainId,
372 blob_ids: Vec<BlobId>,
373 ) -> Result<Vec<Blob>, LocalNodeError> {
374 Ok(self
375 .node
376 .state
377 .get_proposed_blobs(chain_id, blob_ids)
378 .await?)
379 }
380
381 pub async fn get_event_subscriptions(
383 &self,
384 chain_id: ChainId,
385 ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
386 Ok(self.node.state.get_event_subscriptions(chain_id).await?)
387 }
388
389 pub async fn get_next_expected_event(
391 &self,
392 chain_id: ChainId,
393 stream_id: StreamId,
394 ) -> Result<Option<u32>, LocalNodeError> {
395 Ok(self
396 .node
397 .state
398 .get_next_expected_event(chain_id, stream_id)
399 .await?)
400 }
401
402 pub async fn get_received_certificate_trackers(
404 &self,
405 chain_id: ChainId,
406 ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
407 Ok(self
408 .node
409 .state
410 .get_received_certificate_trackers(chain_id)
411 .await?)
412 }
413
414 pub async fn get_tip_state_and_outbox_info(
416 &self,
417 chain_id: ChainId,
418 receiver_id: ChainId,
419 ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
420 Ok(self
421 .node
422 .state
423 .get_tip_state_and_outbox_info(chain_id, receiver_id)
424 .await?)
425 }
426
427 pub async fn get_next_height_to_preprocess(
429 &self,
430 chain_id: ChainId,
431 ) -> Result<BlockHeight, LocalNodeError> {
432 Ok(self
433 .node
434 .state
435 .get_next_height_to_preprocess(chain_id)
436 .await?)
437 }
438}
439
440pub trait LocalChainInfoExt {
443 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
445
446 fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
448
449 fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
451}
452
453impl LocalChainInfoExt for ChainInfo {
454 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
455 self.requested_committees
456 .ok_or(LocalNodeError::InvalidChainInfoResponse)
457 }
458
459 fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
460 self.requested_committees
461 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
462 .remove(&self.epoch)
463 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
464 }
465
466 fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
467 self.requested_committees
468 .as_ref()
469 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
470 .get(&self.epoch)
471 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
472 }
473}