1use std::collections::BTreeMap;
72
73use custom_debug_derive::Debug;
74use futures::future::Either;
75use linera_base::{
76 crypto::{AccountPublicKey, ValidatorSecretKey},
77 data_types::{Blob, BlockHeight, Epoch, Round, Timestamp},
78 ensure,
79 identifiers::{AccountOwner, BlobId, ChainId},
80 ownership::ChainOwnership,
81};
82use linera_execution::ExecutionRuntimeContext;
83use linera_views::{
84 context::Context,
85 map_view::MapView,
86 register_view::RegisterView,
87 views::{ClonableView, View},
88 ViewError,
89};
90use rand_chacha::{rand_core::SeedableRng, ChaCha8Rng};
91use rand_distr::{Distribution, WeightedAliasIndex};
92use serde::{Deserialize, Serialize};
93
94use crate::{
95 block::{Block, ConfirmedBlock, Timeout, ValidatedBlock},
96 data_types::{BlockProposal, LiteVote, OriginalProposal, ProposedBlock, Vote},
97 types::{TimeoutCertificate, ValidatedBlockCertificate},
98 ChainError,
99};
100
101#[derive(Eq, PartialEq)]
103pub enum Outcome {
104 Accept,
105 Skip,
106}
107
108pub type ValidatedOrConfirmedVote<'a> = Either<&'a Vote<ValidatedBlock>, &'a Vote<ConfirmedBlock>>;
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
114#[cfg_attr(with_testing, derive(Eq, PartialEq))]
115pub enum LockingBlock {
116 Fast(BlockProposal),
118 Regular(ValidatedBlockCertificate),
120}
121
122impl LockingBlock {
123 pub fn round(&self) -> Round {
126 match self {
127 Self::Fast(_) => Round::Fast,
128 Self::Regular(certificate) => certificate.round,
129 }
130 }
131
132 pub fn chain_id(&self) -> ChainId {
133 match self {
134 Self::Fast(proposal) => proposal.content.block.chain_id,
135 Self::Regular(certificate) => certificate.value().chain_id(),
136 }
137 }
138}
139
140#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject), graphql(complex))]
142#[derive(Debug, View, ClonableView)]
143pub struct ChainManager<C>
144where
145 C: Clone + Context + Send + Sync + 'static,
146{
147 pub ownership: RegisterView<C, ChainOwnership>,
149 pub seed: RegisterView<C, u64>,
151 #[cfg_attr(with_graphql, graphql(skip))] pub distribution: RegisterView<C, Option<WeightedAliasIndex<u64>>>,
154 #[cfg_attr(with_graphql, graphql(skip))] pub fallback_distribution: RegisterView<C, Option<WeightedAliasIndex<u64>>>,
157 #[cfg_attr(with_graphql, graphql(skip))]
160 pub proposed: RegisterView<C, Option<BlockProposal>>,
161 pub proposed_blobs: MapView<C, BlobId, Blob>,
163 #[cfg_attr(with_graphql, graphql(skip))]
166 pub locking_block: RegisterView<C, Option<LockingBlock>>,
167 pub locking_blobs: MapView<C, BlobId, Blob>,
169 #[cfg_attr(with_graphql, graphql(skip))]
171 pub timeout: RegisterView<C, Option<TimeoutCertificate>>,
172 #[cfg_attr(with_graphql, graphql(skip))]
174 pub confirmed_vote: RegisterView<C, Option<Vote<ConfirmedBlock>>>,
175 #[cfg_attr(with_graphql, graphql(skip))]
177 pub validated_vote: RegisterView<C, Option<Vote<ValidatedBlock>>>,
178 #[cfg_attr(with_graphql, graphql(skip))]
180 pub timeout_vote: RegisterView<C, Option<Vote<Timeout>>>,
181 #[cfg_attr(with_graphql, graphql(skip))]
183 pub fallback_vote: RegisterView<C, Option<Vote<Timeout>>>,
184 pub round_timeout: RegisterView<C, Option<Timestamp>>,
186 #[cfg_attr(with_graphql, graphql(skip))]
193 pub current_round: RegisterView<C, Round>,
194 pub fallback_owners: RegisterView<C, BTreeMap<AccountOwner, u64>>,
196}
197
198#[cfg(with_graphql)]
199#[async_graphql::ComplexObject]
200impl<C> ChainManager<C>
201where
202 C: Context + Clone + Send + Sync + 'static,
203{
204 #[graphql(derived(name = "current_round"))]
211 async fn _current_round(&self) -> Round {
212 self.current_round()
213 }
214}
215
216impl<C> ChainManager<C>
217where
218 C: Context + Clone + Send + Sync + 'static,
219{
220 pub fn reset<'a>(
222 &mut self,
223 ownership: ChainOwnership,
224 height: BlockHeight,
225 local_time: Timestamp,
226 fallback_owners: impl Iterator<Item = (AccountPublicKey, u64)> + 'a,
227 ) -> Result<(), ChainError> {
228 let distribution = if !ownership.owners.is_empty() {
229 let weights = ownership.owners.values().copied().collect();
230 Some(WeightedAliasIndex::new(weights)?)
231 } else {
232 None
233 };
234 let fallback_owners = fallback_owners
235 .map(|(pub_key, weight)| (AccountOwner::from(pub_key), weight))
236 .collect::<BTreeMap<_, _>>();
237 let fallback_distribution = if !fallback_owners.is_empty() {
238 let weights = fallback_owners.values().copied().collect();
239 Some(WeightedAliasIndex::new(weights)?)
240 } else {
241 None
242 };
243
244 let current_round = ownership.first_round();
245 let round_duration = ownership.round_timeout(current_round);
246 let round_timeout = round_duration.map(|rd| local_time.saturating_add(rd));
247
248 self.clear();
249 self.seed.set(height.0);
250 self.ownership.set(ownership);
251 self.distribution.set(distribution);
252 self.fallback_distribution.set(fallback_distribution);
253 self.fallback_owners.set(fallback_owners);
254 self.current_round.set(current_round);
255 self.round_timeout.set(round_timeout);
256 Ok(())
257 }
258
259 pub fn confirmed_vote(&self) -> Option<&Vote<ConfirmedBlock>> {
261 self.confirmed_vote.get().as_ref()
262 }
263
264 pub fn validated_vote(&self) -> Option<&Vote<ValidatedBlock>> {
266 self.validated_vote.get().as_ref()
267 }
268
269 pub fn timeout_vote(&self) -> Option<&Vote<Timeout>> {
271 self.timeout_vote.get().as_ref()
272 }
273
274 pub fn fallback_vote(&self) -> Option<&Vote<Timeout>> {
276 self.fallback_vote.get().as_ref()
277 }
278
279 pub fn current_round(&self) -> Round {
286 *self.current_round.get()
287 }
288
289 pub fn check_proposed_block(&self, proposal: &BlockProposal) -> Result<Outcome, ChainError> {
291 let new_block = &proposal.content.block;
292 let new_round = proposal.content.round;
293 if let Some(old_proposal) = self.proposed.get() {
294 if old_proposal.content == proposal.content {
295 return Ok(Outcome::Skip); }
297 }
298 ensure!(
300 new_block.height < BlockHeight::MAX,
301 ChainError::InvalidBlockHeight
302 );
303 let current_round = self.current_round();
304 match new_round {
305 Round::Fast => {}
308 Round::MultiLeader(_) | Round::SingleLeader(0) => {
309 ensure!(
312 self.is_super(&proposal.public_key.into()) || !current_round.is_fast(),
313 ChainError::WrongRound(current_round)
314 );
315 ensure!(
317 new_round >= current_round,
318 ChainError::InsufficientRound(new_round)
319 );
320 }
321 Round::SingleLeader(_) | Round::Validator(_) => {
322 ensure!(
324 new_round == current_round,
325 ChainError::WrongRound(current_round)
326 );
327 }
328 }
329 if let Some(vote) = self.validated_vote() {
331 ensure!(
332 new_round > vote.round,
333 ChainError::InsufficientRoundStrict(vote.round)
334 );
335 }
336 if let Some(locking_block) = self.locking_block.get() {
338 ensure!(
339 locking_block.round() < new_round,
340 ChainError::MustBeNewerThanLockingBlock(new_block.height, locking_block.round())
341 );
342 }
343 if let Some(vote) = self.confirmed_vote() {
346 ensure!(
347 match proposal.original_proposal.as_ref() {
348 None => false,
349 Some(OriginalProposal::Regular { certificate }) =>
350 vote.round <= certificate.round,
351 Some(OriginalProposal::Fast { .. }) => {
352 vote.round.is_fast() && vote.value().matches_proposed_block(new_block)
353 }
354 },
355 ChainError::HasIncompatibleConfirmedVote(new_block.height, vote.round)
356 );
357 }
358 Ok(Outcome::Accept)
359 }
360
361 pub fn vote_timeout(
363 &mut self,
364 chain_id: ChainId,
365 height: BlockHeight,
366 epoch: Epoch,
367 key_pair: Option<&ValidatorSecretKey>,
368 local_time: Timestamp,
369 ) -> bool {
370 let Some(key_pair) = key_pair else {
371 return false; };
373 let Some(round_timeout) = *self.round_timeout.get() else {
374 return false; };
376 if local_time < round_timeout || self.ownership.get().owners.is_empty() {
377 return false; }
379 let current_round = self.current_round();
380 if let Some(vote) = self.timeout_vote.get() {
381 if vote.round == current_round {
382 return false; }
384 }
385 let value = Timeout::new(chain_id, height, epoch);
386 self.timeout_vote
387 .set(Some(Vote::new(value, current_round, key_pair)));
388 true
389 }
390
391 pub fn vote_fallback(
396 &mut self,
397 chain_id: ChainId,
398 height: BlockHeight,
399 epoch: Epoch,
400 key_pair: Option<&ValidatorSecretKey>,
401 ) -> bool {
402 let Some(key_pair) = key_pair else {
403 return false; };
405 if self.fallback_vote.get().is_some() || self.current_round() >= Round::Validator(0) {
406 return false; }
408 let value = Timeout::new(chain_id, height, epoch);
409 let last_regular_round = Round::SingleLeader(u32::MAX);
410 self.fallback_vote
411 .set(Some(Vote::new(value, last_regular_round, key_pair)));
412 true
413 }
414
415 pub fn check_validated_block(
417 &self,
418 certificate: &ValidatedBlockCertificate,
419 ) -> Result<Outcome, ChainError> {
420 let new_block = certificate.block();
421 let new_round = certificate.round;
422 if let Some(Vote { value, round, .. }) = self.confirmed_vote.get() {
423 if value.block() == new_block && *round == new_round {
424 return Ok(Outcome::Skip); }
426 }
427
428 if let Some(Vote { round, .. }) = self.validated_vote.get() {
430 ensure!(new_round >= *round, ChainError::InsufficientRound(*round))
431 }
432
433 if let Some(locking) = self.locking_block.get() {
434 if let LockingBlock::Regular(locking_cert) = locking {
435 if locking_cert.hash() == certificate.hash() && locking.round() == new_round {
436 return Ok(Outcome::Skip); }
438 }
439 ensure!(
440 new_round > locking.round(),
441 ChainError::InsufficientRoundStrict(locking.round())
442 );
443 }
444 Ok(Outcome::Accept)
445 }
446
447 pub fn create_vote(
449 &mut self,
450 proposal: BlockProposal,
451 block: Block,
452 key_pair: Option<&ValidatorSecretKey>,
453 local_time: Timestamp,
454 blobs: BTreeMap<BlobId, Blob>,
455 ) -> Result<Option<ValidatedOrConfirmedVote>, ChainError> {
456 let round = proposal.content.round;
457
458 match &proposal.original_proposal {
459 Some(OriginalProposal::Regular { certificate }) => {
461 if self
462 .locking_block
463 .get()
464 .as_ref()
465 .is_none_or(|locking| locking.round() < certificate.round)
466 {
467 let value = ValidatedBlock::new(block.clone());
468 if let Some(certificate) = certificate.clone().with_value(value) {
469 self.update_locking(LockingBlock::Regular(certificate), blobs.clone())?;
470 }
471 }
472 }
473 Some(OriginalProposal::Fast {
476 public_key,
477 signature,
478 }) => {
479 if self.locking_block.get().is_none() {
480 let original_proposal = BlockProposal {
481 public_key: *public_key,
482 signature: *signature,
483 ..proposal.clone()
484 };
485 self.update_locking(LockingBlock::Fast(original_proposal), blobs.clone())?;
486 }
487 }
488 None => {
491 if round.is_fast() && self.locking_block.get().is_none() {
492 self.update_locking(LockingBlock::Fast(proposal.clone()), blobs.clone())?;
494 }
495 }
496 }
497
498 self.update_proposed(proposal.clone(), blobs)?;
500 self.update_current_round(local_time);
501
502 let Some(key_pair) = key_pair else {
503 return Ok(None);
505 };
506
507 if round.is_fast() {
509 self.validated_vote.set(None);
510 let value = ConfirmedBlock::new(block);
511 let vote = Vote::new(value, round, key_pair);
512 Ok(Some(Either::Right(
513 self.confirmed_vote.get_mut().insert(vote),
514 )))
515 } else {
516 let value = ValidatedBlock::new(block);
517 let vote = Vote::new(value, round, key_pair);
518 Ok(Some(Either::Left(
519 self.validated_vote.get_mut().insert(vote),
520 )))
521 }
522 }
523
524 pub fn create_final_vote(
526 &mut self,
527 validated: ValidatedBlockCertificate,
528 key_pair: Option<&ValidatorSecretKey>,
529 local_time: Timestamp,
530 blobs: BTreeMap<BlobId, Blob>,
531 ) -> Result<(), ViewError> {
532 let round = validated.round;
533 let confirmed_block = ConfirmedBlock::new(validated.inner().block().clone());
534 self.update_locking(LockingBlock::Regular(validated), blobs)?;
535 self.update_current_round(local_time);
536 if let Some(key_pair) = key_pair {
537 if self.current_round() != round {
538 return Ok(()); }
540 let vote = Vote::new(confirmed_block, round, key_pair);
542 self.confirmed_vote.set(Some(vote));
544 self.validated_vote.set(None);
545 }
546 Ok(())
547 }
548
549 pub async fn pending_blob(&self, blob_id: &BlobId) -> Result<Option<Blob>, ViewError> {
551 if let Some(blob) = self.proposed_blobs.get(blob_id).await? {
552 return Ok(Some(blob));
553 }
554 self.locking_blobs.get(blob_id).await
555 }
556
557 fn update_current_round(&mut self, local_time: Timestamp) {
561 let current_round = self
562 .timeout
563 .get()
564 .iter()
565 .map(|certificate| {
566 self.ownership
567 .get()
568 .next_round(certificate.round)
569 .unwrap_or(Round::Validator(u32::MAX))
570 })
571 .chain(self.locking_block.get().as_ref().map(LockingBlock::round))
572 .chain(
573 self.proposed
574 .get()
575 .iter()
576 .map(|proposal| proposal.content.round),
577 )
578 .max()
579 .unwrap_or_default()
580 .max(self.ownership.get().first_round());
581 if current_round <= self.current_round() {
582 return;
583 }
584 let round_duration = self.ownership.get().round_timeout(current_round);
585 self.round_timeout
586 .set(round_duration.map(|rd| local_time.saturating_add(rd)));
587 self.current_round.set(current_round);
588 }
589
590 pub fn handle_timeout_certificate(
593 &mut self,
594 certificate: TimeoutCertificate,
595 local_time: Timestamp,
596 ) {
597 let round = certificate.round;
598 if let Some(known_certificate) = self.timeout.get() {
599 if known_certificate.round >= round {
600 return;
601 }
602 }
603 self.timeout.set(Some(certificate));
604 self.update_current_round(local_time);
605 }
606
607 pub fn verify_owner(&self, proposal: &BlockProposal) -> bool {
610 let owner = &proposal.public_key.into();
611 if self.ownership.get().super_owners.contains(owner) {
612 return true;
613 }
614 match proposal.content.round {
615 Round::Fast => {
616 false }
618 Round::MultiLeader(_) => {
619 let ownership = self.ownership.get();
620 ownership.open_multi_leader_rounds || ownership.owners.contains_key(owner)
622 }
623 Round::SingleLeader(r) => {
624 let Some(index) = self.round_leader_index(r) else {
625 return false;
626 };
627 self.ownership.get().owners.keys().nth(index) == Some(owner)
628 }
629 Round::Validator(r) => {
630 let Some(index) = self.fallback_round_leader_index(r) else {
631 return false;
632 };
633 self.fallback_owners.get().keys().nth(index) == Some(owner)
634 }
635 }
636 }
637
638 fn round_leader(&self, round: Round) -> Option<&AccountOwner> {
641 match round {
642 Round::SingleLeader(r) => {
643 let index = self.round_leader_index(r)?;
644 self.ownership.get().owners.keys().nth(index)
645 }
646 Round::Validator(r) => {
647 let index = self.fallback_round_leader_index(r)?;
648 self.fallback_owners.get().keys().nth(index)
649 }
650 Round::Fast | Round::MultiLeader(_) => None,
651 }
652 }
653
654 fn round_leader_index(&self, round: u32) -> Option<usize> {
656 let seed = u64::from(round)
657 .rotate_left(32)
658 .wrapping_add(*self.seed.get());
659 let mut rng = ChaCha8Rng::seed_from_u64(seed);
660 Some(self.distribution.get().as_ref()?.sample(&mut rng))
661 }
662
663 fn fallback_round_leader_index(&self, round: u32) -> Option<usize> {
666 let seed = u64::from(round)
667 .rotate_left(32)
668 .wrapping_add(*self.seed.get());
669 let mut rng = ChaCha8Rng::seed_from_u64(seed);
670 Some(self.fallback_distribution.get().as_ref()?.sample(&mut rng))
671 }
672
673 fn is_super(&self, owner: &AccountOwner) -> bool {
675 self.ownership.get().super_owners.contains(owner)
676 }
677
678 fn update_proposed(
680 &mut self,
681 proposal: BlockProposal,
682 blobs: BTreeMap<BlobId, Blob>,
683 ) -> Result<(), ViewError> {
684 if let Some(old_proposal) = self.proposed.get() {
685 if old_proposal.content.round >= proposal.content.round {
686 return Ok(());
687 }
688 }
689 self.proposed.set(Some(proposal));
690 self.proposed_blobs.clear();
691 for (blob_id, blob) in blobs {
692 self.proposed_blobs.insert(&blob_id, blob)?;
693 }
694 Ok(())
695 }
696
697 fn update_locking(
699 &mut self,
700 locking: LockingBlock,
701 blobs: BTreeMap<BlobId, Blob>,
702 ) -> Result<(), ViewError> {
703 if let Some(old_locked) = self.locking_block.get() {
704 if old_locked.round() >= locking.round() {
705 return Ok(());
706 }
707 }
708 self.locking_block.set(Some(locking));
709 self.locking_blobs.clear();
710 for (blob_id, blob) in blobs {
711 self.locking_blobs.insert(&blob_id, blob)?;
712 }
713 Ok(())
714 }
715}
716
717#[derive(Default, Clone, Debug, Serialize, Deserialize)]
719#[cfg_attr(with_testing, derive(Eq, PartialEq))]
720pub struct ChainManagerInfo {
721 pub ownership: ChainOwnership,
723 #[debug(skip_if = Option::is_none)]
725 pub requested_proposed: Option<Box<BlockProposal>>,
726 #[debug(skip_if = Option::is_none)]
729 pub requested_locking: Option<Box<LockingBlock>>,
730 #[debug(skip_if = Option::is_none)]
732 pub timeout: Option<Box<TimeoutCertificate>>,
733 #[debug(skip_if = Option::is_none)]
735 pub pending: Option<LiteVote>,
736 #[debug(skip_if = Option::is_none)]
738 pub timeout_vote: Option<LiteVote>,
739 #[debug(skip_if = Option::is_none)]
741 pub fallback_vote: Option<LiteVote>,
742 #[debug(skip_if = Option::is_none)]
744 pub requested_confirmed: Option<Box<ConfirmedBlock>>,
745 #[debug(skip_if = Option::is_none)]
747 pub requested_validated: Option<Box<ValidatedBlock>>,
748 pub current_round: Round,
750 #[debug(skip_if = Option::is_none)]
753 pub leader: Option<AccountOwner>,
754 #[debug(skip_if = Option::is_none)]
756 pub round_timeout: Option<Timestamp>,
757}
758
759impl<C> From<&ChainManager<C>> for ChainManagerInfo
760where
761 C: Context + Clone + Send + Sync + 'static,
762{
763 fn from(manager: &ChainManager<C>) -> Self {
764 let current_round = manager.current_round();
765 let pending = match (manager.confirmed_vote.get(), manager.validated_vote.get()) {
766 (None, None) => None,
767 (Some(confirmed_vote), Some(validated_vote))
768 if validated_vote.round > confirmed_vote.round =>
769 {
770 Some(validated_vote.lite())
771 }
772 (Some(vote), _) => Some(vote.lite()),
773 (None, Some(vote)) => Some(vote.lite()),
774 };
775 ChainManagerInfo {
776 ownership: manager.ownership.get().clone(),
777 requested_proposed: None,
778 requested_locking: None,
779 timeout: manager.timeout.get().clone().map(Box::new),
780 pending,
781 timeout_vote: manager.timeout_vote.get().as_ref().map(Vote::lite),
782 fallback_vote: manager.fallback_vote.get().as_ref().map(Vote::lite),
783 requested_confirmed: None,
784 requested_validated: None,
785 current_round,
786 leader: manager.round_leader(current_round).cloned(),
787 round_timeout: *manager.round_timeout.get(),
788 }
789 }
790}
791
792impl ChainManagerInfo {
793 pub fn add_values<C>(&mut self, manager: &ChainManager<C>)
795 where
796 C: Context + Clone + Send + Sync + 'static,
797 C::Extra: ExecutionRuntimeContext,
798 {
799 self.requested_proposed = manager.proposed.get().clone().map(Box::new);
800 self.requested_locking = manager.locking_block.get().clone().map(Box::new);
801 self.requested_confirmed = manager
802 .confirmed_vote
803 .get()
804 .as_ref()
805 .map(|vote| Box::new(vote.value.clone()));
806 self.requested_validated = manager
807 .validated_vote
808 .get()
809 .as_ref()
810 .map(|vote| Box::new(vote.value.clone()));
811 }
812
813 pub fn can_propose(&self, identity: &AccountOwner, round: Round) -> bool {
816 match round {
817 Round::Fast => self.ownership.super_owners.contains(identity),
818 Round::MultiLeader(_) => true,
819 Round::SingleLeader(_) | Round::Validator(_) => self.leader.as_ref() == Some(identity),
820 }
821 }
822
823 pub fn already_handled_proposal(&self, round: Round, proposed_block: &ProposedBlock) -> bool {
825 self.requested_proposed.as_ref().is_some_and(|proposal| {
826 proposal.content.round == round && *proposed_block == proposal.content.block
827 })
828 }
829
830 pub fn has_locking_block_in_current_round(&self) -> bool {
832 self.requested_locking
833 .as_ref()
834 .is_some_and(|locking| locking.round() == self.current_round)
835 }
836}