use std::{
collections::{BTreeMap, BTreeSet, HashMap},
fmt,
hash::Hash,
mem,
ops::Range,
};
use futures::{stream, stream::TryStreamExt, Future, StreamExt};
use linera_base::{
data_types::{BlockHeight, Round},
identifiers::{BlobId, ChainId},
time::{timer::timeout, Duration, Instant},
};
use linera_chain::{
data_types::{BlockProposal, LiteVote},
types::{ConfirmedBlock, GenericCertificate, ValidatedBlock, ValidatedBlockCertificate},
};
use linera_execution::committee::Committee;
use linera_storage::Storage;
use thiserror::Error;
use crate::{
client::ChainClientError,
data_types::{ChainInfo, ChainInfoQuery},
local_node::LocalNodeClient,
node::{CrossChainMessageDelivery, NodeError, ValidatorNode},
remote_node::RemoteNode,
};
pub const DEFAULT_GRACE_PERIOD: f64 = 0.2;
const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); #[derive(Clone)]
pub enum CommunicateAction {
SubmitBlock {
proposal: Box<BlockProposal>,
blob_ids: Vec<BlobId>,
},
FinalizeBlock {
certificate: ValidatedBlockCertificate,
delivery: CrossChainMessageDelivery,
},
RequestTimeout {
chain_id: ChainId,
height: BlockHeight,
round: Round,
},
}
impl CommunicateAction {
pub fn round(&self) -> Round {
match self {
CommunicateAction::SubmitBlock { proposal, .. } => proposal.content.round,
CommunicateAction::FinalizeBlock { certificate, .. } => certificate.round,
CommunicateAction::RequestTimeout { round, .. } => *round,
}
}
}
#[derive(Clone)]
pub struct ValidatorUpdater<A, S>
where
S: Storage,
{
pub chain_worker_count: usize,
pub remote_node: RemoteNode<A>,
pub local_node: LocalNodeClient<S>,
}
#[derive(Error, Debug)]
pub enum CommunicationError<E: fmt::Debug> {
#[error(
"No error but failed to find a consensus block. Consensus threshold: {0}, Proposals: {1:?}"
)]
NoConsensus(u64, Vec<(u64, usize)>),
#[error("Failed to communicate with a quorum of validators: {0}")]
Trusted(E),
#[error("Failed to communicate with a quorum of validators:\n{:#?}", .0)]
Sample(Vec<(E, u64)>),
}
pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
validator_clients: &'a [RemoteNode<A>],
committee: &Committee,
group_by: G,
execute: F,
grace_period: f64,
) -> Result<(K, Vec<V>), CommunicationError<NodeError>>
where
A: ValidatorNode + Clone + 'static,
F: Clone + Fn(RemoteNode<A>) -> R,
R: Future<Output = Result<V, ChainClientError>> + 'a,
G: Fn(&V) -> K,
K: Hash + PartialEq + Eq + Clone + 'static,
V: 'static,
{
let mut responses: futures::stream::FuturesUnordered<_> = validator_clients
.iter()
.filter_map(|remote_node| {
if committee.weight(&remote_node.public_key) == 0 {
return None;
}
let execute = execute.clone();
let remote_node = remote_node.clone();
Some(async move { (remote_node.public_key, execute(remote_node).await) })
})
.collect();
let start_time = Instant::now();
let mut end_time: Option<Instant> = None;
let mut remaining_votes = committee.total_votes();
let mut highest_key_score = 0;
let mut value_scores = HashMap::new();
let mut error_scores = HashMap::new();
'vote_wait: while let Ok(Some((name, result))) = timeout(
end_time.map_or(MAX_TIMEOUT, |t| t.saturating_duration_since(Instant::now())),
responses.next(),
)
.await
{
remaining_votes -= committee.weight(&name);
match result {
Ok(value) => {
let key = group_by(&value);
let entry = value_scores.entry(key.clone()).or_insert((0, Vec::new()));
entry.0 += committee.weight(&name);
entry.1.push(value);
highest_key_score = highest_key_score.max(entry.0);
}
Err(err) => {
let err = match err {
ChainClientError::RemoteNodeError(err) => err,
err => NodeError::ResponseHandlingError {
error: err.to_string(),
},
};
let entry = error_scores.entry(err.clone()).or_insert(0);
*entry += committee.weight(&name);
if *entry >= committee.validity_threshold() {
return Err(CommunicationError::Trusted(err));
}
}
}
if highest_key_score + remaining_votes < committee.quorum_threshold() {
break 'vote_wait;
}
if end_time.is_none() && highest_key_score >= committee.quorum_threshold() {
end_time = Some(Instant::now() + start_time.elapsed().mul_f64(grace_period));
}
}
let scores = value_scores
.values()
.map(|(weight, values)| (*weight, values.len()))
.collect();
if let Some((key, (_, values))) = value_scores
.into_iter()
.find(|(_, (score, _))| *score >= committee.quorum_threshold())
{
return Ok((key, values));
}
if error_scores.is_empty() {
return Err(CommunicationError::NoConsensus(
committee.quorum_threshold(),
scores,
));
}
let mut sample = error_scores.into_iter().collect::<Vec<_>>();
sample.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
sample.truncate(4);
Err(CommunicationError::Sample(sample))
}
impl<A, S> ValidatorUpdater<A, S>
where
A: ValidatorNode + Clone + 'static,
S: Storage + Clone + Send + Sync + 'static,
{
async fn send_confirmed_certificate(
&mut self,
certificate: GenericCertificate<ConfirmedBlock>,
delivery: CrossChainMessageDelivery,
) -> Result<Box<ChainInfo>, ChainClientError> {
let result = self
.remote_node
.handle_optimized_confirmed_certificate(&certificate, delivery)
.await;
Ok(match &result {
Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
self.remote_node
.check_blobs_not_found(&certificate, blob_ids)?;
let maybe_blobs = self.local_node.read_blobs_from_storage(blob_ids).await?;
let blobs = maybe_blobs.ok_or_else(|| original_err.clone())?;
self.remote_node.upload_blobs(blobs.clone()).await?;
self.remote_node
.handle_confirmed_certificate(certificate, delivery)
.await
}
_ => result,
}?)
}
async fn send_validated_certificate(
&mut self,
certificate: GenericCertificate<ValidatedBlock>,
delivery: CrossChainMessageDelivery,
) -> Result<Box<ChainInfo>, ChainClientError> {
let result = self
.remote_node
.handle_optimized_validated_certificate(&certificate, delivery)
.await;
Ok(match &result {
Err(original_err @ NodeError::BlobsNotFound(blob_ids)) => {
self.remote_node
.check_blobs_not_found(&certificate, blob_ids)?;
let chain_id = certificate.inner().chain_id();
let blobs = self
.local_node
.get_locking_blobs(blob_ids, chain_id)
.await?
.ok_or_else(|| original_err.clone())?;
self.remote_node.send_pending_blobs(chain_id, blobs).await?;
self.remote_node
.handle_validated_certificate(certificate)
.await
}
_ => result,
}?)
}
async fn send_block_proposal(
&mut self,
proposal: Box<BlockProposal>,
mut blob_ids: Vec<BlobId>,
) -> Result<Box<ChainInfo>, ChainClientError> {
let chain_id = proposal.content.block.chain_id;
let mut sent_cross_chain_updates = false;
loop {
match self
.remote_node
.handle_block_proposal(proposal.clone())
.await
{
Ok(info) => return Ok(info),
Err(NodeError::MissingCrossChainUpdate { .. })
| Err(NodeError::InactiveChain(_))
if !sent_cross_chain_updates =>
{
sent_cross_chain_updates = true;
self.send_chain_information_for_senders(chain_id).await?;
}
Err(NodeError::BlobsNotFound(_)) if !blob_ids.is_empty() => {
let published_blob_ids =
BTreeSet::from_iter(proposal.content.block.published_blob_ids());
blob_ids.retain(|blob_id| !published_blob_ids.contains(blob_id));
let mut published_blobs = Vec::new();
{
let chain = self.local_node.chain_state_view(chain_id).await?;
for blob_id in published_blob_ids {
published_blobs
.extend(chain.manager.proposed_blobs.get(&blob_id).await?);
}
}
self.remote_node
.send_pending_blobs(chain_id, published_blobs)
.await?;
let missing_blob_ids = self
.remote_node
.node
.missing_blob_ids(mem::take(&mut blob_ids))
.await?;
let local_storage = self.local_node.storage_client();
let blob_states = local_storage.read_blob_states(&missing_blob_ids).await?;
let mut chain_heights = BTreeMap::new();
for blob_state in blob_states {
let block_chain_id = blob_state.chain_id;
let block_height = blob_state.block_height.try_add_one()?;
chain_heights
.entry(block_chain_id)
.and_modify(|h| *h = block_height.max(*h))
.or_insert(block_height);
}
self.send_chain_info_up_to_heights(
chain_heights,
CrossChainMessageDelivery::NonBlocking,
)
.await?;
}
Err(err) => return Err(err.into()),
}
}
}
pub async fn send_chain_information(
&mut self,
chain_id: ChainId,
target_block_height: BlockHeight,
delivery: CrossChainMessageDelivery,
) -> Result<(), ChainClientError> {
let query = ChainInfoQuery::new(chain_id);
let remote_info = self.remote_node.handle_chain_info_query(query).await?;
let initial_block_height = remote_info.next_block_height;
let range: Range<usize> =
initial_block_height.try_into()?..target_block_height.try_into()?;
let (keys, timeout) = {
let chain = self.local_node.chain_state_view(chain_id).await?;
(
chain.confirmed_log.read(range).await?,
chain.manager.timeout.get().clone(),
)
};
if !keys.is_empty() {
let storage = self.local_node.storage_client();
let certs = storage.read_certificates(keys.into_iter()).await?;
for cert in certs {
self.send_confirmed_certificate(cert, delivery).await?;
}
}
if let Some(cert) = timeout {
if cert.inner().chain_id == chain_id {
self.remote_node.handle_timeout_certificate(cert).await?;
}
}
Ok(())
}
async fn send_chain_info_up_to_heights(
&mut self,
chain_heights: BTreeMap<ChainId, BlockHeight>,
delivery: CrossChainMessageDelivery,
) -> Result<(), ChainClientError> {
let stream = stream::iter(chain_heights)
.map(|(chain_id, height)| {
let mut updater = self.clone();
async move {
updater
.send_chain_information(chain_id, height, delivery)
.await
}
})
.buffer_unordered(self.chain_worker_count);
stream.try_collect::<Vec<_>>().await?;
Ok(())
}
async fn send_chain_information_for_senders(
&mut self,
chain_id: ChainId,
) -> Result<(), ChainClientError> {
let mut sender_heights = BTreeMap::new();
{
let chain = self.local_node.chain_state_view(chain_id).await?;
let pairs = chain.inboxes.try_load_all_entries().await?;
for (origin, inbox) in pairs {
let inbox_next_height = inbox.next_block_height_to_receive()?;
sender_heights
.entry(origin.sender)
.and_modify(|h| *h = inbox_next_height.max(*h))
.or_insert(inbox_next_height);
}
}
self.send_chain_info_up_to_heights(sender_heights, CrossChainMessageDelivery::Blocking)
.await?;
Ok(())
}
pub async fn send_chain_update(
&mut self,
action: CommunicateAction,
) -> Result<LiteVote, ChainClientError> {
let (target_block_height, chain_id) = match &action {
CommunicateAction::SubmitBlock { proposal, .. } => {
let block = &proposal.content.block;
(block.height, block.chain_id)
}
CommunicateAction::FinalizeBlock { certificate, .. } => (
certificate.inner().block().header.height,
certificate.inner().block().header.chain_id,
),
CommunicateAction::RequestTimeout {
height, chain_id, ..
} => (*height, *chain_id),
};
let delivery = CrossChainMessageDelivery::NonBlocking;
self.send_chain_information(chain_id, target_block_height, delivery)
.await?;
let vote = match action {
CommunicateAction::SubmitBlock { proposal, blob_ids } => {
let info = self.send_block_proposal(proposal, blob_ids).await?;
info.manager.pending
}
CommunicateAction::FinalizeBlock {
certificate,
delivery,
} => {
let info = self
.send_validated_certificate(certificate, delivery)
.await?;
info.manager.pending
}
CommunicateAction::RequestTimeout { .. } => {
let query = ChainInfoQuery::new(chain_id).with_timeout();
let info = self.remote_node.handle_chain_info_query(query).await?;
info.manager.timeout_vote
}
};
match vote {
Some(vote) if vote.public_key == self.remote_node.public_key => {
vote.check()?;
Ok(vote)
}
Some(_) | None => Err(NodeError::MissingVoteInValidatorResponse.into()),
}
}
}