1use std::{
6 collections::{BTreeMap, VecDeque},
7 sync::Arc,
8};
9
10use futures::{future::Either, stream, StreamExt as _, TryStreamExt as _};
11use linera_base::{
12 crypto::ValidatorPublicKey,
13 data_types::{ApplicationDescription, ArithmeticError, Blob, BlockHeight, Epoch},
14 identifiers::{ApplicationId, BlobId, ChainId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, ProposedBlock},
18 types::{Block, ConfirmedBlockCertificate, GenericCertificate, LiteCertificate},
19 ChainStateView,
20};
21use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tokio::sync::OwnedRwLockReadGuard;
26use tracing::{instrument, warn};
27
28use crate::{
29 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30 notifier::Notifier,
31 worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34pub struct LocalNode<S>
36where
37 S: Storage,
38{
39 state: WorkerState<S>,
40}
41
42#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46 S: Storage,
47{
48 node: Arc<LocalNode<S>>,
49}
50
51#[derive(Debug, Error)]
53pub enum LocalNodeError {
54 #[error(transparent)]
55 ArithmeticError(#[from] ArithmeticError),
56
57 #[error(transparent)]
58 ViewError(#[from] ViewError),
59
60 #[error("Worker operation failed: {0}")]
61 WorkerError(WorkerError),
62
63 #[error("The local node doesn't have an active chain {0}")]
64 InactiveChain(ChainId),
65
66 #[error("The chain info response received from the local node is invalid")]
67 InvalidChainInfoResponse,
68
69 #[error("Blobs not found: {0:?}")]
70 BlobsNotFound(Vec<BlobId>),
71}
72
73impl From<WorkerError> for LocalNodeError {
74 fn from(error: WorkerError) -> Self {
75 match error {
76 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
77 error => LocalNodeError::WorkerError(error),
78 }
79 }
80}
81
82impl<S> LocalNodeClient<S>
83where
84 S: Storage + Clone + Send + Sync + 'static,
85{
86 #[instrument(level = "trace", skip_all)]
87 pub async fn handle_block_proposal(
88 &self,
89 proposal: BlockProposal,
90 ) -> Result<ChainInfoResponse, LocalNodeError> {
91 let (response, _actions) = self.node.state.handle_block_proposal(proposal).await?;
93 Ok(response)
94 }
95
96 #[instrument(level = "trace", skip_all)]
97 pub async fn handle_lite_certificate(
98 &self,
99 certificate: LiteCertificate<'_>,
100 notifier: &impl Notifier,
101 ) -> Result<ChainInfoResponse, LocalNodeError> {
102 match self.node.state.full_certificate(certificate).await? {
103 Either::Left(confirmed) => Ok(self.handle_certificate(confirmed, notifier).await?),
104 Either::Right(validated) => Ok(self.handle_certificate(validated, notifier).await?),
105 }
106 }
107
108 #[instrument(level = "trace", skip_all)]
109 pub async fn handle_certificate<T>(
110 &self,
111 certificate: GenericCertificate<T>,
112 notifier: &impl Notifier,
113 ) -> Result<ChainInfoResponse, LocalNodeError>
114 where
115 T: ProcessableCertificate,
116 {
117 Ok(Box::pin(
118 self.node
119 .state
120 .fully_handle_certificate_with_notifications(certificate, notifier),
121 )
122 .await?)
123 }
124
125 #[instrument(level = "trace", skip_all)]
127 pub async fn preprocess_certificate(
128 &self,
129 certificate: ConfirmedBlockCertificate,
130 notifier: &impl Notifier,
131 ) -> Result<(), LocalNodeError> {
132 self.node
133 .state
134 .fully_preprocess_certificate_with_notifications(certificate, notifier)
135 .await?;
136 Ok(())
137 }
138
139 #[instrument(level = "trace", skip_all)]
140 pub async fn handle_chain_info_query(
141 &self,
142 query: ChainInfoQuery,
143 ) -> Result<ChainInfoResponse, LocalNodeError> {
144 let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
146 Ok(response)
147 }
148
149 #[instrument(level = "trace", skip_all)]
150 pub fn new(state: WorkerState<S>) -> Self {
151 Self {
152 node: Arc::new(LocalNode { state }),
153 }
154 }
155
156 #[instrument(level = "trace", skip_all)]
157 pub(crate) fn storage_client(&self) -> S {
158 self.node.state.storage_client().clone()
159 }
160
161 #[instrument(level = "trace", skip_all)]
162 pub async fn stage_block_execution(
163 &self,
164 block: ProposedBlock,
165 round: Option<u32>,
166 published_blobs: Vec<Blob>,
167 ) -> Result<(Block, ChainInfoResponse), LocalNodeError> {
168 Ok(self
169 .node
170 .state
171 .stage_block_execution(block, round, published_blobs)
172 .await?)
173 }
174
175 pub async fn read_blobs_from_storage(
177 &self,
178 blob_ids: &[BlobId],
179 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
180 let storage = self.storage_client();
181 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
182 }
183
184 pub async fn read_blob_states_from_storage(
186 &self,
187 blob_ids: &[BlobId],
188 ) -> Result<Vec<BlobState>, LocalNodeError> {
189 let storage = self.storage_client();
190 let mut blobs_not_found = Vec::new();
191 let mut blob_states = Vec::new();
192 for (blob_state, blob_id) in storage
193 .read_blob_states(blob_ids)
194 .await?
195 .into_iter()
196 .zip(blob_ids)
197 {
198 match blob_state {
199 None => blobs_not_found.push(*blob_id),
200 Some(blob_state) => blob_states.push(blob_state),
201 }
202 }
203 if !blobs_not_found.is_empty() {
204 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
205 }
206 Ok(blob_states)
207 }
208
209 pub async fn get_locking_blobs(
212 &self,
213 blob_ids: impl IntoIterator<Item = &BlobId>,
214 chain_id: ChainId,
215 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
216 let chain = self.chain_state_view(chain_id).await?;
217 let mut blobs = Vec::new();
218 for blob_id in blob_ids {
219 match chain.manager.locking_blobs.get(blob_id).await? {
220 None => return Ok(None),
221 Some(blob) => blobs.push(blob),
222 }
223 }
224 Ok(Some(blobs))
225 }
226
227 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
229 let storage = self.storage_client();
230 storage.maybe_write_blobs(blobs).await?;
231 Ok(())
232 }
233
234 pub async fn handle_pending_blobs(
235 &self,
236 chain_id: ChainId,
237 blobs: Vec<Blob>,
238 ) -> Result<(), LocalNodeError> {
239 for blob in blobs {
240 self.node.state.handle_pending_blob(chain_id, blob).await?;
241 }
242 Ok(())
243 }
244
245 #[instrument(level = "trace", skip(self))]
251 pub async fn chain_state_view(
252 &self,
253 chain_id: ChainId,
254 ) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
255 Ok(self.node.state.chain_state_view(chain_id).await?)
256 }
257
258 #[instrument(level = "trace", skip(self))]
259 pub(crate) async fn chain_info(
260 &self,
261 chain_id: ChainId,
262 ) -> Result<Box<ChainInfo>, LocalNodeError> {
263 let query = ChainInfoQuery::new(chain_id);
264 Ok(self.handle_chain_info_query(query).await?.info)
265 }
266
267 #[instrument(level = "trace", skip(self, query))]
268 pub async fn query_application(
269 &self,
270 chain_id: ChainId,
271 query: Query,
272 ) -> Result<QueryOutcome, LocalNodeError> {
273 let outcome = self.node.state.query_application(chain_id, query).await?;
274 Ok(outcome)
275 }
276
277 #[instrument(level = "trace", skip(self))]
278 pub async fn describe_application(
279 &self,
280 chain_id: ChainId,
281 application_id: ApplicationId,
282 ) -> Result<ApplicationDescription, LocalNodeError> {
283 let response = self
284 .node
285 .state
286 .describe_application(chain_id, application_id)
287 .await?;
288 Ok(response)
289 }
290
291 #[instrument(level = "trace", skip(self))]
293 pub async fn retry_pending_cross_chain_requests(
294 &self,
295 sender_chain: ChainId,
296 ) -> Result<(), LocalNodeError> {
297 let (_response, actions) = self
298 .node
299 .state
300 .handle_chain_info_query(ChainInfoQuery::new(sender_chain))
301 .await?;
302 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
303 while let Some(request) = requests.pop_front() {
304 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
305 requests.extend(new_actions.cross_chain_requests);
306 }
307 Ok(())
308 }
309
310 pub async fn next_outbox_heights(
316 &self,
317 chain_ids: impl IntoIterator<Item = &ChainId>,
318 chain_worker_limit: usize,
319 receiver_id: ChainId,
320 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
321 let futures = chain_ids
322 .into_iter()
323 .map(|chain_id| async move {
324 let chain = self.chain_state_view(*chain_id).await?;
325 let mut next_height = chain.tip_state.get().next_block_height;
326 if let Some(outbox) = chain.outboxes.try_load_entry(&receiver_id).await? {
327 next_height = next_height.max(*outbox.next_height_to_schedule.get());
328 }
329 Ok::<_, LocalNodeError>((*chain_id, next_height))
330 })
331 .collect::<Vec<_>>();
332 stream::iter(futures)
333 .buffer_unordered(chain_worker_limit)
334 .try_collect()
335 .await
336 }
337
338 pub async fn update_received_certificate_trackers(
339 &self,
340 chain_id: ChainId,
341 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
342 ) -> Result<(), LocalNodeError> {
343 self.node
344 .state
345 .update_received_certificate_trackers(chain_id, new_trackers)
346 .await?;
347 Ok(())
348 }
349}
350
351pub trait LocalChainInfoExt {
354 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
356
357 fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
359
360 fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
362}
363
364impl LocalChainInfoExt for ChainInfo {
365 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
366 self.requested_committees
367 .ok_or(LocalNodeError::InvalidChainInfoResponse)
368 }
369
370 fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
371 self.requested_committees
372 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
373 .remove(&self.epoch)
374 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
375 }
376
377 fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
378 self.requested_committees
379 .as_ref()
380 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
381 .get(&self.epoch)
382 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
383 }
384}