1use std::{
6 collections::{BTreeMap, VecDeque},
7 sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12 crypto::ValidatorPublicKey,
13 data_types::{ArithmeticError, Blob, BlockHeight, Epoch},
14 identifiers::{BlobId, ChainId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, ProposedBlock},
18 types::{Block, GenericCertificate},
19 ChainStateView,
20};
21use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tokio::sync::OwnedRwLockReadGuard;
26use tracing::{instrument, warn};
27
28use crate::{
29 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30 notifier::Notifier,
31 worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34pub struct LocalNode<S>
36where
37 S: Storage,
38{
39 state: WorkerState<S>,
40}
41
42#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46 S: Storage,
47{
48 node: Arc<LocalNode<S>>,
49}
50
51#[derive(Debug, Error)]
53pub enum LocalNodeError {
54 #[error(transparent)]
55 ArithmeticError(#[from] ArithmeticError),
56
57 #[error(transparent)]
58 ViewError(#[from] ViewError),
59
60 #[error("Worker operation failed: {0}")]
61 WorkerError(WorkerError),
62
63 #[error("The local node doesn't have an active chain {0}")]
64 InactiveChain(ChainId),
65
66 #[error("The chain info response received from the local node is invalid")]
67 InvalidChainInfoResponse,
68
69 #[error("Blobs not found: {0:?}")]
70 BlobsNotFound(Vec<BlobId>),
71}
72
73impl From<WorkerError> for LocalNodeError {
74 fn from(error: WorkerError) -> Self {
75 match error {
76 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
77 error => LocalNodeError::WorkerError(error),
78 }
79 }
80}
81
82impl<S> LocalNodeClient<S>
83where
84 S: Storage + Clone + Send + Sync + 'static,
85{
86 #[instrument(level = "trace", skip_all)]
87 pub async fn handle_block_proposal(
88 &self,
89 proposal: BlockProposal,
90 ) -> Result<ChainInfoResponse, LocalNodeError> {
91 let (response, _actions) = self.node.state.handle_block_proposal(proposal).await?;
93 Ok(response)
94 }
95
96 #[instrument(level = "trace", skip_all)]
97 pub async fn handle_certificate<T>(
98 &self,
99 certificate: GenericCertificate<T>,
100 notifier: &impl Notifier,
101 ) -> Result<ChainInfoResponse, LocalNodeError>
102 where
103 T: ProcessableCertificate,
104 {
105 Ok(Box::pin(
106 self.node
107 .state
108 .fully_handle_certificate_with_notifications(certificate, notifier),
109 )
110 .await?)
111 }
112
113 #[instrument(level = "trace", skip_all)]
114 pub async fn handle_chain_info_query(
115 &self,
116 query: ChainInfoQuery,
117 ) -> Result<ChainInfoResponse, LocalNodeError> {
118 let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
120 Ok(response)
121 }
122
123 #[instrument(level = "trace", skip_all)]
124 pub fn new(state: WorkerState<S>) -> Self {
125 Self {
126 node: Arc::new(LocalNode { state }),
127 }
128 }
129
130 #[instrument(level = "trace", skip_all)]
131 pub(crate) fn storage_client(&self) -> S {
132 self.node.state.storage_client().clone()
133 }
134
135 #[instrument(level = "trace", skip_all)]
136 pub async fn stage_block_execution(
137 &self,
138 block: ProposedBlock,
139 round: Option<u32>,
140 published_blobs: Vec<Blob>,
141 ) -> Result<(Block, ChainInfoResponse), LocalNodeError> {
142 Ok(self
143 .node
144 .state
145 .stage_block_execution(block, round, published_blobs)
146 .await?)
147 }
148
149 pub async fn read_blobs_from_storage(
151 &self,
152 blob_ids: &[BlobId],
153 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
154 let storage = self.storage_client();
155 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
156 }
157
158 pub async fn read_blob_states_from_storage(
160 &self,
161 blob_ids: &[BlobId],
162 ) -> Result<Vec<BlobState>, LocalNodeError> {
163 let storage = self.storage_client();
164 let mut blobs_not_found = Vec::new();
165 let mut blob_states = Vec::new();
166 for (blob_state, blob_id) in storage
167 .read_blob_states(blob_ids)
168 .await?
169 .into_iter()
170 .zip(blob_ids)
171 {
172 match blob_state {
173 None => blobs_not_found.push(*blob_id),
174 Some(blob_state) => blob_states.push(blob_state),
175 }
176 }
177 if !blobs_not_found.is_empty() {
178 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
179 }
180 Ok(blob_states)
181 }
182
183 pub async fn get_locking_blobs(
186 &self,
187 blob_ids: impl IntoIterator<Item = &BlobId>,
188 chain_id: ChainId,
189 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
190 let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
191 Ok(self
192 .node
193 .state
194 .get_locking_blobs(chain_id, blob_ids_vec)
195 .await?)
196 }
197
198 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
200 let storage = self.storage_client();
201 storage.maybe_write_blobs(blobs).await?;
202 Ok(())
203 }
204
205 pub async fn handle_pending_blobs(
206 &self,
207 chain_id: ChainId,
208 blobs: Vec<Blob>,
209 ) -> Result<(), LocalNodeError> {
210 for blob in blobs {
211 self.node.state.handle_pending_blob(chain_id, blob).await?;
212 }
213 Ok(())
214 }
215
216 #[instrument(level = "trace", skip(self))]
222 pub async fn chain_state_view(
223 &self,
224 chain_id: ChainId,
225 ) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
226 Ok(self.node.state.chain_state_view(chain_id).await?)
227 }
228
229 #[instrument(level = "trace", skip(self))]
230 pub(crate) async fn chain_info(
231 &self,
232 chain_id: ChainId,
233 ) -> Result<Box<ChainInfo>, LocalNodeError> {
234 let query = ChainInfoQuery::new(chain_id);
235 Ok(self.handle_chain_info_query(query).await?.info)
236 }
237
238 #[instrument(level = "trace", skip(self, query))]
239 pub async fn query_application(
240 &self,
241 chain_id: ChainId,
242 query: Query,
243 ) -> Result<QueryOutcome, LocalNodeError> {
244 let outcome = self.node.state.query_application(chain_id, query).await?;
245 Ok(outcome)
246 }
247
248 #[instrument(level = "trace", skip(self))]
250 pub async fn retry_pending_cross_chain_requests(
251 &self,
252 sender_chain: ChainId,
253 ) -> Result<(), LocalNodeError> {
254 let (_response, actions) = self
255 .node
256 .state
257 .handle_chain_info_query(ChainInfoQuery::new(sender_chain).with_network_actions())
258 .await?;
259 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
260 while let Some(request) = requests.pop_front() {
261 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
262 requests.extend(new_actions.cross_chain_requests);
263 }
264 Ok(())
265 }
266
267 pub async fn next_outbox_heights(
271 &self,
272 chain_ids: impl IntoIterator<Item = &ChainId>,
273 receiver_id: ChainId,
274 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
275 let futures =
276 FuturesUnordered::from_iter(chain_ids.into_iter().map(|chain_id| async move {
277 let chain = match self.chain_state_view(*chain_id).await {
278 Ok(chain) => chain,
279 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
280 return Ok((*chain_id, BlockHeight::ZERO))
281 }
282 Err(err) => Err(err)?,
283 };
284 let mut next_height = chain.tip_state.get().next_block_height;
285 if let Some(outbox) = chain.outboxes.try_load_entry(&receiver_id).await? {
286 next_height = next_height.max(*outbox.next_height_to_schedule.get());
287 }
288 Ok::<_, LocalNodeError>((*chain_id, next_height))
289 }));
290 futures.try_collect().await
291 }
292
293 pub async fn update_received_certificate_trackers(
294 &self,
295 chain_id: ChainId,
296 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
297 ) -> Result<(), LocalNodeError> {
298 self.node
299 .state
300 .update_received_certificate_trackers(chain_id, new_trackers)
301 .await?;
302 Ok(())
303 }
304
305 pub async fn get_preprocessed_block_hashes(
306 &self,
307 chain_id: ChainId,
308 start: BlockHeight,
309 end: BlockHeight,
310 ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
311 Ok(self
312 .node
313 .state
314 .get_preprocessed_block_hashes(chain_id, start, end)
315 .await?)
316 }
317
318 pub async fn get_inbox_next_height(
319 &self,
320 chain_id: ChainId,
321 origin: ChainId,
322 ) -> Result<BlockHeight, LocalNodeError> {
323 Ok(self
324 .node
325 .state
326 .get_inbox_next_height(chain_id, origin)
327 .await?)
328 }
329}
330
331pub trait LocalChainInfoExt {
334 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
336
337 fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
339
340 fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
342}
343
344impl LocalChainInfoExt for ChainInfo {
345 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
346 self.requested_committees
347 .ok_or(LocalNodeError::InvalidChainInfoResponse)
348 }
349
350 fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
351 self.requested_committees
352 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
353 .remove(&self.epoch)
354 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
355 }
356
357 fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
358 self.requested_committees
359 .as_ref()
360 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
361 .get(&self.epoch)
362 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
363 }
364}