1use std::collections::BTreeMap;
72
73use custom_debug_derive::Debug;
74use futures::future::Either;
75use linera_base::{
76 crypto::{AccountPublicKey, CryptoError, ValidatorSecretKey},
77 data_types::{Blob, BlockHeight, Epoch, Round, Timestamp},
78 ensure,
79 identifiers::{AccountOwner, BlobId, ChainId},
80 ownership::ChainOwnership,
81};
82use linera_execution::ExecutionRuntimeContext;
83use linera_views::{
84 context::Context,
85 map_view::MapView,
86 register_view::RegisterView,
87 views::{ClonableView, View},
88 ViewError,
89};
90use rand_chacha::{rand_core::SeedableRng, ChaCha8Rng};
91use rand_distr::{Distribution, WeightedAliasIndex};
92use serde::{Deserialize, Serialize};
93
94use crate::{
95 block::{Block, ConfirmedBlock, Timeout, ValidatedBlock},
96 data_types::{BlockProposal, LiteVote, OriginalProposal, ProposedBlock, Vote},
97 types::{TimeoutCertificate, ValidatedBlockCertificate},
98 ChainError,
99};
100
101#[derive(Eq, PartialEq)]
103pub enum Outcome {
104 Accept,
105 Skip,
106}
107
108pub type ValidatedOrConfirmedVote<'a> = Either<&'a Vote<ValidatedBlock>, &'a Vote<ConfirmedBlock>>;
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
114#[cfg_attr(with_testing, derive(Eq, PartialEq))]
115pub enum LockingBlock {
116 Fast(BlockProposal),
118 Regular(ValidatedBlockCertificate),
120}
121
122impl LockingBlock {
123 pub fn round(&self) -> Round {
126 match self {
127 Self::Fast(_) => Round::Fast,
128 Self::Regular(certificate) => certificate.round,
129 }
130 }
131
132 pub fn chain_id(&self) -> ChainId {
133 match self {
134 Self::Fast(proposal) => proposal.content.block.chain_id,
135 Self::Regular(certificate) => certificate.value().chain_id(),
136 }
137 }
138}
139
140#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject), graphql(complex))]
142#[derive(Debug, View, ClonableView)]
143pub struct ChainManager<C>
144where
145 C: Clone + Context + Send + Sync + 'static,
146{
147 pub ownership: RegisterView<C, ChainOwnership>,
149 pub seed: RegisterView<C, u64>,
151 #[cfg_attr(with_graphql, graphql(skip))] pub distribution: RegisterView<C, Option<WeightedAliasIndex<u64>>>,
154 #[cfg_attr(with_graphql, graphql(skip))] pub fallback_distribution: RegisterView<C, Option<WeightedAliasIndex<u64>>>,
157 #[cfg_attr(with_graphql, graphql(skip))]
162 pub signed_proposal: RegisterView<C, Option<BlockProposal>>,
163 #[cfg_attr(with_graphql, graphql(skip))]
166 pub proposed: RegisterView<C, Option<BlockProposal>>,
167 pub proposed_blobs: MapView<C, BlobId, Blob>,
169 #[cfg_attr(with_graphql, graphql(skip))]
172 pub locking_block: RegisterView<C, Option<LockingBlock>>,
173 pub locking_blobs: MapView<C, BlobId, Blob>,
175 #[cfg_attr(with_graphql, graphql(skip))]
177 pub timeout: RegisterView<C, Option<TimeoutCertificate>>,
178 #[cfg_attr(with_graphql, graphql(skip))]
180 pub confirmed_vote: RegisterView<C, Option<Vote<ConfirmedBlock>>>,
181 #[cfg_attr(with_graphql, graphql(skip))]
183 pub validated_vote: RegisterView<C, Option<Vote<ValidatedBlock>>>,
184 #[cfg_attr(with_graphql, graphql(skip))]
186 pub timeout_vote: RegisterView<C, Option<Vote<Timeout>>>,
187 #[cfg_attr(with_graphql, graphql(skip))]
189 pub fallback_vote: RegisterView<C, Option<Vote<Timeout>>>,
190 pub round_timeout: RegisterView<C, Option<Timestamp>>,
192 #[cfg_attr(with_graphql, graphql(skip))]
199 pub current_round: RegisterView<C, Round>,
200 pub fallback_owners: RegisterView<C, BTreeMap<AccountOwner, u64>>,
202}
203
204#[cfg(with_graphql)]
205#[async_graphql::ComplexObject]
206impl<C> ChainManager<C>
207where
208 C: Context + Clone + Send + Sync + 'static,
209{
210 #[graphql(derived(name = "current_round"))]
217 async fn _current_round(&self) -> Round {
218 self.current_round()
219 }
220}
221
222impl<C> ChainManager<C>
223where
224 C: Context + Clone + Send + Sync + 'static,
225{
226 pub fn reset<'a>(
228 &mut self,
229 ownership: ChainOwnership,
230 height: BlockHeight,
231 local_time: Timestamp,
232 fallback_owners: impl Iterator<Item = (AccountPublicKey, u64)> + 'a,
233 ) -> Result<(), ChainError> {
234 let distribution = if !ownership.owners.is_empty() {
235 let weights = ownership.owners.values().copied().collect();
236 Some(WeightedAliasIndex::new(weights)?)
237 } else {
238 None
239 };
240 let fallback_owners = fallback_owners
241 .map(|(pub_key, weight)| (AccountOwner::from(pub_key), weight))
242 .collect::<BTreeMap<_, _>>();
243 let fallback_distribution = if !fallback_owners.is_empty() {
244 let weights = fallback_owners.values().copied().collect();
245 Some(WeightedAliasIndex::new(weights)?)
246 } else {
247 None
248 };
249
250 let current_round = ownership.first_round();
251 let round_duration = ownership.round_timeout(current_round);
252 let round_timeout = round_duration.map(|rd| local_time.saturating_add(rd));
253
254 self.clear();
255 self.seed.set(height.0);
256 self.ownership.set(ownership);
257 self.distribution.set(distribution);
258 self.fallback_distribution.set(fallback_distribution);
259 self.fallback_owners.set(fallback_owners);
260 self.current_round.set(current_round);
261 self.round_timeout.set(round_timeout);
262 Ok(())
263 }
264
265 pub fn confirmed_vote(&self) -> Option<&Vote<ConfirmedBlock>> {
267 self.confirmed_vote.get().as_ref()
268 }
269
270 pub fn validated_vote(&self) -> Option<&Vote<ValidatedBlock>> {
272 self.validated_vote.get().as_ref()
273 }
274
275 pub fn timeout_vote(&self) -> Option<&Vote<Timeout>> {
277 self.timeout_vote.get().as_ref()
278 }
279
280 pub fn fallback_vote(&self) -> Option<&Vote<Timeout>> {
282 self.fallback_vote.get().as_ref()
283 }
284
285 pub fn current_round(&self) -> Round {
292 *self.current_round.get()
293 }
294
295 pub fn check_proposed_block(&self, proposal: &BlockProposal) -> Result<Outcome, ChainError> {
297 let new_block = &proposal.content.block;
298 let new_round = proposal.content.round;
299 if let Some(old_proposal) = self.proposed.get() {
300 if old_proposal.content == proposal.content {
301 return Ok(Outcome::Skip); }
303 }
304 ensure!(
306 new_block.height < BlockHeight::MAX,
307 ChainError::BlockHeightOverflow
308 );
309 let current_round = self.current_round();
310 match new_round {
311 Round::Fast => {}
314 Round::MultiLeader(_) | Round::SingleLeader(0) => {
315 ensure!(
318 self.is_super(&proposal.owner()) || !current_round.is_fast(),
319 ChainError::WrongRound(current_round)
320 );
321 ensure!(
323 new_round >= current_round,
324 ChainError::InsufficientRound(new_round)
325 );
326 }
327 Round::SingleLeader(_) | Round::Validator(_) => {
328 ensure!(
330 new_round == current_round,
331 ChainError::WrongRound(current_round)
332 );
333 }
334 }
335 if let Some(vote) = self.validated_vote() {
337 ensure!(
338 new_round > vote.round,
339 ChainError::InsufficientRoundStrict(vote.round)
340 );
341 }
342 if let Some(locking_block) = self.locking_block.get() {
344 ensure!(
345 locking_block.round() < new_round,
346 ChainError::MustBeNewerThanLockingBlock(new_block.height, locking_block.round())
347 );
348 }
349 if let Some(vote) = self.confirmed_vote() {
352 ensure!(
353 match proposal.original_proposal.as_ref() {
354 None => false,
355 Some(OriginalProposal::Regular { certificate }) =>
356 vote.round <= certificate.round,
357 Some(OriginalProposal::Fast(_)) => {
358 vote.round.is_fast() && vote.value().matches_proposed_block(new_block)
359 }
360 },
361 ChainError::HasIncompatibleConfirmedVote(new_block.height, vote.round)
362 );
363 }
364 Ok(Outcome::Accept)
365 }
366
367 pub fn create_timeout_vote(
370 &mut self,
371 chain_id: ChainId,
372 height: BlockHeight,
373 round: Round,
374 epoch: Epoch,
375 key_pair: Option<&ValidatorSecretKey>,
376 local_time: Timestamp,
377 ) -> Result<bool, ChainError> {
378 let Some(key_pair) = key_pair else {
379 return Ok(false); };
381 ensure!(
382 round == self.current_round(),
383 ChainError::WrongRound(self.current_round())
384 );
385 let Some(round_timeout) = *self.round_timeout.get() else {
386 return Err(ChainError::RoundDoesNotTimeOut);
387 };
388 ensure!(
389 local_time >= round_timeout,
390 ChainError::NotTimedOutYet(round_timeout)
391 );
392 if let Some(vote) = self.timeout_vote.get() {
393 if vote.round == round {
394 return Ok(false); }
396 }
397 let value = Timeout::new(chain_id, height, epoch);
398 self.timeout_vote
399 .set(Some(Vote::new(value, round, key_pair)));
400 Ok(true)
401 }
402
403 pub fn vote_fallback(
408 &mut self,
409 chain_id: ChainId,
410 height: BlockHeight,
411 epoch: Epoch,
412 key_pair: Option<&ValidatorSecretKey>,
413 ) -> bool {
414 let Some(key_pair) = key_pair else {
415 return false; };
417 if self.fallback_vote.get().is_some() || self.current_round() >= Round::Validator(0) {
418 return false; }
420 let value = Timeout::new(chain_id, height, epoch);
421 let last_regular_round = Round::SingleLeader(u32::MAX);
422 self.fallback_vote
423 .set(Some(Vote::new(value, last_regular_round, key_pair)));
424 true
425 }
426
427 pub fn check_validated_block(
429 &self,
430 certificate: &ValidatedBlockCertificate,
431 ) -> Result<Outcome, ChainError> {
432 let new_block = certificate.block();
433 let new_round = certificate.round;
434 if let Some(Vote { value, round, .. }) = self.confirmed_vote.get() {
435 if value.block() == new_block && *round == new_round {
436 return Ok(Outcome::Skip); }
438 }
439
440 if let Some(Vote { round, .. }) = self.validated_vote.get() {
442 ensure!(new_round >= *round, ChainError::InsufficientRound(*round))
443 }
444
445 if let Some(locking) = self.locking_block.get() {
446 ensure!(
447 new_round > locking.round(),
448 ChainError::InsufficientRoundStrict(locking.round())
449 );
450 }
451 Ok(Outcome::Accept)
452 }
453
454 pub fn create_vote(
456 &mut self,
457 proposal: BlockProposal,
458 block: Block,
459 key_pair: Option<&ValidatorSecretKey>,
460 local_time: Timestamp,
461 blobs: BTreeMap<BlobId, Blob>,
462 ) -> Result<Option<ValidatedOrConfirmedVote>, ChainError> {
463 let round = proposal.content.round;
464
465 match &proposal.original_proposal {
466 Some(OriginalProposal::Regular { certificate }) => {
468 if self
469 .locking_block
470 .get()
471 .as_ref()
472 .is_none_or(|locking| locking.round() < certificate.round)
473 {
474 let value = ValidatedBlock::new(block.clone());
475 if let Some(certificate) = certificate.clone().with_value(value) {
476 self.update_locking(LockingBlock::Regular(certificate), blobs.clone())?;
477 }
478 }
479 }
480 Some(OriginalProposal::Fast(signature)) => {
483 if self.locking_block.get().is_none() {
484 let original_proposal = BlockProposal {
485 signature: *signature,
486 ..proposal.clone()
487 };
488 self.update_locking(LockingBlock::Fast(original_proposal), blobs.clone())?;
489 }
490 }
491 None => {
494 if round.is_fast() && self.locking_block.get().is_none() {
495 self.update_locking(LockingBlock::Fast(proposal.clone()), blobs.clone())?;
497 }
498 }
499 }
500
501 self.update_proposed(proposal.clone(), blobs)?;
503 self.update_current_round(local_time);
504
505 let Some(key_pair) = key_pair else {
506 return Ok(None);
508 };
509
510 if round.is_fast() {
512 self.validated_vote.set(None);
513 let value = ConfirmedBlock::new(block);
514 let vote = Vote::new(value, round, key_pair);
515 Ok(Some(Either::Right(
516 self.confirmed_vote.get_mut().insert(vote),
517 )))
518 } else {
519 let value = ValidatedBlock::new(block);
520 let vote = Vote::new(value, round, key_pair);
521 Ok(Some(Either::Left(
522 self.validated_vote.get_mut().insert(vote),
523 )))
524 }
525 }
526
527 pub fn create_final_vote(
529 &mut self,
530 validated: ValidatedBlockCertificate,
531 key_pair: Option<&ValidatorSecretKey>,
532 local_time: Timestamp,
533 blobs: BTreeMap<BlobId, Blob>,
534 ) -> Result<(), ViewError> {
535 let round = validated.round;
536 let confirmed_block = ConfirmedBlock::new(validated.inner().block().clone());
537 self.update_locking(LockingBlock::Regular(validated), blobs)?;
538 self.update_current_round(local_time);
539 if let Some(key_pair) = key_pair {
540 if self.current_round() != round {
541 return Ok(()); }
543 let vote = Vote::new(confirmed_block, round, key_pair);
545 self.confirmed_vote.set(Some(vote));
547 self.validated_vote.set(None);
548 }
549 Ok(())
550 }
551
552 pub async fn pending_blob(&self, blob_id: &BlobId) -> Result<Option<Blob>, ViewError> {
554 if let Some(blob) = self.proposed_blobs.get(blob_id).await? {
555 return Ok(Some(blob));
556 }
557 self.locking_blobs.get(blob_id).await
558 }
559
560 fn update_current_round(&mut self, local_time: Timestamp) {
564 let current_round = self
565 .timeout
566 .get()
567 .iter()
568 .map(|certificate| {
569 self.ownership
570 .get()
571 .next_round(certificate.round)
572 .unwrap_or(Round::Validator(u32::MAX))
573 })
574 .chain(self.locking_block.get().as_ref().map(LockingBlock::round))
575 .chain(
576 self.proposed
577 .get()
578 .iter()
579 .map(|proposal| proposal.content.round),
580 )
581 .max()
582 .unwrap_or_default()
583 .max(self.ownership.get().first_round());
584 if current_round <= self.current_round() {
585 return;
586 }
587 let round_duration = self.ownership.get().round_timeout(current_round);
588 self.round_timeout
589 .set(round_duration.map(|rd| local_time.saturating_add(rd)));
590 self.current_round.set(current_round);
591 }
592
593 pub fn handle_timeout_certificate(
596 &mut self,
597 certificate: TimeoutCertificate,
598 local_time: Timestamp,
599 ) {
600 let round = certificate.round;
601 if let Some(known_certificate) = self.timeout.get() {
602 if known_certificate.round >= round {
603 return;
604 }
605 }
606 self.timeout.set(Some(certificate));
607 self.update_current_round(local_time);
608 }
609
610 pub fn verify_owner(
613 &self,
614 proposal_owner: &AccountOwner,
615 proposal_round: Round,
616 ) -> Result<bool, CryptoError> {
617 if self.ownership.get().super_owners.contains(proposal_owner) {
618 return Ok(true);
619 }
620
621 Ok(match proposal_round {
622 Round::Fast => {
623 false }
625 Round::MultiLeader(_) => {
626 let ownership = self.ownership.get();
627 ownership.open_multi_leader_rounds || ownership.owners.contains_key(proposal_owner)
629 }
630 Round::SingleLeader(r) => {
631 let Some(index) = self.round_leader_index(r) else {
632 return Ok(false);
633 };
634 self.ownership.get().owners.keys().nth(index) == Some(proposal_owner)
635 }
636 Round::Validator(r) => {
637 let Some(index) = self.fallback_round_leader_index(r) else {
638 return Ok(false);
639 };
640 self.fallback_owners.get().keys().nth(index) == Some(proposal_owner)
641 }
642 })
643 }
644
645 fn round_leader(&self, round: Round) -> Option<&AccountOwner> {
648 match round {
649 Round::SingleLeader(r) => {
650 let index = self.round_leader_index(r)?;
651 self.ownership.get().owners.keys().nth(index)
652 }
653 Round::Validator(r) => {
654 let index = self.fallback_round_leader_index(r)?;
655 self.fallback_owners.get().keys().nth(index)
656 }
657 Round::Fast | Round::MultiLeader(_) => None,
658 }
659 }
660
661 fn round_leader_index(&self, round: u32) -> Option<usize> {
663 let seed = u64::from(round)
664 .rotate_left(32)
665 .wrapping_add(*self.seed.get());
666 let mut rng = ChaCha8Rng::seed_from_u64(seed);
667 Some(self.distribution.get().as_ref()?.sample(&mut rng))
668 }
669
670 fn fallback_round_leader_index(&self, round: u32) -> Option<usize> {
673 let seed = u64::from(round)
674 .rotate_left(32)
675 .wrapping_add(*self.seed.get());
676 let mut rng = ChaCha8Rng::seed_from_u64(seed);
677 Some(self.fallback_distribution.get().as_ref()?.sample(&mut rng))
678 }
679
680 fn is_super(&self, owner: &AccountOwner) -> bool {
682 self.ownership.get().super_owners.contains(owner)
683 }
684
685 pub fn update_signed_proposal(&mut self, proposal: &BlockProposal) -> bool {
688 if proposal.content.round > Round::MultiLeader(u32::MAX) {
689 return false;
690 }
691 if let Some(old_proposal) = self.signed_proposal.get() {
692 if old_proposal.content.round >= proposal.content.round {
693 return false;
694 }
695 }
696 if let Some(old_proposal) = self.proposed.get() {
697 if old_proposal.content.round >= proposal.content.round {
698 return false;
699 }
700 }
701 self.signed_proposal.set(Some(proposal.clone()));
702 true
703 }
704
705 fn update_proposed(
707 &mut self,
708 proposal: BlockProposal,
709 blobs: BTreeMap<BlobId, Blob>,
710 ) -> Result<(), ViewError> {
711 if let Some(old_proposal) = self.proposed.get() {
712 if old_proposal.content.round >= proposal.content.round {
713 return Ok(());
714 }
715 }
716 if let Some(old_proposal) = self.signed_proposal.get() {
717 if old_proposal.content.round <= proposal.content.round {
718 self.signed_proposal.set(None);
719 }
720 }
721 self.proposed.set(Some(proposal));
722 self.proposed_blobs.clear();
723 for (blob_id, blob) in blobs {
724 self.proposed_blobs.insert(&blob_id, blob)?;
725 }
726 Ok(())
727 }
728
729 fn update_locking(
731 &mut self,
732 locking: LockingBlock,
733 blobs: BTreeMap<BlobId, Blob>,
734 ) -> Result<(), ViewError> {
735 if let Some(old_locked) = self.locking_block.get() {
736 if old_locked.round() >= locking.round() {
737 return Ok(());
738 }
739 }
740 self.locking_block.set(Some(locking));
741 self.locking_blobs.clear();
742 for (blob_id, blob) in blobs {
743 self.locking_blobs.insert(&blob_id, blob)?;
744 }
745 Ok(())
746 }
747}
748
749#[derive(Default, Clone, Debug, Serialize, Deserialize)]
751#[cfg_attr(with_testing, derive(Eq, PartialEq))]
752pub struct ChainManagerInfo {
753 pub ownership: ChainOwnership,
755 pub requested_signed_proposal: Option<Box<BlockProposal>>,
758 #[debug(skip_if = Option::is_none)]
760 pub requested_proposed: Option<Box<BlockProposal>>,
761 #[debug(skip_if = Option::is_none)]
764 pub requested_locking: Option<Box<LockingBlock>>,
765 #[debug(skip_if = Option::is_none)]
767 pub timeout: Option<Box<TimeoutCertificate>>,
768 #[debug(skip_if = Option::is_none)]
770 pub pending: Option<LiteVote>,
771 #[debug(skip_if = Option::is_none)]
773 pub timeout_vote: Option<LiteVote>,
774 #[debug(skip_if = Option::is_none)]
776 pub fallback_vote: Option<LiteVote>,
777 #[debug(skip_if = Option::is_none)]
779 pub requested_confirmed: Option<Box<ConfirmedBlock>>,
780 #[debug(skip_if = Option::is_none)]
782 pub requested_validated: Option<Box<ValidatedBlock>>,
783 pub current_round: Round,
785 #[debug(skip_if = Option::is_none)]
788 pub leader: Option<AccountOwner>,
789 #[debug(skip_if = Option::is_none)]
791 pub round_timeout: Option<Timestamp>,
792}
793
794impl<C> From<&ChainManager<C>> for ChainManagerInfo
795where
796 C: Context + Clone + Send + Sync + 'static,
797{
798 fn from(manager: &ChainManager<C>) -> Self {
799 let current_round = manager.current_round();
800 let pending = match (manager.confirmed_vote.get(), manager.validated_vote.get()) {
801 (None, None) => None,
802 (Some(confirmed_vote), Some(validated_vote))
803 if validated_vote.round > confirmed_vote.round =>
804 {
805 Some(validated_vote.lite())
806 }
807 (Some(vote), _) => Some(vote.lite()),
808 (None, Some(vote)) => Some(vote.lite()),
809 };
810 ChainManagerInfo {
811 ownership: manager.ownership.get().clone(),
812 requested_signed_proposal: None,
813 requested_proposed: None,
814 requested_locking: None,
815 timeout: manager.timeout.get().clone().map(Box::new),
816 pending,
817 timeout_vote: manager.timeout_vote.get().as_ref().map(Vote::lite),
818 fallback_vote: manager.fallback_vote.get().as_ref().map(Vote::lite),
819 requested_confirmed: None,
820 requested_validated: None,
821 current_round,
822 leader: manager.round_leader(current_round).copied(),
823 round_timeout: *manager.round_timeout.get(),
824 }
825 }
826}
827
828impl ChainManagerInfo {
829 pub fn add_values<C>(&mut self, manager: &ChainManager<C>)
831 where
832 C: Context + Clone + Send + Sync + 'static,
833 C::Extra: ExecutionRuntimeContext,
834 {
835 self.requested_signed_proposal = manager.signed_proposal.get().clone().map(Box::new);
836 self.requested_proposed = manager.proposed.get().clone().map(Box::new);
837 self.requested_locking = manager.locking_block.get().clone().map(Box::new);
838 self.requested_confirmed = manager
839 .confirmed_vote
840 .get()
841 .as_ref()
842 .map(|vote| Box::new(vote.value.clone()));
843 self.requested_validated = manager
844 .validated_vote
845 .get()
846 .as_ref()
847 .map(|vote| Box::new(vote.value.clone()));
848 }
849
850 pub fn can_propose(&self, identity: &AccountOwner, round: Round) -> bool {
853 match round {
854 Round::Fast => self.ownership.super_owners.contains(identity),
855 Round::MultiLeader(_) => true,
856 Round::SingleLeader(_) | Round::Validator(_) => self.leader.as_ref() == Some(identity),
857 }
858 }
859
860 pub fn already_handled_proposal(&self, round: Round, proposed_block: &ProposedBlock) -> bool {
862 self.requested_proposed.as_ref().is_some_and(|proposal| {
863 proposal.content.round == round && *proposed_block == proposal.content.block
864 })
865 }
866
867 pub fn has_locking_block_in_current_round(&self) -> bool {
869 self.requested_locking
870 .as_ref()
871 .is_some_and(|locking| locking.round() == self.current_round)
872 }
873}