1mod attempted_changes;
7mod temporary_changes;
8
9use std::{
10 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
11 iter,
12 sync::{self, Arc},
13};
14
15use linera_base::{
16 crypto::{CryptoHash, ValidatorPublicKey},
17 data_types::{ApplicationDescription, Blob, BlockHeight, Epoch},
18 ensure,
19 hashed::Hashed,
20 identifiers::{ApplicationId, BlobId, BlobType, ChainId},
21};
22use linera_chain::{
23 data_types::{BlockExecutionOutcome, BlockProposal, MessageBundle, ProposedBlock},
24 manager,
25 types::{Block, ConfirmedBlockCertificate, TimeoutCertificate, ValidatedBlockCertificate},
26 ChainError, ChainStateView,
27};
28use linera_execution::{ExecutionStateView, Query, QueryOutcome, ServiceRuntimeEndpoint};
29use linera_storage::{Clock as _, ResultReadCertificates, Storage};
30use linera_views::views::ClonableView;
31use tokio::sync::{oneshot, OwnedRwLockReadGuard, RwLock, RwLockWriteGuard};
32use tracing::{instrument, warn};
33
34#[cfg(test)]
35pub(crate) use self::attempted_changes::CrossChainUpdateHelper;
36use self::{
37 attempted_changes::ChainWorkerStateWithAttemptedChanges,
38 temporary_changes::ChainWorkerStateWithTemporaryChanges,
39};
40use super::{ChainWorkerConfig, ChainWorkerRequest, DeliveryNotifier};
41use crate::{
42 data_types::{ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
43 value_cache::ValueCache,
44 worker::{NetworkActions, WorkerError},
45};
46
47pub struct ChainWorkerState<StorageClient>
49where
50 StorageClient: Storage + Clone + Send + Sync + 'static,
51{
52 config: ChainWorkerConfig,
53 storage: StorageClient,
54 chain: ChainStateView<StorageClient::Context>,
55 shared_chain_view: Option<Arc<RwLock<ChainStateView<StorageClient::Context>>>>,
56 service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
57 block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
58 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>>,
59 tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
60 delivery_notifier: DeliveryNotifier,
61 knows_chain_is_active: bool,
62}
63
64impl<StorageClient> ChainWorkerState<StorageClient>
65where
66 StorageClient: Storage + Clone + Send + Sync + 'static,
67{
68 #[expect(clippy::too_many_arguments)]
70 pub async fn load(
71 config: ChainWorkerConfig,
72 storage: StorageClient,
73 block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
74 execution_state_cache: Arc<
75 ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
76 >,
77 tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
78 delivery_notifier: DeliveryNotifier,
79 chain_id: ChainId,
80 service_runtime_endpoint: Option<ServiceRuntimeEndpoint>,
81 ) -> Result<Self, WorkerError> {
82 let chain = storage.load_chain(chain_id).await?;
83
84 Ok(ChainWorkerState {
85 config,
86 storage,
87 chain,
88 shared_chain_view: None,
89 service_runtime_endpoint,
90 block_values,
91 execution_state_cache,
92 tracked_chains,
93 delivery_notifier,
94 knows_chain_is_active: false,
95 })
96 }
97
98 pub fn chain_id(&self) -> ChainId {
100 self.chain.chain_id()
101 }
102
103 #[instrument(skip_all)]
105 pub async fn handle_request(&mut self, request: ChainWorkerRequest<StorageClient::Context>) {
106 tracing::trace!("Handling chain worker request: {request:?}");
107 let responded = match request {
109 #[cfg(with_testing)]
110 ChainWorkerRequest::ReadCertificate { height, callback } => {
111 callback.send(self.read_certificate(height).await).is_ok()
112 }
113 #[cfg(with_testing)]
114 ChainWorkerRequest::FindBundleInInbox {
115 inbox_id,
116 certificate_hash,
117 height,
118 index,
119 callback,
120 } => callback
121 .send(
122 self.find_bundle_in_inbox(inbox_id, certificate_hash, height, index)
123 .await,
124 )
125 .is_ok(),
126 ChainWorkerRequest::GetChainStateView { callback } => {
127 callback.send(self.chain_state_view().await).is_ok()
128 }
129 ChainWorkerRequest::QueryApplication { query, callback } => {
130 callback.send(self.query_application(query).await).is_ok()
131 }
132 ChainWorkerRequest::DescribeApplication {
133 application_id,
134 callback,
135 } => callback
136 .send(self.describe_application(application_id).await)
137 .is_ok(),
138 ChainWorkerRequest::StageBlockExecution {
139 block,
140 round,
141 published_blobs,
142 callback,
143 } => callback
144 .send(
145 self.stage_block_execution(block, round, &published_blobs)
146 .await,
147 )
148 .is_ok(),
149 ChainWorkerRequest::ProcessTimeout {
150 certificate,
151 callback,
152 } => callback
153 .send(self.process_timeout(certificate).await)
154 .is_ok(),
155 ChainWorkerRequest::HandleBlockProposal { proposal, callback } => callback
156 .send(self.handle_block_proposal(proposal).await)
157 .is_ok(),
158 ChainWorkerRequest::ProcessValidatedBlock {
159 certificate,
160 callback,
161 } => callback
162 .send(self.process_validated_block(certificate).await)
163 .is_ok(),
164 ChainWorkerRequest::ProcessConfirmedBlock {
165 certificate,
166 notify_when_messages_are_delivered,
167 callback,
168 } => callback
169 .send(
170 self.process_confirmed_block(certificate, notify_when_messages_are_delivered)
171 .await,
172 )
173 .is_ok(),
174 ChainWorkerRequest::ProcessCrossChainUpdate {
175 origin,
176 bundles,
177 callback,
178 } => callback
179 .send(self.process_cross_chain_update(origin, bundles).await)
180 .is_ok(),
181 ChainWorkerRequest::ConfirmUpdatedRecipient {
182 recipient,
183 latest_height,
184 callback,
185 } => callback
186 .send(
187 self.confirm_updated_recipient(recipient, latest_height)
188 .await,
189 )
190 .is_ok(),
191 ChainWorkerRequest::HandleChainInfoQuery { query, callback } => callback
192 .send(self.handle_chain_info_query(query).await)
193 .is_ok(),
194 ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } => callback
195 .send(self.download_pending_blob(blob_id).await)
196 .is_ok(),
197 ChainWorkerRequest::HandlePendingBlob { blob, callback } => {
198 callback.send(self.handle_pending_blob(blob).await).is_ok()
199 }
200 ChainWorkerRequest::UpdateReceivedCertificateTrackers {
201 new_trackers,
202 callback,
203 } => callback
204 .send(
205 self.update_received_certificate_trackers(new_trackers)
206 .await,
207 )
208 .is_ok(),
209 };
210
211 if !responded {
212 warn!("Callback for `ChainWorkerActor` was dropped before a response was sent");
213 }
214 }
215
216 pub(super) async fn chain_state_view(
221 &mut self,
222 ) -> Result<OwnedRwLockReadGuard<ChainStateView<StorageClient::Context>>, WorkerError> {
223 if self.shared_chain_view.is_none() {
224 self.shared_chain_view = Some(Arc::new(RwLock::new(self.chain.clone_unchecked()?)));
225 }
226
227 Ok(self
228 .shared_chain_view
229 .as_ref()
230 .expect("`shared_chain_view` should be initialized above")
231 .clone()
232 .read_owned()
233 .await)
234 }
235
236 #[cfg(with_testing)]
238 pub(super) async fn read_certificate(
239 &mut self,
240 height: BlockHeight,
241 ) -> Result<Option<ConfirmedBlockCertificate>, WorkerError> {
242 ChainWorkerStateWithTemporaryChanges::new(self)
243 .await
244 .read_certificate(height)
245 .await
246 }
247
248 #[cfg(with_testing)]
250 pub(super) async fn find_bundle_in_inbox(
251 &mut self,
252 inbox_id: ChainId,
253 certificate_hash: CryptoHash,
254 height: BlockHeight,
255 index: u32,
256 ) -> Result<Option<MessageBundle>, WorkerError> {
257 ChainWorkerStateWithTemporaryChanges::new(self)
258 .await
259 .find_bundle_in_inbox(inbox_id, certificate_hash, height, index)
260 .await
261 }
262
263 pub(super) async fn query_application(
265 &mut self,
266 query: Query,
267 ) -> Result<QueryOutcome, WorkerError> {
268 ChainWorkerStateWithTemporaryChanges::new(self)
269 .await
270 .query_application(query)
271 .await
272 }
273
274 pub(super) async fn describe_application(
276 &mut self,
277 application_id: ApplicationId,
278 ) -> Result<ApplicationDescription, WorkerError> {
279 ChainWorkerStateWithTemporaryChanges::new(self)
280 .await
281 .describe_application(application_id)
282 .await
283 }
284
285 pub(super) async fn stage_block_execution(
287 &mut self,
288 block: ProposedBlock,
289 round: Option<u32>,
290 published_blobs: &[Blob],
291 ) -> Result<(Block, ChainInfoResponse), WorkerError> {
292 let (block, response) = ChainWorkerStateWithTemporaryChanges::new(self)
293 .await
294 .stage_block_execution(block, round, published_blobs)
295 .await?;
296 Ok((block, response))
297 }
298
299 pub(super) async fn process_timeout(
301 &mut self,
302 certificate: TimeoutCertificate,
303 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
304 ChainWorkerStateWithAttemptedChanges::new(self)
305 .await
306 .process_timeout(certificate)
307 .await
308 }
309
310 #[tracing::instrument(level = "debug", skip(self))]
312 pub(super) async fn handle_block_proposal(
313 &mut self,
314 proposal: BlockProposal,
315 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
316 self.ensure_is_active().await?;
317 if ChainWorkerStateWithTemporaryChanges::new(&mut *self)
318 .await
319 .check_proposed_block(&proposal)
320 .await?
321 == manager::Outcome::Skip
322 {
323 let info = ChainInfoResponse::new(&self.chain, self.config.key_pair());
325 return Ok((info, NetworkActions::default()));
326 };
327 let published_blobs = ChainWorkerStateWithAttemptedChanges::new(&mut *self)
328 .await
329 .load_proposal_blobs(&proposal)
330 .await?;
331 let validation_outcome = ChainWorkerStateWithTemporaryChanges::new(self)
332 .await
333 .validate_proposal_content(&proposal.content, &published_blobs)
334 .await?;
335
336 let actions = if let Some((outcome, local_time)) = validation_outcome {
337 ChainWorkerStateWithAttemptedChanges::new(&mut *self)
338 .await
339 .vote_for_block_proposal(proposal, outcome, local_time)
340 .await?;
341 self.create_network_actions().await?
343 } else {
344 NetworkActions::default()
346 };
347
348 let info = ChainInfoResponse::new(&self.chain, self.config.key_pair());
349 Ok((info, actions))
350 }
351
352 pub(super) async fn clear_shared_chain_view(&mut self) {
360 if let Some(shared_chain_view) = self.shared_chain_view.take() {
361 let _: RwLockWriteGuard<_> = shared_chain_view.write().await;
362 }
363 }
364
365 #[tracing::instrument(level = "debug", skip(self))]
367 pub(super) async fn process_validated_block(
368 &mut self,
369 certificate: ValidatedBlockCertificate,
370 ) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
371 ChainWorkerStateWithAttemptedChanges::new(self)
372 .await
373 .process_validated_block(certificate)
374 .await
375 }
376
377 #[tracing::instrument(level = "debug", skip(self))]
379 pub(super) async fn process_confirmed_block(
380 &mut self,
381 certificate: ConfirmedBlockCertificate,
382 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
383 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
384 ChainWorkerStateWithAttemptedChanges::new(self)
385 .await
386 .process_confirmed_block(certificate, notify_when_messages_are_delivered)
387 .await
388 }
389
390 #[tracing::instrument(level = "debug", skip(self))]
392 pub(super) async fn process_cross_chain_update(
393 &mut self,
394 origin: ChainId,
395 bundles: Vec<(Epoch, MessageBundle)>,
396 ) -> Result<Option<BlockHeight>, WorkerError> {
397 ChainWorkerStateWithAttemptedChanges::new(self)
398 .await
399 .process_cross_chain_update(origin, bundles)
400 .await
401 }
402
403 pub(super) async fn confirm_updated_recipient(
405 &mut self,
406 recipient: ChainId,
407 latest_height: BlockHeight,
408 ) -> Result<(), WorkerError> {
409 ChainWorkerStateWithAttemptedChanges::new(self)
410 .await
411 .confirm_updated_recipient(recipient, latest_height)
412 .await
413 }
414
415 #[tracing::instrument(level = "debug", skip(self))]
417 pub(super) async fn handle_chain_info_query(
418 &mut self,
419 query: ChainInfoQuery,
420 ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
421 if query.request_leader_timeout {
422 ChainWorkerStateWithAttemptedChanges::new(&mut *self)
423 .await
424 .vote_for_leader_timeout()
425 .await?;
426 }
427 if query.request_fallback {
428 ChainWorkerStateWithAttemptedChanges::new(&mut *self)
429 .await
430 .vote_for_fallback()
431 .await?;
432 }
433 let response = ChainWorkerStateWithTemporaryChanges::new(self)
434 .await
435 .prepare_chain_info_response(query)
436 .await?;
437 let actions = self.create_network_actions().await?;
439 Ok((response, actions))
440 }
441
442 pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result<Blob, WorkerError> {
444 if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? {
445 return Ok(blob);
446 }
447 let blob = self.storage.read_blob(blob_id).await?;
448 blob.ok_or(WorkerError::BlobsNotFound(vec![blob_id]))
449 }
450
451 #[tracing::instrument(level = "debug", skip(self))]
453 pub(super) async fn handle_pending_blob(
454 &mut self,
455 blob: Blob,
456 ) -> Result<ChainInfoResponse, WorkerError> {
457 ChainWorkerStateWithAttemptedChanges::new(&mut *self)
458 .await
459 .handle_pending_blob(blob)
460 .await
461 }
462
463 async fn ensure_is_active(&mut self) -> Result<(), WorkerError> {
465 if !self.knows_chain_is_active {
466 let local_time = self.storage.clock().current_time();
467 self.chain.ensure_is_active(local_time).await?;
468 self.knows_chain_is_active = true;
469 }
470 Ok(())
471 }
472
473 async fn get_required_blobs(
476 &self,
477 required_blob_ids: impl IntoIterator<Item = BlobId>,
478 created_blobs: &BTreeMap<BlobId, Blob>,
479 ) -> Result<BTreeMap<BlobId, Blob>, WorkerError> {
480 let maybe_blobs = self
481 .maybe_get_required_blobs(required_blob_ids, Some(created_blobs))
482 .await?;
483 let not_found_blob_ids = missing_blob_ids(&maybe_blobs);
484 ensure!(
485 not_found_blob_ids.is_empty(),
486 WorkerError::BlobsNotFound(not_found_blob_ids)
487 );
488 Ok(maybe_blobs
489 .into_iter()
490 .filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
491 .collect())
492 }
493
494 async fn maybe_get_required_blobs(
496 &self,
497 blob_ids: impl IntoIterator<Item = BlobId>,
498 created_blobs: Option<&BTreeMap<BlobId, Blob>>,
499 ) -> Result<BTreeMap<BlobId, Option<Blob>>, WorkerError> {
500 let mut maybe_blobs = BTreeMap::from_iter(blob_ids.into_iter().zip(iter::repeat(None)));
501
502 for (blob_id, maybe_blob) in &mut maybe_blobs {
503 if let Some(blob) = created_blobs.and_then(|blob_map| blob_map.get(blob_id)) {
504 *maybe_blob = Some(blob.clone());
505 } else if let Some(blob) = self.chain.manager.pending_blob(blob_id).await? {
506 *maybe_blob = Some(blob);
507 } else if let Some(blob) = self.chain.pending_validated_blobs.get(blob_id).await? {
508 *maybe_blob = Some(blob);
509 } else {
510 for (_, pending_blobs) in self
511 .chain
512 .pending_proposed_blobs
513 .try_load_all_entries()
514 .await?
515 {
516 if let Some(blob) = pending_blobs.get(blob_id).await? {
517 *maybe_blob = Some(blob);
518 break;
519 }
520 }
521 }
522 }
523 let missing_blob_ids = missing_blob_ids(&maybe_blobs);
524 let blobs_from_storage = self.storage.read_blobs(&missing_blob_ids).await?;
525 for (blob_id, maybe_blob) in missing_blob_ids.into_iter().zip(blobs_from_storage) {
526 maybe_blobs.insert(blob_id, maybe_blob);
527 }
528 Ok(maybe_blobs)
529 }
530
531 fn track_newly_created_chains(
537 &self,
538 proposed_block: &ProposedBlock,
539 outcome: &BlockExecutionOutcome,
540 ) {
541 if let Some(tracked_chains) = self.tracked_chains.as_ref() {
542 if !tracked_chains
543 .read()
544 .expect("Panics should not happen while holding a lock to `tracked_chains`")
545 .contains(&proposed_block.chain_id)
546 {
547 return; }
549 let new_chain_ids = outcome
550 .created_blobs_ids()
551 .into_iter()
552 .filter(|blob_id| blob_id.blob_type == BlobType::ChainDescription)
553 .map(|blob_id| ChainId(blob_id.hash));
554
555 tracked_chains
556 .write()
557 .expect("Panics should not happen while holding a lock to `tracked_chains`")
558 .extend(new_chain_ids);
559 }
560 }
561
562 async fn create_network_actions(&self) -> Result<NetworkActions, WorkerError> {
564 let mut heights_by_recipient = BTreeMap::<_, Vec<_>>::new();
565 let mut targets = self.chain.nonempty_outbox_chain_ids();
566 if let Some(tracked_chains) = self.tracked_chains.as_ref() {
567 let tracked_chains = tracked_chains
568 .read()
569 .expect("Panics should not happen while holding a lock to `tracked_chains`");
570 targets.retain(|target| tracked_chains.contains(target));
571 }
572 let outboxes = self.chain.load_outboxes(&targets).await?;
573 for (target, outbox) in targets.into_iter().zip(outboxes) {
574 let heights = outbox.queue.elements().await?;
575 heights_by_recipient.insert(target, heights);
576 }
577 self.create_cross_chain_requests(heights_by_recipient).await
578 }
579
580 async fn create_cross_chain_requests(
581 &self,
582 heights_by_recipient: BTreeMap<ChainId, Vec<BlockHeight>>,
583 ) -> Result<NetworkActions, WorkerError> {
584 let heights = BTreeSet::from_iter(heights_by_recipient.values().flatten().copied());
586 let next_block_height = self.chain.tip_state.get().next_block_height;
587 let log_heights = heights
588 .range(..next_block_height)
589 .copied()
590 .map(usize::try_from)
591 .collect::<Result<Vec<_>, _>>()?;
592 let mut hashes = self
593 .chain
594 .confirmed_log
595 .multi_get(log_heights)
596 .await?
597 .into_iter()
598 .zip(&heights)
599 .map(|(maybe_hash, height)| {
600 maybe_hash.ok_or_else(|| WorkerError::ConfirmedLogEntryNotFound {
601 height: *height,
602 chain_id: self.chain_id(),
603 })
604 })
605 .collect::<Result<Vec<_>, _>>()?;
606 for height in heights.range(next_block_height..) {
607 hashes.push(
608 self.chain
609 .preprocessed_blocks
610 .get(height)
611 .await?
612 .ok_or_else(|| WorkerError::PreprocessedBlocksEntryNotFound {
613 height: *height,
614 chain_id: self.chain_id(),
615 })?,
616 );
617 }
618 let certificates = self.storage.read_certificates(hashes.clone()).await?;
619 let certificates = match ResultReadCertificates::new(certificates, hashes) {
620 ResultReadCertificates::Certificates(certificates) => certificates,
621 ResultReadCertificates::InvalidHashes(hashes) => {
622 return Err(WorkerError::ReadCertificatesError(hashes))
623 }
624 };
625 let certificates = heights
626 .into_iter()
627 .zip(certificates)
628 .collect::<HashMap<_, _>>();
629 let mut actions = NetworkActions::default();
631 for (recipient, heights) in heights_by_recipient {
632 let mut bundles = Vec::new();
633 for height in heights {
634 let cert = certificates
635 .get(&height)
636 .ok_or_else(|| ChainError::InternalError("missing certificates".to_string()))?;
637 bundles.extend(cert.message_bundles_for(recipient));
638 }
639 let request = CrossChainRequest::UpdateRecipient {
640 sender: self.chain.chain_id(),
641 recipient,
642 bundles,
643 };
644 actions.cross_chain_requests.push(request);
645 }
646 Ok(actions)
647 }
648
649 pub async fn all_messages_to_tracked_chains_delivered_up_to(
652 &self,
653 height: BlockHeight,
654 ) -> Result<bool, WorkerError> {
655 if self.chain.all_messages_delivered_up_to(height) {
656 return Ok(true);
657 }
658 let Some(tracked_chains) = self.tracked_chains.as_ref() else {
659 return Ok(false);
660 };
661 let mut targets = self.chain.nonempty_outbox_chain_ids();
662 {
663 let tracked_chains = tracked_chains.read().unwrap();
664 targets.retain(|target| tracked_chains.contains(target));
665 }
666 let outboxes = self.chain.load_outboxes(&targets).await?;
667 for outbox in outboxes {
668 let front = outbox.queue.front();
669 if front.is_some_and(|key| *key <= height) {
670 return Ok(false);
671 }
672 }
673 Ok(true)
674 }
675
676 pub async fn update_received_certificate_trackers(
678 &mut self,
679 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
680 ) -> Result<(), WorkerError> {
681 ChainWorkerStateWithAttemptedChanges::new(self)
682 .await
683 .update_received_certificate_trackers(new_trackers)
684 .await
685 }
686}
687
688fn missing_blob_ids(maybe_blobs: &BTreeMap<BlobId, Option<Blob>>) -> Vec<BlobId> {
690 maybe_blobs
691 .iter()
692 .filter(|(_, maybe_blob)| maybe_blob.is_none())
693 .map(|(blob_id, _)| *blob_id)
694 .collect()
695}
696
697fn check_block_epoch(
699 chain_epoch: Epoch,
700 block_chain: ChainId,
701 block_epoch: Epoch,
702) -> Result<(), WorkerError> {
703 ensure!(
704 block_epoch == chain_epoch,
705 WorkerError::InvalidEpoch {
706 chain_id: block_chain,
707 epoch: block_epoch,
708 chain_epoch
709 }
710 );
711 Ok(())
712}