use std::{
collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
convert::Infallible,
iter,
num::NonZeroUsize,
ops::{Deref, DerefMut},
sync::{Arc, RwLock},
time::Duration,
};
use chain_client_state::ChainClientState;
use custom_debug_derive::Debug;
use dashmap::{
mapref::one::{MappedRef as DashMapMappedRef, Ref as DashMapRef, RefMut as DashMapRefMut},
DashMap,
};
use futures::{
future::{self, try_join_all, Either, FusedFuture, Future},
stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt},
};
#[cfg(with_metrics)]
use linera_base::prometheus_util::MeasureLatency as _;
use linera_base::{
abi::Abi,
crypto::{AccountPublicKey, AccountSecretKey, CryptoHash, ValidatorPublicKey},
data_types::{
Amount, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Round, Timestamp,
},
ensure,
hashed::Hashed,
identifiers::{
Account, AccountOwner, ApplicationId, BlobId, BlobType, BytecodeId, ChainId, MessageId,
Owner, UserApplicationId,
},
ownership::{ChainOwnership, TimeoutConfig},
};
#[cfg(not(target_arch = "wasm32"))]
use linera_base::{data_types::Bytecode, vm::VmRuntime};
use linera_chain::{
data_types::{
BlockProposal, ChainAndHeight, ExecutedBlock, IncomingBundle, LiteVote, MessageAction,
ProposedBlock,
},
manager::LockingBlock,
types::{
CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
},
ChainError, ChainExecutionContext, ChainStateView, ExecutionResultExt as _,
};
use linera_execution::{
committee::{Committee, Epoch},
system::{
AdminOperation, OpenChainConfig, Recipient, SystemChannel, SystemOperation,
CREATE_APPLICATION_MESSAGE_INDEX, OPEN_CHAIN_MESSAGE_INDEX,
},
ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemExecutionError,
SystemQuery, SystemResponse,
};
use linera_storage::{Clock as _, Storage};
use linera_views::views::ViewError;
use rand::prelude::SliceRandom as _;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::OwnedRwLockReadGuard;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, info, instrument, warn, Instrument as _};
use crate::{
data_types::{
BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout,
},
local_node::{LocalNodeClient, LocalNodeError},
node::{
CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
ValidatorNodeProvider,
},
notifier::ChannelNotifier,
remote_node::RemoteNode,
updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater},
worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
};
mod chain_client_state;
#[cfg(test)]
#[path = "../unit_tests/client_tests.rs"]
mod client_tests;
#[cfg(with_metrics)]
mod metrics {
use std::sync::LazyLock;
use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
use prometheus::HistogramVec;
pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
LazyLock::new(|| {
register_histogram_vec(
"process_inbox_latency",
"process_inbox latency",
&[],
exponential_bucket_latencies(500.0),
)
});
pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"prepare_chain_latency",
"prepare_chain latency",
&[],
exponential_bucket_latencies(500.0),
)
});
pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"synchronize_chain_state_latency",
"synchronize_chain_state latency",
&[],
exponential_bucket_latencies(500.0),
)
});
pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"execute_block_latency",
"execute_block latency",
&[],
exponential_bucket_latencies(500.0),
)
});
pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"find_received_certificates_latency",
"find_received_certificates latency",
&[],
exponential_bucket_latencies(500.0),
)
});
}
pub struct Client<ValidatorNodeProvider, Storage>
where
Storage: linera_storage::Storage,
{
validator_node_provider: ValidatorNodeProvider,
local_node: LocalNodeClient<Storage>,
max_pending_message_bundles: usize,
message_policy: MessagePolicy,
cross_chain_message_delivery: CrossChainMessageDelivery,
grace_period: f64,
tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
notifier: Arc<ChannelNotifier<Notification>>,
storage: Storage,
chains: DashMap<ChainId, ChainClientState>,
max_loaded_chains: NonZeroUsize,
blob_download_timeout: Duration,
}
impl<P, S: Storage + Clone> Client<P, S> {
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", skip_all)]
pub fn new(
validator_node_provider: P,
storage: S,
max_pending_message_bundles: usize,
cross_chain_message_delivery: CrossChainMessageDelivery,
long_lived_services: bool,
tracked_chains: impl IntoIterator<Item = ChainId>,
name: impl Into<String>,
max_loaded_chains: NonZeroUsize,
grace_period: f64,
blob_download_timeout: Duration,
) -> Self {
let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
let state = WorkerState::new_for_client(
name.into(),
storage.clone(),
tracked_chains.clone(),
max_loaded_chains,
)
.with_long_lived_services(long_lived_services)
.with_allow_inactive_chains(true)
.with_allow_messages_from_deprecated_epochs(true);
let local_node = LocalNodeClient::new(state);
Self {
validator_node_provider,
local_node,
chains: DashMap::new(),
max_pending_message_bundles,
message_policy: MessagePolicy::new(BlanketMessagePolicy::Accept, None),
cross_chain_message_delivery,
grace_period,
tracked_chains,
notifier: Arc::new(ChannelNotifier::default()),
storage,
max_loaded_chains,
blob_download_timeout,
}
}
pub fn clone_with(
&self,
validator_node_provider: P,
name: impl Into<String>,
tracked_chains: impl IntoIterator<Item = ChainId>,
long_lived_services: bool,
) -> Self {
let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
let state = WorkerState::new_for_client(
name.into(),
self.storage.clone(),
tracked_chains.clone(),
self.max_loaded_chains,
)
.with_long_lived_services(long_lived_services)
.with_allow_inactive_chains(true)
.with_allow_messages_from_deprecated_epochs(true);
let local_node = LocalNodeClient::new(state);
Self {
validator_node_provider,
local_node,
chains: DashMap::new(),
max_pending_message_bundles: self.max_pending_message_bundles,
message_policy: MessagePolicy::new(BlanketMessagePolicy::Accept, None),
cross_chain_message_delivery: self.cross_chain_message_delivery,
grace_period: self.grace_period,
tracked_chains,
notifier: Arc::new(ChannelNotifier::default()),
storage: self.storage.clone(),
max_loaded_chains: self.max_loaded_chains,
blob_download_timeout: self.blob_download_timeout,
}
}
#[instrument(level = "trace", skip(self))]
pub fn storage_client(&self) -> &S {
&self.storage
}
#[instrument(level = "trace", skip(self))]
pub fn local_node(&self) -> &LocalNodeClient<S> {
&self.local_node
}
#[instrument(level = "trace", skip(self))]
pub fn track_chain(&self, chain_id: ChainId) {
self.tracked_chains
.write()
.expect("Panics should not happen while holding a lock to `tracked_chains`")
.insert(chain_id);
}
#[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
#[expect(clippy::too_many_arguments)]
pub fn create_chain_client(
self: &Arc<Self>,
chain_id: ChainId,
known_key_pairs: Vec<AccountSecretKey>,
admin_id: ChainId,
block_hash: Option<CryptoHash>,
timestamp: Timestamp,
next_block_height: BlockHeight,
pending_proposal: Option<PendingProposal>,
) -> ChainClient<P, S> {
if let dashmap::mapref::entry::Entry::Vacant(e) = self.chains.entry(chain_id) {
e.insert(ChainClientState::new(
known_key_pairs,
block_hash,
timestamp,
next_block_height,
pending_proposal,
));
}
ChainClient {
client: self.clone(),
chain_id,
admin_id,
options: ChainClientOptions {
max_pending_message_bundles: self.max_pending_message_bundles,
message_policy: self.message_policy.clone(),
cross_chain_message_delivery: self.cross_chain_message_delivery,
grace_period: self.grace_period,
blob_download_timeout: self.blob_download_timeout,
},
}
}
}
impl<P, S> Client<P, S>
where
P: ValidatorNodeProvider + Sync + 'static,
S: Storage + Sync + Send + Clone + 'static,
{
#[instrument(level = "trace", skip(self, validators))]
pub async fn download_certificates(
&self,
validators: &[RemoteNode<impl ValidatorNode>],
chain_id: ChainId,
target_next_block_height: BlockHeight,
) -> Result<Box<ChainInfo>, ChainClientError> {
let mut validators = validators.iter().collect::<Vec<_>>();
validators.shuffle(&mut rand::thread_rng());
for remote_node in validators {
let info = self.local_node.chain_info(chain_id).await?;
if target_next_block_height <= info.next_block_height {
return Ok(info);
}
self.try_download_certificates_from(
remote_node,
chain_id,
info.next_block_height,
target_next_block_height,
)
.await?;
}
let info = self.local_node.chain_info(chain_id).await?;
if target_next_block_height <= info.next_block_height {
Ok(info)
} else {
Err(ChainClientError::CannotDownloadCertificates {
chain_id,
target_next_block_height,
})
}
}
#[instrument(level = "trace", skip_all)]
async fn try_download_certificates_from(
&self,
remote_node: &RemoteNode<impl ValidatorNode>,
chain_id: ChainId,
mut start: BlockHeight,
stop: BlockHeight,
) -> Result<(), ChainClientError> {
while start < stop {
let limit = u64::from(stop)
.checked_sub(u64::from(start))
.ok_or(ArithmeticError::Overflow)?
.min(1000);
let Some(certificates) = remote_node
.try_query_certificates_from(chain_id, start, limit)
.await?
else {
break;
};
let Some(info) = self
.try_process_certificates(remote_node, chain_id, certificates)
.await
else {
break;
};
assert!(info.next_block_height > start);
start = info.next_block_height;
}
Ok(())
}
#[instrument(level = "trace", skip_all)]
pub async fn try_process_certificates(
&self,
remote_node: &RemoteNode<impl ValidatorNode>,
chain_id: ChainId,
certificates: Vec<ConfirmedBlockCertificate>,
) -> Option<Box<ChainInfo>> {
let mut info = None;
for certificate in certificates {
let hash = certificate.hash();
if certificate.block().header.chain_id != chain_id {
warn!("Failed to process network certificate {}", hash);
return info;
}
let mut result = self.handle_certificate(certificate.clone()).await;
if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
if let Some(blobs) = remote_node.try_download_blobs(blob_ids).await {
let _ = self.local_node.store_blobs(&blobs).await;
result = self.handle_certificate(certificate).await;
}
}
match result {
Ok(response) => info = Some(response.info),
Err(error) => {
warn!("Failed to process network certificate {}: {}", hash, error);
return info;
}
};
}
info
}
async fn handle_certificate<T: ProcessableCertificate>(
&self,
certificate: GenericCertificate<T>,
) -> Result<ChainInfoResponse, LocalNodeError> {
self.local_node
.handle_certificate(certificate.clone(), &self.notifier)
.await
}
}
#[derive(Clone, Debug)]
pub struct MessagePolicy {
blanket: BlanketMessagePolicy,
restrict_chain_ids_to: Option<HashSet<ChainId>>,
}
#[derive(Copy, Clone, Debug, clap::ValueEnum)]
pub enum BlanketMessagePolicy {
Accept,
Reject,
Ignore,
}
impl MessagePolicy {
pub fn new(
blanket: BlanketMessagePolicy,
restrict_chain_ids_to: Option<HashSet<ChainId>>,
) -> Self {
Self {
blanket,
restrict_chain_ids_to,
}
}
#[instrument(level = "trace", skip(self))]
fn must_handle(&self, bundle: &mut IncomingBundle) -> bool {
if self.is_reject() {
if bundle.bundle.is_skippable() {
return false;
} else if bundle.bundle.is_tracked() {
bundle.action = MessageAction::Reject;
}
}
let sender = bundle.origin.sender;
match &self.restrict_chain_ids_to {
None => true,
Some(chains) => chains.contains(&sender),
}
}
#[instrument(level = "trace", skip(self))]
fn is_ignore(&self) -> bool {
matches!(self.blanket, BlanketMessagePolicy::Ignore)
}
#[instrument(level = "trace", skip(self))]
fn is_reject(&self) -> bool {
matches!(self.blanket, BlanketMessagePolicy::Reject)
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ChainClientOptions {
pub max_pending_message_bundles: usize,
pub message_policy: MessagePolicy,
pub cross_chain_message_delivery: CrossChainMessageDelivery,
pub grace_period: f64,
pub blob_download_timeout: Duration,
}
#[derive(Debug)]
pub struct ChainClient<ValidatorNodeProvider, Storage>
where
Storage: linera_storage::Storage,
{
#[debug(skip)]
client: Arc<Client<ValidatorNodeProvider, Storage>>,
chain_id: ChainId,
#[debug(skip)]
admin_id: ChainId,
#[debug(skip)]
options: ChainClientOptions,
}
impl<P, S> Clone for ChainClient<P, S>
where
S: linera_storage::Storage,
{
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
chain_id: self.chain_id,
admin_id: self.admin_id,
options: self.options.clone(),
}
}
}
#[derive(Debug, Error)]
pub enum ChainClientError {
#[error("Local node operation failed: {0}")]
LocalNodeError(#[from] LocalNodeError),
#[error("Remote node operation failed: {0}")]
RemoteNodeError(#[from] NodeError),
#[error(transparent)]
ArithmeticError(#[from] ArithmeticError),
#[error("JSON (de)serialization error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Chain operation failed: {0}")]
ChainError(#[from] ChainError),
#[error(transparent)]
CommunicationError(#[from] CommunicationError<NodeError>),
#[error("Internal error within chain client: {0}")]
InternalError(&'static str),
#[error(
"Cannot accept a certificate from an unknown committee in the future. \
Please synchronize the local view of the admin chain"
)]
CommitteeSynchronizationError,
#[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
WalletSynchronizationError,
#[error("The state of the client is incompatible with the proposed block: {0}")]
BlockProposalError(&'static str),
#[error(
"Cannot accept a certificate from a committee that was retired. \
Try a newer certificate from the same origin"
)]
CommitteeDeprecationError,
#[error("Protocol error within chain client: {0}")]
ProtocolError(&'static str),
#[error("No key available to interact with chain {0}")]
CannotFindKeyForChain(ChainId),
#[error("Found several possible identities to interact with chain {0}")]
FoundMultipleKeysForChain(ChainId),
#[error(transparent)]
ViewError(#[from] ViewError),
#[error(
"Failed to download certificates and update local node to the next height \
{target_next_block_height} of chain {chain_id:?}"
)]
CannotDownloadCertificates {
chain_id: ChainId,
target_next_block_height: BlockHeight,
},
}
impl From<Infallible> for ChainClientError {
fn from(infallible: Infallible) -> Self {
match infallible {}
}
}
pub struct Unsend<T> {
inner: T,
_phantom: std::marker::PhantomData<*mut u8>,
}
impl<T> Unsend<T> {
fn new(inner: T) -> Self {
Self {
inner,
_phantom: Default::default(),
}
}
}
impl<T: Deref> Deref for Unsend<T> {
type Target = T::Target;
fn deref(&self) -> &T::Target {
self.inner.deref()
}
}
impl<T: DerefMut> DerefMut for Unsend<T> {
fn deref_mut(&mut self) -> &mut T::Target {
self.inner.deref_mut()
}
}
pub type ChainGuard<'a, T> = Unsend<DashMapRef<'a, ChainId, T>>;
pub type ChainGuardMut<'a, T> = Unsend<DashMapRefMut<'a, ChainId, T>>;
pub type ChainGuardMapped<'a, T> = Unsend<DashMapMappedRef<'a, ChainId, ChainClientState, T>>;
impl<P: 'static, S: Storage> ChainClient<P, S> {
#[instrument(level = "trace", skip(self))]
pub fn state(&self) -> ChainGuard<ChainClientState> {
Unsend::new(
self.client
.chains
.get(&self.chain_id)
.expect("Chain client constructed for invalid chain"),
)
}
#[instrument(level = "trace", skip(self))]
fn state_mut(&self) -> ChainGuardMut<ChainClientState> {
Unsend::new(
self.client
.chains
.get_mut(&self.chain_id)
.expect("Chain client constructed for invalid chain"),
)
}
#[instrument(level = "trace", skip(self))]
pub fn options_mut(&mut self) -> &mut ChainClientOptions {
&mut self.options
}
#[instrument(level = "trace", skip(self))]
pub fn options(&self) -> &ChainClientOptions {
&self.options
}
#[instrument(level = "trace", skip(self))]
pub fn chain_id(&self) -> ChainId {
self.chain_id
}
#[instrument(level = "trace", skip(self))]
pub fn block_hash(&self) -> Option<CryptoHash> {
self.state().block_hash()
}
#[instrument(level = "trace", skip(self))]
pub fn timestamp(&self) -> Timestamp {
self.state().timestamp()
}
#[instrument(level = "trace", skip(self))]
pub fn next_block_height(&self) -> BlockHeight {
self.state().next_block_height()
}
#[instrument(level = "trace", skip(self))]
pub fn pending_proposal(&self) -> ChainGuardMapped<Option<PendingProposal>> {
Unsend::new(self.state().inner.map(|state| state.pending_proposal()))
}
}
enum ReceiveCertificateMode {
NeedsCheck,
AlreadyChecked,
}
enum CheckCertificateResult {
OldEpoch,
New,
FutureEpoch,
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn create_bytecode_blobs(
contract: Bytecode,
service: Bytecode,
vm_runtime: VmRuntime,
) -> (Blob, Blob, BytecodeId) {
let (compressed_contract, compressed_service) =
tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
.await
.expect("Compression should not panic");
let contract_blob = Blob::new_contract_bytecode(compressed_contract);
let service_blob = Blob::new_service_bytecode(compressed_service);
let bytecode_id = BytecodeId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
(contract_blob, service_blob, bytecode_id)
}
impl<P, S> ChainClient<P, S>
where
P: ValidatorNodeProvider + Sync + 'static,
S: Storage + Clone + Send + Sync + 'static,
{
#[instrument(level = "trace")]
pub async fn chain_state_view(
&self,
) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
self.client.local_node.chain_state_view(self.chain_id).await
}
#[instrument(level = "trace")]
pub async fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
Ok(Box::pin(UnboundedReceiverStream::new(
self.client.notifier.subscribe(vec![self.chain_id]),
)))
}
#[instrument(level = "trace")]
pub fn storage_client(&self) -> S {
self.client.storage_client().clone()
}
#[instrument(level = "trace")]
pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
let query = ChainInfoQuery::new(self.chain_id);
let response = self
.client
.local_node
.handle_chain_info_query(query)
.await?;
self.update_from_info(&response.info);
Ok(response.info)
}
#[instrument(level = "trace")]
pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
let query = ChainInfoQuery::new(self.chain_id).with_manager_values();
let response = self
.client
.local_node
.handle_chain_info_query(query)
.await?;
self.update_from_info(&response.info);
Ok(response.info)
}
#[instrument(level = "trace")]
async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, ChainClientError> {
let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
let info = self
.client
.local_node
.handle_chain_info_query(query)
.await?
.info;
{
let state = self.state();
ensure!(
state.has_other_owners(&info.manager.ownership)
|| info.next_block_height == state.next_block_height(),
ChainClientError::WalletSynchronizationError
);
}
if info.next_block_height != BlockHeight::ZERO && self.options.message_policy.is_ignore() {
return Ok(Vec::new()); }
let mut rearranged = false;
let mut pending_message_bundles = info.requested_pending_message_bundles;
if info.next_block_height == BlockHeight::ZERO
&& info
.description
.ok_or_else(|| LocalNodeError::InactiveChain(self.chain_id))?
.is_child()
{
rearranged = IncomingBundle::put_openchain_at_front(&mut pending_message_bundles);
ensure!(rearranged, LocalNodeError::InactiveChain(self.chain_id));
}
if self.options.message_policy.is_ignore() {
if rearranged {
return Ok(pending_message_bundles[0..1].to_vec());
} else {
return Ok(Vec::new());
}
}
Ok(pending_message_bundles
.into_iter()
.filter_map(|mut bundle| {
self.options
.message_policy
.must_handle(&mut bundle)
.then_some(bundle)
})
.take(self.options.max_pending_message_bundles)
.collect())
}
#[instrument(level = "trace")]
pub async fn epoch_and_committees(
&self,
chain_id: ChainId,
) -> Result<(Option<Epoch>, BTreeMap<Epoch, Committee>), LocalNodeError> {
let query = ChainInfoQuery::new(chain_id).with_committees();
let info = self
.client
.local_node
.handle_chain_info_query(query)
.await?
.info;
let epoch = info.epoch;
let committees = info
.requested_committees
.ok_or(LocalNodeError::InvalidChainInfoResponse)?;
Ok((epoch, committees))
}
#[instrument(level = "trace")]
pub async fn epochs(&self) -> Result<Vec<Epoch>, LocalNodeError> {
let (_epoch, committees) = self.epoch_and_committees(self.chain_id).await?;
Ok(committees.into_keys().collect())
}
#[instrument(level = "trace")]
pub async fn local_committee(&self) -> Result<Committee, LocalNodeError> {
let (epoch, mut committees) = self.epoch_and_committees(self.chain_id).await?;
committees
.remove(
epoch
.as_ref()
.ok_or(LocalNodeError::InactiveChain(self.chain_id))?,
)
.ok_or(LocalNodeError::InactiveChain(self.chain_id))
}
#[instrument(level = "trace")]
async fn known_committees(
&self,
) -> Result<(BTreeMap<Epoch, Committee>, Epoch), LocalNodeError> {
let (epoch, mut committees) = self.epoch_and_committees(self.chain_id).await?;
let (admin_epoch, admin_committees) = self.epoch_and_committees(self.admin_id).await?;
committees.extend(admin_committees);
let epoch = std::cmp::max(epoch.unwrap_or_default(), admin_epoch.unwrap_or_default());
Ok((committees, epoch))
}
#[instrument(level = "trace")]
fn make_nodes(&self, committee: &Committee) -> Result<Vec<RemoteNode<P::Node>>, NodeError> {
Ok(self
.client
.validator_node_provider
.make_nodes(committee)?
.map(|(public_key, node)| RemoteNode { public_key, node })
.collect())
}
#[instrument(level = "trace")]
async fn validator_nodes(&self) -> Result<Vec<RemoteNode<P::Node>>, ChainClientError> {
match self.local_committee().await {
Ok(committee) => Ok(self.make_nodes(&committee)?),
Err(LocalNodeError::InactiveChain(_)) => Ok(Vec::new()),
Err(LocalNodeError::WorkerError(WorkerError::ChainError(error)))
if matches!(*error, ChainError::InactiveChain(_)) =>
{
Ok(Vec::new())
}
Err(e) => Err(e.into()),
}
}
#[instrument(level = "trace")]
async fn epoch(&self) -> Result<Epoch, LocalNodeError> {
self.chain_info()
.await?
.epoch
.ok_or(LocalNodeError::InactiveChain(self.chain_id))
}
#[instrument(level = "trace")]
pub async fn identity(&self) -> Result<Owner, ChainClientError> {
let manager = self.chain_info().await?.manager;
ensure!(
manager.ownership.is_active(),
LocalNodeError::InactiveChain(self.chain_id)
);
let state = self.state();
let mut our_identities = manager
.ownership
.all_owners()
.chain(&manager.leader)
.filter(|owner| state.known_key_pairs().contains_key(owner));
let Some(identity) = our_identities.next() else {
return Err(ChainClientError::CannotFindKeyForChain(self.chain_id));
};
ensure!(
our_identities.all(|id| id == identity),
ChainClientError::FoundMultipleKeysForChain(self.chain_id)
);
Ok(*identity)
}
#[instrument(level = "trace")]
pub async fn key_pair(&self) -> Result<AccountSecretKey, ChainClientError> {
let id = self.identity().await?;
Ok(self
.state()
.known_key_pairs()
.get(&id)
.expect("key should be known at this point")
.copy())
}
#[instrument(level = "trace")]
pub async fn public_key(&self) -> Result<AccountPublicKey, ChainClientError> {
Ok(self.key_pair().await?.public())
}
#[instrument(level = "trace")]
pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, ChainClientError> {
#[cfg(with_metrics)]
let _latency = metrics::PREPARE_CHAIN_LATENCY.measure_latency();
let mut info = self.synchronize_until(self.next_block_height()).await?;
if self.state().has_other_owners(&info.manager.ownership) {
info = self.synchronize_chain_state(self.chain_id).await?;
}
let result = self
.chain_state_view()
.await?
.validate_incoming_bundles()
.await;
if matches!(result, Err(ChainError::MissingCrossChainUpdate { .. })) {
self.find_received_certificates().await?;
}
self.update_from_info(&info);
Ok(info)
}
pub async fn synchronize_until(
&self,
next_block_height: BlockHeight,
) -> Result<Box<ChainInfo>, ChainClientError> {
let nodes = self.validator_nodes().await?;
let info = self
.client
.download_certificates(&nodes, self.chain_id, next_block_height)
.await?;
if info.next_block_height == next_block_height {
ensure!(
self.block_hash() == info.block_hash,
ChainClientError::InternalError("Invalid chain of blocks in local node")
);
}
Ok(info)
}
#[instrument(level = "trace", skip(committee, certificate))]
async fn finalize_block(
&self,
committee: &Committee,
certificate: ValidatedBlockCertificate,
) -> Result<ConfirmedBlockCertificate, ChainClientError> {
let hashed_value = Hashed::new(ConfirmedBlock::new(
certificate.inner().block().clone().into(),
));
let finalize_action = CommunicateAction::FinalizeBlock {
certificate,
delivery: self.options.cross_chain_message_delivery,
};
let certificate = self
.communicate_chain_action(committee, finalize_action, hashed_value)
.await?;
self.receive_certificate_and_update_validators_internal(
certificate.clone(),
ReceiveCertificateMode::AlreadyChecked,
)
.await?;
Ok(certificate)
}
#[instrument(level = "trace", skip(committee, proposal, value))]
pub async fn submit_block_proposal<T: ProcessableCertificate>(
&self,
committee: &Committee,
proposal: Box<BlockProposal>,
value: Hashed<T>,
) -> Result<GenericCertificate<T>, ChainClientError> {
let submit_action = CommunicateAction::SubmitBlock {
proposal,
blob_ids: value.inner().required_blob_ids().into_iter().collect(),
};
let certificate = self
.communicate_chain_action(committee, submit_action, value)
.await?;
self.process_certificate(certificate.clone()).await?;
Ok(certificate)
}
#[instrument(level = "trace", skip(old_committee))]
pub async fn update_validators(
&self,
old_committee: Option<&Committee>,
) -> Result<(), ChainClientError> {
let next_block_height = self.next_block_height();
if let Some(old_committee) = old_committee {
self.communicate_chain_updates(
old_committee,
self.chain_id,
next_block_height,
self.options.cross_chain_message_delivery,
)
.await?
};
if let Ok(new_committee) = self.local_committee().await {
if Some(&new_committee) != old_committee {
let next_block_height = self.next_block_height();
self.communicate_chain_updates(
&new_committee,
self.chain_id,
next_block_height,
self.options.cross_chain_message_delivery,
)
.await?;
}
}
Ok(())
}
#[instrument(level = "trace", skip(committee, delivery))]
pub async fn communicate_chain_updates(
&self,
committee: &Committee,
chain_id: ChainId,
height: BlockHeight,
delivery: CrossChainMessageDelivery,
) -> Result<(), ChainClientError> {
let local_node = self.client.local_node.clone();
let nodes = self.make_nodes(committee)?;
let n_validators = nodes.len();
let chain_worker_count =
std::cmp::max(1, self.client.max_loaded_chains.get() / n_validators);
communicate_with_quorum(
&nodes,
committee,
|_: &()| (),
|remote_node| {
let mut updater = ValidatorUpdater {
chain_worker_count,
remote_node,
local_node: local_node.clone(),
};
Box::pin(async move {
updater
.send_chain_information(chain_id, height, delivery)
.await
})
},
self.options.grace_period,
)
.await?;
Ok(())
}
#[instrument(level = "trace", skip(committee, action, value))]
async fn communicate_chain_action<T: CertificateValue>(
&self,
committee: &Committee,
action: CommunicateAction,
value: Hashed<T>,
) -> Result<GenericCertificate<T>, ChainClientError> {
let local_node = self.client.local_node.clone();
let nodes = self.make_nodes(committee)?;
let n_validators = nodes.len();
let chain_worker_count =
std::cmp::max(1, self.client.max_loaded_chains.get() / n_validators);
let ((votes_hash, votes_round), votes) = communicate_with_quorum(
&nodes,
committee,
|vote: &LiteVote| (vote.value.value_hash, vote.round),
|remote_node| {
let mut updater = ValidatorUpdater {
chain_worker_count,
remote_node,
local_node: local_node.clone(),
};
let action = action.clone();
Box::pin(async move { updater.send_chain_update(action).await })
},
self.options.grace_period,
)
.await?;
ensure!(
(votes_hash, votes_round) == (value.hash(), action.round()),
ChainClientError::ProtocolError("Unexpected response from validators")
);
let certificate = LiteCertificate::try_from_votes(votes)
.ok_or_else(|| {
ChainClientError::InternalError("Vote values or rounds don't match; this is a bug")
})?
.with_value(value)
.ok_or_else(|| {
ChainClientError::ProtocolError("A quorum voted for an unexpected value")
})?;
Ok(certificate)
}
#[instrument(level = "trace", skip(certificate, mode))]
async fn receive_certificate_and_update_validators_internal(
&self,
certificate: ConfirmedBlockCertificate,
mode: ReceiveCertificateMode,
) -> Result<(), ChainClientError> {
let block_chain_id = certificate.block().header.chain_id;
let block_height = certificate.block().header.height;
self.receive_certificate_internal(certificate, mode, None)
.await?;
let local_committee = self.local_committee().await?;
self.communicate_chain_updates(
&local_committee,
block_chain_id,
block_height.try_add_one()?,
CrossChainMessageDelivery::Blocking,
)
.await?;
Ok(())
}
#[instrument(level = "trace", skip(certificate, mode))]
async fn receive_certificate_internal(
&self,
certificate: ConfirmedBlockCertificate,
mode: ReceiveCertificateMode,
nodes: Option<Vec<RemoteNode<P::Node>>>,
) -> Result<(), ChainClientError> {
let block = certificate.block();
let (committees, max_epoch) = self.known_committees().await?;
ensure!(
block.header.epoch <= max_epoch,
ChainClientError::CommitteeSynchronizationError
);
let remote_committee = committees
.get(&block.header.epoch)
.ok_or_else(|| ChainClientError::CommitteeDeprecationError)?;
if let ReceiveCertificateMode::NeedsCheck = mode {
certificate.check(remote_committee)?;
}
let nodes = if let Some(nodes) = nodes {
nodes
} else {
self.make_nodes(remote_committee)?
};
self.client
.download_certificates(&nodes, block.header.chain_id, block.header.height)
.await?;
if let Err(err) = self.process_certificate(certificate.clone()).await {
match &err {
LocalNodeError::BlobsNotFound(blob_ids) => {
let blobs = RemoteNode::download_blobs(
blob_ids,
&nodes,
self.client.blob_download_timeout,
)
.await
.ok_or(err)?;
self.client.local_node.store_blobs(&blobs).await?;
self.process_certificate(certificate).await?;
}
_ => {
warn!("Failed to process network hashed certificate value");
return Err(err.into());
}
}
}
Ok(())
}
#[instrument(level = "trace")]
async fn synchronize_received_certificates_from_validator(
&self,
chain_id: ChainId,
remote_node: &RemoteNode<P::Node>,
chain_worker_limit: usize,
) -> Result<ReceivedCertificatesFromValidator, ChainClientError> {
let mut tracker = self
.chain_state_view()
.await?
.received_certificate_trackers
.get()
.get(&remote_node.public_key)
.copied()
.unwrap_or(0);
let (committees, max_epoch) = self.known_committees().await?;
let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(tracker);
let info = remote_node.handle_chain_info_query(query).await?;
let remote_log = info.requested_received_log;
let remote_max_heights = Self::max_height_per_chain(&remote_log);
let local_next_heights = self
.client
.local_node
.next_block_heights(remote_max_heights.keys(), chain_worker_limit)
.await?;
let mut downloaded_heights = BTreeMap::new();
let mut other_sender_chains = Vec::new();
let certificate_hashes = future::try_join_all(remote_max_heights.into_iter().filter_map(
|(sender_chain_id, remote_height)| {
let local_next = *local_next_heights.get(&sender_chain_id)?;
if let Ok(height) = local_next.try_sub_one() {
downloaded_heights.insert(sender_chain_id, height);
}
let Some(diff) = remote_height.0.checked_sub(local_next.0) else {
other_sender_chains.push(sender_chain_id);
return None;
};
let range = BlockHeightRange::multi(local_next, diff.saturating_add(1));
Some(remote_node.fetch_sent_certificate_hashes(sender_chain_id, range))
},
))
.await?
.into_iter()
.flatten()
.collect();
let remote_certificates = remote_node
.download_certificates(certificate_hashes)
.await?;
let mut certificates = Vec::new();
for confirmed_block_certificate in remote_certificates {
let block_header = &confirmed_block_certificate.inner().block().header;
let sender_chain_id = block_header.chain_id;
let height = block_header.height;
let epoch = block_header.epoch;
match self.check_certificate(max_epoch, &committees, &confirmed_block_certificate)? {
CheckCertificateResult::FutureEpoch => {
warn!(
"Postponing received certificate from {sender_chain_id:.8} at height \
{height} from future epoch {epoch}"
);
}
CheckCertificateResult::OldEpoch => {
warn!("Skipping received certificate from past epoch {epoch:?}");
}
CheckCertificateResult::New => {
downloaded_heights
.entry(sender_chain_id)
.and_modify(|h| *h = height.max(*h))
.or_insert(height);
certificates.push(confirmed_block_certificate);
}
}
}
for entry in remote_log {
if downloaded_heights
.get(&entry.chain_id)
.is_some_and(|h| *h >= entry.height)
{
tracker += 1;
} else {
break;
}
}
Ok(ReceivedCertificatesFromValidator {
public_key: remote_node.public_key,
tracker,
certificates,
other_sender_chains,
})
}
#[instrument(
level = "trace", skip_all,
fields(certificate_hash = ?incoming_certificate.hash()),
)]
fn check_certificate(
&self,
highest_known_epoch: Epoch,
committees: &BTreeMap<Epoch, Committee>,
incoming_certificate: &ConfirmedBlockCertificate,
) -> Result<CheckCertificateResult, NodeError> {
let block = incoming_certificate.block();
if block.header.epoch > highest_known_epoch {
return Ok(CheckCertificateResult::FutureEpoch);
}
if let Some(known_committee) = committees.get(&block.header.epoch) {
incoming_certificate.check(known_committee)?;
Ok(CheckCertificateResult::New)
} else {
Ok(CheckCertificateResult::OldEpoch)
}
}
#[tracing::instrument(level = "trace", skip(received_certificates_batches))]
async fn receive_certificates_from_validators(
&self,
received_certificates_batches: Vec<ReceivedCertificatesFromValidator>,
) {
let validator_count = received_certificates_batches.len();
let mut other_sender_chains = BTreeSet::new();
let mut certificates =
BTreeMap::<ChainId, BTreeMap<BlockHeight, ConfirmedBlockCertificate>>::new();
let mut new_trackers = BTreeMap::new();
for response in received_certificates_batches {
other_sender_chains.extend(response.other_sender_chains);
new_trackers.insert(response.public_key, response.tracker);
for certificate in response.certificates {
certificates
.entry(certificate.block().header.chain_id)
.or_default()
.insert(certificate.block().header.height, certificate);
}
}
let certificate_count = certificates.values().map(BTreeMap::len).sum::<usize>();
tracing::info!(
"Received {certificate_count} certificates from {validator_count} validator(s)."
);
let chain_worker_limit = (self.client.max_loaded_chains.get() / 2).max(1);
let stream = stream::iter(certificates.into_values().map(|certificates| {
let client = self.clone();
async move {
for certificate in certificates.into_values() {
let hash = certificate.hash();
let mode = ReceiveCertificateMode::AlreadyChecked;
if let Err(e) = client
.receive_certificate_internal(certificate, mode, None)
.await
{
warn!("Received invalid certificate {hash}: {e}");
}
}
}
}))
.buffer_unordered(chain_worker_limit);
stream.for_each(future::ready).await;
let stream = stream::iter(other_sender_chains.into_iter().map(|chain_id| {
let local_node = self.client.local_node.clone();
async move {
if let Err(error) = local_node
.retry_pending_cross_chain_requests(chain_id)
.await
{
error!("Failed to retry outgoing messages from {chain_id}: {error}");
}
}
}))
.buffer_unordered(chain_worker_limit);
stream.for_each(future::ready).await;
if let Err(error) = self
.client
.local_node
.update_received_certificate_trackers(self.chain_id, new_trackers)
.await
{
error!(
"Failed to update the certificate trackers for chain {:.8}: {error}",
self.chain_id
);
}
}
#[instrument(level = "trace")]
async fn find_received_certificates(&self) -> Result<(), ChainClientError> {
#[cfg(with_metrics)]
let _latency = metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
let chain_id = self.chain_id;
let local_committee = self.local_committee().await?;
let nodes = self.make_nodes(&local_committee)?;
let client = self.clone();
let chain_worker_limit =
(self.client.max_loaded_chains.get() / local_committee.validators().len()).max(1);
let result = communicate_with_quorum(
&nodes,
&local_committee,
|_| (),
|remote_node| {
let client = client.clone();
Box::pin(async move {
client
.synchronize_received_certificates_from_validator(
chain_id,
&remote_node,
chain_worker_limit,
)
.await
})
},
self.options.grace_period,
)
.await;
let received_certificate_batches = match result {
Ok(((), received_certificate_batches)) => received_certificate_batches,
Err(CommunicationError::Trusted(NodeError::InactiveChain(id))) if id == chain_id => {
return Ok(());
}
Err(error) => {
return Err(error.into());
}
};
self.receive_certificates_from_validators(received_certificate_batches)
.await;
Ok(())
}
#[instrument(level = "trace")]
pub async fn transfer(
&self,
owner: Option<Owner>,
amount: Amount,
recipient: Recipient,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
self.execute_operation(Operation::System(SystemOperation::Transfer {
owner,
recipient,
amount,
}))
.await
}
#[instrument(level = "trace")]
pub async fn read_data_blob(
&self,
hash: CryptoHash,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
let blob_id = BlobId {
hash,
blob_type: BlobType::Data,
};
self.execute_operation(Operation::System(SystemOperation::ReadBlob { blob_id }))
.await
}
#[instrument(level = "trace")]
pub async fn claim(
&self,
owner: Owner,
target_id: ChainId,
recipient: Recipient,
amount: Amount,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
self.execute_operation(Operation::System(SystemOperation::Claim {
owner,
target_id,
recipient,
amount,
}))
.await
}
#[instrument(level = "trace", skip(certificate))]
pub async fn process_certificate<T: ProcessableCertificate>(
&self,
certificate: GenericCertificate<T>,
) -> Result<(), LocalNodeError> {
let info = self.client.handle_certificate(certificate).await?.info;
self.update_from_info(&info);
Ok(())
}
#[instrument(level = "trace", skip(info))]
fn update_from_info(&self, info: &ChainInfo) {
if info.chain_id == self.chain_id {
self.state_mut().update_from_info(info);
}
}
#[instrument(level = "trace")]
pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, ChainClientError> {
let chain_id = self.chain_id;
let query = ChainInfoQuery::new(chain_id).with_committees();
let info = self
.client
.local_node
.handle_chain_info_query(query)
.await?
.info;
let epoch = info.epoch.ok_or(LocalNodeError::InactiveChain(chain_id))?;
let committee = info
.requested_committees
.ok_or(LocalNodeError::InvalidChainInfoResponse)?
.remove(&epoch)
.ok_or(LocalNodeError::InactiveChain(chain_id))?;
let height = info.next_block_height;
let round = info.manager.current_round;
let action = CommunicateAction::RequestTimeout {
height,
round,
chain_id,
};
let value: Hashed<Timeout> = Hashed::new(Timeout::new(chain_id, height, epoch));
let certificate = self
.communicate_chain_action(&committee, action, value)
.await?;
self.process_certificate(certificate.clone()).await?;
self.communicate_chain_updates(
&committee,
chain_id,
height,
CrossChainMessageDelivery::NonBlocking,
)
.await?;
Ok(certificate)
}
#[instrument(level = "trace", skip_all)]
async fn synchronize_chain_state(
&self,
chain_id: ChainId,
) -> Result<Box<ChainInfo>, ChainClientError> {
#[cfg(with_metrics)]
let _latency = metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency();
let (epoch, mut committees) = self.epoch_and_committees(chain_id).await?;
let committee = committees
.remove(&epoch.ok_or(LocalNodeError::InvalidChainInfoResponse)?)
.ok_or(LocalNodeError::InvalidChainInfoResponse)?;
let validators = self.make_nodes(&committee)?;
communicate_with_quorum(
&validators,
&committee,
|_: &()| (),
|remote_node| {
let client = self.clone();
async move {
client
.try_synchronize_chain_state_from(&remote_node, chain_id)
.await
}
},
self.options.grace_period,
)
.await?;
self.client
.local_node
.chain_info(chain_id)
.await
.map_err(Into::into)
}
#[instrument(level = "trace", skip(self, remote_node, chain_id))]
async fn try_synchronize_chain_state_from(
&self,
remote_node: &RemoteNode<P::Node>,
chain_id: ChainId,
) -> Result<(), ChainClientError> {
let local_info = self.client.local_node.chain_info(chain_id).await?;
let range = BlockHeightRange {
start: local_info.next_block_height,
limit: None,
};
let query = ChainInfoQuery::new(chain_id)
.with_sent_certificate_hashes_in_range(range)
.with_manager_values();
let info = remote_node.handle_chain_info_query(query).await?;
if info.next_block_height < local_info.next_block_height {
return Ok(());
}
let certificates: Vec<ConfirmedBlockCertificate> = remote_node
.download_certificates(info.requested_sent_certificate_hashes)
.await?;
if !certificates.is_empty()
&& self
.client
.try_process_certificates(remote_node, chain_id, certificates)
.await
.is_none()
{
return Ok(());
};
let mut proposals = Vec::new();
if let Some(proposal) = info.manager.requested_proposed {
proposals.push(*proposal);
}
if let Some(locking) = info.manager.requested_locking {
match *locking {
LockingBlock::Fast(proposal) => {
proposals.push(proposal);
}
LockingBlock::Regular(cert) => {
let hash = cert.hash();
if let Err(err) = self.try_process_locking_block_from(remote_node, cert).await {
warn!(
"Skipping certificate {hash} from validator {}: {err}",
remote_node.public_key
);
}
}
}
}
for proposal in proposals {
let owner: Owner = proposal.public_key.into();
if let Err(mut err) = self
.client
.local_node
.handle_block_proposal(proposal.clone())
.await
{
if let LocalNodeError::BlobsNotFound(_) = &err {
let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
if !required_blob_ids.is_empty() {
let mut blobs = Vec::new();
for blob_id in required_blob_ids {
let blob_content = match remote_node
.node
.download_pending_blob(chain_id, blob_id)
.await
{
Ok(content) => content,
Err(err) => {
let public_key = &remote_node.public_key;
warn!("Skipping proposal from {owner} and validator {public_key}: {err}");
continue;
}
};
blobs.push(Blob::new(blob_content));
}
self.client
.local_node
.handle_pending_blobs(chain_id, blobs)
.await?;
if let Err(new_err) = self
.client
.local_node
.handle_block_proposal(proposal.clone())
.await
{
err = new_err;
} else {
continue;
}
}
if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
self.update_local_node_with_blobs_from(blob_ids.clone(), remote_node)
.await?;
if let Err(new_err) = self
.client
.local_node
.handle_block_proposal(proposal.clone())
.await
{
err = new_err;
} else {
continue;
}
}
}
let public_key = &remote_node.public_key;
warn!("Skipping proposal from {owner} and validator {public_key}: {err}");
}
}
Ok(())
}
async fn try_process_locking_block_from(
&self,
remote_node: &RemoteNode<P::Node>,
certificate: GenericCertificate<ValidatedBlock>,
) -> Result<(), ChainClientError> {
let chain_id = certificate.inner().chain_id();
match self.process_certificate(certificate.clone()).await {
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
let mut blobs = Vec::new();
for blob_id in blob_ids {
let blob_content = remote_node
.node
.download_pending_blob(chain_id, blob_id)
.await?;
blobs.push(Blob::new(blob_content));
}
self.client
.local_node
.handle_pending_blobs(chain_id, blobs)
.await?;
self.process_certificate(certificate).await?;
Ok(())
}
Err(err) => Err(err.into()),
Ok(()) => Ok(()),
}
}
async fn update_local_node_with_blobs_from(
&self,
blob_ids: Vec<BlobId>,
remote_node: &RemoteNode<P::Node>,
) -> Result<(), ChainClientError> {
try_join_all(blob_ids.into_iter().map(|blob_id| async move {
let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
self.receive_certificate(certificate).await
}))
.await?;
Ok(())
}
pub async fn receive_certificate_for_blob(
&self,
blob_id: BlobId,
) -> Result<(), ChainClientError> {
self.receive_certificates_for_blobs(vec![blob_id]).await
}
pub async fn receive_certificates_for_blobs(
&self,
blob_ids: Vec<BlobId>,
) -> Result<(), ChainClientError> {
let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
let validators = self.validator_nodes().await?;
let mut missing_blobs = Vec::new();
for blob_id in blob_ids {
let mut certificate_stream = validators
.iter()
.map(|remote_node| async move {
let cert = remote_node.download_certificate_for_blob(blob_id).await?;
Ok::<_, NodeError>((remote_node.clone(), cert))
})
.collect::<FuturesUnordered<_>>();
loop {
let Some(result) = certificate_stream.next().await else {
missing_blobs.push(blob_id);
break;
};
if let Ok((remote_node, cert)) = result {
if self
.receive_certificate_internal(
cert,
ReceiveCertificateMode::NeedsCheck,
Some(vec![remote_node]),
)
.await
.is_ok()
{
break;
}
}
}
}
if missing_blobs.is_empty() {
Ok(())
} else {
Err(NodeError::BlobsNotFound(missing_blobs).into())
}
}
#[tracing::instrument(level = "trace", skip(block))]
async fn stage_block_execution_and_discard_failing_messages(
&self,
mut block: ProposedBlock,
round: Option<u32>,
) -> Result<(ExecutedBlock, ChainInfoResponse), ChainClientError> {
loop {
let result = self.stage_block_execution(block.clone(), round).await;
if let Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
WorkerError::ChainError(chain_error),
))) = &result
{
if let ChainError::ExecutionError(
error,
ChainExecutionContext::IncomingBundle(index),
) = &**chain_error
{
let message = block
.incoming_bundles
.get_mut(*index as usize)
.expect("Message at given index should exist");
if message.bundle.is_protected() {
error!("Protected incoming message failed to execute locally: {message:?}");
} else {
info!(
%error, origin = ?message.origin,
"Message failed to execute locally and will be rejected."
);
message.action = MessageAction::Reject;
continue;
}
}
}
return result;
}
}
#[instrument(level = "trace", skip(block))]
async fn stage_block_execution(
&self,
block: ProposedBlock,
round: Option<u32>,
) -> Result<(ExecutedBlock, ChainInfoResponse), ChainClientError> {
loop {
let result = self
.client
.local_node
.stage_block_execution(block.clone(), round)
.await;
if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
self.receive_certificates_for_blobs(blob_ids.clone())
.await?;
continue; }
return Ok(result?);
}
}
#[instrument(level = "trace", skip(operations, blobs))]
pub async fn execute_operations(
&self,
operations: Vec<Operation>,
blobs: Vec<Blob>,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
loop {
match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await? {
ExecuteBlockOutcome::Executed(certificate) => {
return Ok(ClientOutcome::Committed(certificate));
}
ExecuteBlockOutcome::WaitForTimeout(timeout) => {
return Ok(ClientOutcome::WaitForTimeout(timeout));
}
ExecuteBlockOutcome::Conflict(certificate) => {
info!(
height = %certificate.block().header.height,
"Another block was committed; retrying."
);
}
};
}
}
#[instrument(level = "trace", skip(operation))]
pub async fn execute_operation(
&self,
operation: Operation,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
self.execute_operations(vec![operation], vec![]).await
}
#[instrument(level = "trace", skip(operations, blobs))]
async fn execute_block(
&self,
operations: Vec<Operation>,
blobs: Vec<Blob>,
) -> Result<ExecuteBlockOutcome, ChainClientError> {
#[cfg(with_metrics)]
let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
let mutex = self.state().client_mutex();
let _guard = mutex.lock_owned().await;
match self.process_pending_block_without_prepare().await? {
ClientOutcome::Committed(Some(certificate)) => {
return Ok(ExecuteBlockOutcome::Conflict(certificate))
}
ClientOutcome::WaitForTimeout(timeout) => {
return Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
}
ClientOutcome::Committed(None) => {}
}
let incoming_bundles = self.pending_message_bundles().await?;
let identity = self.identity().await?;
let confirmed_value = self
.new_pending_block(incoming_bundles, operations, blobs, identity)
.await?;
match self.process_pending_block_without_prepare().await? {
ClientOutcome::Committed(Some(certificate))
if certificate.block() == confirmed_value.inner().block() =>
{
Ok(ExecuteBlockOutcome::Executed(certificate))
}
ClientOutcome::Committed(Some(certificate)) => {
Ok(ExecuteBlockOutcome::Conflict(certificate))
}
ClientOutcome::Committed(None) => Err(ChainClientError::BlockProposalError(
"Unexpected block proposal error",
)),
ClientOutcome::WaitForTimeout(timeout) => {
Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
}
}
}
#[instrument(level = "trace", skip(incoming_bundles, operations, blobs))]
async fn new_pending_block(
&self,
incoming_bundles: Vec<IncomingBundle>,
operations: Vec<Operation>,
blobs: Vec<Blob>,
identity: Owner,
) -> Result<Hashed<ConfirmedBlock>, ChainClientError> {
let (previous_block_hash, height, timestamp) = {
let state = self.state();
ensure!(
state.pending_proposal().is_none(),
ChainClientError::BlockProposalError(
"Client state already has a pending block; \
use the `linera retry-pending-block` command to commit that first"
)
);
(
state.block_hash(),
state.next_block_height(),
self.next_timestamp(&incoming_bundles, state.timestamp()),
)
};
let block = ProposedBlock {
epoch: self.epoch().await?,
chain_id: self.chain_id,
incoming_bundles,
operations,
previous_block_hash,
height,
authenticated_signer: Some(identity),
timestamp,
};
let info = self.chain_info().await?;
let published_blob_ids = block.published_blob_ids();
let round = match Self::round_for_new_proposal(&info, &identity, &block, true)? {
Either::Left(round) => round.multi_leader(),
Either::Right(_) => None,
};
let (executed_block, _) = self
.stage_block_execution_and_discard_failing_messages(block, round)
.await?;
let block = &executed_block.block;
let committee = self.local_committee().await?;
let max_size = committee.policy().maximum_block_proposal_size;
block.check_proposal_size(max_size)?;
for blob in &blobs {
if published_blob_ids.contains(&blob.id()) {
committee
.policy()
.check_blob_size(blob.content())
.with_execution_context(ChainExecutionContext::Block)?;
}
}
self.state_mut().set_pending_proposal(block.clone(), blobs);
Ok(Hashed::new(ConfirmedBlock::new(executed_block)))
}
#[instrument(level = "trace", skip(incoming_bundles))]
fn next_timestamp(
&self,
incoming_bundles: &[IncomingBundle],
block_time: Timestamp,
) -> Timestamp {
let local_time = self.storage_client().clock().current_time();
incoming_bundles
.iter()
.map(|msg| msg.bundle.timestamp)
.max()
.map_or(local_time, |timestamp| timestamp.max(local_time))
.max(block_time)
}
#[instrument(level = "trace", skip(query))]
pub async fn query_application(&self, query: Query) -> Result<QueryOutcome, ChainClientError> {
let outcome = self
.client
.local_node
.query_application(self.chain_id, query)
.await?;
Ok(outcome)
}
#[instrument(level = "trace", skip(query))]
pub async fn query_system_application(
&self,
query: SystemQuery,
) -> Result<QueryOutcome<SystemResponse>, ChainClientError> {
let QueryOutcome {
response,
operations,
} = self
.client
.local_node
.query_application(self.chain_id, Query::System(query))
.await?;
match response {
QueryResponse::System(response) => Ok(QueryOutcome {
response,
operations,
}),
_ => Err(ChainClientError::InternalError(
"Unexpected response for system query",
)),
}
}
#[instrument(level = "trace", skip(application_id, query))]
pub async fn query_user_application<A: Abi>(
&self,
application_id: UserApplicationId<A>,
query: &A::Query,
) -> Result<QueryOutcome<A::QueryResponse>, ChainClientError> {
let query = Query::user(application_id, query)?;
let QueryOutcome {
response,
operations,
} = self
.client
.local_node
.query_application(self.chain_id, query)
.await?;
match response {
QueryResponse::User(response_bytes) => {
let response = serde_json::from_slice(&response_bytes)?;
Ok(QueryOutcome {
response,
operations,
})
}
_ => Err(ChainClientError::InternalError(
"Unexpected response for user query",
)),
}
}
#[instrument(level = "trace")]
pub async fn query_balance(&self) -> Result<Amount, ChainClientError> {
let (balance, _) = self.query_balances_with_owner(None).await?;
Ok(balance)
}
#[instrument(level = "trace", skip(owner))]
pub async fn query_owner_balance(
&self,
owner: AccountOwner,
) -> Result<Amount, ChainClientError> {
Ok(self
.query_balances_with_owner(Some(owner))
.await?
.1
.unwrap_or(Amount::ZERO))
}
#[instrument(level = "trace", skip(owner))]
async fn query_balances_with_owner(
&self,
owner: Option<AccountOwner>,
) -> Result<(Amount, Option<Amount>), ChainClientError> {
let incoming_bundles = self.pending_message_bundles().await?;
let (previous_block_hash, height, timestamp) = {
let state = self.state();
(
state.block_hash(),
state.next_block_height(),
self.next_timestamp(&incoming_bundles, state.timestamp()),
)
};
let block = ProposedBlock {
epoch: self.epoch().await?,
chain_id: self.chain_id,
incoming_bundles,
operations: Vec::new(),
previous_block_hash,
height,
authenticated_signer: owner.and_then(|owner| match owner {
AccountOwner::User(user) => Some(user),
AccountOwner::Application(_) => None,
}),
timestamp,
};
match self
.stage_block_execution_and_discard_failing_messages(block, None)
.await
{
Ok((_, response)) => Ok((
response.info.chain_balance,
response.info.requested_owner_balance,
)),
Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
WorkerError::ChainError(error),
))) if matches!(
&*error,
ChainError::ExecutionError(
execution_error,
ChainExecutionContext::Block
) if matches!(**execution_error, ExecutionError::SystemError(
SystemExecutionError::InsufficientFundingForFees { .. }
))
) =>
{
Ok((Amount::ZERO, Some(Amount::ZERO)))
}
Err(error) => Err(error),
}
}
#[instrument(level = "trace")]
pub async fn local_balance(&self) -> Result<Amount, ChainClientError> {
let (balance, _) = self.local_balances_with_owner(None).await?;
Ok(balance)
}
#[instrument(level = "trace", skip(owner))]
pub async fn local_owner_balance(
&self,
owner: AccountOwner,
) -> Result<Amount, ChainClientError> {
Ok(self
.local_balances_with_owner(Some(owner))
.await?
.1
.unwrap_or(Amount::ZERO))
}
#[instrument(level = "trace", skip(owner))]
async fn local_balances_with_owner(
&self,
owner: Option<AccountOwner>,
) -> Result<(Amount, Option<Amount>), ChainClientError> {
let next_block_height = self.next_block_height();
ensure!(
self.chain_info().await?.next_block_height == next_block_height,
ChainClientError::WalletSynchronizationError
);
let mut query = ChainInfoQuery::new(self.chain_id);
query.request_owner_balance = owner;
let response = self
.client
.local_node
.handle_chain_info_query(query)
.await?;
Ok((
response.info.chain_balance,
response.info.requested_owner_balance,
))
}
#[instrument(level = "trace")]
pub async fn request_application(
&self,
application_id: UserApplicationId,
chain_id: Option<ChainId>,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
let chain_id = chain_id.unwrap_or(application_id.creation.chain_id);
self.execute_operation(Operation::System(SystemOperation::RequestApplication {
application_id,
chain_id,
}))
.await
}
#[instrument(level = "trace")]
pub async fn transfer_to_account(
&self,
owner: Option<Owner>,
amount: Amount,
account: Account,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
self.transfer(owner, amount, Recipient::Account(account))
.await
}
#[instrument(level = "trace")]
pub async fn burn(
&self,
owner: Option<Owner>,
amount: Amount,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
self.transfer(owner, amount, Recipient::Burn).await
}
#[instrument(level = "trace")]
pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, ChainClientError> {
if self.chain_id != self.admin_id {
self.synchronize_chain_state(self.admin_id).await?;
}
let info = self.prepare_chain().await?;
self.find_received_certificates().await?;
Ok(info)
}
#[instrument(level = "trace")]
pub async fn process_pending_block(
&self,
) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
self.synchronize_from_validators().await?;
self.process_pending_block_without_prepare().await
}
#[instrument(level = "trace")]
async fn process_pending_block_without_prepare(
&self,
) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
let info = self.request_leader_timeout_if_needed().await?;
if info.manager.has_locking_block_in_current_round()
&& !info.manager.current_round.is_fast()
{
return self.finalize_locking_block(info).await;
}
let identity = self.identity().await?;
let local_node = &self.client.local_node;
let pending_proposal = self.state().pending_proposal().clone();
let (executed_block, blobs) = if let Some(locking) = &info.manager.requested_locking {
let (executed_block, blob_ids) = match &**locking {
LockingBlock::Regular(certificate) => (
certificate.block().clone().into(),
certificate.block().required_blob_ids(),
),
LockingBlock::Fast(proposal) => {
let block = proposal.content.block.clone();
let blob_ids = block.published_blob_ids();
(self.stage_block_execution(block, None).await?.0, blob_ids)
}
};
let blobs = local_node
.get_locking_blobs(&blob_ids, self.chain_id)
.await?
.ok_or_else(|| ChainClientError::InternalError("Missing local locking blobs"))?;
(executed_block, blobs)
} else if let Some(pending_proposal) = pending_proposal {
let block = pending_proposal.block;
let round = match Self::round_for_new_proposal(&info, &identity, &block, true)? {
Either::Left(round) => round.multi_leader(),
Either::Right(_) => None,
};
let executed_block = self.stage_block_execution(block, round).await?.0;
(executed_block, pending_proposal.blobs)
} else {
return Ok(ClientOutcome::Committed(None)); };
let round = match Self::round_for_new_proposal(
&info,
&identity,
&executed_block.block,
executed_block.outcome.has_oracle_responses(),
)? {
Either::Left(round) => round,
Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
};
let already_handled_locally = info
.manager
.already_handled_proposal(round, &executed_block.block);
let key_pair = self.key_pair().await?;
let proposal = if let Some(locking) = info.manager.requested_locking {
Box::new(match *locking {
LockingBlock::Regular(cert) => BlockProposal::new_retry(round, cert, &key_pair),
LockingBlock::Fast(proposal) => {
BlockProposal::new_initial(round, proposal.content.block, &key_pair)
}
})
} else {
let block = executed_block.block.clone();
Box::new(BlockProposal::new_initial(round, block, &key_pair))
};
if !already_handled_locally {
if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
match err {
LocalNodeError::BlobsNotFound(_) => {
local_node
.handle_pending_blobs(self.chain_id, blobs)
.await?;
local_node.handle_block_proposal(*proposal.clone()).await?;
}
err => return Err(err.into()),
}
}
}
let committee = self.local_committee().await?;
let certificate = if round.is_fast() {
let hashed_value = Hashed::new(ConfirmedBlock::new(executed_block));
self.submit_block_proposal(&committee, proposal, hashed_value)
.await?
} else {
let hashed_value = Hashed::new(ValidatedBlock::new(executed_block));
let certificate = self
.submit_block_proposal(&committee, proposal, hashed_value.clone())
.await?;
self.finalize_block(&committee, certificate).await?
};
self.update_validators(Some(&committee)).await?;
Ok(ClientOutcome::Committed(Some(certificate)))
}
async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, ChainClientError> {
let mut info = self.chain_info_with_manager_values().await?;
self.state().check_info_is_up_to_date(&info)?;
if let Some(round_timeout) = info.manager.round_timeout {
if round_timeout <= self.storage_client().clock().current_time() {
self.request_leader_timeout().await?;
info = self.chain_info_with_manager_values().await?;
}
}
Ok(info)
}
async fn finalize_locking_block(
&self,
info: Box<ChainInfo>,
) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
let locking = info
.manager
.requested_locking
.expect("Should have a locking block");
let LockingBlock::Regular(certificate) = *locking else {
panic!("Should have a locking validated block");
};
let committee = self.local_committee().await?;
match self.finalize_block(&committee, certificate.clone()).await {
Ok(certificate) => Ok(ClientOutcome::Committed(Some(certificate))),
Err(ChainClientError::CommunicationError(error)) => {
let timestamp = info.manager.round_timeout.ok_or(error)?;
Ok(ClientOutcome::WaitForTimeout(RoundTimeout {
timestamp,
current_round: info.manager.current_round,
next_block_height: info.next_block_height,
}))
}
Err(error) => Err(error),
}
}
fn round_for_new_proposal(
info: &ChainInfo,
identity: &Owner,
block: &ProposedBlock,
has_oracle_responses: bool,
) -> Result<Either<Round, RoundTimeout>, ChainClientError> {
let manager = &info.manager;
let conflict = manager.requested_proposed.as_ref().is_some_and(|proposal| {
proposal.content.round == manager.current_round && proposal.content.block != *block
}) || (manager.current_round.is_fast() && has_oracle_responses);
let round = if !conflict {
manager.current_round
} else if let Some(round) = manager
.ownership
.next_round(manager.current_round)
.filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
{
round
} else if let Some(timeout) = info.round_timeout() {
return Ok(Either::Right(timeout));
} else {
return Err(ChainClientError::BlockProposalError(
"Conflicting proposal in the current round",
));
};
if manager.can_propose(identity, round) {
return Ok(Either::Left(round));
}
if let Some(timeout) = info.round_timeout() {
return Ok(Either::Right(timeout));
}
Err(ChainClientError::BlockProposalError(
"Not a leader in the current round",
))
}
#[instrument(level = "trace")]
pub fn clear_pending_proposal(&self) {
self.state_mut().clear_pending_proposal();
}
#[instrument(
level = "trace",
skip(certificate),
fields(certificate_hash = ?certificate.hash()),
)]
pub async fn receive_certificate_and_update_validators(
&self,
certificate: ConfirmedBlockCertificate,
) -> Result<(), ChainClientError> {
self.receive_certificate_and_update_validators_internal(
certificate,
ReceiveCertificateMode::NeedsCheck,
)
.await
}
#[instrument(
level = "trace",
skip(certificate),
fields(certificate_hash = ?certificate.hash()),
)]
pub async fn receive_certificate(
&self,
certificate: ConfirmedBlockCertificate,
) -> Result<(), ChainClientError> {
self.receive_certificate_internal(certificate, ReceiveCertificateMode::NeedsCheck, None)
.await
}
#[instrument(level = "trace", skip(key_pair))]
pub async fn rotate_key_pair(
&self,
key_pair: AccountSecretKey,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
let new_public_key = self.state_mut().insert_known_key_pair(key_pair);
self.transfer_ownership(new_public_key.into()).await
}
#[instrument(level = "trace")]
pub async fn transfer_ownership(
&self,
new_owner: Owner,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
self.execute_operation(Operation::System(SystemOperation::ChangeOwnership {
super_owners: vec![new_owner],
owners: Vec::new(),
multi_leader_rounds: 2,
open_multi_leader_rounds: false,
timeout_config: TimeoutConfig::default(),
}))
.await
}
#[instrument(level = "trace")]
pub async fn share_ownership(
&self,
new_owner: Owner,
new_weight: u64,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
loop {
let ownership = self.prepare_chain().await?.manager.ownership;
ensure!(
ownership.is_active(),
ChainError::InactiveChain(self.chain_id)
);
let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
owners.push((new_owner, new_weight));
let operations = vec![Operation::System(SystemOperation::ChangeOwnership {
super_owners: Vec::new(),
owners,
multi_leader_rounds: ownership.multi_leader_rounds,
open_multi_leader_rounds: ownership.open_multi_leader_rounds,
timeout_config: ownership.timeout_config,
})];
match self.execute_block(operations, vec![]).await? {
ExecuteBlockOutcome::Executed(certificate) => {
return Ok(ClientOutcome::Committed(certificate));
}
ExecuteBlockOutcome::Conflict(certificate) => {
info!(
height = %certificate.block().header.height,
"Another block was committed; retrying."
);
}
ExecuteBlockOutcome::WaitForTimeout(timeout) => {
return Ok(ClientOutcome::WaitForTimeout(timeout));
}
};
}
}
#[instrument(level = "trace")]
pub async fn change_ownership(
&self,
ownership: ChainOwnership,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
self.execute_operation(Operation::System(SystemOperation::ChangeOwnership {
super_owners: ownership.super_owners.into_iter().collect(),
owners: ownership.owners.into_iter().collect(),
multi_leader_rounds: ownership.multi_leader_rounds,
open_multi_leader_rounds: ownership.open_multi_leader_rounds,
timeout_config: ownership.timeout_config.clone(),
}))
.await
}
#[instrument(level = "trace", skip(application_permissions))]
pub async fn change_application_permissions(
&self,
application_permissions: ApplicationPermissions,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
let operation = SystemOperation::ChangeApplicationPermissions(application_permissions);
self.execute_operation(operation.into()).await
}
#[instrument(level = "trace", skip(self))]
pub async fn open_chain(
&self,
ownership: ChainOwnership,
application_permissions: ApplicationPermissions,
balance: Amount,
) -> Result<ClientOutcome<(MessageId, ConfirmedBlockCertificate)>, ChainClientError> {
loop {
let (epoch, committees) = self.epoch_and_committees(self.chain_id).await?;
let epoch = epoch.ok_or(LocalNodeError::InactiveChain(self.chain_id))?;
let config = OpenChainConfig {
ownership: ownership.clone(),
committees,
admin_id: self.admin_id,
epoch,
balance,
application_permissions: application_permissions.clone(),
};
let operation = Operation::System(SystemOperation::OpenChain(config));
let certificate = match self.execute_block(vec![operation], vec![]).await? {
ExecuteBlockOutcome::Executed(certificate) => certificate,
ExecuteBlockOutcome::Conflict(_) => continue,
ExecuteBlockOutcome::WaitForTimeout(timeout) => {
return Ok(ClientOutcome::WaitForTimeout(timeout));
}
};
let message_id = certificate
.block()
.message_id_for_operation(0, OPEN_CHAIN_MESSAGE_INDEX)
.ok_or_else(|| ChainClientError::InternalError("Failed to create new chain"))?;
self.client.track_chain(ChainId::child(message_id));
self.client
.local_node
.retry_pending_cross_chain_requests(self.chain_id)
.await?;
return Ok(ClientOutcome::Committed((message_id, certificate)));
}
}
#[instrument(level = "trace")]
pub async fn close_chain(
&self,
) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
let operation = Operation::System(SystemOperation::CloseChain);
match self.execute_operation(operation).await {
Ok(outcome) => Ok(outcome.map(Some)),
Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
WorkerError::ChainError(chain_error),
))) if matches!(*chain_error, ChainError::ClosedChain) => {
Ok(ClientOutcome::Committed(None)) }
Err(error) => Err(error),
}
}
#[cfg(not(target_arch = "wasm32"))]
#[instrument(level = "trace", skip(contract, service))]
pub async fn publish_bytecode(
&self,
contract: Bytecode,
service: Bytecode,
vm_runtime: VmRuntime,
) -> Result<ClientOutcome<(BytecodeId, ConfirmedBlockCertificate)>, ChainClientError> {
let (contract_blob, service_blob, bytecode_id) =
create_bytecode_blobs(contract, service, vm_runtime).await;
self.publish_bytecode_blobs(contract_blob, service_blob, bytecode_id)
.await
}
#[cfg(not(target_arch = "wasm32"))]
#[instrument(level = "trace", skip(contract_blob, service_blob, bytecode_id))]
pub async fn publish_bytecode_blobs(
&self,
contract_blob: Blob,
service_blob: Blob,
bytecode_id: BytecodeId,
) -> Result<ClientOutcome<(BytecodeId, ConfirmedBlockCertificate)>, ChainClientError> {
self.execute_operations(
vec![Operation::System(SystemOperation::PublishBytecode {
bytecode_id,
})],
vec![contract_blob, service_blob],
)
.await?
.try_map(|certificate| Ok((bytecode_id, certificate)))
}
#[instrument(level = "trace", skip(bytes))]
pub async fn publish_data_blobs(
&self,
bytes: Vec<Vec<u8>>,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
let blobs = bytes.into_iter().map(Blob::new_data);
let publish_blob_operations = blobs
.clone()
.map(|blob| {
Operation::System(SystemOperation::PublishDataBlob {
blob_hash: blob.id().hash,
})
})
.collect();
self.execute_operations(publish_blob_operations, blobs.collect())
.await
}
#[instrument(level = "trace", skip(bytes))]
pub async fn publish_data_blob(
&self,
bytes: Vec<u8>,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
self.publish_data_blobs(vec![bytes]).await
}
#[instrument(
level = "trace",
skip(self, parameters, instantiation_argument, required_application_ids)
)]
pub async fn create_application<
A: Abi,
Parameters: Serialize,
InstantiationArgument: Serialize,
>(
&self,
bytecode_id: BytecodeId<A, Parameters, InstantiationArgument>,
parameters: &Parameters,
instantiation_argument: &InstantiationArgument,
required_application_ids: Vec<UserApplicationId>,
) -> Result<ClientOutcome<(UserApplicationId<A>, ConfirmedBlockCertificate)>, ChainClientError>
{
let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
let parameters = serde_json::to_vec(parameters)?;
Ok(self
.create_application_untyped(
bytecode_id.forget_abi(),
parameters,
instantiation_argument,
required_application_ids,
)
.await?
.map(|(app_id, cert)| (app_id.with_abi(), cert)))
}
#[instrument(
level = "trace",
skip(
self,
bytecode_id,
parameters,
instantiation_argument,
required_application_ids
)
)]
pub async fn create_application_untyped(
&self,
bytecode_id: BytecodeId,
parameters: Vec<u8>,
instantiation_argument: Vec<u8>,
required_application_ids: Vec<UserApplicationId>,
) -> Result<ClientOutcome<(UserApplicationId, ConfirmedBlockCertificate)>, ChainClientError>
{
self.execute_operation(Operation::System(SystemOperation::CreateApplication {
bytecode_id,
parameters,
instantiation_argument,
required_application_ids,
}))
.await?
.try_map(|certificate| {
let creation = certificate
.block()
.message_id_for_operation(0, CREATE_APPLICATION_MESSAGE_INDEX)
.ok_or_else(|| ChainClientError::InternalError("Failed to create application"))?;
let id = ApplicationId {
creation,
bytecode_id,
};
Ok((id, certificate))
})
}
#[instrument(level = "trace", skip(committee))]
pub async fn stage_new_committee(
&self,
committee: Committee,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
loop {
let epoch = self.epoch().await?;
match self
.execute_block(
vec![Operation::System(SystemOperation::Admin(
AdminOperation::CreateCommittee {
epoch: epoch.try_add_one()?,
committee: committee.clone(),
},
))],
vec![],
)
.await?
{
ExecuteBlockOutcome::Executed(certificate) => {
return Ok(ClientOutcome::Committed(certificate))
}
ExecuteBlockOutcome::Conflict(_) => continue,
ExecuteBlockOutcome::WaitForTimeout(timeout) => {
return Ok(ClientOutcome::WaitForTimeout(timeout));
}
};
}
}
#[instrument(level = "trace")]
pub async fn process_inbox(
&self,
) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
self.prepare_chain().await?;
self.process_inbox_without_prepare().await
}
#[instrument(level = "trace")]
pub async fn process_inbox_without_prepare(
&self,
) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
#[cfg(with_metrics)]
let _latency = metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
let mut certificates = Vec::new();
loop {
let incoming_bundles = self.pending_message_bundles().await?;
if incoming_bundles.is_empty() {
return Ok((certificates, None));
}
match self.execute_block(vec![], vec![]).await {
Ok(ExecuteBlockOutcome::Executed(certificate))
| Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
return Ok((certificates, Some(timeout)));
}
Err(error) => return Err(error),
};
}
}
#[instrument(level = "trace")]
pub async fn subscribe_to_new_committees(
&self,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
let operation = SystemOperation::Subscribe {
chain_id: self.admin_id,
channel: SystemChannel::Admin,
};
self.execute_operation(Operation::System(operation)).await
}
#[instrument(level = "trace")]
pub async fn unsubscribe_from_new_committees(
&self,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
let operation = SystemOperation::Unsubscribe {
chain_id: self.admin_id,
channel: SystemChannel::Admin,
};
self.execute_operation(Operation::System(operation)).await
}
#[instrument(level = "trace")]
pub async fn finalize_committee(
&self,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
self.prepare_chain().await?;
let (current_epoch, committees) = self.epoch_and_committees(self.chain_id).await?;
let current_epoch = current_epoch.ok_or(LocalNodeError::InactiveChain(self.chain_id))?;
let operations = committees
.keys()
.filter_map(|epoch| {
if *epoch != current_epoch {
Some(Operation::System(SystemOperation::Admin(
AdminOperation::RemoveCommittee { epoch: *epoch },
)))
} else {
None
}
})
.collect();
self.execute_operations(operations, vec![]).await
}
#[instrument(level = "trace")]
pub async fn transfer_to_account_unsafe_unconfirmed(
&self,
owner: Option<Owner>,
amount: Amount,
account: Account,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
self.execute_operation(Operation::System(SystemOperation::Transfer {
owner,
recipient: Recipient::Account(account),
amount,
}))
.await
}
#[instrument(level = "trace", skip(hash))]
pub async fn read_hashed_confirmed_block(
&self,
hash: CryptoHash,
) -> Result<Hashed<ConfirmedBlock>, ViewError> {
self.client
.storage_client()
.read_hashed_confirmed_block(hash)
.await
}
#[instrument(level = "trace")]
pub async fn retry_pending_outgoing_messages(&self) -> Result<(), ChainClientError> {
self.client
.local_node
.retry_pending_cross_chain_requests(self.chain_id)
.await?;
Ok(())
}
#[instrument(level = "trace", skip(from, limit))]
pub async fn read_hashed_confirmed_blocks_downward(
&self,
from: CryptoHash,
limit: u32,
) -> Result<Vec<Hashed<ConfirmedBlock>>, ViewError> {
self.client
.storage_client()
.read_hashed_confirmed_blocks_downward(from, limit)
.await
}
#[instrument(level = "trace", skip(local_node))]
async fn local_chain_info(
&self,
chain_id: ChainId,
local_node: &mut LocalNodeClient<S>,
) -> Option<Box<ChainInfo>> {
let Ok(info) = local_node.chain_info(chain_id).await else {
error!("Fail to read local chain info for {chain_id}");
return None;
};
self.update_from_info(&info);
Some(info)
}
#[instrument(level = "trace", skip(chain_id, local_node))]
async fn local_next_block_height(
&self,
chain_id: ChainId,
local_node: &mut LocalNodeClient<S>,
) -> Option<BlockHeight> {
let info = self.local_chain_info(chain_id, local_node).await?;
Some(info.next_block_height)
}
#[instrument(level = "trace", skip(remote_node, local_node, notification))]
async fn process_notification(
&self,
remote_node: RemoteNode<P::Node>,
mut local_node: LocalNodeClient<S>,
notification: Notification,
) {
match notification.reason {
Reason::NewIncomingBundle { origin, height } => {
if self
.local_next_block_height(origin.sender, &mut local_node)
.await
> Some(height)
{
debug!("Accepting redundant notification for new message");
return;
}
if let Err(error) = self
.find_received_certificates_from_validator(remote_node)
.await
{
error!("Fail to process notification: {error}");
return;
}
if self
.local_next_block_height(origin.sender, &mut local_node)
.await
<= Some(height)
{
error!("Fail to synchronize new message after notification");
}
}
Reason::NewBlock { height, .. } => {
let chain_id = notification.chain_id;
if self
.local_next_block_height(chain_id, &mut local_node)
.await
> Some(height)
{
debug!("Accepting redundant notification for new block");
return;
}
if let Err(error) = self
.try_synchronize_chain_state_from(&remote_node, chain_id)
.await
{
error!("Fail to process notification: {error}");
return;
}
let local_height = self
.local_next_block_height(chain_id, &mut local_node)
.await;
if local_height <= Some(height) {
error!("Fail to synchronize new block after notification");
}
}
Reason::NewRound { height, round } => {
let chain_id = notification.chain_id;
if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await {
if (info.next_block_height, info.manager.current_round) >= (height, round) {
debug!("Accepting redundant notification for new round");
return;
}
}
if let Err(error) = self
.try_synchronize_chain_state_from(&remote_node, chain_id)
.await
{
error!("Fail to process notification: {error}");
return;
}
let Some(info) = self.local_chain_info(chain_id, &mut local_node).await else {
error!("Fail to read local chain info for {chain_id}");
return;
};
if (info.next_block_height, info.manager.current_round) < (height, round) {
error!("Fail to synchronize new block after notification");
}
}
}
}
#[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
pub async fn listen(
&self,
) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), ChainClientError> {
use future::FutureExt as _;
async fn await_while_polling<F: FusedFuture>(
future: F,
background_work: impl FusedStream<Item = ()>,
) -> F::Output {
tokio::pin!(future);
tokio::pin!(background_work);
loop {
futures::select! {
_ = background_work.next() => (),
result = future => return result,
}
}
}
let mut senders = HashMap::new(); let notifications = self.subscribe().await?;
let (abortable_notifications, abort) = stream::abortable(self.subscribe().await?);
if let Err(error) = self.synchronize_from_validators().await {
error!("Failed to synchronize from validators: {}", error);
}
let mut process_notifications = FuturesUnordered::new();
match self.update_streams(&mut senders).await {
Ok(handler) => process_notifications.push(handler),
Err(error) => error!("Failed to update committee: {error}"),
};
let this = self.clone();
let update_streams = async move {
let mut abortable_notifications = abortable_notifications.fuse();
while let Some(notification) =
await_while_polling(abortable_notifications.next(), &mut process_notifications)
.await
{
if let Reason::NewBlock { .. } = notification.reason {
match await_while_polling(
this.update_streams(&mut senders).fuse(),
&mut process_notifications,
)
.await
{
Ok(handler) => process_notifications.push(handler),
Err(error) => error!("Failed to update committee: {error}"),
}
}
}
for abort in senders.into_values() {
abort.abort();
}
let () = process_notifications.collect().await;
}
.in_current_span();
Ok((update_streams, AbortOnDrop(abort), notifications))
}
#[instrument(level = "trace", skip(senders))]
async fn update_streams(
&self,
senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
) -> Result<impl Future<Output = ()>, ChainClientError> {
let (chain_id, nodes, local_node) = {
let committee = self.local_committee().await?;
let nodes: HashMap<_, _> = self
.client
.validator_node_provider
.make_nodes(&committee)?
.collect();
(self.chain_id, nodes, self.client.local_node.clone())
};
senders.retain(|validator, abort| {
if !nodes.contains_key(validator) {
abort.abort();
}
!abort.is_aborted()
});
let validator_tasks = FuturesUnordered::new();
for (public_key, node) in nodes {
let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
continue;
};
let stream = stream::once({
let node = node.clone();
async move { node.subscribe(vec![chain_id]).await }
})
.filter_map(move |result| async move {
if let Err(error) = &result {
warn!(?error, "Could not connect to validator {public_key}");
} else {
info!("Connected to validator {public_key}");
}
result.ok()
})
.flatten();
let (stream, abort) = stream::abortable(stream);
let mut stream = Box::pin(stream);
let this = self.clone();
let local_node = local_node.clone();
let remote_node = RemoteNode { public_key, node };
validator_tasks.push(async move {
while let Some(notification) = stream.next().await {
this.process_notification(
remote_node.clone(),
local_node.clone(),
notification,
)
.await;
}
});
entry.insert(abort);
}
Ok(validator_tasks.collect())
}
#[instrument(level = "trace")]
async fn find_received_certificates_from_validator(
&self,
remote_node: RemoteNode<P::Node>,
) -> Result<(), ChainClientError> {
let chain_id = self.chain_id;
let received_certificates = self
.synchronize_received_certificates_from_validator(
chain_id,
&remote_node,
self.client.max_loaded_chains.into(),
)
.await?;
self.receive_certificates_from_validators(vec![received_certificates])
.await;
Ok(())
}
fn max_height_per_chain(remote_log: &[ChainAndHeight]) -> BTreeMap<ChainId, BlockHeight> {
remote_log.iter().fold(
BTreeMap::<ChainId, BlockHeight>::new(),
|mut chain_to_info, entry| {
chain_to_info
.entry(entry.chain_id)
.and_modify(|h| *h = entry.height.max(*h))
.or_insert(entry.height);
chain_to_info
},
)
}
#[instrument(level = "trace", skip(remote_node))]
pub async fn sync_validator(&self, remote_node: P::Node) -> Result<(), ChainClientError> {
let validator_chain_state = remote_node
.handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
.await?;
let local_chain_state = self.client.local_node.chain_info(self.chain_id).await?;
let Some(missing_certificate_count) = local_chain_state
.next_block_height
.0
.checked_sub(validator_chain_state.info.next_block_height.0)
.filter(|count| *count > 0)
else {
debug!("Validator is up-to-date with local state");
return Ok(());
};
let missing_certificates_end = usize::try_from(local_chain_state.next_block_height.0)
.expect("`usize` should be at least `u64`");
let missing_certificates_start = missing_certificates_end
- usize::try_from(missing_certificate_count).expect("`usize` should be at least `u64`");
let missing_certificate_hashes = self
.client
.local_node
.chain_state_view(self.chain_id)
.await?
.confirmed_log
.read(missing_certificates_start..missing_certificates_end)
.await?;
let certificates = self
.client
.storage
.read_certificates(missing_certificate_hashes)
.await?;
for certificate in certificates {
remote_node
.handle_confirmed_certificate(certificate, CrossChainMessageDelivery::NonBlocking)
.await?;
}
Ok(())
}
}
#[derive(Debug)]
enum ExecuteBlockOutcome {
Executed(ConfirmedBlockCertificate),
Conflict(ConfirmedBlockCertificate),
WaitForTimeout(RoundTimeout),
}
#[must_use]
pub struct AbortOnDrop(AbortHandle);
impl Drop for AbortOnDrop {
#[instrument(level = "trace", skip(self))]
fn drop(&mut self) {
self.0.abort();
}
}
struct ReceivedCertificatesFromValidator {
public_key: ValidatorPublicKey,
tracker: u64,
certificates: Vec<ConfirmedBlockCertificate>,
other_sender_chains: Vec<ChainId>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct PendingProposal {
pub block: ProposedBlock,
pub blobs: Vec<Blob>,
}