1use std::{
6 collections::{BTreeMap, HashMap, VecDeque},
7 sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{ArithmeticError, Blob, BlockHeight, Epoch},
14 identifiers::{BlobId, ChainId, StreamId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, ProposedBlock},
18 types::{Block, GenericCertificate},
19 ChainStateView,
20};
21use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome, ResourceTracker};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tokio::sync::OwnedRwLockReadGuard;
26use tracing::{instrument, warn};
27
28use crate::{
29 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30 notifier::Notifier,
31 worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34pub struct LocalNode<S>
36where
37 S: Storage,
38{
39 state: WorkerState<S>,
40}
41
42#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46 S: Storage,
47{
48 node: Arc<LocalNode<S>>,
49}
50
51#[derive(Debug, Error)]
53pub enum LocalNodeError {
54 #[error(transparent)]
55 ArithmeticError(#[from] ArithmeticError),
56
57 #[error(transparent)]
58 ViewError(#[from] ViewError),
59
60 #[error("Worker operation failed: {0}")]
61 WorkerError(WorkerError),
62
63 #[error("The local node doesn't have an active chain {0}")]
64 InactiveChain(ChainId),
65
66 #[error("The chain info response received from the local node is invalid")]
67 InvalidChainInfoResponse,
68
69 #[error("Blobs not found: {0:?}")]
70 BlobsNotFound(Vec<BlobId>),
71}
72
73impl From<WorkerError> for LocalNodeError {
74 fn from(error: WorkerError) -> Self {
75 match error {
76 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
77 error => LocalNodeError::WorkerError(error),
78 }
79 }
80}
81
82impl<S> LocalNodeClient<S>
83where
84 S: Storage + Clone + 'static,
85{
86 #[instrument(level = "trace", skip_all)]
87 pub async fn handle_block_proposal(
88 &self,
89 proposal: BlockProposal,
90 ) -> Result<ChainInfoResponse, LocalNodeError> {
91 let (response, _actions) =
93 Box::pin(self.node.state.handle_block_proposal(proposal)).await?;
94 Ok(response)
95 }
96
97 #[instrument(level = "trace", skip_all)]
98 pub async fn handle_certificate<T>(
99 &self,
100 certificate: GenericCertificate<T>,
101 notifier: &impl Notifier,
102 ) -> Result<ChainInfoResponse, LocalNodeError>
103 where
104 T: ProcessableCertificate,
105 {
106 Ok(Box::pin(
107 self.node
108 .state
109 .fully_handle_certificate_with_notifications(certificate, notifier),
110 )
111 .await?)
112 }
113
114 #[instrument(level = "trace", skip_all)]
115 pub async fn handle_chain_info_query(
116 &self,
117 query: ChainInfoQuery,
118 ) -> Result<ChainInfoResponse, LocalNodeError> {
119 let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
121 Ok(response)
122 }
123
124 #[instrument(level = "trace", skip_all)]
125 pub fn new(state: WorkerState<S>) -> Self {
126 Self {
127 node: Arc::new(LocalNode { state }),
128 }
129 }
130
131 #[instrument(level = "trace", skip_all)]
132 pub(crate) fn storage_client(&self) -> S {
133 self.node.state.storage_client().clone()
134 }
135
136 #[instrument(level = "trace", skip_all)]
137 pub async fn stage_block_execution(
138 &self,
139 block: ProposedBlock,
140 round: Option<u32>,
141 published_blobs: Vec<Blob>,
142 ) -> Result<(Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
143 Ok(self
144 .node
145 .state
146 .stage_block_execution(block, round, published_blobs)
147 .await?)
148 }
149
150 pub async fn read_blobs_from_storage(
152 &self,
153 blob_ids: &[BlobId],
154 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
155 let storage = self.storage_client();
156 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
157 }
158
159 pub async fn read_blob_states_from_storage(
161 &self,
162 blob_ids: &[BlobId],
163 ) -> Result<Vec<BlobState>, LocalNodeError> {
164 let storage = self.storage_client();
165 let mut blobs_not_found = Vec::new();
166 let mut blob_states = Vec::new();
167 for (blob_state, blob_id) in storage
168 .read_blob_states(blob_ids)
169 .await?
170 .into_iter()
171 .zip(blob_ids)
172 {
173 match blob_state {
174 None => blobs_not_found.push(*blob_id),
175 Some(blob_state) => blob_states.push(blob_state),
176 }
177 }
178 if !blobs_not_found.is_empty() {
179 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
180 }
181 Ok(blob_states)
182 }
183
184 pub async fn get_locking_blobs(
187 &self,
188 blob_ids: impl IntoIterator<Item = &BlobId>,
189 chain_id: ChainId,
190 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
191 let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
192 Ok(self
193 .node
194 .state
195 .get_locking_blobs(chain_id, blob_ids_vec)
196 .await?)
197 }
198
199 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
201 let storage = self.storage_client();
202 storage.maybe_write_blobs(blobs).await?;
203 Ok(())
204 }
205
206 pub async fn handle_pending_blobs(
207 &self,
208 chain_id: ChainId,
209 blobs: Vec<Blob>,
210 ) -> Result<(), LocalNodeError> {
211 for blob in blobs {
212 self.node.state.handle_pending_blob(chain_id, blob).await?;
213 }
214 Ok(())
215 }
216
217 #[instrument(level = "trace", skip(self))]
223 pub async fn chain_state_view(
224 &self,
225 chain_id: ChainId,
226 ) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
227 Ok(self.node.state.chain_state_view(chain_id).await?)
228 }
229
230 #[instrument(level = "trace", skip(self))]
231 pub(crate) async fn chain_info(
232 &self,
233 chain_id: ChainId,
234 ) -> Result<Box<ChainInfo>, LocalNodeError> {
235 let query = ChainInfoQuery::new(chain_id);
236 Ok(self.handle_chain_info_query(query).await?.info)
237 }
238
239 #[instrument(level = "trace", skip(self, query))]
240 pub async fn query_application(
241 &self,
242 chain_id: ChainId,
243 query: Query,
244 block_hash: Option<CryptoHash>,
245 ) -> Result<QueryOutcome, LocalNodeError> {
246 let outcome = self
247 .node
248 .state
249 .query_application(chain_id, query, block_hash)
250 .await?;
251 Ok(outcome)
252 }
253
254 #[instrument(level = "trace", skip(self))]
256 pub async fn retry_pending_cross_chain_requests(
257 &self,
258 sender_chain: ChainId,
259 ) -> Result<(), LocalNodeError> {
260 let (_response, actions) = self
261 .node
262 .state
263 .handle_chain_info_query(ChainInfoQuery::new(sender_chain).with_network_actions())
264 .await?;
265 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
266 while let Some(request) = requests.pop_front() {
267 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
268 requests.extend(new_actions.cross_chain_requests);
269 }
270 Ok(())
271 }
272
273 pub async fn next_outbox_heights(
277 &self,
278 chain_ids: impl IntoIterator<Item = &ChainId>,
279 receiver_id: ChainId,
280 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
281 let futures =
282 FuturesUnordered::from_iter(chain_ids.into_iter().map(|chain_id| async move {
283 let (next_block_height, next_height_to_schedule) = match self
284 .get_tip_state_and_outbox_info(*chain_id, receiver_id)
285 .await
286 {
287 Ok(info) => info,
288 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
289 return Ok((*chain_id, BlockHeight::ZERO))
290 }
291 Err(err) => Err(err)?,
292 };
293 let next_height = if let Some(scheduled_height) = next_height_to_schedule {
294 next_block_height.max(scheduled_height)
295 } else {
296 next_block_height
297 };
298 Ok::<_, LocalNodeError>((*chain_id, next_height))
299 }));
300 futures.try_collect().await
301 }
302
303 pub async fn update_received_certificate_trackers(
304 &self,
305 chain_id: ChainId,
306 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
307 ) -> Result<(), LocalNodeError> {
308 self.node
309 .state
310 .update_received_certificate_trackers(chain_id, new_trackers)
311 .await?;
312 Ok(())
313 }
314
315 pub async fn get_preprocessed_block_hashes(
316 &self,
317 chain_id: ChainId,
318 start: BlockHeight,
319 end: BlockHeight,
320 ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
321 Ok(self
322 .node
323 .state
324 .get_preprocessed_block_hashes(chain_id, start, end)
325 .await?)
326 }
327
328 pub async fn get_inbox_next_height(
329 &self,
330 chain_id: ChainId,
331 origin: ChainId,
332 ) -> Result<BlockHeight, LocalNodeError> {
333 Ok(self
334 .node
335 .state
336 .get_inbox_next_height(chain_id, origin)
337 .await?)
338 }
339
340 pub async fn get_block_hashes(
342 &self,
343 chain_id: ChainId,
344 heights: Vec<BlockHeight>,
345 ) -> Result<Vec<CryptoHash>, LocalNodeError> {
346 Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
347 }
348
349 pub async fn get_proposed_blobs(
351 &self,
352 chain_id: ChainId,
353 blob_ids: Vec<BlobId>,
354 ) -> Result<Vec<Blob>, LocalNodeError> {
355 Ok(self
356 .node
357 .state
358 .get_proposed_blobs(chain_id, blob_ids)
359 .await?)
360 }
361
362 pub async fn get_event_subscriptions(
364 &self,
365 chain_id: ChainId,
366 ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
367 Ok(self.node.state.get_event_subscriptions(chain_id).await?)
368 }
369
370 pub async fn get_next_expected_event(
372 &self,
373 chain_id: ChainId,
374 stream_id: StreamId,
375 ) -> Result<Option<u32>, LocalNodeError> {
376 Ok(self
377 .node
378 .state
379 .get_next_expected_event(chain_id, stream_id)
380 .await?)
381 }
382
383 pub async fn get_received_certificate_trackers(
385 &self,
386 chain_id: ChainId,
387 ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
388 Ok(self
389 .node
390 .state
391 .get_received_certificate_trackers(chain_id)
392 .await?)
393 }
394
395 pub async fn get_tip_state_and_outbox_info(
397 &self,
398 chain_id: ChainId,
399 receiver_id: ChainId,
400 ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
401 Ok(self
402 .node
403 .state
404 .get_tip_state_and_outbox_info(chain_id, receiver_id)
405 .await?)
406 }
407
408 pub async fn get_next_height_to_preprocess(
410 &self,
411 chain_id: ChainId,
412 ) -> Result<BlockHeight, LocalNodeError> {
413 Ok(self
414 .node
415 .state
416 .get_next_height_to_preprocess(chain_id)
417 .await?)
418 }
419}
420
421pub trait LocalChainInfoExt {
424 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
426
427 fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
429
430 fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
432}
433
434impl LocalChainInfoExt for ChainInfo {
435 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
436 self.requested_committees
437 .ok_or(LocalNodeError::InvalidChainInfoResponse)
438 }
439
440 fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
441 self.requested_committees
442 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
443 .remove(&self.epoch)
444 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
445 }
446
447 fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
448 self.requested_committees
449 .as_ref()
450 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
451 .get(&self.epoch)
452 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
453 }
454}