1mod state;
5use std::{
6 collections::{hash_map, BTreeMap, BTreeSet, HashMap},
7 convert::Infallible,
8 iter,
9 sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14 future::{self, Either, FusedFuture, Future},
15 stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20 abi::Abi,
21 crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
22 data_types::{
23 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
24 ChainDescription, Epoch, MessagePolicy, Round, Timestamp,
25 },
26 ensure,
27 identifiers::{
28 Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
29 ModuleId, StreamId,
30 },
31 ownership::{ChainOwnership, TimeoutConfig},
32 time::{Duration, Instant},
33};
34#[cfg(not(target_arch = "wasm32"))]
35use linera_base::{data_types::Bytecode, vm::VmRuntime};
36use linera_chain::{
37 data_types::{
38 BlockProposal, BundleExecutionPolicy, ChainAndHeight, IncomingBundle, ProposedBlock,
39 Transaction,
40 },
41 manager::LockingBlock,
42 types::{
43 Block, ConfirmedBlock, ConfirmedBlockCertificate, Timeout, TimeoutCertificate,
44 ValidatedBlock,
45 },
46 ChainError, ChainExecutionContext, ChainStateView,
47};
48use linera_execution::{
49 committee::Committee,
50 system::{
51 AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
52 REMOVED_EPOCH_STREAM_NAME,
53 },
54 ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
55};
56use linera_storage::{Clock as _, Storage as _};
57use linera_views::ViewError;
58use rand::seq::SliceRandom;
59use serde::Serialize;
60pub use state::State;
61use thiserror::Error;
62use tokio::sync::{mpsc, OwnedRwLockReadGuard};
63use tokio_stream::wrappers::UnboundedReceiverStream;
64use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
65
66use super::{
67 received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
68 ListeningMode, PendingProposal, ReceiveCertificateMode, TimingType,
69};
70use crate::{
71 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
72 environment::Environment,
73 local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
74 node::{
75 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
76 ValidatorNodeProvider as _,
77 },
78 remote_node::RemoteNode,
79 updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
80 worker::{Notification, Reason, WorkerError},
81};
82
83#[derive(Debug, Clone)]
84pub struct Options {
85 pub max_pending_message_bundles: usize,
87 pub max_block_limit_errors: u32,
92 pub message_policy: MessagePolicy,
94 pub cross_chain_message_delivery: CrossChainMessageDelivery,
96 pub quorum_grace_period: f64,
99 pub blob_download_timeout: Duration,
101 pub certificate_batch_download_timeout: Duration,
103 pub certificate_download_batch_size: u64,
106 pub sender_certificate_download_batch_size: usize,
109 pub max_joined_tasks: usize,
111 pub allow_fast_blocks: bool,
114}
115
116#[cfg(with_testing)]
117impl Options {
118 pub fn test_default() -> Self {
119 use super::{
120 DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
121 };
122 use crate::DEFAULT_QUORUM_GRACE_PERIOD;
123
124 Options {
125 max_pending_message_bundles: 10,
126 max_block_limit_errors: 3,
127 message_policy: MessagePolicy::new_accept_all(),
128 cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
129 quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
130 blob_download_timeout: Duration::from_secs(1),
131 certificate_batch_download_timeout: Duration::from_secs(1),
132 certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
133 sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
134 max_joined_tasks: 100,
135 allow_fast_blocks: false,
136 }
137 }
138}
139
140#[derive(Debug)]
146pub struct ChainClient<Env: Environment> {
147 #[debug(skip)]
149 pub(crate) client: Arc<Client<Env>>,
150 chain_id: ChainId,
152 #[debug(skip)]
154 options: Options,
155 preferred_owner: Option<AccountOwner>,
158 initial_next_block_height: BlockHeight,
160 initial_block_hash: Option<CryptoHash>,
162 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
164}
165
166impl<Env: Environment> Clone for ChainClient<Env> {
167 fn clone(&self) -> Self {
168 Self {
169 client: self.client.clone(),
170 chain_id: self.chain_id,
171 options: self.options.clone(),
172 preferred_owner: self.preferred_owner,
173 initial_next_block_height: self.initial_next_block_height,
174 initial_block_hash: self.initial_block_hash,
175 timing_sender: self.timing_sender.clone(),
176 }
177 }
178}
179
180#[derive(Debug, Error)]
182pub enum Error {
183 #[error("Local node operation failed: {0}")]
184 LocalNodeError(#[from] LocalNodeError),
185
186 #[error("Remote node operation failed: {0}")]
187 RemoteNodeError(#[from] NodeError),
188
189 #[error(transparent)]
190 ArithmeticError(#[from] ArithmeticError),
191
192 #[error("Missing certificates: {0:?}")]
193 ReadCertificatesError(Vec<CryptoHash>),
194
195 #[error("Missing confirmed block: {0:?}")]
196 MissingConfirmedBlock(CryptoHash),
197
198 #[error("JSON (de)serialization error: {0}")]
199 JsonError(#[from] serde_json::Error),
200
201 #[error("Chain operation failed: {0}")]
202 ChainError(#[from] ChainError),
203
204 #[error(transparent)]
205 CommunicationError(#[from] CommunicationError<NodeError>),
206
207 #[error("Internal error within chain client: {0}")]
208 InternalError(&'static str),
209
210 #[error(
211 "Cannot accept a certificate from an unknown committee in the future. \
212 Please synchronize the local view of the admin chain"
213 )]
214 CommitteeSynchronizationError,
215
216 #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
217 WalletSynchronizationError,
218
219 #[error("The state of the client is incompatible with the proposed block: {0}")]
220 BlockProposalError(&'static str),
221
222 #[error(
223 "Cannot accept a certificate from a committee that was retired. \
224 Try a newer certificate from the same origin"
225 )]
226 CommitteeDeprecationError,
227
228 #[error("Protocol error within chain client: {0}")]
229 ProtocolError(&'static str),
230
231 #[error("Signer doesn't have key to sign for chain {0}")]
232 CannotFindKeyForChain(ChainId),
233
234 #[error("client is not configured to propose on chain {0}")]
235 NoAccountKeyConfigured(ChainId),
236
237 #[error("The chain client isn't owner on chain {0}")]
238 NotAnOwner(ChainId),
239
240 #[error(transparent)]
241 ViewError(#[from] ViewError),
242
243 #[error(
244 "Failed to download certificates and update local node to the next height \
245 {target_next_block_height} of chain {chain_id}"
246 )]
247 CannotDownloadCertificates {
248 chain_id: ChainId,
249 target_next_block_height: BlockHeight,
250 },
251
252 #[error(transparent)]
253 BcsError(#[from] bcs::Error),
254
255 #[error(
256 "Unexpected quorum: validators voted for block hash {hash} in {round}, \
257 expected block hash {expected_hash} in {expected_round}"
258 )]
259 UnexpectedQuorum {
260 hash: CryptoHash,
261 round: Round,
262 expected_hash: CryptoHash,
263 expected_round: Round,
264 },
265
266 #[error("signer error: {0:?}")]
267 Signer(#[source] Box<dyn signer::Error>),
268
269 #[error("Cannot revoke the current epoch {0}")]
270 CannotRevokeCurrentEpoch(Epoch),
271
272 #[error("Epoch is already revoked")]
273 EpochAlreadyRevoked,
274
275 #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
276 CannotDownloadMissingSenderBlock {
277 chain_id: ChainId,
278 height: BlockHeight,
279 },
280
281 #[error(
282 "A different block was already committed at this height. \
283 The committed certificate hash is {0}"
284 )]
285 Conflict(CryptoHash),
286}
287
288impl From<Infallible> for Error {
289 fn from(infallible: Infallible) -> Self {
290 match infallible {}
291 }
292}
293
294impl Error {
295 pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
296 Self::Signer(Box::new(err))
297 }
298}
299
300impl<Env: Environment> ChainClient<Env> {
301 pub fn new(
302 client: Arc<Client<Env>>,
303 chain_id: ChainId,
304 options: Options,
305 initial_block_hash: Option<CryptoHash>,
306 initial_next_block_height: BlockHeight,
307 preferred_owner: Option<AccountOwner>,
308 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
309 ) -> Self {
310 ChainClient {
311 client,
312 chain_id,
313 options,
314 preferred_owner,
315 initial_block_hash,
316 initial_next_block_height,
317 timing_sender,
318 }
319 }
320
321 pub fn is_follow_only(&self) -> bool {
323 self.client.is_chain_follow_only(self.chain_id)
324 }
325
326 #[instrument(level = "trace", skip(self))]
328 fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
329 self.client
330 .chains
331 .pin()
332 .get(&self.chain_id)
333 .expect("Chain client constructed for invalid chain")
334 .client_mutex()
335 }
336
337 #[instrument(level = "trace", skip(self))]
339 pub fn pending_proposal(&self) -> Option<PendingProposal> {
340 self.client
341 .chains
342 .pin()
343 .get(&self.chain_id)
344 .expect("Chain client constructed for invalid chain")
345 .pending_proposal()
346 .clone()
347 }
348
349 #[instrument(level = "trace", skip(self, f))]
351 fn update_state<F>(&self, f: F)
352 where
353 F: Fn(&mut State),
354 {
355 let chains = self.client.chains.pin();
356 chains
357 .update(self.chain_id, |state| {
358 let mut state = state.clone_for_update_unchecked();
359 f(&mut state);
360 state
361 })
362 .expect("Chain client constructed for invalid chain");
363 }
364
365 #[instrument(level = "trace", skip(self))]
367 pub fn signer(&self) -> &impl Signer {
368 self.client.signer()
369 }
370
371 pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, Error> {
373 self.client.has_key_for(owner).await
374 }
375
376 #[instrument(level = "trace", skip(self))]
378 pub fn options_mut(&mut self) -> &mut Options {
379 &mut self.options
380 }
381
382 #[instrument(level = "trace", skip(self))]
384 pub fn options(&self) -> &Options {
385 &self.options
386 }
387
388 #[instrument(level = "trace", skip(self))]
390 pub fn chain_id(&self) -> ChainId {
391 self.chain_id
392 }
393
394 pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
396 self.timing_sender.clone()
397 }
398
399 #[instrument(level = "trace", skip(self))]
401 pub fn admin_chain_id(&self) -> ChainId {
402 self.client.admin_chain_id
403 }
404
405 #[instrument(level = "trace", skip(self))]
407 pub fn preferred_owner(&self) -> Option<AccountOwner> {
408 self.preferred_owner
409 }
410
411 #[instrument(level = "trace", skip(self))]
413 pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
414 self.preferred_owner = Some(preferred_owner);
415 }
416
417 #[instrument(level = "trace", skip(self))]
419 pub fn unset_preferred_owner(&mut self) {
420 self.preferred_owner = None;
421 }
422
423 #[instrument(level = "trace")]
425 pub async fn chain_state_view(
426 &self,
427 ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
428 self.client.local_node.chain_state_view(self.chain_id).await
429 }
430
431 #[instrument(level = "trace", skip(self))]
433 pub async fn event_stream_publishers(
434 &self,
435 ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
436 let subscriptions = self
437 .client
438 .local_node
439 .get_event_subscriptions(self.chain_id)
440 .await?;
441 let mut publishers = subscriptions.into_iter().fold(
442 BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
443 |mut map, ((chain_id, stream_id), _)| {
444 map.entry(chain_id).or_default().insert(stream_id);
445 map
446 },
447 );
448 if self.chain_id != self.client.admin_chain_id {
449 publishers.insert(
450 self.client.admin_chain_id,
451 vec![
452 StreamId::system(EPOCH_STREAM_NAME),
453 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
454 ]
455 .into_iter()
456 .collect(),
457 );
458 }
459 Ok(publishers)
460 }
461
462 #[instrument(level = "trace")]
464 pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
465 self.subscribe_to(self.chain_id)
466 }
467
468 #[instrument(level = "trace")]
470 pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
471 Ok(Box::pin(UnboundedReceiverStream::new(
472 self.client.notifier.subscribe(vec![chain_id]),
473 )))
474 }
475
476 #[instrument(level = "trace")]
478 pub fn storage_client(&self) -> &Env::Storage {
479 self.client.storage_client()
480 }
481
482 #[instrument(level = "trace")]
484 pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
485 let query = ChainInfoQuery::new(self.chain_id);
486 let response = self
487 .client
488 .local_node
489 .handle_chain_info_query(query)
490 .await?;
491 self.client.update_from_info(&response.info);
492 Ok(response.info)
493 }
494
495 #[instrument(level = "trace")]
497 pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
498 let query = ChainInfoQuery::new(self.chain_id)
499 .with_manager_values()
500 .with_committees();
501 let response = self
502 .client
503 .local_node
504 .handle_chain_info_query(query)
505 .await?;
506 self.client.update_from_info(&response.info);
507 Ok(response.info)
508 }
509
510 pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
512 self.client.get_chain_description(self.chain_id).await
513 }
514
515 #[instrument(level = "trace")]
518 async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
519 if self.options.message_policy.is_ignore() {
520 return Ok(Vec::new());
522 }
523
524 let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
525 let info = self
526 .client
527 .local_node
528 .handle_chain_info_query(query)
529 .await?
530 .info;
531 if self.preferred_owner.is_some_and(|owner| {
532 info.manager
533 .ownership
534 .is_super_owner_no_regular_owners(&owner)
535 }) {
536 ensure!(
538 info.next_block_height >= self.initial_next_block_height,
539 Error::WalletSynchronizationError
540 );
541 }
542
543 Ok(info
544 .requested_pending_message_bundles
545 .into_iter()
546 .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
547 .take(self.options.max_pending_message_bundles)
548 .collect())
549 }
550
551 #[instrument(level = "trace")]
555 async fn collect_stream_updates(&self) -> Result<Option<Operation>, Error> {
556 let subscription_map = self
558 .client
559 .local_node
560 .get_event_subscriptions(self.chain_id)
561 .await?;
562 let futures = subscription_map
564 .into_iter()
565 .filter(|((chain_id, _), _)| {
566 self.options
567 .message_policy
568 .restrict_chain_ids_to
569 .as_ref()
570 .is_none_or(|chain_set| chain_set.contains(chain_id))
571 })
572 .map(|((chain_id, stream_id), subscriptions)| {
573 let client = self.client.clone();
574 async move {
575 let next_expected_index = client
576 .local_node
577 .get_next_expected_event(chain_id, stream_id.clone())
578 .await?;
579 if let Some(next_index) = next_expected_index
580 .filter(|next_index| *next_index > subscriptions.next_index)
581 {
582 Ok(Some((chain_id, stream_id, next_index)))
583 } else {
584 Ok::<_, Error>(None)
585 }
586 }
587 });
588 let updates = futures::stream::iter(futures)
589 .buffer_unordered(self.options.max_joined_tasks)
590 .try_collect::<Vec<_>>()
591 .await?
592 .into_iter()
593 .flatten()
594 .collect::<Vec<_>>();
595 if updates.is_empty() {
596 return Ok(None);
597 }
598 Ok(Some(SystemOperation::UpdateStreams(updates).into()))
599 }
600
601 #[instrument(level = "trace")]
602 async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
603 self.client.chain_info_with_committees(self.chain_id).await
604 }
605
606 #[instrument(level = "trace")]
608 async fn epoch_and_committees(
609 &self,
610 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
611 let info = self.chain_info_with_committees().await?;
612 let epoch = info.epoch;
613 let committees = info.into_committees()?;
614 Ok((epoch, committees))
615 }
616
617 #[instrument(level = "trace")]
619 pub async fn local_committee(&self) -> Result<Committee, Error> {
620 let info = match self.chain_info_with_committees().await {
621 Ok(info) => info,
622 Err(LocalNodeError::BlobsNotFound(_)) => {
623 self.synchronize_chain_state(self.chain_id).await?;
624 self.chain_info_with_committees().await?
625 }
626 Err(err) => return Err(err.into()),
627 };
628 Ok(info.into_current_committee()?)
629 }
630
631 #[instrument(level = "trace")]
633 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
634 self.client.admin_committee().await
635 }
636
637 #[instrument(level = "trace")]
641 pub async fn identity(&self) -> Result<AccountOwner, Error> {
642 let Some(preferred_owner) = self.preferred_owner else {
643 return Err(Error::NoAccountKeyConfigured(self.chain_id));
644 };
645 let manager = self.chain_info().await?.manager;
646 ensure!(
647 manager.ownership.is_active(),
648 LocalNodeError::InactiveChain(self.chain_id)
649 );
650 let fallback_owners = if manager.ownership.has_fallback() {
651 self.local_committee()
652 .await?
653 .account_keys_and_weights()
654 .map(|(key, _)| AccountOwner::from(key))
655 .collect()
656 } else {
657 BTreeSet::new()
658 };
659
660 let is_owner = manager
661 .ownership
662 .can_propose_in_multi_leader_round(&preferred_owner)
663 || fallback_owners.contains(&preferred_owner);
664
665 if !is_owner {
666 warn!(
667 chain_id = %self.chain_id,
668 ownership = ?manager.ownership,
669 ?fallback_owners,
670 ?preferred_owner,
671 "The preferred owner is not configured as an owner of this chain",
672 );
673 return Err(Error::NotAnOwner(self.chain_id));
674 }
675
676 let has_signer = self.has_key_for(&preferred_owner).await?;
677
678 if !has_signer {
679 warn!(%self.chain_id, ?preferred_owner,
680 "Chain is one of the owners but its Signer instance doesn't contain the key",
681 );
682 return Err(Error::CannotFindKeyForChain(self.chain_id));
683 }
684
685 Ok(preferred_owner)
686 }
687
688 #[instrument(level = "trace")]
696 pub async fn prepare_for_owner(&self, owner: AccountOwner) -> Result<Box<ChainInfo>, Error> {
697 ensure!(
698 self.has_key_for(&owner).await?,
699 Error::CannotFindKeyForChain(self.chain_id)
700 );
701 self.client
703 .get_chain_description_blob(self.chain_id)
704 .await?;
705
706 let info = self.chain_info().await?;
708
709 ensure!(
711 info.manager
712 .ownership
713 .can_propose_in_multi_leader_round(&owner),
714 Error::NotAnOwner(self.chain_id)
715 );
716
717 Ok(info)
718 }
719
720 #[instrument(level = "trace")]
723 pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
724 #[cfg(with_metrics)]
725 let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
726
727 let mut info = self.synchronize_to_known_height().await?;
728
729 if self.preferred_owner.is_none_or(|owner| {
730 !info
731 .manager
732 .ownership
733 .is_super_owner_no_regular_owners(&owner)
734 }) {
735 info = self.client.synchronize_chain_state(self.chain_id).await?;
739 }
740
741 if info.epoch > self.client.admin_committees().await?.0 {
742 self.client
743 .synchronize_chain_state(self.client.admin_chain_id)
744 .await?;
745 }
746
747 self.client.update_from_info(&info);
748 Ok(info)
749 }
750
751 async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
756 let info = self
757 .client
758 .download_certificates(self.chain_id, self.initial_next_block_height)
759 .await?;
760 if info.next_block_height == self.initial_next_block_height {
761 ensure!(
763 self.initial_block_hash == info.block_hash,
764 Error::InternalError("Invalid chain of blocks in local node")
765 );
766 }
767 Ok(info)
768 }
769
770 #[instrument(level = "trace", skip(old_committee, latest_certificate))]
772 pub async fn update_validators(
773 &self,
774 old_committee: Option<&Committee>,
775 latest_certificate: Option<ConfirmedBlockCertificate>,
776 ) -> Result<(), Error> {
777 let update_validators_start = linera_base::time::Instant::now();
778 if let Some(old_committee) = old_committee {
780 self.communicate_chain_updates(old_committee, latest_certificate.clone())
781 .await?
782 };
783 if let Ok(new_committee) = self.local_committee().await {
784 if Some(&new_committee) != old_committee {
785 self.communicate_chain_updates(&new_committee, latest_certificate)
788 .await?;
789 }
790 }
791 self.send_timing(update_validators_start, TimingType::UpdateValidators);
792 Ok(())
793 }
794
795 #[instrument(level = "trace", skip(committee))]
797 pub async fn communicate_chain_updates(
798 &self,
799 committee: &Committee,
800 latest_certificate: Option<ConfirmedBlockCertificate>,
801 ) -> Result<(), Error> {
802 let delivery = self.options.cross_chain_message_delivery;
803 let height = self.chain_info().await?.next_block_height;
804 self.client
805 .communicate_chain_updates(
806 committee,
807 self.chain_id,
808 height,
809 delivery,
810 latest_certificate,
811 )
812 .await
813 }
814
815 async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
818 let subscriptions = self
819 .client
820 .local_node
821 .get_event_subscriptions(self.chain_id)
822 .await?;
823 let chain_ids = subscriptions
824 .iter()
825 .map(|((chain_id, _), _)| *chain_id)
826 .chain(iter::once(self.client.admin_chain_id))
827 .filter(|chain_id| *chain_id != self.chain_id)
828 .collect::<BTreeSet<_>>();
829 stream::iter(
830 chain_ids
831 .into_iter()
832 .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
833 )
834 .buffer_unordered(self.options.max_joined_tasks)
835 .collect::<Vec<_>>()
836 .await
837 .into_iter()
838 .collect::<Result<Vec<_>, _>>()?;
839 Ok(())
840 }
841
842 #[instrument(level = "trace")]
851 pub async fn find_received_certificates(&self) -> Result<(), Error> {
852 debug!(chain_id = %self.chain_id, "starting find_received_certificates");
853 #[cfg(with_metrics)]
854 let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
855 let chain_id = self.chain_id;
857 let (_, committee) = self.admin_committee().await?;
858 let nodes = self.client.make_nodes(&committee)?;
859
860 let trackers = self
861 .client
862 .local_node
863 .get_received_certificate_trackers(chain_id)
864 .await?;
865
866 trace!("find_received_certificates: read trackers");
867
868 let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
869 let result = communicate_with_quorum(
871 &nodes,
872 &committee,
873 |_| (),
874 |remote_node| {
875 let client = &self.client;
876 let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
877 let received_log_batches = Arc::clone(&received_log_batches);
878 Box::pin(async move {
879 let batch = client
880 .get_received_log_from_validator(chain_id, &remote_node, tracker)
881 .await?;
882 let mut batches = received_log_batches.lock().unwrap();
883 batches.push((remote_node.public_key, batch));
884 Ok(())
885 })
886 },
887 self.options.quorum_grace_period,
888 )
889 .await;
890
891 if let Err(error) = result {
892 error!(
893 %error,
894 "Failed to synchronize received_logs from at least a quorum of validators",
895 );
896 }
897
898 let received_logs: Vec<_> = {
899 let mut received_log_batches = received_log_batches.lock().unwrap();
900 std::mem::take(received_log_batches.as_mut())
901 };
902
903 debug!(
904 received_logs_len = %received_logs.len(),
905 received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
906 "collected received logs"
907 );
908
909 let (received_logs, mut validator_trackers) = {
910 (
911 ReceivedLogs::from_received_result(received_logs.clone()),
912 ValidatorTrackers::new(received_logs, &trackers),
913 )
914 };
915
916 debug!(
917 num_chains = %received_logs.num_chains(),
918 num_certs = %received_logs.num_certs(),
919 "find_received_certificates: total number of chains and certificates to sync",
920 );
921
922 let max_blocks_per_chain =
923 self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
924 for received_log in received_logs.into_batches(
925 self.options.sender_certificate_download_batch_size,
926 max_blocks_per_chain,
927 ) {
928 validator_trackers = self
929 .receive_sender_certificates(received_log, validator_trackers, &nodes)
930 .await?;
931
932 self.update_received_certificate_trackers(&validator_trackers)
933 .await;
934 }
935
936 info!("find_received_certificates finished");
937
938 Ok(())
939 }
940
941 async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
942 let updated_trackers = trackers.to_map();
943 trace!(?updated_trackers, "updated tracker values");
944
945 if let Err(error) = self
947 .client
948 .local_node
949 .update_received_certificate_trackers(self.chain_id, updated_trackers)
950 .await
951 {
952 error!(
953 chain_id = %self.chain_id,
954 %error,
955 "Failed to update the certificate trackers for chain",
956 );
957 }
958 }
959
960 async fn receive_sender_certificates(
963 &self,
964 mut received_logs: ReceivedLogs,
965 mut validator_trackers: ValidatorTrackers,
966 nodes: &[RemoteNode<Env::ValidatorNode>],
967 ) -> Result<ValidatorTrackers, Error> {
968 debug!(
969 num_chains = %received_logs.num_chains(),
970 num_certs = %received_logs.num_certs(),
971 "receive_sender_certificates: number of chains and certificates to sync",
972 );
973
974 let local_next_heights = self
976 .client
977 .local_node
978 .next_outbox_heights(received_logs.chains(), self.chain_id)
979 .await?;
980
981 validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
982
983 debug!(
984 remaining_total_certificates = %received_logs.num_certs(),
985 "receive_sender_certificates: computed remote_heights"
986 );
987
988 let mut other_sender_chains = Vec::new();
989 let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
990
991 let cert_futures = received_logs.heights_per_chain().into_iter().filter_map({
992 let received_logs = &received_logs;
993 let other_sender_chains = &mut other_sender_chains;
994
995 move |(sender_chain_id, remote_heights)| {
996 if remote_heights.is_empty() {
997 other_sender_chains.push(sender_chain_id);
1001 return None;
1002 };
1003 let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
1004 let sender = sender.clone();
1005 let client = self.client.clone();
1006 let mut nodes = nodes.to_vec();
1007 nodes.shuffle(&mut rand::thread_rng());
1008 Some(async move {
1009 client
1010 .download_and_process_sender_chain(
1011 sender_chain_id,
1012 &nodes,
1013 received_logs,
1014 remote_heights,
1015 sender,
1016 )
1017 .await
1018 })
1019 }
1020 });
1021
1022 future::join(
1023 stream::iter(cert_futures)
1024 .buffer_unordered(self.options.max_joined_tasks)
1025 .collect::<()>(),
1026 async {
1027 while let Some(chain_and_height) = receiver.recv().await {
1028 validator_trackers.downloaded_cert(chain_and_height);
1029 }
1030 },
1031 )
1032 .await;
1033
1034 debug!(
1035 num_other_chains = %other_sender_chains.len(),
1036 "receive_sender_certificates: processing certificates finished"
1037 );
1038
1039 self.retry_pending_cross_chain_requests(nodes, other_sender_chains)
1043 .await;
1044
1045 debug!("receive_sender_certificates: finished processing other_sender_chains");
1046
1047 Ok(validator_trackers)
1048 }
1049
1050 async fn retry_pending_cross_chain_requests(
1053 &self,
1054 nodes: &[RemoteNode<Env::ValidatorNode>],
1055 other_sender_chains: Vec<ChainId>,
1056 ) {
1057 let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
1058 let local_node = self.client.local_node.clone();
1059 async move {
1060 if let Err(error) = match local_node
1061 .retry_pending_cross_chain_requests(chain_id)
1062 .await
1063 {
1064 Ok(()) => Ok(()),
1065 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1066 if let Err(error) = self
1067 .client
1068 .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1069 .await
1070 {
1071 error!(
1072 ?blob_ids,
1073 %error,
1074 "Error while attempting to download blobs during retrying outgoing \
1075 messages"
1076 );
1077 }
1078 local_node
1079 .retry_pending_cross_chain_requests(chain_id)
1080 .await
1081 }
1082 err => err,
1083 } {
1084 error!(
1085 %chain_id,
1086 %error,
1087 "Failed to retry outgoing messages from chain"
1088 );
1089 }
1090 }
1091 }));
1092 stream.for_each(future::ready).await;
1093 }
1094
1095 #[instrument(level = "trace")]
1097 pub async fn transfer(
1098 &self,
1099 owner: AccountOwner,
1100 amount: Amount,
1101 recipient: Account,
1102 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1103 self.execute_operation(SystemOperation::Transfer {
1105 owner,
1106 recipient,
1107 amount,
1108 })
1109 .await
1110 }
1111
1112 #[instrument(level = "trace")]
1115 pub async fn read_data_blob(
1116 &self,
1117 hash: CryptoHash,
1118 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1119 let blob_id = BlobId {
1120 hash,
1121 blob_type: BlobType::Data,
1122 };
1123 self.execute_operation(SystemOperation::VerifyBlob { blob_id })
1124 .await
1125 }
1126
1127 #[instrument(level = "trace")]
1129 pub async fn claim(
1130 &self,
1131 owner: AccountOwner,
1132 target_id: ChainId,
1133 recipient: Account,
1134 amount: Amount,
1135 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1136 self.execute_operation(SystemOperation::Claim {
1137 owner,
1138 target_id,
1139 recipient,
1140 amount,
1141 })
1142 .await
1143 }
1144
1145 #[instrument(level = "trace")]
1148 pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1149 let chain_id = self.chain_id;
1150 let info = self.chain_info_with_committees().await?;
1151 let committee = info.current_committee()?;
1152 let height = info.next_block_height;
1153 let round = info.manager.current_round;
1154 let action = CommunicateAction::RequestTimeout {
1155 height,
1156 round,
1157 chain_id,
1158 };
1159 let value = Timeout::new(chain_id, height, info.epoch);
1160 let certificate = Box::new(
1161 self.client
1162 .communicate_chain_action(committee, action, value)
1163 .await?,
1164 );
1165 self.client.process_certificate(certificate.clone()).await?;
1166 self.client
1168 .communicate_chain_updates(
1169 committee,
1170 chain_id,
1171 height,
1172 CrossChainMessageDelivery::NonBlocking,
1173 None,
1174 )
1175 .await?;
1176 Ok(*certificate)
1177 }
1178
1179 #[instrument(level = "trace", skip_all)]
1181 pub async fn synchronize_chain_state(
1182 &self,
1183 chain_id: ChainId,
1184 ) -> Result<Box<ChainInfo>, Error> {
1185 self.client.synchronize_chain_state(chain_id).await
1186 }
1187
1188 #[instrument(level = "trace", skip_all)]
1191 pub async fn synchronize_chain_state_from_committee(
1192 &self,
1193 committee: Committee,
1194 ) -> Result<Box<ChainInfo>, Error> {
1195 self.client
1196 .synchronize_chain_from_committee(self.chain_id, committee)
1197 .await
1198 }
1199
1200 #[instrument(level = "trace", skip(operations, blobs))]
1202 pub async fn execute_operations(
1203 &self,
1204 operations: Vec<Operation>,
1205 blobs: Vec<Blob>,
1206 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1207 let timing_start = linera_base::time::Instant::now();
1208
1209 let result = loop {
1210 let execute_block_start = linera_base::time::Instant::now();
1211 match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1213 Ok(ClientOutcome::Committed(certificate)) => {
1214 self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1215 break Ok(ClientOutcome::Committed(certificate));
1216 }
1217 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
1218 break Ok(ClientOutcome::WaitForTimeout(timeout));
1219 }
1220 Ok(ClientOutcome::Conflict(certificate)) => {
1221 info!(
1222 height = %certificate.block().header.height,
1223 "Another block was committed."
1224 );
1225 break Ok(ClientOutcome::Conflict(certificate));
1226 }
1227 Err(Error::CommunicationError(CommunicationError::Trusted(
1228 NodeError::UnexpectedBlockHeight {
1229 expected_block_height,
1230 found_block_height,
1231 },
1232 ))) if expected_block_height > found_block_height => {
1233 tracing::info!(
1234 "Local state is outdated; synchronizing chain {:.8}",
1235 self.chain_id
1236 );
1237 self.synchronize_chain_state(self.chain_id).await?;
1238 }
1239 Err(err) => return Err(err),
1240 };
1241 };
1242
1243 self.send_timing(timing_start, TimingType::ExecuteOperations);
1244
1245 result
1246 }
1247
1248 pub async fn execute_operation(
1250 &self,
1251 operation: impl Into<Operation>,
1252 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1253 self.execute_operations(vec![operation.into()], vec![])
1254 .await
1255 }
1256
1257 #[instrument(level = "trace", skip(operations, blobs))]
1261 async fn execute_block(
1262 &self,
1263 operations: Vec<Operation>,
1264 blobs: Vec<Blob>,
1265 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1266 #[cfg(with_metrics)]
1267 let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1268
1269 let mutex = self.client_mutex();
1270 let _guard = mutex.lock_owned().await;
1271 match self.process_pending_block_without_prepare().await? {
1273 ClientOutcome::Committed(Some(certificate)) => {
1274 return Ok(ClientOutcome::Conflict(Box::new(certificate)))
1275 }
1276 ClientOutcome::WaitForTimeout(timeout) => {
1277 return Ok(ClientOutcome::WaitForTimeout(timeout))
1278 }
1279 ClientOutcome::Conflict(certificate) => {
1280 return Ok(ClientOutcome::Conflict(certificate))
1281 }
1282 ClientOutcome::Committed(None) => {}
1283 }
1284
1285 let transactions = self.prepend_epochs_messages_and_events(operations).await?;
1289
1290 if transactions.is_empty() {
1291 return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1292 WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1293 )));
1294 }
1295
1296 let block = self.new_pending_block(transactions, blobs).await?;
1297
1298 match self.process_pending_block_without_prepare().await? {
1299 ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
1300 Ok(ClientOutcome::Committed(certificate))
1301 }
1302 ClientOutcome::Committed(Some(certificate)) => {
1303 Ok(ClientOutcome::Conflict(Box::new(certificate)))
1304 }
1305 ClientOutcome::Committed(None) => {
1307 Err(Error::BlockProposalError("Unexpected block proposal error"))
1308 }
1309 ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
1310 ClientOutcome::Conflict(certificate) => Ok(ClientOutcome::Conflict(certificate)),
1311 }
1312 }
1313
1314 #[instrument(level = "trace", skip(operations))]
1320 async fn prepend_epochs_messages_and_events(
1321 &self,
1322 operations: Vec<Operation>,
1323 ) -> Result<Vec<Transaction>, Error> {
1324 let incoming_bundles = self.pending_message_bundles().await?;
1325 let stream_updates = self.collect_stream_updates().await?;
1326 Ok(self
1327 .collect_epoch_changes()
1328 .await?
1329 .into_iter()
1330 .map(Transaction::ExecuteOperation)
1331 .chain(
1332 incoming_bundles
1333 .into_iter()
1334 .map(Transaction::ReceiveMessages),
1335 )
1336 .chain(
1337 stream_updates
1338 .into_iter()
1339 .map(Transaction::ExecuteOperation),
1340 )
1341 .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1342 .collect::<Vec<_>>())
1343 }
1344
1345 #[instrument(level = "trace", skip(transactions, blobs))]
1349 async fn new_pending_block(
1350 &self,
1351 transactions: Vec<Transaction>,
1352 blobs: Vec<Blob>,
1353 ) -> Result<Block, Error> {
1354 let identity = self.identity().await?;
1355
1356 ensure!(
1357 self.pending_proposal().is_none(),
1358 Error::BlockProposalError(
1359 "Client state already has a pending block; \
1360 use the `linera retry-pending-block` command to commit that first"
1361 )
1362 );
1363 let info = self.chain_info_with_committees().await?;
1364 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1365 let proposed_block = ProposedBlock {
1366 epoch: info.epoch,
1367 chain_id: self.chain_id,
1368 transactions,
1369 previous_block_hash: info.block_hash,
1370 height: info.next_block_height,
1371 authenticated_owner: Some(identity),
1372 timestamp,
1373 };
1374
1375 let round = self.round_for_oracle(&info, &identity).await?;
1376 let (block, _) = self
1379 .client
1380 .stage_block_execution_with_policy(
1381 proposed_block,
1382 round,
1383 blobs.clone(),
1384 BundleExecutionPolicy::AutoRetry {
1385 max_failures: self.options.max_block_limit_errors,
1386 },
1387 )
1388 .await?;
1389 let (proposed_block, _) = block.clone().into_proposal();
1390 self.update_state(|state| {
1391 state.set_pending_proposal(proposed_block.clone(), blobs.clone())
1392 });
1393 Ok(block)
1394 }
1395
1396 #[instrument(level = "trace", skip(transactions))]
1401 fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1402 let local_time = self.storage_client().clock().current_time();
1403 transactions
1404 .iter()
1405 .filter_map(Transaction::incoming_bundle)
1406 .map(|msg| msg.bundle.timestamp)
1407 .max()
1408 .map_or(local_time, |timestamp| timestamp.max(local_time))
1409 .max(block_time)
1410 }
1411
1412 #[instrument(level = "trace", skip(query))]
1414 pub async fn query_application(
1415 &self,
1416 query: Query,
1417 block_hash: Option<CryptoHash>,
1418 ) -> Result<QueryOutcome, Error> {
1419 loop {
1420 let result = self
1421 .client
1422 .local_node
1423 .query_application(self.chain_id, query.clone(), block_hash)
1424 .await;
1425 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1426 let validators = self.client.validator_nodes().await?;
1427 self.client
1428 .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1429 .await?;
1430 continue; }
1432 return Ok(result?);
1433 }
1434 }
1435
1436 #[instrument(level = "trace", skip(query))]
1438 pub async fn query_system_application(
1439 &self,
1440 query: SystemQuery,
1441 ) -> Result<QueryOutcome<SystemResponse>, Error> {
1442 let QueryOutcome {
1443 response,
1444 operations,
1445 } = self.query_application(Query::System(query), None).await?;
1446 match response {
1447 QueryResponse::System(response) => Ok(QueryOutcome {
1448 response,
1449 operations,
1450 }),
1451 _ => Err(Error::InternalError("Unexpected response for system query")),
1452 }
1453 }
1454
1455 #[instrument(level = "trace", skip(application_id, query))]
1457 #[cfg(with_testing)]
1458 pub async fn query_user_application<A: Abi>(
1459 &self,
1460 application_id: ApplicationId<A>,
1461 query: &A::Query,
1462 ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1463 let query = Query::user(application_id, query)?;
1464 let QueryOutcome {
1465 response,
1466 operations,
1467 } = self.query_application(query, None).await?;
1468 match response {
1469 QueryResponse::User(response_bytes) => {
1470 let response = serde_json::from_slice(&response_bytes)?;
1471 Ok(QueryOutcome {
1472 response,
1473 operations,
1474 })
1475 }
1476 _ => Err(Error::InternalError("Unexpected response for user query")),
1477 }
1478 }
1479
1480 #[instrument(level = "trace")]
1487 pub async fn query_balance(&self) -> Result<Amount, Error> {
1488 let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
1489 Ok(balance)
1490 }
1491
1492 #[instrument(level = "trace", skip(owner))]
1499 pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1500 if owner.is_chain() {
1501 self.query_balance().await
1502 } else {
1503 Ok(self
1504 .query_balances_with_owner(owner)
1505 .await?
1506 .1
1507 .unwrap_or(Amount::ZERO))
1508 }
1509 }
1510
1511 #[instrument(level = "trace", skip(owner))]
1518 pub(crate) async fn query_balances_with_owner(
1519 &self,
1520 owner: AccountOwner,
1521 ) -> Result<(Amount, Option<Amount>), Error> {
1522 let incoming_bundles = self.pending_message_bundles().await?;
1523 if incoming_bundles.is_empty() {
1526 let chain_balance = self.local_balance().await?;
1527 let owner_balance = self.local_owner_balance(owner).await?;
1528 return Ok((chain_balance, Some(owner_balance)));
1529 }
1530 let info = self.chain_info().await?;
1531 let transactions = incoming_bundles
1532 .into_iter()
1533 .map(Transaction::ReceiveMessages)
1534 .collect::<Vec<_>>();
1535 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1536 let block = ProposedBlock {
1537 epoch: info.epoch,
1538 chain_id: self.chain_id,
1539 transactions,
1540 previous_block_hash: info.block_hash,
1541 height: info.next_block_height,
1542 authenticated_owner: if owner == AccountOwner::CHAIN {
1543 None
1544 } else {
1545 Some(owner)
1546 },
1547 timestamp,
1548 };
1549 match self
1550 .client
1551 .stage_block_execution_with_policy(
1552 block,
1553 None,
1554 Vec::new(),
1555 BundleExecutionPolicy::AutoRetry {
1556 max_failures: self.options.max_block_limit_errors,
1557 },
1558 )
1559 .await
1560 {
1561 Ok((_, response)) => Ok((
1562 response.info.chain_balance,
1563 response.info.requested_owner_balance,
1564 )),
1565 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1566 error,
1567 )))) if matches!(
1568 &*error,
1569 ChainError::ExecutionError(
1570 execution_error,
1571 ChainExecutionContext::Block
1572 ) if matches!(
1573 **execution_error,
1574 ExecutionError::FeesExceedFunding { .. }
1575 )
1576 ) =>
1577 {
1578 Ok((Amount::ZERO, Some(Amount::ZERO)))
1580 }
1581 Err(error) => Err(error),
1582 }
1583 }
1584
1585 #[instrument(level = "trace")]
1589 pub async fn local_balance(&self) -> Result<Amount, Error> {
1590 let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1591 Ok(balance)
1592 }
1593
1594 #[instrument(level = "trace", skip(owner))]
1598 pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1599 if owner.is_chain() {
1600 self.local_balance().await
1601 } else {
1602 Ok(self
1603 .local_balances_with_owner(owner)
1604 .await?
1605 .1
1606 .unwrap_or(Amount::ZERO))
1607 }
1608 }
1609
1610 #[instrument(level = "trace", skip(owner))]
1614 pub(crate) async fn local_balances_with_owner(
1615 &self,
1616 owner: AccountOwner,
1617 ) -> Result<(Amount, Option<Amount>), Error> {
1618 ensure!(
1619 self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1620 Error::WalletSynchronizationError
1621 );
1622 let mut query = ChainInfoQuery::new(self.chain_id);
1623 query.request_owner_balance = owner;
1624 let response = self
1625 .client
1626 .local_node
1627 .handle_chain_info_query(query)
1628 .await?;
1629 Ok((
1630 response.info.chain_balance,
1631 response.info.requested_owner_balance,
1632 ))
1633 }
1634
1635 #[instrument(level = "trace")]
1637 pub async fn transfer_to_account(
1638 &self,
1639 from: AccountOwner,
1640 amount: Amount,
1641 account: Account,
1642 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1643 self.transfer(from, amount, account).await
1644 }
1645
1646 #[cfg(with_testing)]
1648 #[instrument(level = "trace")]
1649 pub async fn burn(
1650 &self,
1651 owner: AccountOwner,
1652 amount: Amount,
1653 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1654 let recipient = Account::burn_address(self.chain_id);
1655 self.transfer(owner, amount, recipient).await
1656 }
1657
1658 #[instrument(level = "trace")]
1659 pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1660 let validators = self.client.validator_nodes().await?;
1661 self.client
1662 .fetch_chain_info(self.chain_id, &validators)
1663 .await
1664 }
1665
1666 #[instrument(level = "trace")]
1675 pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1676 if self.is_follow_only() {
1677 return self.client.synchronize_chain_state(self.chain_id).await;
1678 }
1679 let info = self.prepare_chain().await?;
1680 self.synchronize_publisher_chains().await?;
1681 self.find_received_certificates().await?;
1682 Ok(info)
1683 }
1684
1685 #[instrument(level = "trace")]
1687 pub async fn process_pending_block(
1688 &self,
1689 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1690 self.prepare_chain().await?;
1691 self.process_pending_block_without_prepare().await
1692 }
1693
1694 #[instrument(level = "trace")]
1696 async fn process_pending_block_without_prepare(
1697 &self,
1698 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1699 let info = self.request_leader_timeout_if_needed().await?;
1700
1701 if info.manager.has_locking_block_in_current_round()
1703 && !info.manager.current_round.is_fast()
1704 {
1705 return self.finalize_locking_block(info).await;
1706 }
1707 let owner = self.identity().await?;
1708
1709 let local_node = &self.client.local_node;
1710 let pending_proposal = self.pending_proposal();
1712 let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
1713 match &**locking {
1714 LockingBlock::Regular(certificate) => {
1715 let blob_ids = certificate.block().required_blob_ids();
1716 let blobs = local_node
1717 .get_locking_blobs(&blob_ids, self.chain_id)
1718 .await?
1719 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1720 debug!("Retrying locking block from round {}", certificate.round);
1721 (certificate.block().clone(), blobs)
1722 }
1723 LockingBlock::Fast(proposal) => {
1724 let proposed_block = proposal.content.block.clone();
1725 let blob_ids = proposed_block.published_blob_ids();
1726 let blobs = local_node
1727 .get_locking_blobs(&blob_ids, self.chain_id)
1728 .await?
1729 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1730 let block = self
1731 .client
1732 .stage_block_execution(proposed_block, None, blobs.clone())
1733 .await?
1734 .0;
1735 debug!("Retrying locking block from fast round.");
1736 (block, blobs)
1737 }
1738 }
1739 } else if let Some(pending_proposal) = pending_proposal {
1740 let proposed_block = pending_proposal.block;
1742 let round = self.round_for_oracle(&info, &owner).await?;
1743 let (block, _) = self
1744 .client
1745 .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
1746 .await?;
1747 debug!("Proposing the local pending block.");
1748 (block, pending_proposal.blobs)
1749 } else {
1750 return Ok(ClientOutcome::Committed(None)); };
1752
1753 let has_oracle_responses = block.has_oracle_responses();
1754 let (proposed_block, outcome) = block.into_proposal();
1755 let round = match self
1756 .round_for_new_proposal(&info, &owner, has_oracle_responses)
1757 .await?
1758 {
1759 Either::Left(round) => round,
1760 Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
1761 };
1762 debug!("Proposing block for round {}", round);
1763
1764 let already_handled_locally = info
1765 .manager
1766 .already_handled_proposal(round, &proposed_block);
1767 let proposal = if let Some(locking) = info.manager.requested_locking {
1769 Box::new(match *locking {
1770 LockingBlock::Regular(cert) => {
1771 BlockProposal::new_retry_regular(owner, round, cert, self.signer())
1772 .await
1773 .map_err(Error::signer_failure)?
1774 }
1775 LockingBlock::Fast(proposal) => {
1776 BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
1777 .await
1778 .map_err(Error::signer_failure)?
1779 }
1780 })
1781 } else {
1782 Box::new(
1783 BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
1784 .await
1785 .map_err(Error::signer_failure)?,
1786 )
1787 };
1788 if !already_handled_locally {
1789 if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
1791 match err {
1792 LocalNodeError::BlobsNotFound(_) => {
1793 local_node
1794 .handle_pending_blobs(self.chain_id, blobs)
1795 .await?;
1796 local_node.handle_block_proposal(*proposal.clone()).await?;
1797 }
1798 err => return Err(err.into()),
1799 }
1800 }
1801 }
1802 let committee = self.local_committee().await?;
1803 let block = Block::new(proposed_block, outcome);
1804 let submit_block_proposal_start = linera_base::time::Instant::now();
1806 let certificate = if round.is_fast() {
1807 let hashed_value = ConfirmedBlock::new(block);
1808 self.client
1809 .submit_block_proposal(&committee, proposal, hashed_value)
1810 .await?
1811 } else {
1812 let hashed_value = ValidatedBlock::new(block);
1813 let certificate = self
1814 .client
1815 .submit_block_proposal(&committee, proposal, hashed_value.clone())
1816 .await?;
1817 self.client.finalize_block(&committee, certificate).await?
1818 };
1819 self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
1820 debug!(round = %certificate.round, "Sending confirmed block to validators");
1821 self.update_validators(Some(&committee), Some(certificate.clone()))
1822 .await?;
1823 Ok(ClientOutcome::Committed(Some(certificate)))
1824 }
1825
1826 fn send_timing(&self, start: Instant, timing_type: TimingType) {
1827 let Some(sender) = &self.timing_sender else {
1828 return;
1829 };
1830 if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
1831 tracing::warn!(%err, "Failed to send timing info");
1832 }
1833 }
1834
1835 async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
1838 let mut info = self.chain_info_with_manager_values().await?;
1839 if let Some(round_timeout) = info.manager.round_timeout {
1842 if round_timeout <= self.storage_client().clock().current_time() {
1843 if let Err(e) = self.request_leader_timeout().await {
1844 info!("Failed to obtain a timeout certificate: {}", e);
1845 } else {
1846 info = self.chain_info_with_manager_values().await?;
1847 }
1848 }
1849 }
1850 Ok(info)
1851 }
1852
1853 async fn finalize_locking_block(
1857 &self,
1858 info: Box<ChainInfo>,
1859 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1860 let locking = info
1861 .manager
1862 .requested_locking
1863 .expect("Should have a locking block");
1864 let LockingBlock::Regular(certificate) = *locking else {
1865 panic!("Should have a locking validated block");
1866 };
1867 debug!(
1868 round = %certificate.round,
1869 "Finalizing locking block"
1870 );
1871 let committee = self.local_committee().await?;
1872 let certificate = self
1873 .client
1874 .finalize_block(&committee, certificate.clone())
1875 .await?;
1876 self.update_validators(Some(&committee), Some(certificate.clone()))
1877 .await?;
1878 Ok(ClientOutcome::Committed(Some(certificate)))
1879 }
1880
1881 async fn round_for_oracle(
1883 &self,
1884 info: &ChainInfo,
1885 identity: &AccountOwner,
1886 ) -> Result<Option<u32>, Error> {
1887 match self.round_for_new_proposal(info, identity, true).await {
1889 Ok(Either::Left(round)) => Ok(round.multi_leader()),
1891 Err(Error::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
1895 Err(err) => Err(err),
1896 }
1897 }
1898
1899 async fn round_for_new_proposal(
1901 &self,
1902 info: &ChainInfo,
1903 identity: &AccountOwner,
1904 has_oracle_responses: bool,
1905 ) -> Result<Either<Round, RoundTimeout>, Error> {
1906 let manager = &info.manager;
1907 let seed = manager.seed;
1908 let skip_fast = manager.current_round.is_fast()
1913 && (has_oracle_responses || !self.options.allow_fast_blocks);
1914 let conflict = manager
1915 .requested_signed_proposal
1916 .as_ref()
1917 .into_iter()
1918 .chain(&manager.requested_proposed)
1919 .any(|proposal| proposal.content.round == manager.current_round)
1920 || skip_fast;
1921 let round = if !conflict {
1922 manager.current_round
1923 } else if let Some(round) = manager
1924 .ownership
1925 .next_round(manager.current_round)
1926 .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
1927 {
1928 round
1929 } else if let Some(timeout) = info.round_timeout() {
1930 return Ok(Either::Right(timeout));
1931 } else {
1932 return Err(Error::BlockProposalError(
1933 "Conflicting proposal in the current round",
1934 ));
1935 };
1936 let current_committee = info
1937 .current_committee()?
1938 .validators
1939 .values()
1940 .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
1941 .collect();
1942 if manager.should_propose(identity, round, seed, ¤t_committee) {
1943 return Ok(Either::Left(round));
1944 }
1945 if let Some(timeout) = info.round_timeout() {
1946 return Ok(Either::Right(timeout));
1947 }
1948 Err(Error::BlockProposalError(
1949 "Not a leader in the current round",
1950 ))
1951 }
1952
1953 #[cfg(with_testing)]
1955 #[instrument(level = "trace")]
1956 pub fn clear_pending_proposal(&self) {
1957 self.update_state(|state| state.clear_pending_proposal());
1958 }
1959
1960 #[instrument(level = "trace")]
1964 pub async fn rotate_key_pair(
1965 &self,
1966 public_key: AccountPublicKey,
1967 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1968 self.transfer_ownership(public_key.into()).await
1969 }
1970
1971 #[instrument(level = "trace")]
1973 pub async fn transfer_ownership(
1974 &self,
1975 new_owner: AccountOwner,
1976 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1977 self.execute_operation(SystemOperation::ChangeOwnership {
1978 super_owners: vec![new_owner],
1979 owners: Vec::new(),
1980 first_leader: None,
1981 multi_leader_rounds: 2,
1982 open_multi_leader_rounds: false,
1983 timeout_config: TimeoutConfig::default(),
1984 })
1985 .await
1986 }
1987
1988 #[instrument(level = "trace")]
1990 pub async fn share_ownership(
1991 &self,
1992 new_owner: AccountOwner,
1993 new_weight: u64,
1994 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1995 let ownership = self.prepare_chain().await?.manager.ownership;
1996 ensure!(
1997 ownership.is_active(),
1998 ChainError::InactiveChain(self.chain_id)
1999 );
2000 let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
2001 owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
2002 owners.push((new_owner, new_weight));
2003 let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
2004 super_owners: Vec::new(),
2005 owners,
2006 first_leader: ownership.first_leader,
2007 multi_leader_rounds: ownership.multi_leader_rounds,
2008 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2009 timeout_config: ownership.timeout_config,
2010 })];
2011 self.execute_block(operations, vec![]).await
2012 }
2013
2014 #[instrument(level = "trace")]
2016 pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, Error> {
2017 Ok(self
2018 .client
2019 .local_node
2020 .chain_state_view(self.chain_id)
2021 .await?
2022 .execution_state
2023 .system
2024 .ownership
2025 .get()
2026 .clone())
2027 }
2028
2029 #[instrument(level = "trace")]
2032 pub async fn change_ownership(
2033 &self,
2034 ownership: ChainOwnership,
2035 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2036 self.execute_operation(SystemOperation::ChangeOwnership {
2037 super_owners: ownership.super_owners.into_iter().collect(),
2038 owners: ownership.owners.into_iter().collect(),
2039 first_leader: ownership.first_leader,
2040 multi_leader_rounds: ownership.multi_leader_rounds,
2041 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2042 timeout_config: ownership.timeout_config.clone(),
2043 })
2044 .await
2045 }
2046
2047 #[instrument(level = "trace")]
2049 pub async fn query_application_permissions(&self) -> Result<ApplicationPermissions, Error> {
2050 Ok(self
2051 .client
2052 .local_node
2053 .chain_state_view(self.chain_id)
2054 .await?
2055 .execution_state
2056 .system
2057 .application_permissions
2058 .get()
2059 .clone())
2060 }
2061
2062 #[instrument(level = "trace", skip(application_permissions))]
2064 pub async fn change_application_permissions(
2065 &self,
2066 application_permissions: ApplicationPermissions,
2067 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2068 self.execute_operation(SystemOperation::ChangeApplicationPermissions(
2069 application_permissions,
2070 ))
2071 .await
2072 }
2073
2074 #[instrument(level = "trace", skip(self))]
2076 pub async fn open_chain(
2077 &self,
2078 ownership: ChainOwnership,
2079 application_permissions: ApplicationPermissions,
2080 balance: Amount,
2081 ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2082 let mut has_key = false;
2084 for owner in ownership.all_owners() {
2085 if self.has_key_for(owner).await? {
2086 has_key = true;
2087 break;
2088 }
2089 }
2090 let config = OpenChainConfig {
2091 ownership,
2092 balance,
2093 application_permissions,
2094 };
2095 let operation = Operation::system(SystemOperation::OpenChain(config));
2096 let certificate = match self.execute_block(vec![operation], vec![]).await? {
2097 ClientOutcome::Committed(certificate) => certificate,
2098 ClientOutcome::Conflict(certificate) => {
2099 return Ok(ClientOutcome::Conflict(certificate));
2100 }
2101 ClientOutcome::WaitForTimeout(timeout) => {
2102 return Ok(ClientOutcome::WaitForTimeout(timeout));
2103 }
2104 };
2105 let chain_blob = certificate
2107 .block()
2108 .body
2109 .blobs
2110 .last()
2111 .and_then(|blobs| blobs.last())
2112 .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2113 let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2114 if has_key {
2116 self.client
2117 .extend_chain_mode(description.id(), ListeningMode::FullChain);
2118 self.client
2119 .local_node
2120 .retry_pending_cross_chain_requests(self.chain_id)
2121 .await?;
2122 }
2123 Ok(ClientOutcome::Committed((description, certificate)))
2124 }
2125
2126 #[instrument(level = "trace")]
2129 pub async fn close_chain(
2130 &self,
2131 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2132 match self.execute_operation(SystemOperation::CloseChain).await {
2133 Ok(outcome) => Ok(outcome.map(Some)),
2134 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2135 chain_error,
2136 )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2137 Ok(ClientOutcome::Committed(None)) }
2139 Err(error) => Err(error),
2140 }
2141 }
2142
2143 #[cfg(not(target_arch = "wasm32"))]
2145 #[instrument(level = "trace", skip(contract, service))]
2146 pub async fn publish_module(
2147 &self,
2148 contract: Bytecode,
2149 service: Bytecode,
2150 vm_runtime: VmRuntime,
2151 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2152 let (blobs, module_id) = super::create_bytecode_blobs(contract, service, vm_runtime).await;
2153 self.publish_module_blobs(blobs, module_id).await
2154 }
2155
2156 #[cfg(not(target_arch = "wasm32"))]
2158 #[instrument(level = "trace", skip(blobs, module_id))]
2159 pub async fn publish_module_blobs(
2160 &self,
2161 blobs: Vec<Blob>,
2162 module_id: ModuleId,
2163 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2164 self.execute_operations(
2165 vec![Operation::system(SystemOperation::PublishModule {
2166 module_id,
2167 })],
2168 blobs,
2169 )
2170 .await?
2171 .try_map(|certificate| Ok((module_id, certificate)))
2172 }
2173
2174 #[instrument(level = "trace", skip(bytes))]
2176 pub async fn publish_data_blobs(
2177 &self,
2178 bytes: Vec<Vec<u8>>,
2179 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2180 let blobs = bytes.into_iter().map(Blob::new_data);
2181 let publish_blob_operations = blobs
2182 .clone()
2183 .map(|blob| {
2184 Operation::system(SystemOperation::PublishDataBlob {
2185 blob_hash: blob.id().hash,
2186 })
2187 })
2188 .collect();
2189 self.execute_operations(publish_blob_operations, blobs.collect())
2190 .await
2191 }
2192
2193 #[instrument(level = "trace", skip(bytes))]
2195 pub async fn publish_data_blob(
2196 &self,
2197 bytes: Vec<u8>,
2198 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2199 self.publish_data_blobs(vec![bytes]).await
2200 }
2201
2202 #[instrument(
2204 level = "trace",
2205 skip(self, parameters, instantiation_argument, required_application_ids)
2206 )]
2207 pub async fn create_application<
2208 A: Abi,
2209 Parameters: Serialize,
2210 InstantiationArgument: Serialize,
2211 >(
2212 &self,
2213 module_id: ModuleId<A, Parameters, InstantiationArgument>,
2214 parameters: &Parameters,
2215 instantiation_argument: &InstantiationArgument,
2216 required_application_ids: Vec<ApplicationId>,
2217 ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2218 let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2219 let parameters = serde_json::to_vec(parameters)?;
2220 Ok(self
2221 .create_application_untyped(
2222 module_id.forget_abi(),
2223 parameters,
2224 instantiation_argument,
2225 required_application_ids,
2226 )
2227 .await?
2228 .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2229 }
2230
2231 #[instrument(
2233 level = "trace",
2234 skip(
2235 self,
2236 module_id,
2237 parameters,
2238 instantiation_argument,
2239 required_application_ids
2240 )
2241 )]
2242 pub async fn create_application_untyped(
2243 &self,
2244 module_id: ModuleId,
2245 parameters: Vec<u8>,
2246 instantiation_argument: Vec<u8>,
2247 required_application_ids: Vec<ApplicationId>,
2248 ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2249 self.execute_operation(SystemOperation::CreateApplication {
2250 module_id,
2251 parameters,
2252 instantiation_argument,
2253 required_application_ids,
2254 })
2255 .await?
2256 .try_map(|certificate| {
2257 let mut creation: Vec<_> = certificate
2259 .block()
2260 .created_blob_ids()
2261 .into_iter()
2262 .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2263 .collect();
2264 if creation.len() > 1 {
2265 return Err(Error::InternalError(
2266 "Unexpected number of application descriptions published",
2267 ));
2268 }
2269 let blob_id = creation.pop().ok_or(Error::InternalError(
2270 "ApplicationDescription blob not found.",
2271 ))?;
2272 let id = ApplicationId::new(blob_id.hash);
2273 Ok((id, certificate))
2274 })
2275 }
2276
2277 #[instrument(level = "trace", skip(committee))]
2279 pub async fn stage_new_committee(
2280 &self,
2281 committee: Committee,
2282 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2283 let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2284 let blob_hash = blob.id().hash;
2285 match self
2286 .execute_operations(
2287 vec![Operation::system(SystemOperation::Admin(
2288 AdminOperation::PublishCommitteeBlob { blob_hash },
2289 ))],
2290 vec![blob],
2291 )
2292 .await?
2293 {
2294 ClientOutcome::Committed(_) => {}
2295 outcome @ ClientOutcome::WaitForTimeout(_) | outcome @ ClientOutcome::Conflict(_) => {
2296 return Ok(outcome)
2297 }
2298 }
2299 let epoch = self.chain_info().await?.epoch.try_add_one()?;
2300 self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2301 epoch,
2302 blob_hash,
2303 }))
2304 .await
2305 }
2306
2307 #[instrument(level = "trace")]
2313 pub async fn process_inbox(
2314 &self,
2315 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2316 self.prepare_chain().await?;
2317 self.process_inbox_without_prepare().await
2318 }
2319
2320 #[instrument(level = "trace")]
2326 pub async fn process_inbox_without_prepare(
2327 &self,
2328 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2329 #[cfg(with_metrics)]
2330 let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2331
2332 let mut certificates = Vec::new();
2333 loop {
2334 match self.execute_block(vec![], vec![]).await {
2338 Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
2339 Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
2340 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
2341 return Ok((certificates, Some(timeout)));
2342 }
2343 Err(Error::LocalNodeError(LocalNodeError::WorkerError(
2345 WorkerError::ChainError(chain_error),
2346 ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
2347 return Ok((certificates, None));
2348 }
2349 Err(error) => return Err(error),
2350 };
2351 }
2352 }
2353
2354 async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2357 let (mut min_epoch, mut next_epoch) = {
2358 let (epoch, committees) = self.epoch_and_committees().await?;
2359 let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
2360 (min_epoch, epoch.try_add_one()?)
2361 };
2362 let mut epoch_change_ops = Vec::new();
2363 while self
2364 .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2365 .await?
2366 {
2367 epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2368 next_epoch,
2369 )));
2370 next_epoch.try_add_assign_one()?;
2371 }
2372 while self
2373 .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
2374 .await?
2375 {
2376 epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
2377 min_epoch,
2378 )));
2379 min_epoch.try_add_assign_one()?;
2380 }
2381 Ok(epoch_change_ops)
2382 }
2383
2384 async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2387 let event_id = EventId {
2388 chain_id: self.client.admin_chain_id,
2389 stream_id: StreamId::system(stream_name),
2390 index,
2391 };
2392 Ok(self
2393 .client
2394 .storage_client()
2395 .read_event(event_id)
2396 .await?
2397 .is_some())
2398 }
2399
2400 pub async fn events_from_index(
2402 &self,
2403 stream_id: StreamId,
2404 start_index: u32,
2405 ) -> Result<Vec<IndexAndEvent>, Error> {
2406 Ok(self
2407 .client
2408 .storage_client()
2409 .read_events_from_index(&self.chain_id, &stream_id, start_index)
2410 .await?)
2411 }
2412
2413 #[instrument(level = "trace")]
2418 pub async fn revoke_epochs(
2419 &self,
2420 revoked_epoch: Epoch,
2421 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2422 self.prepare_chain().await?;
2423 let (current_epoch, committees) = self.epoch_and_committees().await?;
2424 ensure!(
2425 revoked_epoch < current_epoch,
2426 Error::CannotRevokeCurrentEpoch(current_epoch)
2427 );
2428 ensure!(
2429 committees.contains_key(&revoked_epoch),
2430 Error::EpochAlreadyRevoked
2431 );
2432 let operations = committees
2433 .keys()
2434 .filter_map(|epoch| {
2435 if *epoch <= revoked_epoch {
2436 Some(Operation::system(SystemOperation::Admin(
2437 AdminOperation::RemoveCommittee { epoch: *epoch },
2438 )))
2439 } else {
2440 None
2441 }
2442 })
2443 .collect();
2444 self.execute_operations(operations, vec![]).await
2445 }
2446
2447 #[instrument(level = "trace")]
2451 pub async fn transfer_to_account_unsafe_unconfirmed(
2452 &self,
2453 owner: AccountOwner,
2454 amount: Amount,
2455 recipient: Account,
2456 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2457 self.execute_operation(SystemOperation::Transfer {
2458 owner,
2459 recipient,
2460 amount,
2461 })
2462 .await
2463 }
2464
2465 #[instrument(level = "trace", skip(hash))]
2466 pub async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, Error> {
2467 let block = self
2468 .client
2469 .storage_client()
2470 .read_confirmed_block(hash)
2471 .await?;
2472 block.ok_or(Error::MissingConfirmedBlock(hash))
2473 }
2474
2475 #[instrument(level = "trace", skip(hash))]
2476 pub async fn read_certificate(
2477 &self,
2478 hash: CryptoHash,
2479 ) -> Result<ConfirmedBlockCertificate, Error> {
2480 let certificate = self.client.storage_client().read_certificate(hash).await?;
2481 certificate.ok_or(Error::ReadCertificatesError(vec![hash]))
2482 }
2483
2484 #[instrument(level = "trace")]
2486 pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2487 self.client
2488 .local_node
2489 .retry_pending_cross_chain_requests(self.chain_id)
2490 .await?;
2491 Ok(())
2492 }
2493
2494 #[instrument(level = "trace", skip(local_node))]
2495 async fn local_chain_info(
2496 &self,
2497 chain_id: ChainId,
2498 local_node: &mut LocalNodeClient<Env::Storage>,
2499 ) -> Result<Option<Box<ChainInfo>>, Error> {
2500 match local_node.chain_info(chain_id).await {
2501 Ok(info) => {
2502 self.client.update_from_info(&info);
2504 Ok(Some(info))
2505 }
2506 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
2507 Err(err) => Err(err.into()),
2508 }
2509 }
2510
2511 #[instrument(level = "trace", skip(chain_id, local_node))]
2512 async fn local_next_block_height(
2513 &self,
2514 chain_id: ChainId,
2515 local_node: &mut LocalNodeClient<Env::Storage>,
2516 ) -> Result<BlockHeight, Error> {
2517 Ok(self
2518 .local_chain_info(chain_id, local_node)
2519 .await?
2520 .map_or(BlockHeight::ZERO, |info| info.next_block_height))
2521 }
2522
2523 #[instrument(level = "trace")]
2526 async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
2527 Ok(self
2528 .client
2529 .local_node
2530 .get_inbox_next_height(self.chain_id, origin)
2531 .await?)
2532 }
2533
2534 #[instrument(level = "trace", skip(remote_node, local_node, notification))]
2535 async fn process_notification(
2536 &self,
2537 remote_node: RemoteNode<Env::ValidatorNode>,
2538 mut local_node: LocalNodeClient<Env::Storage>,
2539 notification: Notification,
2540 ) -> Result<(), Error> {
2541 let listening_mode = self.listening_mode();
2542 let relevant = listening_mode
2543 .as_ref()
2544 .is_some_and(|mode| mode.is_relevant(¬ification.reason));
2545 if !relevant {
2546 debug!(
2547 chain_id = %self.chain_id,
2548 reason = ?notification.reason,
2549 ?listening_mode,
2550 "Ignoring notification due to listening mode"
2551 );
2552 return Ok(());
2553 }
2554 match notification.reason {
2555 Reason::NewIncomingBundle { origin, height } => {
2556 if self.local_next_height_to_receive(origin).await? > height {
2557 debug!(
2558 chain_id = %self.chain_id,
2559 "Accepting redundant notification for new message"
2560 );
2561 return Ok(());
2562 }
2563 self.client
2564 .download_sender_block_with_sending_ancestors(
2565 self.chain_id,
2566 origin,
2567 height,
2568 &remote_node,
2569 )
2570 .await?;
2571 if self.local_next_height_to_receive(origin).await? <= height {
2572 info!(
2573 chain_id = %self.chain_id,
2574 "NewIncomingBundle: Fail to synchronize new message after notification"
2575 );
2576 }
2577 }
2578 Reason::NewBlock { height, .. } => {
2579 let chain_id = notification.chain_id;
2580 if self
2581 .local_next_block_height(chain_id, &mut local_node)
2582 .await?
2583 > height
2584 {
2585 debug!(
2586 chain_id = %self.chain_id,
2587 "Accepting redundant notification for new block"
2588 );
2589 return Ok(());
2590 }
2591 self.client
2592 .synchronize_chain_state_from(&remote_node, chain_id)
2593 .await?;
2594 if self
2595 .local_next_block_height(chain_id, &mut local_node)
2596 .await?
2597 <= height
2598 {
2599 info!("NewBlock: Fail to synchronize new block after notification");
2600 }
2601 trace!(
2602 chain_id = %self.chain_id,
2603 %height,
2604 "NewBlock: processed notification",
2605 );
2606 }
2607 Reason::NewEvents { height, hash, .. } => {
2608 if self
2609 .local_next_block_height(notification.chain_id, &mut local_node)
2610 .await?
2611 > height
2612 {
2613 debug!(
2614 chain_id = %self.chain_id,
2615 "Accepting redundant notification for new block"
2616 );
2617 return Ok(());
2618 }
2619 trace!(
2620 chain_id = %self.chain_id,
2621 %height,
2622 "NewEvents: processing notification"
2623 );
2624 let mut certificates = remote_node.node.download_certificates(vec![hash]).await?;
2625 let certificate = certificates
2628 .pop()
2629 .expect("download_certificates should have returned one certificate");
2630 self.client
2631 .receive_sender_certificate(
2632 certificate,
2633 ReceiveCertificateMode::NeedsCheck,
2634 None,
2635 )
2636 .await?;
2637 }
2638 Reason::NewRound { height, round } => {
2639 let chain_id = notification.chain_id;
2640 if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
2641 if (info.next_block_height, info.manager.current_round) >= (height, round) {
2642 debug!(
2643 chain_id = %self.chain_id,
2644 "Accepting redundant notification for new round"
2645 );
2646 return Ok(());
2647 }
2648 }
2649 self.client
2650 .synchronize_chain_state_from(&remote_node, chain_id)
2651 .await?;
2652 let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
2653 error!(
2654 chain_id = %self.chain_id,
2655 "NewRound: Fail to read local chain info for {chain_id}"
2656 );
2657 return Ok(());
2658 };
2659 if (info.next_block_height, info.manager.current_round) < (height, round) {
2660 error!(
2661 chain_id = %self.chain_id,
2662 "NewRound: Fail to synchronize new block after notification"
2663 );
2664 }
2665 }
2666 Reason::BlockExecuted { .. } => {
2667 }
2669 }
2670 Ok(())
2671 }
2672
2673 pub fn is_tracked(&self) -> bool {
2675 self.client.is_tracked(self.chain_id)
2676 }
2677
2678 pub fn listening_mode(&self) -> Option<ListeningMode> {
2680 self.client.chain_mode(self.chain_id)
2681 }
2682
2683 #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
2688 pub async fn listen(
2689 &self,
2690 ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
2691 use future::FutureExt as _;
2692
2693 async fn await_while_polling<F: FusedFuture>(
2694 future: F,
2695 background_work: impl FusedStream<Item = ()>,
2696 ) -> F::Output {
2697 tokio::pin!(future);
2698 tokio::pin!(background_work);
2699 loop {
2700 futures::select! {
2701 _ = background_work.next() => (),
2702 result = future => return result,
2703 }
2704 }
2705 }
2706
2707 let mut senders = HashMap::new(); let notifications = self.subscribe()?;
2709 let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
2710
2711 let mut process_notifications = FuturesUnordered::new();
2718
2719 match self.update_notification_streams(&mut senders).await {
2720 Ok(handler) => process_notifications.push(handler),
2721 Err(error) => error!("Failed to update committee: {error}"),
2722 };
2723
2724 let this = self.clone();
2725 let update_streams = async move {
2726 let mut abortable_notifications = abortable_notifications.fuse();
2727
2728 while let Some(notification) =
2729 await_while_polling(abortable_notifications.next(), &mut process_notifications)
2730 .await
2731 {
2732 if let Reason::NewBlock { .. } = notification.reason {
2733 match Box::pin(await_while_polling(
2734 this.update_notification_streams(&mut senders).fuse(),
2735 &mut process_notifications,
2736 ))
2737 .await
2738 {
2739 Ok(handler) => process_notifications.push(handler),
2740 Err(error) => error!("Failed to update committee: {error}"),
2741 }
2742 }
2743 }
2744
2745 for abort in senders.into_values() {
2746 abort.abort();
2747 }
2748
2749 let () = process_notifications.collect().await;
2750 }
2751 .in_current_span();
2752
2753 Ok((update_streams, AbortOnDrop(abort), notifications))
2754 }
2755
2756 #[instrument(level = "trace", skip(senders))]
2757 async fn update_notification_streams(
2758 &self,
2759 senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
2760 ) -> Result<impl Future<Output = ()>, Error> {
2761 let (nodes, local_node) = {
2762 let committee = self.local_committee().await?;
2763 let nodes: HashMap<_, _> = self
2764 .client
2765 .validator_node_provider()
2766 .make_nodes(&committee)?
2767 .collect();
2768 (nodes, self.client.local_node.clone())
2769 };
2770 senders.retain(|validator, abort| {
2772 if !nodes.contains_key(validator) {
2773 abort.abort();
2774 }
2775 !abort.is_aborted()
2776 });
2777 let validator_tasks = FuturesUnordered::new();
2779 for (public_key, node) in nodes {
2780 let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
2781 continue;
2782 };
2783 let address = node.address();
2784 let this = self.clone();
2785 let stream = stream::once({
2786 let node = node.clone();
2787 async move {
2788 let stream = node.subscribe(vec![this.chain_id]).await?;
2789 let remote_node = RemoteNode { public_key, node };
2792 this.client
2793 .synchronize_chain_state_from(&remote_node, this.chain_id)
2794 .await?;
2795 Ok::<_, Error>(stream)
2796 }
2797 })
2798 .filter_map(move |result| {
2799 let address = address.clone();
2800 async move {
2801 if let Err(error) = &result {
2802 info!(?error, address, "could not connect to validator");
2803 } else {
2804 debug!(address, "connected to validator");
2805 }
2806 result.ok()
2807 }
2808 })
2809 .flatten();
2810 let (stream, abort) = stream::abortable(stream);
2811 let mut stream = Box::pin(stream);
2812 let this = self.clone();
2813 let local_node = local_node.clone();
2814 let remote_node = RemoteNode { public_key, node };
2815 validator_tasks.push(async move {
2816 while let Some(notification) = stream.next().await {
2817 if let Err(error) = this
2818 .process_notification(
2819 remote_node.clone(),
2820 local_node.clone(),
2821 notification.clone(),
2822 )
2823 .await
2824 {
2825 tracing::info!(
2826 chain_id = %this.chain_id,
2827 address = remote_node.address(),
2828 ?notification,
2829 %error,
2830 "failed to process notification",
2831 );
2832 }
2833 }
2834 });
2835 entry.insert(abort);
2836 }
2837 Ok(validator_tasks.collect())
2838 }
2839
2840 #[instrument(level = "trace", skip(remote_node))]
2842 pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
2843 let validator_next_block_height = match remote_node
2844 .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
2845 .await
2846 {
2847 Ok(info) => info.info.next_block_height,
2848 Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
2849 Err(err) => return Err(err.into()),
2850 };
2851 let local_next_block_height = self.chain_info().await?.next_block_height;
2852
2853 if validator_next_block_height >= local_next_block_height {
2854 debug!("Validator is up-to-date with local state");
2855 return Ok(());
2856 }
2857
2858 let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
2859 .map(BlockHeight)
2860 .collect();
2861
2862 let certificates = self
2863 .client
2864 .storage_client()
2865 .read_certificates_by_heights(self.chain_id, &heights)
2866 .await?
2867 .into_iter()
2868 .flatten()
2869 .collect::<Vec<_>>();
2870
2871 for certificate in certificates {
2872 match remote_node
2873 .handle_confirmed_certificate(
2874 certificate.clone(),
2875 CrossChainMessageDelivery::NonBlocking,
2876 )
2877 .await
2878 {
2879 Ok(_) => (),
2880 Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
2881 let missing_blobs: Vec<_> = self
2883 .client
2884 .storage_client()
2885 .read_blobs(&missing_blob_ids)
2886 .await?
2887 .into_iter()
2888 .flatten()
2889 .collect();
2890 remote_node.upload_blobs(missing_blobs).await?;
2891 remote_node
2892 .handle_confirmed_certificate(
2893 certificate,
2894 CrossChainMessageDelivery::NonBlocking,
2895 )
2896 .await?;
2897 }
2898 Err(err) => return Err(err.into()),
2899 }
2900 }
2901
2902 Ok(())
2903 }
2904}
2905
2906#[cfg(with_testing)]
2907impl<Env: Environment> ChainClient<Env> {
2908 pub async fn process_notification_from(
2909 &self,
2910 notification: Notification,
2911 validator: (ValidatorPublicKey, &str),
2912 ) {
2913 let mut node_list = self
2914 .client
2915 .validator_node_provider()
2916 .make_nodes_from_list(vec![validator])
2917 .unwrap();
2918 let (public_key, node) = node_list.next().unwrap();
2919 let remote_node = RemoteNode { node, public_key };
2920 let local_node = self.client.local_node.clone();
2921 self.process_notification(remote_node, local_node, notification)
2922 .await
2923 .unwrap();
2924 }
2925}