use std::{
collections::{BTreeSet, HashSet},
fmt,
};
use async_graphql::SimpleObject;
use custom_debug_derive::Debug;
use linera_base::{
bcs,
crypto::{
AccountPublicKey, AccountSecretKey, AccountSignature, BcsHashable, BcsSignable,
CryptoError, CryptoHash, ValidatorPublicKey, ValidatorSecretKey, ValidatorSignature,
},
data_types::{Amount, BlockHeight, Event, OracleResponse, Round, Timestamp},
doc_scalar, ensure,
hashed::Hashed,
identifiers::{
Account, BlobId, BlobType, ChainId, ChannelFullName, Destination, MessageId, Owner,
},
};
use linera_execution::{
committee::{Committee, Epoch},
system::OpenChainConfig,
Message, MessageKind, Operation, SystemMessage, SystemOperation,
};
use serde::{Deserialize, Serialize};
use crate::{
block::ValidatedBlock,
types::{
CertificateKind, CertificateValue, GenericCertificate, LiteCertificate,
ValidatedBlockCertificate,
},
ChainError,
};
#[cfg(test)]
#[path = "unit_tests/data_types_tests.rs"]
mod data_types_tests;
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, SimpleObject)]
pub struct ProposedBlock {
pub chain_id: ChainId,
pub epoch: Epoch,
#[debug(skip_if = Vec::is_empty)]
pub incoming_bundles: Vec<IncomingBundle>,
#[debug(skip_if = Vec::is_empty)]
pub operations: Vec<Operation>,
pub height: BlockHeight,
pub timestamp: Timestamp,
#[debug(skip_if = Option::is_none)]
pub authenticated_signer: Option<Owner>,
pub previous_block_hash: Option<CryptoHash>,
}
impl ProposedBlock {
pub fn published_blob_ids(&self) -> BTreeSet<BlobId> {
let mut blob_ids = BTreeSet::new();
for operation in &self.operations {
if let Operation::System(SystemOperation::PublishDataBlob { blob_hash }) = operation {
blob_ids.insert(BlobId::new(*blob_hash, BlobType::Data));
}
if let Operation::System(SystemOperation::PublishBytecode { bytecode_id }) = operation {
blob_ids.extend([
BlobId::new(bytecode_id.contract_blob_hash, BlobType::ContractBytecode),
BlobId::new(bytecode_id.service_blob_hash, BlobType::ServiceBytecode),
]);
}
}
blob_ids
}
pub fn has_only_rejected_messages(&self) -> bool {
self.operations.is_empty()
&& self
.incoming_bundles
.iter()
.all(|message| message.action == MessageAction::Reject)
}
pub fn incoming_messages(&self) -> impl Iterator<Item = &PostedMessage> {
self.incoming_bundles
.iter()
.flat_map(|incoming_bundle| &incoming_bundle.bundle.messages)
}
pub fn message_count(&self) -> usize {
self.incoming_bundles
.iter()
.map(|im| im.bundle.messages.len())
.sum()
}
pub fn transactions(&self) -> impl Iterator<Item = (u32, Transaction<'_>)> {
let bundles = self
.incoming_bundles
.iter()
.map(Transaction::ReceiveMessages);
let operations = self.operations.iter().map(Transaction::ExecuteOperation);
(0u32..).zip(bundles.chain(operations))
}
pub fn starts_with_open_chain_message(
&self,
) -> Option<(&IncomingBundle, &PostedMessage, &OpenChainConfig)> {
let in_bundle = self.incoming_bundles.first()?;
if in_bundle.action != MessageAction::Accept {
return None;
}
let posted_message = in_bundle.bundle.messages.first()?;
let config = posted_message.message.matches_open_chain()?;
Some((in_bundle, posted_message, config))
}
pub fn check_proposal_size(&self, maximum_block_proposal_size: u64) -> Result<(), ChainError> {
let size = bcs::serialized_size(self)?;
ensure!(
size <= usize::try_from(maximum_block_proposal_size).unwrap_or(usize::MAX),
ChainError::BlockProposalTooLarge
);
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum Transaction<'a> {
ReceiveMessages(&'a IncomingBundle),
ExecuteOperation(&'a Operation),
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, SimpleObject)]
pub struct ChainAndHeight {
pub chain_id: ChainId,
pub height: BlockHeight,
}
impl ChainAndHeight {
pub fn to_message_id(&self, index: u32) -> MessageId {
MessageId {
chain_id: self.chain_id,
height: self.height,
index,
}
}
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, SimpleObject)]
pub struct IncomingBundle {
pub origin: Origin,
pub bundle: MessageBundle,
pub action: MessageAction,
}
impl IncomingBundle {
pub fn messages_and_ids(&self) -> impl Iterator<Item = (MessageId, &PostedMessage)> {
let chain_and_height = ChainAndHeight {
chain_id: self.origin.sender,
height: self.bundle.height,
};
let messages = self.bundle.messages.iter();
messages.map(move |posted_message| {
let message_id = chain_and_height.to_message_id(posted_message.index);
(message_id, posted_message)
})
}
pub fn put_openchain_at_front(bundles: &mut [IncomingBundle]) -> bool {
let Some(index) = bundles.iter().position(|msg| {
matches!(
msg.bundle.messages.first(),
Some(PostedMessage {
message: Message::System(SystemMessage::OpenChain(_)),
..
})
)
}) else {
return false;
};
bundles[0..=index].rotate_right(1);
true
}
}
impl<'de> BcsHashable<'de> for IncomingBundle {}
#[derive(Copy, Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum MessageAction {
Accept,
Reject,
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Serialize, Deserialize)]
pub struct Origin {
pub sender: ChainId,
pub medium: Medium,
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Serialize, Deserialize)]
pub struct Target {
pub recipient: ChainId,
pub medium: Medium,
}
#[derive(Debug, Eq, PartialEq, Clone, Hash, Serialize, Deserialize, SimpleObject)]
pub struct MessageBundle {
pub height: BlockHeight,
pub timestamp: Timestamp,
pub certificate_hash: CryptoHash,
pub transaction_index: u32,
pub messages: Vec<PostedMessage>,
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Serialize, Deserialize)]
pub enum Medium {
Direct,
Channel(ChannelFullName),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(with_testing, derive(Eq, PartialEq))]
pub struct BlockProposal {
pub content: ProposalContent,
pub public_key: AccountPublicKey,
pub signature: AccountSignature,
#[debug(skip_if = Option::is_none)]
pub validated_block_certificate: Option<LiteCertificate<'static>>,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, SimpleObject)]
pub struct OutgoingMessage {
pub destination: Destination,
#[debug(skip_if = Option::is_none)]
pub authenticated_signer: Option<Owner>,
#[debug(skip_if = Amount::is_zero)]
pub grant: Amount,
#[debug(skip_if = Option::is_none)]
pub refund_grant_to: Option<Account>,
pub kind: MessageKind,
pub message: Message,
}
impl<'de> BcsHashable<'de> for OutgoingMessage {}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, SimpleObject)]
pub struct PostedMessage {
#[debug(skip_if = Option::is_none)]
pub authenticated_signer: Option<Owner>,
#[debug(skip_if = Amount::is_zero)]
pub grant: Amount,
#[debug(skip_if = Option::is_none)]
pub refund_grant_to: Option<Account>,
pub kind: MessageKind,
pub index: u32,
pub message: Message,
}
impl OutgoingMessage {
pub fn has_destination(&self, medium: &Medium, recipient: ChainId) -> bool {
match (&self.destination, medium) {
(Destination::Recipient(_), Medium::Channel(_))
| (Destination::Subscribers(_), Medium::Direct) => false,
(Destination::Recipient(id), Medium::Direct) => *id == recipient,
(
Destination::Subscribers(dest_name),
Medium::Channel(ChannelFullName {
application_id,
name,
}),
) => *application_id == self.message.application_id() && name == dest_name,
}
}
pub fn into_posted(self, index: u32) -> PostedMessage {
let OutgoingMessage {
destination: _,
authenticated_signer,
grant,
refund_grant_to,
kind,
message,
} = self;
PostedMessage {
authenticated_signer,
grant,
refund_grant_to,
kind,
index,
message,
}
}
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, SimpleObject)]
pub struct ExecutedBlock {
pub block: ProposedBlock,
pub outcome: BlockExecutionOutcome,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, SimpleObject)]
#[cfg_attr(with_testing, derive(Default))]
pub struct BlockExecutionOutcome {
pub messages: Vec<Vec<OutgoingMessage>>,
pub state_hash: CryptoHash,
pub oracle_responses: Vec<Vec<OracleResponse>>,
pub events: Vec<Vec<Event>>,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct LiteValue {
pub value_hash: CryptoHash,
pub chain_id: ChainId,
pub kind: CertificateKind,
}
impl LiteValue {
pub fn new<T: CertificateValue>(value: &Hashed<T>) -> Self {
LiteValue {
value_hash: value.hash(),
chain_id: value.inner().chain_id(),
kind: T::KIND,
}
}
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
struct VoteValue(CryptoHash, Round, CertificateKind);
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound(deserialize = "T: BcsHashable<'de>"))]
pub struct Vote<T> {
pub value: Hashed<T>,
pub round: Round,
pub public_key: ValidatorPublicKey,
pub signature: ValidatorSignature,
}
impl<T> Vote<T> {
pub fn new(value: Hashed<T>, round: Round, key_pair: &ValidatorSecretKey) -> Self
where
T: CertificateValue,
{
let hash_and_round = VoteValue(value.hash(), round, T::KIND);
let signature = ValidatorSignature::new(&hash_and_round, key_pair);
Self {
value,
round,
public_key: key_pair.public(),
signature,
}
}
pub fn lite(&self) -> LiteVote
where
T: CertificateValue,
{
LiteVote {
value: LiteValue::new(&self.value),
round: self.round,
public_key: self.public_key,
signature: self.signature,
}
}
pub fn value(&self) -> &Hashed<T> {
&self.value
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(with_testing, derive(Eq, PartialEq))]
pub struct LiteVote {
pub value: LiteValue,
pub round: Round,
pub public_key: ValidatorPublicKey,
pub signature: ValidatorSignature,
}
impl LiteVote {
#[cfg(any(feature = "benchmark", with_testing))]
pub fn with_value<T>(self, value: Hashed<T>) -> Option<Vote<T>> {
if self.value.value_hash != value.hash() {
return None;
}
Some(Vote {
value,
round: self.round,
public_key: self.public_key,
signature: self.signature,
})
}
pub fn kind(&self) -> CertificateKind {
self.value.kind
}
}
impl fmt::Display for Origin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.medium {
Medium::Direct => write!(f, "{:.8} (direct)", self.sender),
Medium::Channel(full_name) => write!(f, "{:.8} via {full_name:.8}", self.sender),
}
}
}
impl Origin {
pub fn chain(sender: ChainId) -> Self {
Self {
sender,
medium: Medium::Direct,
}
}
pub fn channel(sender: ChainId, name: ChannelFullName) -> Self {
Self {
sender,
medium: Medium::Channel(name),
}
}
}
impl Target {
pub fn chain(recipient: ChainId) -> Self {
Self {
recipient,
medium: Medium::Direct,
}
}
pub fn channel(recipient: ChainId, name: ChannelFullName) -> Self {
Self {
recipient,
medium: Medium::Channel(name),
}
}
}
impl MessageBundle {
pub fn is_skippable(&self) -> bool {
self.messages.iter().all(PostedMessage::is_skippable)
}
pub fn is_tracked(&self) -> bool {
let mut tracked = false;
for posted_message in &self.messages {
match posted_message.kind {
MessageKind::Simple | MessageKind::Bouncing => {}
MessageKind::Protected => return false,
MessageKind::Tracked => tracked = true,
}
}
tracked
}
pub fn is_protected(&self) -> bool {
self.messages.iter().any(PostedMessage::is_protected)
}
pub fn goes_to_inbox(&self) -> bool {
self.messages
.iter()
.any(|posted_message| posted_message.message.goes_to_inbox())
}
}
impl PostedMessage {
pub fn is_skippable(&self) -> bool {
match self.kind {
MessageKind::Protected | MessageKind::Tracked => false,
MessageKind::Simple | MessageKind::Bouncing => self.grant == Amount::ZERO,
}
}
pub fn is_protected(&self) -> bool {
matches!(self.kind, MessageKind::Protected)
}
pub fn is_tracked(&self) -> bool {
matches!(self.kind, MessageKind::Tracked)
}
pub fn is_bouncing(&self) -> bool {
matches!(self.kind, MessageKind::Bouncing)
}
}
impl ExecutedBlock {
pub fn messages(&self) -> &Vec<Vec<OutgoingMessage>> {
&self.outcome.messages
}
pub fn message_bundles_for<'a>(
&'a self,
medium: &'a Medium,
recipient: ChainId,
certificate_hash: CryptoHash,
) -> impl Iterator<Item = (Epoch, MessageBundle)> + 'a {
let mut index = 0u32;
let block_height = self.block.height;
let block_timestamp = self.block.timestamp;
let block_epoch = self.block.epoch;
(0u32..)
.zip(self.messages())
.filter_map(move |(transaction_index, txn_messages)| {
let messages = (index..)
.zip(txn_messages)
.filter(|(_, message)| message.has_destination(medium, recipient))
.map(|(idx, message)| message.clone().into_posted(idx))
.collect::<Vec<_>>();
index += txn_messages.len() as u32;
(!messages.is_empty()).then(|| {
let bundle = MessageBundle {
height: block_height,
timestamp: block_timestamp,
certificate_hash,
transaction_index,
messages,
};
(block_epoch, bundle)
})
})
}
pub fn message_id_for_operation(
&self,
operation_index: usize,
message_index: u32,
) -> Option<MessageId> {
let block = &self.block;
let transaction_index = block.incoming_bundles.len().checked_add(operation_index)?;
if message_index
>= u32::try_from(self.outcome.messages.get(transaction_index)?.len()).ok()?
{
return None;
}
let first_message_index = u32::try_from(
self.outcome
.messages
.iter()
.take(transaction_index)
.map(Vec::len)
.sum::<usize>(),
)
.ok()?;
let index = first_message_index.checked_add(message_index)?;
Some(self.message_id(index))
}
pub fn message_by_id(&self, message_id: &MessageId) -> Option<&OutgoingMessage> {
let MessageId {
chain_id,
height,
index,
} = message_id;
if self.block.chain_id != *chain_id || self.block.height != *height {
return None;
}
let mut index = usize::try_from(*index).ok()?;
for messages in self.messages() {
if let Some(message) = messages.get(index) {
return Some(message);
}
index -= messages.len();
}
None
}
pub fn message_id(&self, index: u32) -> MessageId {
MessageId {
chain_id: self.block.chain_id,
height: self.block.height,
index,
}
}
pub fn required_blob_ids(&self) -> HashSet<BlobId> {
let mut blob_ids = self.outcome.oracle_blob_ids();
blob_ids.extend(self.block.published_blob_ids());
blob_ids
}
pub fn requires_blob(&self, blob_id: &BlobId) -> bool {
self.outcome.oracle_blob_ids().contains(blob_id)
|| self.block.published_blob_ids().contains(blob_id)
}
}
impl BlockExecutionOutcome {
pub fn with(self, block: ProposedBlock) -> ExecutedBlock {
ExecutedBlock {
block,
outcome: self,
}
}
pub fn oracle_blob_ids(&self) -> HashSet<BlobId> {
let mut required_blob_ids = HashSet::new();
for responses in &self.oracle_responses {
for response in responses {
if let OracleResponse::Blob(blob_id) = response {
required_blob_ids.insert(*blob_id);
}
}
}
required_blob_ids
}
pub fn has_oracle_responses(&self) -> bool {
self.oracle_responses
.iter()
.any(|responses| !responses.is_empty())
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct ProposalContent {
pub block: ProposedBlock,
pub round: Round,
#[debug(skip_if = Option::is_none)]
pub outcome: Option<BlockExecutionOutcome>,
}
impl BlockProposal {
pub fn new_initial(round: Round, block: ProposedBlock, secret: &AccountSecretKey) -> Self {
let content = ProposalContent {
round,
block,
outcome: None,
};
let signature = AccountSignature::new(&content, secret);
Self {
content,
public_key: secret.public(),
signature,
validated_block_certificate: None,
}
}
pub fn new_retry(
round: Round,
validated_block_certificate: ValidatedBlockCertificate,
secret: &AccountSecretKey,
) -> Self {
let lite_cert = validated_block_certificate.lite_certificate().cloned();
let block = validated_block_certificate.into_inner().into_inner();
let executed_block: ExecutedBlock = block.into();
let content = ProposalContent {
block: executed_block.block,
round,
outcome: Some(executed_block.outcome),
};
let signature = AccountSignature::new(&content, secret);
Self {
content,
public_key: secret.public(),
signature,
validated_block_certificate: Some(lite_cert),
}
}
pub fn check_signature(&self) -> Result<(), CryptoError> {
self.signature.verify(&self.content, self.public_key)
}
pub fn required_blob_ids(&self) -> impl Iterator<Item = BlobId> + '_ {
self.content.block.published_blob_ids().into_iter().chain(
self.content
.outcome
.iter()
.flat_map(|outcome| outcome.oracle_blob_ids()),
)
}
pub fn check_invariants(&self) -> Result<(), &'static str> {
match (&self.validated_block_certificate, &self.content.outcome) {
(None, None) => {}
(None, Some(_)) | (Some(_), None) => {
return Err("Must contain a validation certificate if and only if \
it contains the execution outcome from a previous round");
}
(Some(lite_certificate), Some(outcome)) => {
let executed_block = outcome.clone().with(self.content.block.clone());
let value = Hashed::new(ValidatedBlock::new(executed_block));
ensure!(
lite_certificate.check_value(&value),
"Lite certificate must match the given block and execution outcome"
);
}
}
Ok(())
}
}
impl LiteVote {
pub fn new(value: LiteValue, round: Round, secret_key: &ValidatorSecretKey) -> Self {
let hash_and_round = VoteValue(value.value_hash, round, value.kind);
let signature = ValidatorSignature::new(&hash_and_round, secret_key);
Self {
value,
round,
public_key: secret_key.public(),
signature,
}
}
pub fn check(&self) -> Result<(), ChainError> {
let hash_and_round = VoteValue(self.value.value_hash, self.round, self.value.kind);
Ok(self.signature.check(&hash_and_round, &self.public_key)?)
}
}
pub struct SignatureAggregator<'a, T> {
committee: &'a Committee,
weight: u64,
used_validators: HashSet<ValidatorPublicKey>,
partial: GenericCertificate<T>,
}
impl<'a, T> SignatureAggregator<'a, T> {
pub fn new(value: Hashed<T>, round: Round, committee: &'a Committee) -> Self {
Self {
committee,
weight: 0,
used_validators: HashSet::new(),
partial: GenericCertificate::new(value, round, Vec::new()),
}
}
pub fn append(
&mut self,
public_key: ValidatorPublicKey,
signature: ValidatorSignature,
) -> Result<Option<GenericCertificate<T>>, ChainError>
where
T: CertificateValue,
{
let hash_and_round = VoteValue(self.partial.hash(), self.partial.round, T::KIND);
signature.check(&hash_and_round, &public_key)?;
ensure!(
!self.used_validators.contains(&public_key),
ChainError::CertificateValidatorReuse
);
self.used_validators.insert(public_key);
let voting_rights = self.committee.weight(&public_key);
ensure!(voting_rights > 0, ChainError::InvalidSigner);
self.weight += voting_rights;
self.partial.add_signature((public_key, signature));
if self.weight >= self.committee.quorum_threshold() {
self.weight = 0; Ok(Some(self.partial.clone()))
} else {
Ok(None)
}
}
}
pub(crate) fn is_strictly_ordered(values: &[(ValidatorPublicKey, ValidatorSignature)]) -> bool {
values.windows(2).all(|pair| pair[0].0 < pair[1].0)
}
pub(crate) fn check_signatures(
value_hash: CryptoHash,
certificate_kind: CertificateKind,
round: Round,
signatures: &[(ValidatorPublicKey, ValidatorSignature)],
committee: &Committee,
) -> Result<(), ChainError> {
let mut weight = 0;
let mut used_validators = HashSet::new();
for (validator, _) in signatures {
ensure!(
!used_validators.contains(validator),
ChainError::CertificateValidatorReuse
);
used_validators.insert(*validator);
let voting_rights = committee.weight(validator);
ensure!(voting_rights > 0, ChainError::InvalidSigner);
weight += voting_rights;
}
ensure!(
weight >= committee.quorum_threshold(),
ChainError::CertificateRequiresQuorum
);
let hash_and_round = VoteValue(value_hash, round, certificate_kind);
ValidatorSignature::verify_batch(&hash_and_round, signatures.iter())?;
Ok(())
}
impl<'de> BcsSignable<'de> for ProposalContent {}
impl<'de> BcsSignable<'de> for VoteValue {}
doc_scalar!(
MessageAction,
"Whether an incoming message is accepted or rejected."
);
doc_scalar!(
Medium,
"The origin of a message coming from a particular chain. Used to identify each inbox."
);
doc_scalar!(
Origin,
"The origin of a message, relative to a particular application. Used to identify each inbox."
);
doc_scalar!(
Target,
"The target of a message, relative to a particular application. Used to identify each outbox."
);