1mod attempted_changes;
7mod temporary_changes;
8
9use std::{
10 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
11 iter,
12 sync::{self, Arc},
13};
14
15use linera_base::{
16 crypto::{CryptoHash, ValidatorPublicKey},
17 data_types::{ApplicationDescription, Blob, BlockHeight, Epoch},
18 ensure,
19 hashed::Hashed,
20 identifiers::{ApplicationId, BlobId, BlobType, ChainId},
21};
22use linera_chain::{
23 data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
24 manager,
25 types::{Block, ConfirmedBlockCertificate, TimeoutCertificate, ValidatedBlockCertificate},
26 ChainError, ChainStateView,
27};
28use linera_execution::{ExecutionStateView, Query, QueryOutcome, ServiceRuntimeEndpoint};
29use linera_storage::{Clock as _, Storage};
30use linera_views::views::ClonableView;
31use tokio::sync::{oneshot, OwnedRwLockReadGuard, RwLock};
32
33#[cfg(test)]
34pub(crate) use self::attempted_changes::CrossChainUpdateHelper;
35use self::{
36 attempted_changes::ChainWorkerStateWithAttemptedChanges,
37 temporary_changes::ChainWorkerStateWithTemporaryChanges,
38};
39use super::{ChainWorkerConfig, DeliveryNotifier};
40use crate::{
41 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
42 value_cache::ValueCache,
43 worker::{NetworkActions, WorkerError},
44};
45
46pub struct ChainWorkerState<StorageClient>
48where
49 StorageClient: Storage + Clone + Send + Sync + 'static,
50{
51 config: ChainWorkerConfig,
52 storage: StorageClient,
53 chain: ChainStateView<StorageClient::Context>,
54 shared_chain_view: Option<Arc<RwLock<ChainStateView<StorageClient::Context>>>>,
55 service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
56 block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
57 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>>,
58 tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
59 delivery_notifier: DeliveryNotifier,
60 knows_chain_is_active: bool,
61}
62
63impl<StorageClient> ChainWorkerState<StorageClient>
64where
65 StorageClient: Storage + Clone + Send + Sync + 'static,
66{
67 #[expect(clippy::too_many_arguments)]
69 pub async fn load(
70 config: ChainWorkerConfig,
71 storage: StorageClient,
72 block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
73
74 execution_state_cache: Arc<
75 ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
76 >,
77 tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
78 delivery_notifier: DeliveryNotifier,
79 chain_id: ChainId,
80 service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
81 ) -> Result<Self, WorkerError> {
82 let chain = storage.load_chain(chain_id).await?;
83
84 Ok(ChainWorkerState {
85 config,
86 storage,
87 chain,
88 shared_chain_view: None,
89 service_runtime_endpoint,
90 block_values,
91 execution_state_cache,
92 tracked_chains,
93 delivery_notifier,
94 knows_chain_is_active: false,
95 })
96 }
97
98 pub fn chain_id(&self) -> ChainId {
100 self.chain.chain_id()
101 }
102
103 pub(super) async fn chain_state_view(
108 &mut self,
109 ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
110 if self.shared_chain_view.is_none() {
111 self.shared_chain_view = Some(Arc::new(RwLock::new(self.chain.clone_unchecked()?)));
112 }
113
114 Ok(self
115 .shared_chain_view
116 .as_ref()
117 .expect("`shared_chain_view` should be initialized above")
118 .clone()
119 .read_owned()
120 .await)
121 }
122
123 #[cfg(with_testing)]
125 pub(super) async fn read_certificate(
126 &mut self,
127 height: BlockHeight,
128 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
129 ChainWorkerStateWithTemporaryChanges::new(self)
130 .await
131 .read_certificate(height)
132 .await
133 }
134
135 #[cfg(with_testing)]
137 pub(super) async fn find_bundle_in_inbox(
138 &mut self,
139 inbox_id: ChainId,
140 certificate_hash: CryptoHash,
141 height: BlockHeight,
142 index: u32,
143 ) -> Result<Option<MessageBundle>, WorkerError> {
144 ChainWorkerStateWithTemporaryChanges::new(self)
145 .await
146 .find_bundle_in_inbox(inbox_id, certificate_hash, height, index)
147 .await
148 }
149
150 pub(super) async fn query_application(
152 &mut self,
153 query: Query,
154 ) -> Result<QueryOutcome, WorkerError> {
155 ChainWorkerStateWithTemporaryChanges::new(self)
156 .await
157 .query_application(query)
158 .await
159 }
160
161 pub(super) async fn describe_application(
163 &mut self,
164 application_id: ApplicationId,
165 ) -> Result<ApplicationDescription, WorkerError> {
166 ChainWorkerStateWithTemporaryChanges::new(self)
167 .await
168 .describe_application(application_id)
169 .await
170 }
171
172 pub(super) async fn stage_block_execution(
174 &mut self,
175 block: ProposedBlock,
176 round: Option<u32>,
177 published_blobs: &[Blob],
178 ) -> Result<(Block, ChainInfoResponse), WorkerError> {
179 let (block, response) = ChainWorkerStateWithTemporaryChanges::new(self)
180 .await
181 .stage_block_execution(block, round, published_blobs)
182 .await?;
183 Ok((block, response))
184 }
185
186 pub(super) async fn process_timeout(
188 &mut self,
189 certificate: TimeoutCertificate,
190 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
191 ChainWorkerStateWithAttemptedChanges::new(self)
192 .await
193 .process_timeout(certificate)
194 .await
195 }
196
197 #[tracing::instrument(level = "debug", skip(self))]
199 pub(super) async fn handle_block_proposal(
200 &mut self,
201 proposal: BlockProposal,
202 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
203 self.ensure_is_active().await?;
204 if ChainWorkerStateWithTemporaryChanges::new(&mut *self)
205 .await
206 .check_proposed_block(&proposal)
207 .await?
208 == manager::Outcome::Skip
209 {
210 let info = ChainInfoResponse::new(&self.chain, self.config.key_pair());
212 return Ok((info, NetworkActions::default()));
213 };
214 let published_blobs = ChainWorkerStateWithAttemptedChanges::new(&mut *self)
215 .await
216 .load_proposal_blobs(&proposal)
217 .await?;
218 let validation_outcome = ChainWorkerStateWithTemporaryChanges::new(self)
219 .await
220 .validate_proposal_content(&proposal.content, &published_blobs)
221 .await?;
222
223 let actions = if let Some((outcome, local_time)) = validation_outcome {
224 ChainWorkerStateWithAttemptedChanges::new(&mut *self)
225 .await
226 .vote_for_block_proposal(proposal, outcome, local_time)
227 .await?;
228 self.create_network_actions().await?
230 } else {
231 NetworkActions::default()
233 };
234
235 let info = ChainInfoResponse::new(&self.chain, self.config.key_pair());
236 Ok((info, actions))
237 }
238
239 #[tracing::instrument(level = "debug", skip(self))]
241 pub(super) async fn process_validated_block(
242 &mut self,
243 certificate: ValidatedBlockCertificate,
244 ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
245 ChainWorkerStateWithAttemptedChanges::new(self)
246 .await
247 .process_validated_block(certificate)
248 .await
249 }
250
251 #[tracing::instrument(level = "debug", skip(self))]
253 pub(super) async fn process_confirmed_block(
254 &mut self,
255 certificate: ConfirmedBlockCertificate,
256 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
257 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
258 ChainWorkerStateWithAttemptedChanges::new(self)
259 .await
260 .process_confirmed_block(certificate, notify_when_messages_are_delivered)
261 .await
262 }
263
264 #[tracing::instrument(level = "debug", skip(self))]
266 pub(super) async fn preprocess_certificate(
267 &mut self,
268 certificate: ConfirmedBlockCertificate,
269 ) -> Result<NetworkActions, WorkerError> {
270 ChainWorkerStateWithAttemptedChanges::new(self)
271 .await
272 .preprocess_certificate(certificate)
273 .await
274 }
275
276 #[tracing::instrument(level = "debug", skip(self))]
278 pub(super) async fn process_cross_chain_update(
279 &mut self,
280 origin: ChainId,
281 bundles: Vec<(Epoch, MessageBundle)>,
282 ) -> Result<Option<BlockHeight>, WorkerError> {
283 ChainWorkerStateWithAttemptedChanges::new(self)
284 .await
285 .process_cross_chain_update(origin, bundles)
286 .await
287 }
288
289 pub(super) async fn confirm_updated_recipient(
291 &mut self,
292 recipient: ChainId,
293 latest_height: BlockHeight,
294 ) -> Result<(), WorkerError> {
295 ChainWorkerStateWithAttemptedChanges::new(self)
296 .await
297 .confirm_updated_recipient(recipient, latest_height)
298 .await
299 }
300
301 #[tracing::instrument(level = "debug", skip(self))]
303 pub(super) async fn handle_chain_info_query(
304 &mut self,
305 query: ChainInfoQuery,
306 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
307 if query.request_leader_timeout {
308 ChainWorkerStateWithAttemptedChanges::new(&mut *self)
309 .await
310 .vote_for_leader_timeout()
311 .await?;
312 }
313 if query.request_fallback {
314 ChainWorkerStateWithAttemptedChanges::new(&mut *self)
315 .await
316 .vote_for_fallback()
317 .await?;
318 }
319 let response = ChainWorkerStateWithTemporaryChanges::new(self)
320 .await
321 .prepare_chain_info_response(query)
322 .await?;
323 let actions = self.create_network_actions().await?;
325 Ok((response, actions))
326 }
327
328 pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result<Blob, WorkerError> {
330 if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? {
331 return Ok(blob);
332 }
333 let blob = self.storage.read_blob(blob_id).await?;
334 blob.ok_or(WorkerError::BlobsNotFound(vec![blob_id]))
335 }
336
337 #[tracing::instrument(level = "debug", skip(self))]
339 pub(super) async fn handle_pending_blob(
340 &mut self,
341 blob: Blob,
342 ) -> Result<ChainInfoResponse, WorkerError> {
343 ChainWorkerStateWithAttemptedChanges::new(&mut *self)
344 .await
345 .handle_pending_blob(blob)
346 .await
347 }
348
349 async fn ensure_is_active(&mut self) -> Result<(), WorkerError> {
351 if !self.knows_chain_is_active {
352 let local_time = self.storage.clock().current_time();
353 self.chain.ensure_is_active(local_time).await?;
354 self.knows_chain_is_active = true;
355 }
356 Ok(())
357 }
358
359 async fn get_required_blobs(
362 &self,
363 required_blob_ids: impl IntoIterator<Item = BlobId>,
364 created_blobs: &BTreeMap<BlobId, Blob>,
365 ) -> Result<BTreeMap<BlobId, Blob>, WorkerError> {
366 let maybe_blobs = self
367 .maybe_get_required_blobs(required_blob_ids, Some(created_blobs))
368 .await?;
369 let not_found_blob_ids = missing_blob_ids(&maybe_blobs);
370 ensure!(
371 not_found_blob_ids.is_empty(),
372 WorkerError::BlobsNotFound(not_found_blob_ids)
373 );
374 Ok(maybe_blobs
375 .into_iter()
376 .filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
377 .collect())
378 }
379
380 async fn maybe_get_required_blobs(
382 &self,
383 blob_ids: impl IntoIterator<Item = BlobId>,
384 created_blobs: Option<&BTreeMap<BlobId, Blob>>,
385 ) -> Result<BTreeMap<BlobId, Option<Blob>>, WorkerError> {
386 let mut maybe_blobs = BTreeMap::from_iter(blob_ids.into_iter().zip(iter::repeat(None)));
387
388 for (blob_id, maybe_blob) in &mut maybe_blobs {
389 if let Some(blob) = created_blobs.and_then(|blob_map| blob_map.get(blob_id)) {
390 *maybe_blob = Some(blob.clone());
391 } else if let Some(blob) = self.chain.manager.pending_blob(blob_id).await? {
392 *maybe_blob = Some(blob);
393 } else if let Some(blob) = self.chain.pending_validated_blobs.get(blob_id).await? {
394 *maybe_blob = Some(blob);
395 } else {
396 for (_, pending_blobs) in self
397 .chain
398 .pending_proposed_blobs
399 .try_load_all_entries()
400 .await?
401 {
402 if let Some(blob) = pending_blobs.get(blob_id).await? {
403 *maybe_blob = Some(blob);
404 break;
405 }
406 }
407 }
408 }
409 let missing_blob_ids = missing_blob_ids(&maybe_blobs);
410 let blobs_from_storage = self.storage.read_blobs(&missing_blob_ids).await?;
411 for (blob_id, maybe_blob) in missing_blob_ids.into_iter().zip(blobs_from_storage) {
412 maybe_blobs.insert(blob_id, maybe_blob);
413 }
414 Ok(maybe_blobs)
415 }
416
417 fn track_newly_created_chains(
423 &self,
424 proposed_block: &ProposedBlock,
425 outcome: &BlockExecutionOutcome,
426 ) {
427 if let Some(tracked_chains) = self.tracked_chains.as_ref() {
428 if !tracked_chains
429 .read()
430 .expect("Panics should not happen while holding a lock to `tracked_chains`")
431 .contains(&proposed_block.chain_id)
432 {
433 return; }
435 let new_chain_ids = outcome
436 .created_blobs_ids()
437 .into_iter()
438 .filter(|blob_id| blob_id.blob_type == BlobType::ChainDescription)
439 .map(|blob_id| ChainId(blob_id.hash));
440
441 tracked_chains
442 .write()
443 .expect("Panics should not happen while holding a lock to `tracked_chains`")
444 .extend(new_chain_ids);
445 }
446 }
447
448 async fn create_network_actions(&self) -> Result<NetworkActions, WorkerError> {
450 let mut heights_by_recipient = BTreeMap::<_, Vec<_>>::new();
451 let mut targets = self.chain.outboxes.indices().await?;
452 if let Some(tracked_chains) = self.tracked_chains.as_ref() {
453 let tracked_chains = tracked_chains
454 .read()
455 .expect("Panics should not happen while holding a lock to `tracked_chains`");
456 targets.retain(|target| tracked_chains.contains(target));
457 }
458 let outboxes = self.chain.outboxes.try_load_entries(&targets).await?;
459 for (target, outbox) in targets.into_iter().zip(outboxes) {
460 let outbox = outbox.expect("Only existing outboxes should be referenced by `indices`");
461 let heights = outbox.queue.elements().await?;
462 heights_by_recipient.insert(target, heights);
463 }
464 self.create_cross_chain_requests(heights_by_recipient).await
465 }
466
467 async fn create_cross_chain_requests(
468 &self,
469 heights_by_recipient: BTreeMap<ChainId, Vec<BlockHeight>>,
470 ) -> Result<NetworkActions, WorkerError> {
471 let heights = BTreeSet::from_iter(heights_by_recipient.values().flatten().copied());
473 let next_block_height = self.chain.tip_state.get().next_block_height;
474 let log_heights = heights
475 .range(..next_block_height)
476 .copied()
477 .map(usize::try_from)
478 .collect::<Result<Vec<_>, _>>()?;
479 let mut hashes = self
480 .chain
481 .confirmed_log
482 .multi_get(log_heights)
483 .await?
484 .into_iter()
485 .zip(&heights)
486 .map(|(maybe_hash, height)| {
487 maybe_hash.ok_or_else(|| WorkerError::ConfirmedLogEntryNotFound {
488 height: *height,
489 chain_id: self.chain_id(),
490 })
491 })
492 .collect::<Result<Vec<_>, _>>()?;
493 for height in heights.range(next_block_height..) {
494 hashes.push(
495 self.chain
496 .preprocessed_blocks
497 .get(height)
498 .await?
499 .ok_or_else(|| WorkerError::PreprocessedBlocksEntryNotFound {
500 height: *height,
501 chain_id: self.chain_id(),
502 })?,
503 );
504 }
505 let certificates = self.storage.read_certificates(hashes).await?;
506 let certificates = heights
507 .into_iter()
508 .zip(certificates)
509 .collect::<HashMap<_, _>>();
510 let mut actions = NetworkActions::default();
512 for (recipient, heights) in heights_by_recipient {
513 let mut bundles = Vec::new();
514 for height in heights {
515 let cert = certificates
516 .get(&height)
517 .ok_or_else(|| ChainError::InternalError("missing certificates".to_string()))?;
518 bundles.extend(cert.message_bundles_for(recipient));
519 }
520 let request = CrossChainRequest::UpdateRecipient {
521 sender: self.chain.chain_id(),
522 recipient,
523 bundles,
524 };
525 actions.cross_chain_requests.push(request);
526 }
527 Ok(actions)
528 }
529
530 pub async fn all_messages_to_tracked_chains_delivered_up_to(
533 &self,
534 height: BlockHeight,
535 ) -> Result<bool, WorkerError> {
536 if self.chain.all_messages_delivered_up_to(height) {
537 return Ok(true);
538 }
539 let Some(tracked_chains) = self.tracked_chains.as_ref() else {
540 return Ok(false);
541 };
542 let mut targets = self.chain.outboxes.indices().await?;
543 {
544 let tracked_chains = tracked_chains.read().unwrap();
545 targets.retain(|target| tracked_chains.contains(target));
546 }
547 let outboxes = self.chain.outboxes.try_load_entries(&targets).await?;
548 for outbox in outboxes {
549 let outbox = outbox.expect("Only existing outboxes should be referenced by `indices`");
550 let front = outbox.queue.front();
551 if front.is_some_and(|key| *key <= height) {
552 return Ok(false);
553 }
554 }
555 Ok(true)
556 }
557
558 pub async fn update_received_certificate_trackers(
560 &mut self,
561 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
562 ) -> Result<(), WorkerError> {
563 ChainWorkerStateWithAttemptedChanges::new(self)
564 .await
565 .update_received_certificate_trackers(new_trackers)
566 .await
567 }
568}
569
570fn missing_blob_ids(maybe_blobs: &BTreeMap<BlobId, Option<Blob>>) -> Vec<BlobId> {
572 maybe_blobs
573 .iter()
574 .filter(|(_, maybe_blob)| maybe_blob.is_none())
575 .map(|(blob_id, _)| *blob_id)
576 .collect()
577}
578
579fn check_block_epoch(
581 chain_epoch: Epoch,
582 block_chain: ChainId,
583 block_epoch: Epoch,
584) -> Result<(), WorkerError> {
585 ensure!(
586 block_epoch == chain_epoch,
587 WorkerError::InvalidEpoch {
588 chain_id: block_chain,
589 epoch: block_epoch,
590 chain_epoch
591 }
592 );
593 Ok(())
594}