1use std::{
6 collections::{BTreeMap, VecDeque},
7 sync::Arc,
8};
9
10use futures::{future::Either, stream, StreamExt as _, TryStreamExt as _};
11use linera_base::{
12 crypto::ValidatorPublicKey,
13 data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch},
14 identifiers::{ApplicationId, BlobId, ChainId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, ProposedBlock},
18 types::{Block, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate},
19 ChainStateView,
20};
21use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tokio::sync::OwnedRwLockReadGuard;
26use tracing::{instrument, warn};
27
28use crate::{
29 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30 notifier::Notifier,
31 worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34pub struct LocalNode<S>
36where
37 S: Storage,
38{
39 state: WorkerState<S>,
40}
41
42#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46 S: Storage,
47{
48 node: Arc<LocalNode<S>>,
49}
50
51#[derive(Debug, Error)]
53pub enum LocalNodeError {
54 #[error(transparent)]
55 ArithmeticError(#[from] ArithmeticError),
56
57 #[error(transparent)]
58 ViewError(#[from] ViewError),
59
60 #[error("Worker operation failed: {0}")]
61 WorkerError(WorkerError),
62
63 #[error("Failed to read blob {blob_id:?} of chain {chain_id:?}")]
64 CannotReadLocalBlob { chain_id: ChainId, blob_id: BlobId },
65
66 #[error("The local node doesn't have an active chain {0}")]
67 InactiveChain(ChainId),
68
69 #[error("The chain info response received from the local node is invalid")]
70 InvalidChainInfoResponse,
71
72 #[error("Blobs not found: {0:?}")]
73 BlobsNotFound(Vec<BlobId>),
74}
75
76impl From<WorkerError> for LocalNodeError {
77 fn from(error: WorkerError) -> Self {
78 match error {
79 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
80 error => LocalNodeError::WorkerError(error),
81 }
82 }
83}
84
85impl<S> LocalNodeClient<S>
86where
87 S: Storage + Clone + Send + Sync + 'static,
88{
89 #[instrument(level = "trace", skip_all)]
90 pub async fn handle_block_proposal(
91 &self,
92 proposal: BlockProposal,
93 ) -> Result<ChainInfoResponse, LocalNodeError> {
94 let (response, _actions) = self.node.state.handle_block_proposal(proposal).await?;
96 Ok(response)
97 }
98
99 #[instrument(level = "trace", skip_all)]
100 pub async fn handle_lite_certificate(
101 &self,
102 certificate: LiteCertificate<'_>,
103 notifier: &impl Notifier,
104 ) -> Result<ChainInfoResponse, LocalNodeError> {
105 match self.node.state.full_certificate(certificate).await? {
106 Either::Left(confirmed) => Ok(self.handle_certificate(confirmed, notifier).await?),
107 Either::Right(validated) => Ok(self.handle_certificate(validated, notifier).await?),
108 }
109 }
110
111 #[instrument(level = "trace", skip_all)]
112 pub async fn handle_certificate<T>(
113 &self,
114 certificate: GenericCertificate<T>,
115 notifier: &impl Notifier,
116 ) -> Result<ChainInfoResponse, LocalNodeError>
117 where
118 T: ProcessableCertificate,
119 {
120 Ok(Box::pin(
121 self.node
122 .state
123 .fully_handle_certificate_with_notifications(certificate, notifier),
124 )
125 .await?)
126 }
127
128 #[instrument(level = "trace", skip_all)]
130 pub async fn preprocess_certificate(
131 &self,
132 certificate: ConfirmedBlockCertificate,
133 notifier: &impl Notifier,
134 ) -> Result<(), LocalNodeError> {
135 self.node
136 .state
137 .fully_preprocess_certificate_with_notifications(certificate, notifier)
138 .await?;
139 Ok(())
140 }
141
142 #[instrument(level = "trace", skip_all)]
143 pub async fn handle_chain_info_query(
144 &self,
145 query: ChainInfoQuery,
146 ) -> Result<ChainInfoResponse, LocalNodeError> {
147 let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
149 Ok(response)
150 }
151
152 #[instrument(level = "trace", skip_all)]
153 pub fn new(state: WorkerState<S>) -> Self {
154 Self {
155 node: Arc::new(LocalNode { state }),
156 }
157 }
158
159 #[instrument(level = "trace", skip_all)]
160 pub(crate) fn storage_client(&self) -> S {
161 self.node.state.storage_client().clone()
162 }
163
164 #[instrument(level = "trace", skip_all)]
165 pub async fn stage_block_execution(
166 &self,
167 block: ProposedBlock,
168 round: Option<u32>,
169 published_blobs: Vec<Blob>,
170 ) -> Result<(Block, ChainInfoResponse), LocalNodeError> {
171 Ok(self
172 .node
173 .state
174 .stage_block_execution(block, round, published_blobs)
175 .await?)
176 }
177
178 pub async fn read_blobs_from_storage(
180 &self,
181 blob_ids: &[BlobId],
182 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
183 let storage = self.storage_client();
184 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
185 }
186
187 pub async fn read_blob_states_from_storage(
189 &self,
190 blob_ids: &[BlobId],
191 ) -> Result<Vec<BlobState>, LocalNodeError> {
192 let storage = self.storage_client();
193 let mut blobs_not_found = Vec::new();
194 let mut blob_states = Vec::new();
195 for (blob_state, blob_id) in storage
196 .read_blob_states(blob_ids)
197 .await?
198 .into_iter()
199 .zip(blob_ids)
200 {
201 match blob_state {
202 None => blobs_not_found.push(*blob_id),
203 Some(blob_state) => blob_states.push(blob_state),
204 }
205 }
206 if !blobs_not_found.is_empty() {
207 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
208 }
209 Ok(blob_states)
210 }
211
212 pub async fn get_locking_blobs(
215 &self,
216 blob_ids: impl IntoIterator<Item = &BlobId>,
217 chain_id: ChainId,
218 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
219 let chain = self.chain_state_view(chain_id).await?;
220 let mut blobs = Vec::new();
221 for blob_id in blob_ids {
222 match chain.manager.locking_blobs.get(blob_id).await? {
223 None => return Ok(None),
224 Some(blob) => blobs.push(blob),
225 }
226 }
227 Ok(Some(blobs))
228 }
229
230 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
232 let storage = self.storage_client();
233 storage.maybe_write_blobs(blobs).await?;
234 Ok(())
235 }
236
237 pub async fn handle_pending_blobs(
238 &self,
239 chain_id: ChainId,
240 blobs: Vec<Blob>,
241 ) -> Result<(), LocalNodeError> {
242 for blob in blobs {
243 self.node.state.handle_pending_blob(chain_id, blob).await?;
244 }
245 Ok(())
246 }
247
248 #[instrument(level = "trace", skip(self))]
254 pub async fn chain_state_view(
255 &self,
256 chain_id: ChainId,
257 ) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
258 Ok(self.node.state.chain_state_view(chain_id).await?)
259 }
260
261 #[instrument(level = "trace", skip(self))]
262 pub(crate) async fn chain_info(
263 &self,
264 chain_id: ChainId,
265 ) -> Result<Box<ChainInfo>, LocalNodeError> {
266 let query = ChainInfoQuery::new(chain_id);
267 Ok(self.handle_chain_info_query(query).await?.info)
268 }
269
270 #[instrument(level = "trace", skip(self, query))]
271 pub async fn query_application(
272 &self,
273 chain_id: ChainId,
274 query: Query,
275 ) -> Result<QueryOutcome, LocalNodeError> {
276 let outcome = self.node.state.query_application(chain_id, query).await?;
277 Ok(outcome)
278 }
279
280 #[instrument(level = "trace", skip(self))]
281 pub async fn describe_application(
282 &self,
283 chain_id: ChainId,
284 application_id: ApplicationId,
285 ) -> Result<ApplicationDescription, LocalNodeError> {
286 let response = self
287 .node
288 .state
289 .describe_application(chain_id, application_id)
290 .await?;
291 Ok(response)
292 }
293
294 #[instrument(level = "trace", skip(self))]
296 pub async fn retry_pending_cross_chain_requests(
297 &self,
298 sender_chain: ChainId,
299 ) -> Result<(), LocalNodeError> {
300 let (_response, actions) = self
301 .node
302 .state
303 .handle_chain_info_query(ChainInfoQuery::new(sender_chain))
304 .await?;
305 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
306 while let Some(request) = requests.pop_front() {
307 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
308 requests.extend(new_actions.cross_chain_requests);
309 }
310 Ok(())
311 }
312
313 pub async fn next_block_heights(
318 &self,
319 chain_ids: impl IntoIterator<Item = &ChainId>,
320 chain_worker_limit: usize,
321 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
322 let futures = chain_ids
323 .into_iter()
324 .map(|chain_id| async move {
325 let chain = self.chain_state_view(*chain_id).await?;
326 let mut next_height = chain.tip_state.get().next_block_height;
327 while chain.preprocessed_blocks.contains_key(&next_height).await? {
330 next_height.try_add_assign_one()?;
331 }
332 Ok::<_, LocalNodeError>((*chain_id, next_height))
333 })
334 .collect::<Vec<_>>();
335 stream::iter(futures)
336 .buffer_unordered(chain_worker_limit)
337 .try_collect()
338 .await
339 }
340
341 pub async fn update_received_certificate_trackers(
342 &self,
343 chain_id: ChainId,
344 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
345 ) -> Result<(), LocalNodeError> {
346 self.node
347 .state
348 .update_received_certificate_trackers(chain_id, new_trackers)
349 .await?;
350 Ok(())
351 }
352}
353
354pub trait LocalChainInfoExt {
357 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
359
360 fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
362
363 fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
365}
366
367impl LocalChainInfoExt for ChainInfo {
368 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
369 self.requested_committees
370 .ok_or(LocalNodeError::InvalidChainInfoResponse)
371 }
372
373 fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
374 self.requested_committees
375 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
376 .remove(&self.epoch)
377 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
378 }
379
380 fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
381 self.requested_committees
382 .as_ref()
383 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
384 .get(&self.epoch)
385 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
386 }
387}