1use std::collections::BTreeMap;
72
73use custom_debug_derive::Debug;
74use futures::future::Either;
75use linera_base::{
76 crypto::{AccountPublicKey, CryptoError, ValidatorSecretKey},
77 data_types::{Blob, BlockHeight, Epoch, Round, Timestamp},
78 ensure,
79 identifiers::{AccountOwner, BlobId, ChainId},
80 ownership::ChainOwnership,
81};
82use linera_execution::ExecutionRuntimeContext;
83use linera_views::{
84 context::Context,
85 map_view::MapView,
86 register_view::RegisterView,
87 views::{ClonableView, View},
88 ViewError,
89};
90use rand_chacha::{rand_core::SeedableRng, ChaCha8Rng};
91use rand_distr::{Distribution, WeightedAliasIndex};
92use serde::{Deserialize, Serialize};
93
94use crate::{
95 block::{Block, ConfirmedBlock, Timeout, ValidatedBlock},
96 data_types::{BlockProposal, LiteVote, OriginalProposal, ProposedBlock, Vote},
97 types::{TimeoutCertificate, ValidatedBlockCertificate},
98 ChainError,
99};
100
101#[derive(Eq, PartialEq)]
103pub enum Outcome {
104 Accept,
105 Skip,
106}
107
108pub type ValidatedOrConfirmedVote<'a> = Either<&'a Vote<ValidatedBlock>, &'a Vote<ConfirmedBlock>>;
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
114#[cfg_attr(with_testing, derive(Eq, PartialEq))]
115pub enum LockingBlock {
116 Fast(BlockProposal),
118 Regular(ValidatedBlockCertificate),
120}
121
122impl LockingBlock {
123 pub fn round(&self) -> Round {
126 match self {
127 Self::Fast(_) => Round::Fast,
128 Self::Regular(certificate) => certificate.round,
129 }
130 }
131
132 pub fn chain_id(&self) -> ChainId {
133 match self {
134 Self::Fast(proposal) => proposal.content.block.chain_id,
135 Self::Regular(certificate) => certificate.value().chain_id(),
136 }
137 }
138}
139
140#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject), graphql(complex))]
142#[derive(Debug, View, ClonableView)]
143pub struct ChainManager<C>
144where
145 C: Clone + Context + Send + Sync + 'static,
146{
147 pub ownership: RegisterView<C, ChainOwnership>,
149 pub seed: RegisterView<C, u64>,
151 #[cfg_attr(with_graphql, graphql(skip))] pub distribution: RegisterView<C, Option<WeightedAliasIndex<u64>>>,
154 #[cfg_attr(with_graphql, graphql(skip))] pub fallback_distribution: RegisterView<C, Option<WeightedAliasIndex<u64>>>,
157 #[cfg_attr(with_graphql, graphql(skip))]
160 pub proposed: RegisterView<C, Option<BlockProposal>>,
161 pub proposed_blobs: MapView<C, BlobId, Blob>,
163 #[cfg_attr(with_graphql, graphql(skip))]
166 pub locking_block: RegisterView<C, Option<LockingBlock>>,
167 pub locking_blobs: MapView<C, BlobId, Blob>,
169 #[cfg_attr(with_graphql, graphql(skip))]
171 pub timeout: RegisterView<C, Option<TimeoutCertificate>>,
172 #[cfg_attr(with_graphql, graphql(skip))]
174 pub confirmed_vote: RegisterView<C, Option<Vote<ConfirmedBlock>>>,
175 #[cfg_attr(with_graphql, graphql(skip))]
177 pub validated_vote: RegisterView<C, Option<Vote<ValidatedBlock>>>,
178 #[cfg_attr(with_graphql, graphql(skip))]
180 pub timeout_vote: RegisterView<C, Option<Vote<Timeout>>>,
181 #[cfg_attr(with_graphql, graphql(skip))]
183 pub fallback_vote: RegisterView<C, Option<Vote<Timeout>>>,
184 pub round_timeout: RegisterView<C, Option<Timestamp>>,
186 #[cfg_attr(with_graphql, graphql(skip))]
193 pub current_round: RegisterView<C, Round>,
194 pub fallback_owners: RegisterView<C, BTreeMap<AccountOwner, u64>>,
196}
197
198#[cfg(with_graphql)]
199#[async_graphql::ComplexObject]
200impl<C> ChainManager<C>
201where
202 C: Context + Clone + Send + Sync + 'static,
203{
204 #[graphql(derived(name = "current_round"))]
211 async fn _current_round(&self) -> Round {
212 self.current_round()
213 }
214}
215
216impl<C> ChainManager<C>
217where
218 C: Context + Clone + Send + Sync + 'static,
219{
220 pub fn reset<'a>(
222 &mut self,
223 ownership: ChainOwnership,
224 height: BlockHeight,
225 local_time: Timestamp,
226 fallback_owners: impl Iterator<Item = (AccountPublicKey, u64)> + 'a,
227 ) -> Result<(), ChainError> {
228 let distribution = if !ownership.owners.is_empty() {
229 let weights = ownership.owners.values().copied().collect();
230 Some(WeightedAliasIndex::new(weights)?)
231 } else {
232 None
233 };
234 let fallback_owners = fallback_owners
235 .map(|(pub_key, weight)| (AccountOwner::from(pub_key), weight))
236 .collect::<BTreeMap<_, _>>();
237 let fallback_distribution = if !fallback_owners.is_empty() {
238 let weights = fallback_owners.values().copied().collect();
239 Some(WeightedAliasIndex::new(weights)?)
240 } else {
241 None
242 };
243
244 let current_round = ownership.first_round();
245 let round_duration = ownership.round_timeout(current_round);
246 let round_timeout = round_duration.map(|rd| local_time.saturating_add(rd));
247
248 self.clear();
249 self.seed.set(height.0);
250 self.ownership.set(ownership);
251 self.distribution.set(distribution);
252 self.fallback_distribution.set(fallback_distribution);
253 self.fallback_owners.set(fallback_owners);
254 self.current_round.set(current_round);
255 self.round_timeout.set(round_timeout);
256 Ok(())
257 }
258
259 pub fn confirmed_vote(&self) -> Option<&Vote<ConfirmedBlock>> {
261 self.confirmed_vote.get().as_ref()
262 }
263
264 pub fn validated_vote(&self) -> Option<&Vote<ValidatedBlock>> {
266 self.validated_vote.get().as_ref()
267 }
268
269 pub fn timeout_vote(&self) -> Option<&Vote<Timeout>> {
271 self.timeout_vote.get().as_ref()
272 }
273
274 pub fn fallback_vote(&self) -> Option<&Vote<Timeout>> {
276 self.fallback_vote.get().as_ref()
277 }
278
279 pub fn current_round(&self) -> Round {
286 *self.current_round.get()
287 }
288
289 pub fn check_proposed_block(&self, proposal: &BlockProposal) -> Result<Outcome, ChainError> {
291 let new_block = &proposal.content.block;
292 let new_round = proposal.content.round;
293 if let Some(old_proposal) = self.proposed.get() {
294 if old_proposal.content == proposal.content {
295 return Ok(Outcome::Skip); }
297 }
298 ensure!(
300 new_block.height < BlockHeight::MAX,
301 ChainError::InvalidBlockHeight
302 );
303 let current_round = self.current_round();
304 match new_round {
305 Round::Fast => {}
308 Round::MultiLeader(_) | Round::SingleLeader(0) => {
309 ensure!(
312 self.is_super(&proposal.owner()) || !current_round.is_fast(),
313 ChainError::WrongRound(current_round)
314 );
315 ensure!(
317 new_round >= current_round,
318 ChainError::InsufficientRound(new_round)
319 );
320 }
321 Round::SingleLeader(_) | Round::Validator(_) => {
322 ensure!(
324 new_round == current_round,
325 ChainError::WrongRound(current_round)
326 );
327 }
328 }
329 if let Some(vote) = self.validated_vote() {
331 ensure!(
332 new_round > vote.round,
333 ChainError::InsufficientRoundStrict(vote.round)
334 );
335 }
336 if let Some(locking_block) = self.locking_block.get() {
338 ensure!(
339 locking_block.round() < new_round,
340 ChainError::MustBeNewerThanLockingBlock(new_block.height, locking_block.round())
341 );
342 }
343 if let Some(vote) = self.confirmed_vote() {
346 ensure!(
347 match proposal.original_proposal.as_ref() {
348 None => false,
349 Some(OriginalProposal::Regular { certificate }) =>
350 vote.round <= certificate.round,
351 Some(OriginalProposal::Fast(_)) => {
352 vote.round.is_fast() && vote.value().matches_proposed_block(new_block)
353 }
354 },
355 ChainError::HasIncompatibleConfirmedVote(new_block.height, vote.round)
356 );
357 }
358 Ok(Outcome::Accept)
359 }
360
361 pub fn vote_timeout(
363 &mut self,
364 chain_id: ChainId,
365 height: BlockHeight,
366 epoch: Epoch,
367 key_pair: Option<&ValidatorSecretKey>,
368 local_time: Timestamp,
369 ) -> bool {
370 let Some(key_pair) = key_pair else {
371 return false; };
373 let Some(round_timeout) = *self.round_timeout.get() else {
374 return false; };
376 if local_time < round_timeout || self.ownership.get().owners.is_empty() {
377 return false; }
379 let current_round = self.current_round();
380 if let Some(vote) = self.timeout_vote.get() {
381 if vote.round == current_round {
382 return false; }
384 }
385 let value = Timeout::new(chain_id, height, epoch);
386 self.timeout_vote
387 .set(Some(Vote::new(value, current_round, key_pair)));
388 true
389 }
390
391 pub fn vote_fallback(
396 &mut self,
397 chain_id: ChainId,
398 height: BlockHeight,
399 epoch: Epoch,
400 key_pair: Option<&ValidatorSecretKey>,
401 ) -> bool {
402 let Some(key_pair) = key_pair else {
403 return false; };
405 if self.fallback_vote.get().is_some() || self.current_round() >= Round::Validator(0) {
406 return false; }
408 let value = Timeout::new(chain_id, height, epoch);
409 let last_regular_round = Round::SingleLeader(u32::MAX);
410 self.fallback_vote
411 .set(Some(Vote::new(value, last_regular_round, key_pair)));
412 true
413 }
414
415 pub fn check_validated_block(
417 &self,
418 certificate: &ValidatedBlockCertificate,
419 ) -> Result<Outcome, ChainError> {
420 let new_block = certificate.block();
421 let new_round = certificate.round;
422 if let Some(Vote { value, round, .. }) = self.confirmed_vote.get() {
423 if value.block() == new_block && *round == new_round {
424 return Ok(Outcome::Skip); }
426 }
427
428 if let Some(Vote { round, .. }) = self.validated_vote.get() {
430 ensure!(new_round >= *round, ChainError::InsufficientRound(*round))
431 }
432
433 if let Some(locking) = self.locking_block.get() {
434 if let LockingBlock::Regular(locking_cert) = locking {
435 if locking_cert.hash() == certificate.hash() && locking.round() == new_round {
436 return Ok(Outcome::Skip); }
438 }
439 ensure!(
440 new_round > locking.round(),
441 ChainError::InsufficientRoundStrict(locking.round())
442 );
443 }
444 Ok(Outcome::Accept)
445 }
446
447 pub fn create_vote(
449 &mut self,
450 proposal: BlockProposal,
451 block: Block,
452 key_pair: Option<&ValidatorSecretKey>,
453 local_time: Timestamp,
454 blobs: BTreeMap<BlobId, Blob>,
455 ) -> Result<Option<ValidatedOrConfirmedVote>, ChainError> {
456 let round = proposal.content.round;
457
458 match &proposal.original_proposal {
459 Some(OriginalProposal::Regular { certificate }) => {
461 if self
462 .locking_block
463 .get()
464 .as_ref()
465 .is_none_or(|locking| locking.round() < certificate.round)
466 {
467 let value = ValidatedBlock::new(block.clone());
468 if let Some(certificate) = certificate.clone().with_value(value) {
469 self.update_locking(LockingBlock::Regular(certificate), blobs.clone())?;
470 }
471 }
472 }
473 Some(OriginalProposal::Fast(signature)) => {
476 if self.locking_block.get().is_none() {
477 let original_proposal = BlockProposal {
478 signature: *signature,
479 ..proposal.clone()
480 };
481 self.update_locking(LockingBlock::Fast(original_proposal), blobs.clone())?;
482 }
483 }
484 None => {
487 if round.is_fast() && self.locking_block.get().is_none() {
488 self.update_locking(LockingBlock::Fast(proposal.clone()), blobs.clone())?;
490 }
491 }
492 }
493
494 self.update_proposed(proposal.clone(), blobs)?;
496 self.update_current_round(local_time);
497
498 let Some(key_pair) = key_pair else {
499 return Ok(None);
501 };
502
503 if round.is_fast() {
505 self.validated_vote.set(None);
506 let value = ConfirmedBlock::new(block);
507 let vote = Vote::new(value, round, key_pair);
508 Ok(Some(Either::Right(
509 self.confirmed_vote.get_mut().insert(vote),
510 )))
511 } else {
512 let value = ValidatedBlock::new(block);
513 let vote = Vote::new(value, round, key_pair);
514 Ok(Some(Either::Left(
515 self.validated_vote.get_mut().insert(vote),
516 )))
517 }
518 }
519
520 pub fn create_final_vote(
522 &mut self,
523 validated: ValidatedBlockCertificate,
524 key_pair: Option<&ValidatorSecretKey>,
525 local_time: Timestamp,
526 blobs: BTreeMap<BlobId, Blob>,
527 ) -> Result<(), ViewError> {
528 let round = validated.round;
529 let confirmed_block = ConfirmedBlock::new(validated.inner().block().clone());
530 self.update_locking(LockingBlock::Regular(validated), blobs)?;
531 self.update_current_round(local_time);
532 if let Some(key_pair) = key_pair {
533 if self.current_round() != round {
534 return Ok(()); }
536 let vote = Vote::new(confirmed_block, round, key_pair);
538 self.confirmed_vote.set(Some(vote));
540 self.validated_vote.set(None);
541 }
542 Ok(())
543 }
544
545 pub async fn pending_blob(&self, blob_id: &BlobId) -> Result<Option<Blob>, ViewError> {
547 if let Some(blob) = self.proposed_blobs.get(blob_id).await? {
548 return Ok(Some(blob));
549 }
550 self.locking_blobs.get(blob_id).await
551 }
552
553 fn update_current_round(&mut self, local_time: Timestamp) {
557 let current_round = self
558 .timeout
559 .get()
560 .iter()
561 .map(|certificate| {
562 self.ownership
563 .get()
564 .next_round(certificate.round)
565 .unwrap_or(Round::Validator(u32::MAX))
566 })
567 .chain(self.locking_block.get().as_ref().map(LockingBlock::round))
568 .chain(
569 self.proposed
570 .get()
571 .iter()
572 .map(|proposal| proposal.content.round),
573 )
574 .max()
575 .unwrap_or_default()
576 .max(self.ownership.get().first_round());
577 if current_round <= self.current_round() {
578 return;
579 }
580 let round_duration = self.ownership.get().round_timeout(current_round);
581 self.round_timeout
582 .set(round_duration.map(|rd| local_time.saturating_add(rd)));
583 self.current_round.set(current_round);
584 }
585
586 pub fn handle_timeout_certificate(
589 &mut self,
590 certificate: TimeoutCertificate,
591 local_time: Timestamp,
592 ) {
593 let round = certificate.round;
594 if let Some(known_certificate) = self.timeout.get() {
595 if known_certificate.round >= round {
596 return;
597 }
598 }
599 self.timeout.set(Some(certificate));
600 self.update_current_round(local_time);
601 }
602
603 pub fn verify_owner(
606 &self,
607 proposal_owner: &AccountOwner,
608 proposal_round: Round,
609 ) -> Result<bool, CryptoError> {
610 if self.ownership.get().super_owners.contains(proposal_owner) {
611 return Ok(true);
612 }
613
614 Ok(match proposal_round {
615 Round::Fast => {
616 false }
618 Round::MultiLeader(_) => {
619 let ownership = self.ownership.get();
620 ownership.open_multi_leader_rounds || ownership.owners.contains_key(proposal_owner)
622 }
623 Round::SingleLeader(r) => {
624 let Some(index) = self.round_leader_index(r) else {
625 return Ok(false);
626 };
627 self.ownership.get().owners.keys().nth(index) == Some(proposal_owner)
628 }
629 Round::Validator(r) => {
630 let Some(index) = self.fallback_round_leader_index(r) else {
631 return Ok(false);
632 };
633 self.fallback_owners.get().keys().nth(index) == Some(proposal_owner)
634 }
635 })
636 }
637
638 fn round_leader(&self, round: Round) -> Option<&AccountOwner> {
641 match round {
642 Round::SingleLeader(r) => {
643 let index = self.round_leader_index(r)?;
644 self.ownership.get().owners.keys().nth(index)
645 }
646 Round::Validator(r) => {
647 let index = self.fallback_round_leader_index(r)?;
648 self.fallback_owners.get().keys().nth(index)
649 }
650 Round::Fast | Round::MultiLeader(_) => None,
651 }
652 }
653
654 fn round_leader_index(&self, round: u32) -> Option<usize> {
656 let seed = u64::from(round)
657 .rotate_left(32)
658 .wrapping_add(*self.seed.get());
659 let mut rng = ChaCha8Rng::seed_from_u64(seed);
660 Some(self.distribution.get().as_ref()?.sample(&mut rng))
661 }
662
663 fn fallback_round_leader_index(&self, round: u32) -> Option<usize> {
666 let seed = u64::from(round)
667 .rotate_left(32)
668 .wrapping_add(*self.seed.get());
669 let mut rng = ChaCha8Rng::seed_from_u64(seed);
670 Some(self.fallback_distribution.get().as_ref()?.sample(&mut rng))
671 }
672
673 fn is_super(&self, owner: &AccountOwner) -> bool {
675 self.ownership.get().super_owners.contains(owner)
676 }
677
678 fn update_proposed(
680 &mut self,
681 proposal: BlockProposal,
682 blobs: BTreeMap<BlobId, Blob>,
683 ) -> Result<(), ViewError> {
684 if let Some(old_proposal) = self.proposed.get() {
685 if old_proposal.content.round >= proposal.content.round {
686 return Ok(());
687 }
688 }
689 self.proposed.set(Some(proposal));
690 self.proposed_blobs.clear();
691 for (blob_id, blob) in blobs {
692 self.proposed_blobs.insert(&blob_id, blob)?;
693 }
694 Ok(())
695 }
696
697 fn update_locking(
699 &mut self,
700 locking: LockingBlock,
701 blobs: BTreeMap<BlobId, Blob>,
702 ) -> Result<(), ViewError> {
703 if let Some(old_locked) = self.locking_block.get() {
704 if old_locked.round() >= locking.round() {
705 return Ok(());
706 }
707 }
708 self.locking_block.set(Some(locking));
709 self.locking_blobs.clear();
710 for (blob_id, blob) in blobs {
711 self.locking_blobs.insert(&blob_id, blob)?;
712 }
713 Ok(())
714 }
715}
716
717#[derive(Default, Clone, Debug, Serialize, Deserialize)]
719#[cfg_attr(with_testing, derive(Eq, PartialEq))]
720pub struct ChainManagerInfo {
721 pub ownership: ChainOwnership,
723 #[debug(skip_if = Option::is_none)]
725 pub requested_proposed: Option<Box<BlockProposal>>,
726 #[debug(skip_if = Option::is_none)]
729 pub requested_locking: Option<Box<LockingBlock>>,
730 #[debug(skip_if = Option::is_none)]
732 pub timeout: Option<Box<TimeoutCertificate>>,
733 #[debug(skip_if = Option::is_none)]
735 pub pending: Option<LiteVote>,
736 #[debug(skip_if = Option::is_none)]
738 pub timeout_vote: Option<LiteVote>,
739 #[debug(skip_if = Option::is_none)]
741 pub fallback_vote: Option<LiteVote>,
742 #[debug(skip_if = Option::is_none)]
744 pub requested_confirmed: Option<Box<ConfirmedBlock>>,
745 #[debug(skip_if = Option::is_none)]
747 pub requested_validated: Option<Box<ValidatedBlock>>,
748 pub current_round: Round,
750 #[debug(skip_if = Option::is_none)]
753 pub leader: Option<AccountOwner>,
754 #[debug(skip_if = Option::is_none)]
756 pub round_timeout: Option<Timestamp>,
757}
758
759impl<C> From<&ChainManager<C>> for ChainManagerInfo
760where
761 C: Context + Clone + Send + Sync + 'static,
762{
763 fn from(manager: &ChainManager<C>) -> Self {
764 let current_round = manager.current_round();
765 let pending = match (manager.confirmed_vote.get(), manager.validated_vote.get()) {
766 (None, None) => None,
767 (Some(confirmed_vote), Some(validated_vote))
768 if validated_vote.round > confirmed_vote.round =>
769 {
770 Some(validated_vote.lite())
771 }
772 (Some(vote), _) => Some(vote.lite()),
773 (None, Some(vote)) => Some(vote.lite()),
774 };
775 ChainManagerInfo {
776 ownership: manager.ownership.get().clone(),
777 requested_proposed: None,
778 requested_locking: None,
779 timeout: manager.timeout.get().clone().map(Box::new),
780 pending,
781 timeout_vote: manager.timeout_vote.get().as_ref().map(Vote::lite),
782 fallback_vote: manager.fallback_vote.get().as_ref().map(Vote::lite),
783 requested_confirmed: None,
784 requested_validated: None,
785 current_round,
786 leader: manager.round_leader(current_round).cloned(),
787 round_timeout: *manager.round_timeout.get(),
788 }
789 }
790}
791
792impl ChainManagerInfo {
793 pub fn add_values<C>(&mut self, manager: &ChainManager<C>)
795 where
796 C: Context + Clone + Send + Sync + 'static,
797 C::Extra: ExecutionRuntimeContext,
798 {
799 self.requested_proposed = manager.proposed.get().clone().map(Box::new);
800 self.requested_locking = manager.locking_block.get().clone().map(Box::new);
801 self.requested_confirmed = manager
802 .confirmed_vote
803 .get()
804 .as_ref()
805 .map(|vote| Box::new(vote.value.clone()));
806 self.requested_validated = manager
807 .validated_vote
808 .get()
809 .as_ref()
810 .map(|vote| Box::new(vote.value.clone()));
811 }
812
813 pub fn can_propose(&self, identity: &AccountOwner, round: Round) -> bool {
816 match round {
817 Round::Fast => self.ownership.super_owners.contains(identity),
818 Round::MultiLeader(_) => true,
819 Round::SingleLeader(_) | Round::Validator(_) => self.leader.as_ref() == Some(identity),
820 }
821 }
822
823 pub fn already_handled_proposal(&self, round: Round, proposed_block: &ProposedBlock) -> bool {
825 self.requested_proposed.as_ref().is_some_and(|proposal| {
826 proposal.content.round == round && *proposed_block == proposal.content.block
827 })
828 }
829
830 pub fn has_locking_block_in_current_round(&self) -> bool {
832 self.requested_locking
833 .as_ref()
834 .is_some_and(|locking| locking.round() == self.current_round)
835 }
836}