1mod state;
5use std::{
6 collections::{hash_map, BTreeMap, BTreeSet, HashMap},
7 convert::Infallible,
8 iter,
9 sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14 future::{self, Either, FusedFuture, Future, FutureExt},
15 select,
16 stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
17};
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{
21 abi::Abi,
22 crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
23 data_types::{
24 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
25 ChainDescription, Epoch, Round, Timestamp,
26 },
27 ensure,
28 identifiers::{
29 Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
30 ModuleId, StreamId,
31 },
32 ownership::{ChainOwnership, TimeoutConfig},
33 time::{Duration, Instant},
34};
35#[cfg(not(target_arch = "wasm32"))]
36use linera_base::{data_types::Bytecode, vm::VmRuntime};
37use linera_chain::{
38 data_types::{BlockProposal, ChainAndHeight, IncomingBundle, ProposedBlock, Transaction},
39 manager::LockingBlock,
40 types::{
41 Block, ConfirmedBlock, ConfirmedBlockCertificate, Timeout, TimeoutCertificate,
42 ValidatedBlock,
43 },
44 ChainError, ChainExecutionContext, ChainStateView,
45};
46use linera_execution::{
47 committee::Committee,
48 system::{
49 AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
50 REMOVED_EPOCH_STREAM_NAME,
51 },
52 ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
53};
54use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
55use linera_views::ViewError;
56use rand::seq::SliceRandom;
57use serde::Serialize;
58pub use state::State;
59use thiserror::Error;
60use tokio::sync::{mpsc, OwnedRwLockReadGuard};
61use tokio_stream::wrappers::UnboundedReceiverStream;
62use tokio_util::sync::CancellationToken;
63use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
64
65use super::{
66 received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
67 ExecuteBlockOutcome, ListeningMode, MessagePolicy, PendingProposal, ReceiveCertificateMode,
68 TimingType,
69};
70use crate::{
71 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
72 environment::Environment,
73 local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
74 node::{
75 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
76 ValidatorNodeProvider as _,
77 },
78 remote_node::RemoteNode,
79 updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
80 worker::{Notification, Reason, WorkerError},
81};
82
83#[derive(Debug, Clone)]
84pub struct Options {
85 pub max_pending_message_bundles: usize,
87 pub message_policy: MessagePolicy,
89 pub cross_chain_message_delivery: CrossChainMessageDelivery,
91 pub quorum_grace_period: f64,
94 pub blob_download_timeout: Duration,
96 pub certificate_batch_download_timeout: Duration,
98 pub certificate_download_batch_size: u64,
101 pub sender_certificate_download_batch_size: usize,
104 pub max_joined_tasks: usize,
106}
107
108#[cfg(with_testing)]
109impl Options {
110 pub fn test_default() -> Self {
111 use super::{
112 DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
113 };
114 use crate::DEFAULT_QUORUM_GRACE_PERIOD;
115
116 Options {
117 max_pending_message_bundles: 10,
118 message_policy: MessagePolicy::new_accept_all(),
119 cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
120 quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
121 blob_download_timeout: Duration::from_secs(1),
122 certificate_batch_download_timeout: Duration::from_secs(1),
123 certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
124 sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
125 max_joined_tasks: 100,
126 }
127 }
128}
129
130#[derive(Debug)]
136pub struct ChainClient<Env: Environment> {
137 #[debug(skip)]
139 pub(crate) client: Arc<Client<Env>>,
140 chain_id: ChainId,
142 #[debug(skip)]
144 options: Options,
145 preferred_owner: Option<AccountOwner>,
148 initial_next_block_height: BlockHeight,
150 initial_block_hash: Option<CryptoHash>,
152 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
154}
155
156impl<Env: Environment> Clone for ChainClient<Env> {
157 fn clone(&self) -> Self {
158 Self {
159 client: self.client.clone(),
160 chain_id: self.chain_id,
161 options: self.options.clone(),
162 preferred_owner: self.preferred_owner,
163 initial_next_block_height: self.initial_next_block_height,
164 initial_block_hash: self.initial_block_hash,
165 timing_sender: self.timing_sender.clone(),
166 }
167 }
168}
169
170#[derive(Debug, Error)]
172pub enum Error {
173 #[error("Local node operation failed: {0}")]
174 LocalNodeError(#[from] LocalNodeError),
175
176 #[error("Remote node operation failed: {0}")]
177 RemoteNodeError(#[from] NodeError),
178
179 #[error(transparent)]
180 ArithmeticError(#[from] ArithmeticError),
181
182 #[error("Missing certificates: {0:?}")]
183 ReadCertificatesError(Vec<CryptoHash>),
184
185 #[error("Missing confirmed block: {0:?}")]
186 MissingConfirmedBlock(CryptoHash),
187
188 #[error("JSON (de)serialization error: {0}")]
189 JsonError(#[from] serde_json::Error),
190
191 #[error("Chain operation failed: {0}")]
192 ChainError(#[from] ChainError),
193
194 #[error(transparent)]
195 CommunicationError(#[from] CommunicationError<NodeError>),
196
197 #[error("Internal error within chain client: {0}")]
198 InternalError(&'static str),
199
200 #[error(
201 "Cannot accept a certificate from an unknown committee in the future. \
202 Please synchronize the local view of the admin chain"
203 )]
204 CommitteeSynchronizationError,
205
206 #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
207 WalletSynchronizationError,
208
209 #[error("The state of the client is incompatible with the proposed block: {0}")]
210 BlockProposalError(&'static str),
211
212 #[error(
213 "Cannot accept a certificate from a committee that was retired. \
214 Try a newer certificate from the same origin"
215 )]
216 CommitteeDeprecationError,
217
218 #[error("Protocol error within chain client: {0}")]
219 ProtocolError(&'static str),
220
221 #[error("Signer doesn't have key to sign for chain {0}")]
222 CannotFindKeyForChain(ChainId),
223
224 #[error("client is not configured to propose on chain {0}")]
225 NoAccountKeyConfigured(ChainId),
226
227 #[error("The chain client isn't owner on chain {0}")]
228 NotAnOwner(ChainId),
229
230 #[error(transparent)]
231 ViewError(#[from] ViewError),
232
233 #[error(
234 "Failed to download certificates and update local node to the next height \
235 {target_next_block_height} of chain {chain_id}"
236 )]
237 CannotDownloadCertificates {
238 chain_id: ChainId,
239 target_next_block_height: BlockHeight,
240 },
241
242 #[error(transparent)]
243 BcsError(#[from] bcs::Error),
244
245 #[error(
246 "Unexpected quorum: validators voted for block hash {hash} in {round}, \
247 expected block hash {expected_hash} in {expected_round}"
248 )]
249 UnexpectedQuorum {
250 hash: CryptoHash,
251 round: Round,
252 expected_hash: CryptoHash,
253 expected_round: Round,
254 },
255
256 #[error("signer error: {0:?}")]
257 Signer(#[source] Box<dyn signer::Error>),
258
259 #[error("Cannot revoke the current epoch {0}")]
260 CannotRevokeCurrentEpoch(Epoch),
261
262 #[error("Epoch is already revoked")]
263 EpochAlreadyRevoked,
264
265 #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
266 CannotDownloadMissingSenderBlock {
267 chain_id: ChainId,
268 height: BlockHeight,
269 },
270}
271
272impl From<Infallible> for Error {
273 fn from(infallible: Infallible) -> Self {
274 match infallible {}
275 }
276}
277
278impl Error {
279 pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
280 Self::Signer(Box::new(err))
281 }
282}
283
284impl<Env: Environment> ChainClient<Env> {
285 pub fn new(
286 client: Arc<Client<Env>>,
287 chain_id: ChainId,
288 options: Options,
289 initial_block_hash: Option<CryptoHash>,
290 initial_next_block_height: BlockHeight,
291 preferred_owner: Option<AccountOwner>,
292 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
293 ) -> Self {
294 ChainClient {
295 client,
296 chain_id,
297 options,
298 preferred_owner,
299 initial_block_hash,
300 initial_next_block_height,
301 timing_sender,
302 }
303 }
304
305 pub fn is_follow_only(&self) -> bool {
307 self.client.is_chain_follow_only(self.chain_id)
308 }
309
310 #[instrument(level = "trace", skip(self))]
312 fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
313 self.client
314 .chains
315 .pin()
316 .get(&self.chain_id)
317 .expect("Chain client constructed for invalid chain")
318 .client_mutex()
319 }
320
321 #[instrument(level = "trace", skip(self))]
323 pub fn pending_proposal(&self) -> Option<PendingProposal> {
324 self.client
325 .chains
326 .pin()
327 .get(&self.chain_id)
328 .expect("Chain client constructed for invalid chain")
329 .pending_proposal()
330 .clone()
331 }
332
333 #[instrument(level = "trace", skip(self, f))]
335 fn update_state<F>(&self, f: F)
336 where
337 F: Fn(&mut State),
338 {
339 let chains = self.client.chains.pin();
340 chains
341 .update(self.chain_id, |state| {
342 let mut state = state.clone_for_update_unchecked();
343 f(&mut state);
344 state
345 })
346 .expect("Chain client constructed for invalid chain");
347 }
348
349 #[instrument(level = "trace", skip(self))]
351 pub fn signer(&self) -> &impl Signer {
352 self.client.signer()
353 }
354
355 #[instrument(level = "trace", skip(self))]
357 pub fn options_mut(&mut self) -> &mut Options {
358 &mut self.options
359 }
360
361 #[instrument(level = "trace", skip(self))]
363 pub fn options(&self) -> &Options {
364 &self.options
365 }
366
367 #[instrument(level = "trace", skip(self))]
369 pub fn chain_id(&self) -> ChainId {
370 self.chain_id
371 }
372
373 pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
375 self.timing_sender.clone()
376 }
377
378 #[instrument(level = "trace", skip(self))]
380 pub fn admin_id(&self) -> ChainId {
381 self.client.admin_id
382 }
383
384 #[instrument(level = "trace", skip(self))]
386 pub fn preferred_owner(&self) -> Option<AccountOwner> {
387 self.preferred_owner
388 }
389
390 #[instrument(level = "trace", skip(self))]
392 pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
393 self.preferred_owner = Some(preferred_owner);
394 }
395
396 #[instrument(level = "trace", skip(self))]
398 pub fn unset_preferred_owner(&mut self) {
399 self.preferred_owner = None;
400 }
401
402 #[instrument(level = "trace")]
404 pub async fn chain_state_view(
405 &self,
406 ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
407 self.client.local_node.chain_state_view(self.chain_id).await
408 }
409
410 #[instrument(level = "trace", skip(self))]
412 pub async fn event_stream_publishers(
413 &self,
414 ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
415 let subscriptions = self
416 .client
417 .local_node
418 .get_event_subscriptions(self.chain_id)
419 .await?;
420 let mut publishers = subscriptions.into_iter().fold(
421 BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
422 |mut map, ((chain_id, stream_id), _)| {
423 map.entry(chain_id).or_default().insert(stream_id);
424 map
425 },
426 );
427 if self.chain_id != self.client.admin_id {
428 publishers.insert(
429 self.client.admin_id,
430 vec![
431 StreamId::system(EPOCH_STREAM_NAME),
432 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
433 ]
434 .into_iter()
435 .collect(),
436 );
437 }
438 Ok(publishers)
439 }
440
441 #[instrument(level = "trace")]
443 pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
444 self.subscribe_to(self.chain_id)
445 }
446
447 #[instrument(level = "trace")]
449 pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
450 Ok(Box::pin(UnboundedReceiverStream::new(
451 self.client.notifier.subscribe(vec![chain_id]),
452 )))
453 }
454
455 #[instrument(level = "trace")]
457 pub fn storage_client(&self) -> &Env::Storage {
458 self.client.storage_client()
459 }
460
461 #[instrument(level = "trace")]
463 pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
464 let query = ChainInfoQuery::new(self.chain_id);
465 let response = self
466 .client
467 .local_node
468 .handle_chain_info_query(query)
469 .await?;
470 self.client.update_from_info(&response.info);
471 Ok(response.info)
472 }
473
474 #[instrument(level = "trace")]
476 pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
477 let query = ChainInfoQuery::new(self.chain_id)
478 .with_manager_values()
479 .with_committees();
480 let response = self
481 .client
482 .local_node
483 .handle_chain_info_query(query)
484 .await?;
485 self.client.update_from_info(&response.info);
486 Ok(response.info)
487 }
488
489 pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
491 self.client.get_chain_description(self.chain_id).await
492 }
493
494 #[instrument(level = "trace")]
497 async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
498 if self.options.message_policy.is_ignore() {
499 return Ok(Vec::new());
501 }
502
503 let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
504 let info = self
505 .client
506 .local_node
507 .handle_chain_info_query(query)
508 .await?
509 .info;
510 if self.preferred_owner.is_some_and(|owner| {
511 info.manager
512 .ownership
513 .is_super_owner_no_regular_owners(&owner)
514 }) {
515 ensure!(
517 info.next_block_height >= self.initial_next_block_height,
518 Error::WalletSynchronizationError
519 );
520 }
521
522 Ok(info
523 .requested_pending_message_bundles
524 .into_iter()
525 .filter_map(|bundle| self.options.message_policy.apply(bundle))
526 .take(self.options.max_pending_message_bundles)
527 .collect())
528 }
529
530 #[instrument(level = "trace")]
534 async fn collect_stream_updates(&self) -> Result<Option<Operation>, Error> {
535 let subscription_map = self
537 .client
538 .local_node
539 .get_event_subscriptions(self.chain_id)
540 .await?;
541 let futures = subscription_map
543 .into_iter()
544 .filter(|((chain_id, _), _)| {
545 self.options
546 .message_policy
547 .restrict_chain_ids_to
548 .as_ref()
549 .is_none_or(|chain_set| chain_set.contains(chain_id))
550 })
551 .map(|((chain_id, stream_id), subscriptions)| {
552 let client = self.client.clone();
553 async move {
554 let next_expected_index = client
555 .local_node
556 .get_next_expected_event(chain_id, stream_id.clone())
557 .await?;
558 if let Some(next_index) = next_expected_index
559 .filter(|next_index| *next_index > subscriptions.next_index)
560 {
561 Ok(Some((chain_id, stream_id, next_index)))
562 } else {
563 Ok::<_, Error>(None)
564 }
565 }
566 });
567 let updates = futures::stream::iter(futures)
568 .buffer_unordered(self.options.max_joined_tasks)
569 .try_collect::<Vec<_>>()
570 .await?
571 .into_iter()
572 .flatten()
573 .collect::<Vec<_>>();
574 if updates.is_empty() {
575 return Ok(None);
576 }
577 Ok(Some(SystemOperation::UpdateStreams(updates).into()))
578 }
579
580 #[instrument(level = "trace")]
581 async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
582 self.client.chain_info_with_committees(self.chain_id).await
583 }
584
585 #[instrument(level = "trace")]
587 async fn epoch_and_committees(
588 &self,
589 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
590 let info = self.chain_info_with_committees().await?;
591 let epoch = info.epoch;
592 let committees = info.into_committees()?;
593 Ok((epoch, committees))
594 }
595
596 #[instrument(level = "trace")]
598 pub async fn local_committee(&self) -> Result<Committee, Error> {
599 let info = match self.chain_info_with_committees().await {
600 Ok(info) => info,
601 Err(LocalNodeError::BlobsNotFound(_)) => {
602 self.synchronize_chain_state(self.chain_id).await?;
603 self.chain_info_with_committees().await?
604 }
605 Err(err) => return Err(err.into()),
606 };
607 Ok(info.into_current_committee()?)
608 }
609
610 #[instrument(level = "trace")]
612 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
613 self.client.admin_committee().await
614 }
615
616 #[instrument(level = "trace")]
620 pub async fn identity(&self) -> Result<AccountOwner, Error> {
621 let Some(preferred_owner) = self.preferred_owner else {
622 return Err(Error::NoAccountKeyConfigured(self.chain_id));
623 };
624 let manager = self.chain_info().await?.manager;
625 ensure!(
626 manager.ownership.is_active(),
627 LocalNodeError::InactiveChain(self.chain_id)
628 );
629 let fallback_owners = if manager.ownership.has_fallback() {
630 self.local_committee()
631 .await?
632 .account_keys_and_weights()
633 .map(|(key, _)| AccountOwner::from(key))
634 .collect()
635 } else {
636 BTreeSet::new()
637 };
638
639 let is_owner = manager.ownership.is_owner(&preferred_owner)
640 || fallback_owners.contains(&preferred_owner);
641
642 if !is_owner {
643 warn!(
644 chain_id = %self.chain_id,
645 ownership = ?manager.ownership,
646 ?fallback_owners,
647 ?preferred_owner,
648 "The preferred owner is not configured as an owner of this chain",
649 );
650 return Err(Error::NotAnOwner(self.chain_id));
651 }
652
653 let has_signer = self
654 .signer()
655 .contains_key(&preferred_owner)
656 .await
657 .map_err(Error::signer_failure)?;
658
659 if !has_signer {
660 warn!(%self.chain_id, ?preferred_owner,
661 "Chain is one of the owners but its Signer instance doesn't contain the key",
662 );
663 return Err(Error::CannotFindKeyForChain(self.chain_id));
664 }
665
666 Ok(preferred_owner)
667 }
668
669 #[instrument(level = "trace")]
672 pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
673 #[cfg(with_metrics)]
674 let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
675
676 let mut info = self.synchronize_to_known_height().await?;
677
678 if self.preferred_owner.is_none_or(|owner| {
679 !info
680 .manager
681 .ownership
682 .is_super_owner_no_regular_owners(&owner)
683 }) {
684 info = self.client.synchronize_chain_state(self.chain_id).await?;
688 }
689
690 if info.epoch > self.client.admin_committees().await?.0 {
691 self.client
692 .synchronize_chain_state(self.client.admin_id)
693 .await?;
694 }
695
696 self.client.update_from_info(&info);
697 Ok(info)
698 }
699
700 async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
705 let info = self
706 .client
707 .download_certificates(self.chain_id, self.initial_next_block_height)
708 .await?;
709 if info.next_block_height == self.initial_next_block_height {
710 ensure!(
712 self.initial_block_hash == info.block_hash,
713 Error::InternalError("Invalid chain of blocks in local node")
714 );
715 }
716 Ok(info)
717 }
718
719 #[instrument(level = "trace", skip(old_committee, latest_certificate))]
721 pub async fn update_validators(
722 &self,
723 old_committee: Option<&Committee>,
724 latest_certificate: Option<ConfirmedBlockCertificate>,
725 ) -> Result<(), Error> {
726 let update_validators_start = linera_base::time::Instant::now();
727 if let Some(old_committee) = old_committee {
729 self.communicate_chain_updates(old_committee, latest_certificate.clone())
730 .await?
731 };
732 if let Ok(new_committee) = self.local_committee().await {
733 if Some(&new_committee) != old_committee {
734 self.communicate_chain_updates(&new_committee, latest_certificate)
737 .await?;
738 }
739 }
740 self.send_timing(update_validators_start, TimingType::UpdateValidators);
741 Ok(())
742 }
743
744 #[instrument(level = "trace", skip(committee))]
746 pub async fn communicate_chain_updates(
747 &self,
748 committee: &Committee,
749 latest_certificate: Option<ConfirmedBlockCertificate>,
750 ) -> Result<(), Error> {
751 let delivery = self.options.cross_chain_message_delivery;
752 let height = self.chain_info().await?.next_block_height;
753 self.client
754 .communicate_chain_updates(
755 committee,
756 self.chain_id,
757 height,
758 delivery,
759 latest_certificate,
760 )
761 .await
762 }
763
764 async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
767 let subscriptions = self
768 .client
769 .local_node
770 .get_event_subscriptions(self.chain_id)
771 .await?;
772 let chain_ids = subscriptions
773 .iter()
774 .map(|((chain_id, _), _)| *chain_id)
775 .chain(iter::once(self.client.admin_id))
776 .filter(|chain_id| *chain_id != self.chain_id)
777 .collect::<BTreeSet<_>>();
778 stream::iter(
779 chain_ids
780 .into_iter()
781 .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
782 )
783 .buffer_unordered(self.options.max_joined_tasks)
784 .collect::<Vec<_>>()
785 .await
786 .into_iter()
787 .collect::<Result<Vec<_>, _>>()?;
788 Ok(())
789 }
790
791 #[instrument(level = "trace")]
800 pub async fn find_received_certificates(
801 &self,
802 cancellation_token: Option<CancellationToken>,
803 ) -> Result<(), Error> {
804 debug!(chain_id = %self.chain_id, "starting find_received_certificates");
805 #[cfg(with_metrics)]
806 let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
807 let chain_id = self.chain_id;
809 let (_, committee) = self.admin_committee().await?;
810 let nodes = self.client.make_nodes(&committee)?;
811
812 let trackers = self
813 .client
814 .local_node
815 .get_received_certificate_trackers(chain_id)
816 .await?;
817
818 trace!("find_received_certificates: read trackers");
819
820 let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
821 let result = communicate_with_quorum(
823 &nodes,
824 &committee,
825 |_| (),
826 |remote_node| {
827 let client = &self.client;
828 let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
829 let received_log_batches = Arc::clone(&received_log_batches);
830 Box::pin(async move {
831 let batch = client
832 .get_received_log_from_validator(chain_id, &remote_node, tracker)
833 .await?;
834 let mut batches = received_log_batches.lock().unwrap();
835 batches.push((remote_node.public_key, batch));
836 Ok(())
837 })
838 },
839 self.options.quorum_grace_period,
840 )
841 .await;
842
843 if let Err(error) = result {
844 error!(
845 %error,
846 "Failed to synchronize received_logs from at least a quorum of validators",
847 );
848 }
849
850 let received_logs: Vec<_> = {
851 let mut received_log_batches = received_log_batches.lock().unwrap();
852 std::mem::take(received_log_batches.as_mut())
853 };
854
855 debug!(
856 received_logs_len = %received_logs.len(),
857 received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
858 "collected received logs"
859 );
860
861 let (received_logs, mut validator_trackers) = {
862 (
863 ReceivedLogs::from_received_result(received_logs.clone()),
864 ValidatorTrackers::new(received_logs, &trackers),
865 )
866 };
867
868 debug!(
869 num_chains = %received_logs.num_chains(),
870 num_certs = %received_logs.num_certs(),
871 "find_received_certificates: total number of chains and certificates to sync",
872 );
873
874 let max_blocks_per_chain =
875 self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
876 for received_log in received_logs.into_batches(
877 self.options.sender_certificate_download_batch_size,
878 max_blocks_per_chain,
879 ) {
880 validator_trackers = self
881 .receive_sender_certificates(
882 received_log,
883 validator_trackers,
884 &nodes,
885 cancellation_token.clone(),
886 )
887 .await?;
888
889 self.update_received_certificate_trackers(&validator_trackers)
890 .await;
891 }
892
893 info!("find_received_certificates finished");
894
895 Ok(())
896 }
897
898 async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
899 let updated_trackers = trackers.to_map();
900 trace!(?updated_trackers, "updated tracker values");
901
902 if let Err(error) = self
904 .client
905 .local_node
906 .update_received_certificate_trackers(self.chain_id, updated_trackers)
907 .await
908 {
909 error!(
910 chain_id = %self.chain_id,
911 %error,
912 "Failed to update the certificate trackers for chain",
913 );
914 }
915 }
916
917 async fn receive_sender_certificates(
920 &self,
921 mut received_logs: ReceivedLogs,
922 mut validator_trackers: ValidatorTrackers,
923 nodes: &[RemoteNode<Env::ValidatorNode>],
924 cancellation_token: Option<CancellationToken>,
925 ) -> Result<ValidatorTrackers, Error> {
926 debug!(
927 num_chains = %received_logs.num_chains(),
928 num_certs = %received_logs.num_certs(),
929 "receive_sender_certificates: number of chains and certificates to sync",
930 );
931
932 let local_next_heights = self
934 .client
935 .local_node
936 .next_outbox_heights(received_logs.chains(), self.chain_id)
937 .await?;
938
939 validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
940
941 debug!(
942 remaining_total_certificates = %received_logs.num_certs(),
943 "receive_sender_certificates: computed remote_heights"
944 );
945
946 let mut other_sender_chains = Vec::new();
947 let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
948
949 let cert_futures = received_logs.heights_per_chain().into_iter().filter_map(
950 |(sender_chain_id, remote_heights)| {
951 if remote_heights.is_empty() {
952 other_sender_chains.push(sender_chain_id);
956 return None;
957 };
958 let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
959 let sender = sender.clone();
960 let client = self.client.clone();
961 let mut nodes = nodes.to_vec();
962 nodes.shuffle(&mut rand::thread_rng());
963 let received_logs_ref = &received_logs;
964 Some(async move {
965 client
966 .download_and_process_sender_chain(
967 sender_chain_id,
968 &nodes,
969 received_logs_ref,
970 remote_heights,
971 sender,
972 )
973 .await
974 })
975 },
976 );
977
978 let update_trackers = linera_base::task::spawn(async move {
979 while let Some(chain_and_height) = receiver.recv().await {
980 validator_trackers.downloaded_cert(chain_and_height);
981 }
982 validator_trackers
983 });
984
985 let mut cancellation_future = Box::pin(
986 async move {
987 if let Some(token) = cancellation_token {
988 token.cancelled().await
989 } else {
990 future::pending().await
991 }
992 }
993 .fuse(),
994 );
995
996 select! {
997 _ = stream::iter(cert_futures)
998 .buffer_unordered(self.options.max_joined_tasks)
999 .for_each(future::ready)
1000 => (),
1001 _ = cancellation_future => ()
1002 };
1003
1004 drop(sender);
1005
1006 let validator_trackers = update_trackers.await;
1007
1008 debug!(
1009 num_other_chains = %other_sender_chains.len(),
1010 "receive_sender_certificates: processing certificates finished"
1011 );
1012
1013 self.retry_pending_cross_chain_requests(nodes, other_sender_chains)
1017 .await;
1018
1019 debug!("receive_sender_certificates: finished processing other_sender_chains");
1020
1021 Ok(validator_trackers)
1022 }
1023
1024 async fn retry_pending_cross_chain_requests(
1027 &self,
1028 nodes: &[RemoteNode<Env::ValidatorNode>],
1029 other_sender_chains: Vec<ChainId>,
1030 ) {
1031 let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
1032 let local_node = self.client.local_node.clone();
1033 async move {
1034 if let Err(error) = match local_node
1035 .retry_pending_cross_chain_requests(chain_id)
1036 .await
1037 {
1038 Ok(()) => Ok(()),
1039 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1040 if let Err(error) = self
1041 .client
1042 .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1043 .await
1044 {
1045 error!(
1046 ?blob_ids,
1047 %error,
1048 "Error while attempting to download blobs during retrying outgoing \
1049 messages"
1050 );
1051 }
1052 local_node
1053 .retry_pending_cross_chain_requests(chain_id)
1054 .await
1055 }
1056 err => err,
1057 } {
1058 error!(
1059 %chain_id,
1060 %error,
1061 "Failed to retry outgoing messages from chain"
1062 );
1063 }
1064 }
1065 }));
1066 stream.for_each(future::ready).await;
1067 }
1068
1069 #[instrument(level = "trace")]
1071 pub async fn transfer(
1072 &self,
1073 owner: AccountOwner,
1074 amount: Amount,
1075 recipient: Account,
1076 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1077 self.execute_operation(SystemOperation::Transfer {
1079 owner,
1080 recipient,
1081 amount,
1082 })
1083 .await
1084 }
1085
1086 #[instrument(level = "trace")]
1089 pub async fn read_data_blob(
1090 &self,
1091 hash: CryptoHash,
1092 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1093 let blob_id = BlobId {
1094 hash,
1095 blob_type: BlobType::Data,
1096 };
1097 self.execute_operation(SystemOperation::VerifyBlob { blob_id })
1098 .await
1099 }
1100
1101 #[instrument(level = "trace")]
1103 pub async fn claim(
1104 &self,
1105 owner: AccountOwner,
1106 target_id: ChainId,
1107 recipient: Account,
1108 amount: Amount,
1109 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1110 self.execute_operation(SystemOperation::Claim {
1111 owner,
1112 target_id,
1113 recipient,
1114 amount,
1115 })
1116 .await
1117 }
1118
1119 #[instrument(level = "trace")]
1122 pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1123 let chain_id = self.chain_id;
1124 let info = self.chain_info_with_committees().await?;
1125 let committee = info.current_committee()?;
1126 let height = info.next_block_height;
1127 let round = info.manager.current_round;
1128 let action = CommunicateAction::RequestTimeout {
1129 height,
1130 round,
1131 chain_id,
1132 };
1133 let value = Timeout::new(chain_id, height, info.epoch);
1134 let certificate = Box::new(
1135 self.client
1136 .communicate_chain_action(committee, action, value)
1137 .await?,
1138 );
1139 self.client.process_certificate(certificate.clone()).await?;
1140 self.client
1142 .communicate_chain_updates(
1143 committee,
1144 chain_id,
1145 height,
1146 CrossChainMessageDelivery::NonBlocking,
1147 None,
1148 )
1149 .await?;
1150 Ok(*certificate)
1151 }
1152
1153 #[instrument(level = "trace", skip_all)]
1155 pub async fn synchronize_chain_state(
1156 &self,
1157 chain_id: ChainId,
1158 ) -> Result<Box<ChainInfo>, Error> {
1159 self.client.synchronize_chain_state(chain_id).await
1160 }
1161
1162 #[instrument(level = "trace", skip_all)]
1165 pub async fn synchronize_chain_state_from_committee(
1166 &self,
1167 committee: Committee,
1168 ) -> Result<Box<ChainInfo>, Error> {
1169 self.client
1170 .synchronize_chain_from_committee(self.chain_id, committee)
1171 .await
1172 }
1173
1174 #[instrument(level = "trace", skip(operations, blobs))]
1176 pub async fn execute_operations(
1177 &self,
1178 operations: Vec<Operation>,
1179 blobs: Vec<Blob>,
1180 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1181 let timing_start = linera_base::time::Instant::now();
1182
1183 let result = loop {
1184 let execute_block_start = linera_base::time::Instant::now();
1185 match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1187 Ok(ExecuteBlockOutcome::Executed(certificate)) => {
1188 self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1189 break Ok(ClientOutcome::Committed(certificate));
1190 }
1191 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
1192 break Ok(ClientOutcome::WaitForTimeout(timeout));
1193 }
1194 Ok(ExecuteBlockOutcome::Conflict(certificate)) => {
1195 info!(
1196 height = %certificate.block().header.height,
1197 "Another block was committed; retrying."
1198 );
1199 }
1200 Err(Error::CommunicationError(CommunicationError::Trusted(
1201 NodeError::UnexpectedBlockHeight {
1202 expected_block_height,
1203 found_block_height,
1204 },
1205 ))) if expected_block_height > found_block_height => {
1206 tracing::info!(
1207 "Local state is outdated; synchronizing chain {:.8}",
1208 self.chain_id
1209 );
1210 self.synchronize_chain_state(self.chain_id).await?;
1211 }
1212 Err(err) => return Err(err),
1213 };
1214 };
1215
1216 self.send_timing(timing_start, TimingType::ExecuteOperations);
1217
1218 result
1219 }
1220
1221 pub async fn execute_operation(
1223 &self,
1224 operation: impl Into<Operation>,
1225 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1226 self.execute_operations(vec![operation.into()], vec![])
1227 .await
1228 }
1229
1230 #[instrument(level = "trace", skip(operations, blobs))]
1234 async fn execute_block(
1235 &self,
1236 operations: Vec<Operation>,
1237 blobs: Vec<Blob>,
1238 ) -> Result<ExecuteBlockOutcome, Error> {
1239 #[cfg(with_metrics)]
1240 let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1241
1242 let mutex = self.client_mutex();
1243 let _guard = mutex.lock_owned().await;
1244 match self.process_pending_block_without_prepare().await? {
1246 ClientOutcome::Committed(Some(certificate)) => {
1247 return Ok(ExecuteBlockOutcome::Conflict(certificate))
1248 }
1249 ClientOutcome::WaitForTimeout(timeout) => {
1250 return Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
1251 }
1252 ClientOutcome::Committed(None) => {}
1253 }
1254
1255 let transactions = self.prepend_epochs_messages_and_events(operations).await?;
1259
1260 if transactions.is_empty() {
1261 return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1262 WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1263 )));
1264 }
1265
1266 let block = self.new_pending_block(transactions, blobs).await?;
1267
1268 match self.process_pending_block_without_prepare().await? {
1269 ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
1270 Ok(ExecuteBlockOutcome::Executed(certificate))
1271 }
1272 ClientOutcome::Committed(Some(certificate)) => {
1273 Ok(ExecuteBlockOutcome::Conflict(certificate))
1274 }
1275 ClientOutcome::Committed(None) => {
1277 Err(Error::BlockProposalError("Unexpected block proposal error"))
1278 }
1279 ClientOutcome::WaitForTimeout(timeout) => {
1280 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
1281 }
1282 }
1283 }
1284
1285 #[instrument(level = "trace", skip(operations))]
1291 async fn prepend_epochs_messages_and_events(
1292 &self,
1293 operations: Vec<Operation>,
1294 ) -> Result<Vec<Transaction>, Error> {
1295 let incoming_bundles = self.pending_message_bundles().await?;
1296 let stream_updates = self.collect_stream_updates().await?;
1297 Ok(self
1298 .collect_epoch_changes()
1299 .await?
1300 .into_iter()
1301 .map(Transaction::ExecuteOperation)
1302 .chain(
1303 incoming_bundles
1304 .into_iter()
1305 .map(Transaction::ReceiveMessages),
1306 )
1307 .chain(
1308 stream_updates
1309 .into_iter()
1310 .map(Transaction::ExecuteOperation),
1311 )
1312 .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1313 .collect::<Vec<_>>())
1314 }
1315
1316 #[instrument(level = "trace", skip(transactions, blobs))]
1320 async fn new_pending_block(
1321 &self,
1322 transactions: Vec<Transaction>,
1323 blobs: Vec<Blob>,
1324 ) -> Result<Block, Error> {
1325 let identity = self.identity().await?;
1326
1327 ensure!(
1328 self.pending_proposal().is_none(),
1329 Error::BlockProposalError(
1330 "Client state already has a pending block; \
1331 use the `linera retry-pending-block` command to commit that first"
1332 )
1333 );
1334 let info = self.chain_info_with_committees().await?;
1335 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1336 let proposed_block = ProposedBlock {
1337 epoch: info.epoch,
1338 chain_id: self.chain_id,
1339 transactions,
1340 previous_block_hash: info.block_hash,
1341 height: info.next_block_height,
1342 authenticated_owner: Some(identity),
1343 timestamp,
1344 };
1345
1346 let round = self.round_for_oracle(&info, &identity).await?;
1347 let (block, _) = self
1350 .client
1351 .stage_block_execution_and_discard_failing_messages(
1352 proposed_block,
1353 round,
1354 blobs.clone(),
1355 )
1356 .await?;
1357 let (proposed_block, _) = block.clone().into_proposal();
1358 self.update_state(|state| {
1359 state.set_pending_proposal(proposed_block.clone(), blobs.clone())
1360 });
1361 Ok(block)
1362 }
1363
1364 #[instrument(level = "trace", skip(transactions))]
1369 fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1370 let local_time = self.storage_client().clock().current_time();
1371 transactions
1372 .iter()
1373 .filter_map(Transaction::incoming_bundle)
1374 .map(|msg| msg.bundle.timestamp)
1375 .max()
1376 .map_or(local_time, |timestamp| timestamp.max(local_time))
1377 .max(block_time)
1378 }
1379
1380 #[instrument(level = "trace", skip(query))]
1382 pub async fn query_application(
1383 &self,
1384 query: Query,
1385 block_hash: Option<CryptoHash>,
1386 ) -> Result<QueryOutcome, Error> {
1387 loop {
1388 let result = self
1389 .client
1390 .local_node
1391 .query_application(self.chain_id, query.clone(), block_hash)
1392 .await;
1393 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1394 let validators = self.client.validator_nodes().await?;
1395 self.client
1396 .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1397 .await?;
1398 continue; }
1400 return Ok(result?);
1401 }
1402 }
1403
1404 #[instrument(level = "trace", skip(query))]
1406 pub async fn query_system_application(
1407 &self,
1408 query: SystemQuery,
1409 ) -> Result<QueryOutcome<SystemResponse>, Error> {
1410 let QueryOutcome {
1411 response,
1412 operations,
1413 } = self.query_application(Query::System(query), None).await?;
1414 match response {
1415 QueryResponse::System(response) => Ok(QueryOutcome {
1416 response,
1417 operations,
1418 }),
1419 _ => Err(Error::InternalError("Unexpected response for system query")),
1420 }
1421 }
1422
1423 #[instrument(level = "trace", skip(application_id, query))]
1425 #[cfg(with_testing)]
1426 pub async fn query_user_application<A: Abi>(
1427 &self,
1428 application_id: ApplicationId<A>,
1429 query: &A::Query,
1430 ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1431 let query = Query::user(application_id, query)?;
1432 let QueryOutcome {
1433 response,
1434 operations,
1435 } = self.query_application(query, None).await?;
1436 match response {
1437 QueryResponse::User(response_bytes) => {
1438 let response = serde_json::from_slice(&response_bytes)?;
1439 Ok(QueryOutcome {
1440 response,
1441 operations,
1442 })
1443 }
1444 _ => Err(Error::InternalError("Unexpected response for user query")),
1445 }
1446 }
1447
1448 #[instrument(level = "trace")]
1455 pub async fn query_balance(&self) -> Result<Amount, Error> {
1456 let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
1457 Ok(balance)
1458 }
1459
1460 #[instrument(level = "trace", skip(owner))]
1467 pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1468 if owner.is_chain() {
1469 self.query_balance().await
1470 } else {
1471 Ok(self
1472 .query_balances_with_owner(owner)
1473 .await?
1474 .1
1475 .unwrap_or(Amount::ZERO))
1476 }
1477 }
1478
1479 #[instrument(level = "trace", skip(owner))]
1486 pub(crate) async fn query_balances_with_owner(
1487 &self,
1488 owner: AccountOwner,
1489 ) -> Result<(Amount, Option<Amount>), Error> {
1490 let incoming_bundles = self.pending_message_bundles().await?;
1491 if incoming_bundles.is_empty() {
1494 let chain_balance = self.local_balance().await?;
1495 let owner_balance = self.local_owner_balance(owner).await?;
1496 return Ok((chain_balance, Some(owner_balance)));
1497 }
1498 let info = self.chain_info().await?;
1499 let transactions = incoming_bundles
1500 .into_iter()
1501 .map(Transaction::ReceiveMessages)
1502 .collect::<Vec<_>>();
1503 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1504 let block = ProposedBlock {
1505 epoch: info.epoch,
1506 chain_id: self.chain_id,
1507 transactions,
1508 previous_block_hash: info.block_hash,
1509 height: info.next_block_height,
1510 authenticated_owner: if owner == AccountOwner::CHAIN {
1511 None
1512 } else {
1513 Some(owner)
1514 },
1515 timestamp,
1516 };
1517 match self
1518 .client
1519 .stage_block_execution_and_discard_failing_messages(block, None, Vec::new())
1520 .await
1521 {
1522 Ok((_, response)) => Ok((
1523 response.info.chain_balance,
1524 response.info.requested_owner_balance,
1525 )),
1526 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1527 error,
1528 )))) if matches!(
1529 &*error,
1530 ChainError::ExecutionError(
1531 execution_error,
1532 ChainExecutionContext::Block
1533 ) if matches!(
1534 **execution_error,
1535 ExecutionError::FeesExceedFunding { .. }
1536 )
1537 ) =>
1538 {
1539 Ok((Amount::ZERO, Some(Amount::ZERO)))
1541 }
1542 Err(error) => Err(error),
1543 }
1544 }
1545
1546 #[instrument(level = "trace")]
1550 pub async fn local_balance(&self) -> Result<Amount, Error> {
1551 let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1552 Ok(balance)
1553 }
1554
1555 #[instrument(level = "trace", skip(owner))]
1559 pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1560 if owner.is_chain() {
1561 self.local_balance().await
1562 } else {
1563 Ok(self
1564 .local_balances_with_owner(owner)
1565 .await?
1566 .1
1567 .unwrap_or(Amount::ZERO))
1568 }
1569 }
1570
1571 #[instrument(level = "trace", skip(owner))]
1575 pub(crate) async fn local_balances_with_owner(
1576 &self,
1577 owner: AccountOwner,
1578 ) -> Result<(Amount, Option<Amount>), Error> {
1579 ensure!(
1580 self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1581 Error::WalletSynchronizationError
1582 );
1583 let mut query = ChainInfoQuery::new(self.chain_id);
1584 query.request_owner_balance = owner;
1585 let response = self
1586 .client
1587 .local_node
1588 .handle_chain_info_query(query)
1589 .await?;
1590 Ok((
1591 response.info.chain_balance,
1592 response.info.requested_owner_balance,
1593 ))
1594 }
1595
1596 #[instrument(level = "trace")]
1598 pub async fn transfer_to_account(
1599 &self,
1600 from: AccountOwner,
1601 amount: Amount,
1602 account: Account,
1603 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1604 self.transfer(from, amount, account).await
1605 }
1606
1607 #[cfg(with_testing)]
1609 #[instrument(level = "trace")]
1610 pub async fn burn(
1611 &self,
1612 owner: AccountOwner,
1613 amount: Amount,
1614 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1615 let recipient = Account::burn_address(self.chain_id);
1616 self.transfer(owner, amount, recipient).await
1617 }
1618
1619 #[instrument(level = "trace")]
1620 pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1621 let validators = self.client.validator_nodes().await?;
1622 self.client
1623 .fetch_chain_info(self.chain_id, &validators)
1624 .await
1625 }
1626
1627 #[instrument(level = "trace")]
1636 pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1637 if self.is_follow_only() {
1638 return self.client.synchronize_chain_state(self.chain_id).await;
1639 }
1640 let info = self.prepare_chain().await?;
1641 self.synchronize_publisher_chains().await?;
1642 self.find_received_certificates(None).await?;
1643 Ok(info)
1644 }
1645
1646 #[instrument(level = "trace")]
1648 pub async fn process_pending_block(
1649 &self,
1650 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1651 self.prepare_chain().await?;
1652 self.process_pending_block_without_prepare().await
1653 }
1654
1655 #[instrument(level = "trace")]
1657 async fn process_pending_block_without_prepare(
1658 &self,
1659 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1660 let info = self.request_leader_timeout_if_needed().await?;
1661
1662 if info.manager.has_locking_block_in_current_round()
1664 && !info.manager.current_round.is_fast()
1665 {
1666 return self.finalize_locking_block(info).await;
1667 }
1668 let owner = self.identity().await?;
1669
1670 let local_node = &self.client.local_node;
1671 let pending_proposal = self.pending_proposal();
1673 let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
1674 match &**locking {
1675 LockingBlock::Regular(certificate) => {
1676 let blob_ids = certificate.block().required_blob_ids();
1677 let blobs = local_node
1678 .get_locking_blobs(&blob_ids, self.chain_id)
1679 .await?
1680 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1681 debug!("Retrying locking block from round {}", certificate.round);
1682 (certificate.block().clone(), blobs)
1683 }
1684 LockingBlock::Fast(proposal) => {
1685 let proposed_block = proposal.content.block.clone();
1686 let blob_ids = proposed_block.published_blob_ids();
1687 let blobs = local_node
1688 .get_locking_blobs(&blob_ids, self.chain_id)
1689 .await?
1690 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1691 let block = self
1692 .client
1693 .stage_block_execution(proposed_block, None, blobs.clone())
1694 .await?
1695 .0;
1696 debug!("Retrying locking block from fast round.");
1697 (block, blobs)
1698 }
1699 }
1700 } else if let Some(pending_proposal) = pending_proposal {
1701 let proposed_block = pending_proposal.block;
1703 let round = self.round_for_oracle(&info, &owner).await?;
1704 let (block, _) = self
1705 .client
1706 .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
1707 .await?;
1708 debug!("Proposing the local pending block.");
1709 (block, pending_proposal.blobs)
1710 } else {
1711 return Ok(ClientOutcome::Committed(None)); };
1713
1714 let has_oracle_responses = block.has_oracle_responses();
1715 let (proposed_block, outcome) = block.into_proposal();
1716 let round = match self
1717 .round_for_new_proposal(&info, &owner, has_oracle_responses)
1718 .await?
1719 {
1720 Either::Left(round) => round,
1721 Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
1722 };
1723 debug!("Proposing block for round {}", round);
1724
1725 let already_handled_locally = info
1726 .manager
1727 .already_handled_proposal(round, &proposed_block);
1728 let proposal = if let Some(locking) = info.manager.requested_locking {
1730 Box::new(match *locking {
1731 LockingBlock::Regular(cert) => {
1732 BlockProposal::new_retry_regular(owner, round, cert, self.signer())
1733 .await
1734 .map_err(Error::signer_failure)?
1735 }
1736 LockingBlock::Fast(proposal) => {
1737 BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
1738 .await
1739 .map_err(Error::signer_failure)?
1740 }
1741 })
1742 } else {
1743 Box::new(
1744 BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
1745 .await
1746 .map_err(Error::signer_failure)?,
1747 )
1748 };
1749 if !already_handled_locally {
1750 if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
1752 match err {
1753 LocalNodeError::BlobsNotFound(_) => {
1754 local_node
1755 .handle_pending_blobs(self.chain_id, blobs)
1756 .await?;
1757 local_node.handle_block_proposal(*proposal.clone()).await?;
1758 }
1759 err => return Err(err.into()),
1760 }
1761 }
1762 }
1763 let committee = self.local_committee().await?;
1764 let block = Block::new(proposed_block, outcome);
1765 let submit_block_proposal_start = linera_base::time::Instant::now();
1767 let certificate = if round.is_fast() {
1768 let hashed_value = ConfirmedBlock::new(block);
1769 self.client
1770 .submit_block_proposal(&committee, proposal, hashed_value)
1771 .await?
1772 } else {
1773 let hashed_value = ValidatedBlock::new(block);
1774 let certificate = self
1775 .client
1776 .submit_block_proposal(&committee, proposal, hashed_value.clone())
1777 .await?;
1778 self.client.finalize_block(&committee, certificate).await?
1779 };
1780 self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
1781 debug!(round = %certificate.round, "Sending confirmed block to validators");
1782 self.update_validators(Some(&committee), Some(certificate.clone()))
1783 .await?;
1784 Ok(ClientOutcome::Committed(Some(certificate)))
1785 }
1786
1787 fn send_timing(&self, start: Instant, timing_type: TimingType) {
1788 let Some(sender) = &self.timing_sender else {
1789 return;
1790 };
1791 if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
1792 tracing::warn!(%err, "Failed to send timing info");
1793 }
1794 }
1795
1796 async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
1799 let mut info = self.chain_info_with_manager_values().await?;
1800 if let Some(round_timeout) = info.manager.round_timeout {
1803 if round_timeout <= self.storage_client().clock().current_time() {
1804 if let Err(e) = self.request_leader_timeout().await {
1805 info!("Failed to obtain a timeout certificate: {}", e);
1806 } else {
1807 info = self.chain_info_with_manager_values().await?;
1808 }
1809 }
1810 }
1811 Ok(info)
1812 }
1813
1814 async fn finalize_locking_block(
1818 &self,
1819 info: Box<ChainInfo>,
1820 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1821 let locking = info
1822 .manager
1823 .requested_locking
1824 .expect("Should have a locking block");
1825 let LockingBlock::Regular(certificate) = *locking else {
1826 panic!("Should have a locking validated block");
1827 };
1828 debug!(
1829 round = %certificate.round,
1830 "Finalizing locking block"
1831 );
1832 let committee = self.local_committee().await?;
1833 let certificate = self
1834 .client
1835 .finalize_block(&committee, certificate.clone())
1836 .await?;
1837 self.update_validators(Some(&committee), Some(certificate.clone()))
1838 .await?;
1839 Ok(ClientOutcome::Committed(Some(certificate)))
1840 }
1841
1842 async fn round_for_oracle(
1844 &self,
1845 info: &ChainInfo,
1846 identity: &AccountOwner,
1847 ) -> Result<Option<u32>, Error> {
1848 match self.round_for_new_proposal(info, identity, true).await {
1850 Ok(Either::Left(round)) => Ok(round.multi_leader()),
1852 Err(Error::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
1856 Err(err) => Err(err),
1857 }
1858 }
1859
1860 async fn round_for_new_proposal(
1862 &self,
1863 info: &ChainInfo,
1864 identity: &AccountOwner,
1865 has_oracle_responses: bool,
1866 ) -> Result<Either<Round, RoundTimeout>, Error> {
1867 let manager = &info.manager;
1868 let seed = manager.seed;
1869 let conflict = manager
1873 .requested_signed_proposal
1874 .as_ref()
1875 .into_iter()
1876 .chain(&manager.requested_proposed)
1877 .any(|proposal| proposal.content.round == manager.current_round)
1878 || (manager.current_round.is_fast() && has_oracle_responses);
1879 let round = if !conflict {
1880 manager.current_round
1881 } else if let Some(round) = manager
1882 .ownership
1883 .next_round(manager.current_round)
1884 .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
1885 {
1886 round
1887 } else if let Some(timeout) = info.round_timeout() {
1888 return Ok(Either::Right(timeout));
1889 } else {
1890 return Err(Error::BlockProposalError(
1891 "Conflicting proposal in the current round",
1892 ));
1893 };
1894 let current_committee = info
1895 .current_committee()?
1896 .validators
1897 .values()
1898 .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
1899 .collect();
1900 if manager.should_propose(identity, round, seed, ¤t_committee) {
1901 return Ok(Either::Left(round));
1902 }
1903 if let Some(timeout) = info.round_timeout() {
1904 return Ok(Either::Right(timeout));
1905 }
1906 Err(Error::BlockProposalError(
1907 "Not a leader in the current round",
1908 ))
1909 }
1910
1911 #[cfg(with_testing)]
1913 #[instrument(level = "trace")]
1914 pub fn clear_pending_proposal(&self) {
1915 self.update_state(|state| state.clear_pending_proposal());
1916 }
1917
1918 #[instrument(level = "trace")]
1922 pub async fn rotate_key_pair(
1923 &self,
1924 public_key: AccountPublicKey,
1925 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1926 self.transfer_ownership(public_key.into()).await
1927 }
1928
1929 #[instrument(level = "trace")]
1931 pub async fn transfer_ownership(
1932 &self,
1933 new_owner: AccountOwner,
1934 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1935 self.execute_operation(SystemOperation::ChangeOwnership {
1936 super_owners: vec![new_owner],
1937 owners: Vec::new(),
1938 first_leader: None,
1939 multi_leader_rounds: 2,
1940 open_multi_leader_rounds: false,
1941 timeout_config: TimeoutConfig::default(),
1942 })
1943 .await
1944 }
1945
1946 #[instrument(level = "trace")]
1948 pub async fn share_ownership(
1949 &self,
1950 new_owner: AccountOwner,
1951 new_weight: u64,
1952 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1953 loop {
1954 let ownership = self.prepare_chain().await?.manager.ownership;
1955 ensure!(
1956 ownership.is_active(),
1957 ChainError::InactiveChain(self.chain_id)
1958 );
1959 let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
1960 owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
1961 owners.push((new_owner, new_weight));
1962 let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
1963 super_owners: Vec::new(),
1964 owners,
1965 first_leader: ownership.first_leader,
1966 multi_leader_rounds: ownership.multi_leader_rounds,
1967 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
1968 timeout_config: ownership.timeout_config,
1969 })];
1970 match self.execute_block(operations, vec![]).await? {
1971 ExecuteBlockOutcome::Executed(certificate) => {
1972 return Ok(ClientOutcome::Committed(certificate));
1973 }
1974 ExecuteBlockOutcome::Conflict(certificate) => {
1975 info!(
1976 height = %certificate.block().header.height,
1977 "Another block was committed; retrying."
1978 );
1979 }
1980 ExecuteBlockOutcome::WaitForTimeout(timeout) => {
1981 return Ok(ClientOutcome::WaitForTimeout(timeout));
1982 }
1983 };
1984 }
1985 }
1986
1987 #[instrument(level = "trace")]
1990 pub async fn change_ownership(
1991 &self,
1992 ownership: ChainOwnership,
1993 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1994 self.execute_operation(SystemOperation::ChangeOwnership {
1995 super_owners: ownership.super_owners.into_iter().collect(),
1996 owners: ownership.owners.into_iter().collect(),
1997 first_leader: ownership.first_leader,
1998 multi_leader_rounds: ownership.multi_leader_rounds,
1999 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2000 timeout_config: ownership.timeout_config.clone(),
2001 })
2002 .await
2003 }
2004
2005 #[instrument(level = "trace", skip(application_permissions))]
2007 pub async fn change_application_permissions(
2008 &self,
2009 application_permissions: ApplicationPermissions,
2010 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2011 self.execute_operation(SystemOperation::ChangeApplicationPermissions(
2012 application_permissions,
2013 ))
2014 .await
2015 }
2016
2017 #[instrument(level = "trace", skip(self))]
2019 pub async fn open_chain(
2020 &self,
2021 ownership: ChainOwnership,
2022 application_permissions: ApplicationPermissions,
2023 balance: Amount,
2024 ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2025 loop {
2026 let config = OpenChainConfig {
2027 ownership: ownership.clone(),
2028 balance,
2029 application_permissions: application_permissions.clone(),
2030 };
2031 let operation = Operation::system(SystemOperation::OpenChain(config));
2032 let certificate = match self.execute_block(vec![operation], vec![]).await? {
2033 ExecuteBlockOutcome::Executed(certificate) => certificate,
2034 ExecuteBlockOutcome::Conflict(_) => continue,
2035 ExecuteBlockOutcome::WaitForTimeout(timeout) => {
2036 return Ok(ClientOutcome::WaitForTimeout(timeout));
2037 }
2038 };
2039 let chain_blob = certificate
2041 .block()
2042 .body
2043 .blobs
2044 .last()
2045 .and_then(|blobs| blobs.last())
2046 .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2047 let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2048 self.client.track_chain(description.id());
2050 self.client
2051 .local_node
2052 .retry_pending_cross_chain_requests(self.chain_id)
2053 .await?;
2054 return Ok(ClientOutcome::Committed((description, certificate)));
2055 }
2056 }
2057
2058 #[instrument(level = "trace")]
2061 pub async fn close_chain(
2062 &self,
2063 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2064 match self.execute_operation(SystemOperation::CloseChain).await {
2065 Ok(outcome) => Ok(outcome.map(Some)),
2066 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2067 chain_error,
2068 )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2069 Ok(ClientOutcome::Committed(None)) }
2071 Err(error) => Err(error),
2072 }
2073 }
2074
2075 #[cfg(not(target_arch = "wasm32"))]
2077 #[instrument(level = "trace", skip(contract, service))]
2078 pub async fn publish_module(
2079 &self,
2080 contract: Bytecode,
2081 service: Bytecode,
2082 vm_runtime: VmRuntime,
2083 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2084 let (blobs, module_id) = super::create_bytecode_blobs(contract, service, vm_runtime).await;
2085 self.publish_module_blobs(blobs, module_id).await
2086 }
2087
2088 #[cfg(not(target_arch = "wasm32"))]
2090 #[instrument(level = "trace", skip(blobs, module_id))]
2091 pub async fn publish_module_blobs(
2092 &self,
2093 blobs: Vec<Blob>,
2094 module_id: ModuleId,
2095 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2096 self.execute_operations(
2097 vec![Operation::system(SystemOperation::PublishModule {
2098 module_id,
2099 })],
2100 blobs,
2101 )
2102 .await?
2103 .try_map(|certificate| Ok((module_id, certificate)))
2104 }
2105
2106 #[instrument(level = "trace", skip(bytes))]
2108 pub async fn publish_data_blobs(
2109 &self,
2110 bytes: Vec<Vec<u8>>,
2111 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2112 let blobs = bytes.into_iter().map(Blob::new_data);
2113 let publish_blob_operations = blobs
2114 .clone()
2115 .map(|blob| {
2116 Operation::system(SystemOperation::PublishDataBlob {
2117 blob_hash: blob.id().hash,
2118 })
2119 })
2120 .collect();
2121 self.execute_operations(publish_blob_operations, blobs.collect())
2122 .await
2123 }
2124
2125 #[instrument(level = "trace", skip(bytes))]
2127 pub async fn publish_data_blob(
2128 &self,
2129 bytes: Vec<u8>,
2130 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2131 self.publish_data_blobs(vec![bytes]).await
2132 }
2133
2134 #[instrument(
2136 level = "trace",
2137 skip(self, parameters, instantiation_argument, required_application_ids)
2138 )]
2139 pub async fn create_application<
2140 A: Abi,
2141 Parameters: Serialize,
2142 InstantiationArgument: Serialize,
2143 >(
2144 &self,
2145 module_id: ModuleId<A, Parameters, InstantiationArgument>,
2146 parameters: &Parameters,
2147 instantiation_argument: &InstantiationArgument,
2148 required_application_ids: Vec<ApplicationId>,
2149 ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2150 let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2151 let parameters = serde_json::to_vec(parameters)?;
2152 Ok(self
2153 .create_application_untyped(
2154 module_id.forget_abi(),
2155 parameters,
2156 instantiation_argument,
2157 required_application_ids,
2158 )
2159 .await?
2160 .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2161 }
2162
2163 #[instrument(
2165 level = "trace",
2166 skip(
2167 self,
2168 module_id,
2169 parameters,
2170 instantiation_argument,
2171 required_application_ids
2172 )
2173 )]
2174 pub async fn create_application_untyped(
2175 &self,
2176 module_id: ModuleId,
2177 parameters: Vec<u8>,
2178 instantiation_argument: Vec<u8>,
2179 required_application_ids: Vec<ApplicationId>,
2180 ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2181 self.execute_operation(SystemOperation::CreateApplication {
2182 module_id,
2183 parameters,
2184 instantiation_argument,
2185 required_application_ids,
2186 })
2187 .await?
2188 .try_map(|certificate| {
2189 let mut creation: Vec<_> = certificate
2191 .block()
2192 .created_blob_ids()
2193 .into_iter()
2194 .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2195 .collect();
2196 if creation.len() > 1 {
2197 return Err(Error::InternalError(
2198 "Unexpected number of application descriptions published",
2199 ));
2200 }
2201 let blob_id = creation.pop().ok_or(Error::InternalError(
2202 "ApplicationDescription blob not found.",
2203 ))?;
2204 let id = ApplicationId::new(blob_id.hash);
2205 Ok((id, certificate))
2206 })
2207 }
2208
2209 #[instrument(level = "trace", skip(committee))]
2211 pub async fn stage_new_committee(
2212 &self,
2213 committee: Committee,
2214 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2215 let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2216 let blob_hash = blob.id().hash;
2217 match self
2218 .execute_operations(
2219 vec![Operation::system(SystemOperation::Admin(
2220 AdminOperation::PublishCommitteeBlob { blob_hash },
2221 ))],
2222 vec![blob],
2223 )
2224 .await?
2225 {
2226 ClientOutcome::Committed(_) => {}
2227 outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
2228 }
2229 let epoch = self.chain_info().await?.epoch.try_add_one()?;
2230 self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2231 epoch,
2232 blob_hash,
2233 }))
2234 .await
2235 }
2236
2237 #[instrument(level = "trace")]
2243 pub async fn process_inbox(
2244 &self,
2245 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2246 self.prepare_chain().await?;
2247 self.process_inbox_without_prepare().await
2248 }
2249
2250 #[instrument(level = "trace")]
2256 pub async fn process_inbox_without_prepare(
2257 &self,
2258 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2259 #[cfg(with_metrics)]
2260 let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2261
2262 let mut certificates = Vec::new();
2263 loop {
2264 match self.execute_block(vec![], vec![]).await {
2268 Ok(ExecuteBlockOutcome::Executed(certificate))
2269 | Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
2270 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
2271 return Ok((certificates, Some(timeout)));
2272 }
2273 Err(Error::LocalNodeError(LocalNodeError::WorkerError(
2275 WorkerError::ChainError(chain_error),
2276 ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
2277 return Ok((certificates, None));
2278 }
2279 Err(error) => return Err(error),
2280 };
2281 }
2282 }
2283
2284 async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2287 let (mut min_epoch, mut next_epoch) = {
2288 let (epoch, committees) = self.epoch_and_committees().await?;
2289 let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
2290 (min_epoch, epoch.try_add_one()?)
2291 };
2292 let mut epoch_change_ops = Vec::new();
2293 while self
2294 .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2295 .await?
2296 {
2297 epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2298 next_epoch,
2299 )));
2300 next_epoch.try_add_assign_one()?;
2301 }
2302 while self
2303 .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
2304 .await?
2305 {
2306 epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
2307 min_epoch,
2308 )));
2309 min_epoch.try_add_assign_one()?;
2310 }
2311 Ok(epoch_change_ops)
2312 }
2313
2314 async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2317 let event_id = EventId {
2318 chain_id: self.client.admin_id,
2319 stream_id: StreamId::system(stream_name),
2320 index,
2321 };
2322 Ok(self
2323 .client
2324 .storage_client()
2325 .read_event(event_id)
2326 .await?
2327 .is_some())
2328 }
2329
2330 pub async fn events_from_index(
2332 &self,
2333 stream_id: StreamId,
2334 start_index: u32,
2335 ) -> Result<Vec<IndexAndEvent>, Error> {
2336 Ok(self
2337 .client
2338 .storage_client()
2339 .read_events_from_index(&self.chain_id, &stream_id, start_index)
2340 .await?)
2341 }
2342
2343 #[instrument(level = "trace")]
2348 pub async fn revoke_epochs(
2349 &self,
2350 revoked_epoch: Epoch,
2351 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2352 self.prepare_chain().await?;
2353 let (current_epoch, committees) = self.epoch_and_committees().await?;
2354 ensure!(
2355 revoked_epoch < current_epoch,
2356 Error::CannotRevokeCurrentEpoch(current_epoch)
2357 );
2358 ensure!(
2359 committees.contains_key(&revoked_epoch),
2360 Error::EpochAlreadyRevoked
2361 );
2362 let operations = committees
2363 .keys()
2364 .filter_map(|epoch| {
2365 if *epoch <= revoked_epoch {
2366 Some(Operation::system(SystemOperation::Admin(
2367 AdminOperation::RemoveCommittee { epoch: *epoch },
2368 )))
2369 } else {
2370 None
2371 }
2372 })
2373 .collect();
2374 self.execute_operations(operations, vec![]).await
2375 }
2376
2377 #[instrument(level = "trace")]
2381 pub async fn transfer_to_account_unsafe_unconfirmed(
2382 &self,
2383 owner: AccountOwner,
2384 amount: Amount,
2385 recipient: Account,
2386 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2387 self.execute_operation(SystemOperation::Transfer {
2388 owner,
2389 recipient,
2390 amount,
2391 })
2392 .await
2393 }
2394
2395 #[instrument(level = "trace", skip(hash))]
2396 pub async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, Error> {
2397 let block = self
2398 .client
2399 .storage_client()
2400 .read_confirmed_block(hash)
2401 .await?;
2402 block.ok_or(Error::MissingConfirmedBlock(hash))
2403 }
2404
2405 #[instrument(level = "trace", skip(hash))]
2406 pub async fn read_certificate(
2407 &self,
2408 hash: CryptoHash,
2409 ) -> Result<ConfirmedBlockCertificate, Error> {
2410 let certificate = self.client.storage_client().read_certificate(hash).await?;
2411 certificate.ok_or(Error::ReadCertificatesError(vec![hash]))
2412 }
2413
2414 #[instrument(level = "trace")]
2416 pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2417 self.client
2418 .local_node
2419 .retry_pending_cross_chain_requests(self.chain_id)
2420 .await?;
2421 Ok(())
2422 }
2423
2424 #[instrument(level = "trace", skip(local_node))]
2425 async fn local_chain_info(
2426 &self,
2427 chain_id: ChainId,
2428 local_node: &mut LocalNodeClient<Env::Storage>,
2429 ) -> Result<Option<Box<ChainInfo>>, Error> {
2430 match local_node.chain_info(chain_id).await {
2431 Ok(info) => {
2432 self.client.update_from_info(&info);
2434 Ok(Some(info))
2435 }
2436 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
2437 Err(err) => Err(err.into()),
2438 }
2439 }
2440
2441 #[instrument(level = "trace", skip(chain_id, local_node))]
2442 async fn local_next_block_height(
2443 &self,
2444 chain_id: ChainId,
2445 local_node: &mut LocalNodeClient<Env::Storage>,
2446 ) -> Result<BlockHeight, Error> {
2447 Ok(self
2448 .local_chain_info(chain_id, local_node)
2449 .await?
2450 .map_or(BlockHeight::ZERO, |info| info.next_block_height))
2451 }
2452
2453 #[instrument(level = "trace")]
2456 async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
2457 Ok(self
2458 .client
2459 .local_node
2460 .get_inbox_next_height(self.chain_id, origin)
2461 .await?)
2462 }
2463
2464 #[instrument(level = "trace", skip(remote_node, local_node, notification))]
2465 async fn process_notification(
2466 &self,
2467 remote_node: RemoteNode<Env::ValidatorNode>,
2468 mut local_node: LocalNodeClient<Env::Storage>,
2469 notification: Notification,
2470 listening_mode: &ListeningMode,
2471 ) -> Result<(), Error> {
2472 if !listening_mode.is_relevant(¬ification.reason) {
2473 debug!(
2474 chain_id = %self.chain_id,
2475 reason = ?notification.reason,
2476 "Ignoring notification due to listening mode"
2477 );
2478 return Ok(());
2479 }
2480 match notification.reason {
2481 Reason::NewIncomingBundle { origin, height } => {
2482 if self.local_next_height_to_receive(origin).await? > height {
2483 debug!(
2484 chain_id = %self.chain_id,
2485 "Accepting redundant notification for new message"
2486 );
2487 return Ok(());
2488 }
2489 self.client
2490 .download_sender_block_with_sending_ancestors(
2491 self.chain_id,
2492 origin,
2493 height,
2494 &remote_node,
2495 )
2496 .await?;
2497 if self.local_next_height_to_receive(origin).await? <= height {
2498 info!(
2499 chain_id = %self.chain_id,
2500 "NewIncomingBundle: Fail to synchronize new message after notification"
2501 );
2502 }
2503 }
2504 Reason::NewBlock { height, .. } => {
2505 let chain_id = notification.chain_id;
2506 if self
2507 .local_next_block_height(chain_id, &mut local_node)
2508 .await?
2509 > height
2510 {
2511 debug!(
2512 chain_id = %self.chain_id,
2513 "Accepting redundant notification for new block"
2514 );
2515 return Ok(());
2516 }
2517 self.client
2518 .synchronize_chain_state_from(&remote_node, chain_id)
2519 .await?;
2520 if self
2521 .local_next_block_height(chain_id, &mut local_node)
2522 .await?
2523 <= height
2524 {
2525 info!("NewBlock: Fail to synchronize new block after notification");
2526 }
2527 trace!(
2528 chain_id = %self.chain_id,
2529 %height,
2530 "NewBlock: processed notification",
2531 );
2532 }
2533 Reason::NewEvents { height, hash, .. } => {
2534 if self
2535 .local_next_block_height(notification.chain_id, &mut local_node)
2536 .await?
2537 > height
2538 {
2539 debug!(
2540 chain_id = %self.chain_id,
2541 "Accepting redundant notification for new block"
2542 );
2543 return Ok(());
2544 }
2545 trace!(
2546 chain_id = %self.chain_id,
2547 %height,
2548 "NewEvents: processing notification"
2549 );
2550 let mut certificates = remote_node.node.download_certificates(vec![hash]).await?;
2551 let certificate = certificates
2554 .pop()
2555 .expect("download_certificates should have returned one certificate");
2556 self.client
2557 .receive_sender_certificate(
2558 certificate,
2559 ReceiveCertificateMode::NeedsCheck,
2560 None,
2561 )
2562 .await?;
2563 }
2564 Reason::NewRound { height, round } => {
2565 let chain_id = notification.chain_id;
2566 if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
2567 if (info.next_block_height, info.manager.current_round) >= (height, round) {
2568 debug!(
2569 chain_id = %self.chain_id,
2570 "Accepting redundant notification for new round"
2571 );
2572 return Ok(());
2573 }
2574 }
2575 self.client
2576 .synchronize_chain_state_from(&remote_node, chain_id)
2577 .await?;
2578 let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
2579 error!(
2580 chain_id = %self.chain_id,
2581 "NewRound: Fail to read local chain info for {chain_id}"
2582 );
2583 return Ok(());
2584 };
2585 if (info.next_block_height, info.manager.current_round) < (height, round) {
2586 error!(
2587 chain_id = %self.chain_id,
2588 "NewRound: Fail to synchronize new block after notification"
2589 );
2590 }
2591 }
2592 Reason::BlockExecuted { .. } => {
2593 }
2595 }
2596 Ok(())
2597 }
2598
2599 pub fn is_tracked(&self) -> bool {
2601 self.client
2602 .tracked_chains
2603 .read()
2604 .unwrap()
2605 .contains(&self.chain_id)
2606 }
2607
2608 #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
2611 pub async fn listen(
2612 &self,
2613 listening_mode: ListeningMode,
2614 ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
2615 use future::FutureExt as _;
2616
2617 async fn await_while_polling<F: FusedFuture>(
2618 future: F,
2619 background_work: impl FusedStream<Item = ()>,
2620 ) -> F::Output {
2621 tokio::pin!(future);
2622 tokio::pin!(background_work);
2623 loop {
2624 futures::select! {
2625 _ = background_work.next() => (),
2626 result = future => return result,
2627 }
2628 }
2629 }
2630
2631 let mut senders = HashMap::new(); let notifications = self.subscribe()?;
2633 let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
2634
2635 let mut process_notifications = FuturesUnordered::new();
2642
2643 match self
2644 .update_notification_streams(&mut senders, &listening_mode)
2645 .await
2646 {
2647 Ok(handler) => process_notifications.push(handler),
2648 Err(error) => error!("Failed to update committee: {error}"),
2649 };
2650
2651 let this = self.clone();
2652 let update_streams = async move {
2653 let mut abortable_notifications = abortable_notifications.fuse();
2654
2655 while let Some(notification) =
2656 await_while_polling(abortable_notifications.next(), &mut process_notifications)
2657 .await
2658 {
2659 if let Reason::NewBlock { .. } = notification.reason {
2660 match Box::pin(await_while_polling(
2661 this.update_notification_streams(&mut senders, &listening_mode)
2662 .fuse(),
2663 &mut process_notifications,
2664 ))
2665 .await
2666 {
2667 Ok(handler) => process_notifications.push(handler),
2668 Err(error) => error!("Failed to update committee: {error}"),
2669 }
2670 }
2671 }
2672
2673 for abort in senders.into_values() {
2674 abort.abort();
2675 }
2676
2677 let () = process_notifications.collect().await;
2678 }
2679 .in_current_span();
2680
2681 Ok((update_streams, AbortOnDrop(abort), notifications))
2682 }
2683
2684 #[instrument(level = "trace", skip(senders))]
2685 async fn update_notification_streams(
2686 &self,
2687 senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
2688 listening_mode: &ListeningMode,
2689 ) -> Result<impl Future<Output = ()>, Error> {
2690 let (nodes, local_node) = {
2691 let committee = self.local_committee().await?;
2692 let nodes: HashMap<_, _> = self
2693 .client
2694 .validator_node_provider()
2695 .make_nodes(&committee)?
2696 .collect();
2697 (nodes, self.client.local_node.clone())
2698 };
2699 senders.retain(|validator, abort| {
2701 if !nodes.contains_key(validator) {
2702 abort.abort();
2703 }
2704 !abort.is_aborted()
2705 });
2706 let validator_tasks = FuturesUnordered::new();
2708 for (public_key, node) in nodes {
2709 let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
2710 continue;
2711 };
2712 let address = node.address();
2713 let this = self.clone();
2714 let stream = stream::once({
2715 let node = node.clone();
2716 async move {
2717 let stream = node.subscribe(vec![this.chain_id]).await?;
2718 let remote_node = RemoteNode { public_key, node };
2721 this.client
2722 .synchronize_chain_state_from(&remote_node, this.chain_id)
2723 .await?;
2724 Ok::<_, Error>(stream)
2725 }
2726 })
2727 .filter_map(move |result| {
2728 let address = address.clone();
2729 async move {
2730 if let Err(error) = &result {
2731 info!(?error, address, "could not connect to validator");
2732 } else {
2733 debug!(address, "connected to validator");
2734 }
2735 result.ok()
2736 }
2737 })
2738 .flatten();
2739 let (stream, abort) = stream::abortable(stream);
2740 let mut stream = Box::pin(stream);
2741 let this = self.clone();
2742 let local_node = local_node.clone();
2743 let remote_node = RemoteNode { public_key, node };
2744 let listening_mode_cloned = listening_mode.clone();
2745 validator_tasks.push(async move {
2746 while let Some(notification) = stream.next().await {
2747 if let Err(error) = this
2748 .process_notification(
2749 remote_node.clone(),
2750 local_node.clone(),
2751 notification.clone(),
2752 &listening_mode_cloned,
2753 )
2754 .await
2755 {
2756 tracing::info!(
2757 chain_id = %this.chain_id,
2758 address = remote_node.address(),
2759 ?notification,
2760 %error,
2761 "failed to process notification",
2762 );
2763 }
2764 }
2765 });
2766 entry.insert(abort);
2767 }
2768 Ok(validator_tasks.collect())
2769 }
2770
2771 #[instrument(level = "trace", skip(remote_node))]
2773 pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
2774 let validator_next_block_height = match remote_node
2775 .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
2776 .await
2777 {
2778 Ok(info) => info.info.next_block_height,
2779 Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
2780 Err(err) => return Err(err.into()),
2781 };
2782 let local_next_block_height = self.chain_info().await?.next_block_height;
2783
2784 if validator_next_block_height >= local_next_block_height {
2785 debug!("Validator is up-to-date with local state");
2786 return Ok(());
2787 }
2788
2789 let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
2790 .map(BlockHeight)
2791 .collect();
2792
2793 let missing_certificate_hashes = self
2794 .client
2795 .local_node
2796 .get_block_hashes(self.chain_id, heights)
2797 .await?;
2798
2799 let certificates = self
2800 .client
2801 .storage_client()
2802 .read_certificates(missing_certificate_hashes.clone())
2803 .await?;
2804 let certificates =
2805 match ResultReadCertificates::new(certificates, missing_certificate_hashes) {
2806 ResultReadCertificates::Certificates(certificates) => certificates,
2807 ResultReadCertificates::InvalidHashes(hashes) => {
2808 return Err(Error::ReadCertificatesError(hashes))
2809 }
2810 };
2811 for certificate in certificates {
2812 match remote_node
2813 .handle_confirmed_certificate(
2814 certificate.clone(),
2815 CrossChainMessageDelivery::NonBlocking,
2816 )
2817 .await
2818 {
2819 Ok(_) => (),
2820 Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
2821 let missing_blobs: Vec<_> = self
2823 .client
2824 .storage_client()
2825 .read_blobs(&missing_blob_ids)
2826 .await?
2827 .into_iter()
2828 .flatten()
2829 .collect();
2830 remote_node.upload_blobs(missing_blobs).await?;
2831 remote_node
2832 .handle_confirmed_certificate(
2833 certificate,
2834 CrossChainMessageDelivery::NonBlocking,
2835 )
2836 .await?;
2837 }
2838 Err(err) => return Err(err.into()),
2839 }
2840 }
2841
2842 Ok(())
2843 }
2844}
2845
2846#[cfg(with_testing)]
2847impl<Env: Environment> ChainClient<Env> {
2848 pub async fn process_notification_from(
2849 &self,
2850 notification: Notification,
2851 validator: (ValidatorPublicKey, &str),
2852 ) {
2853 let mut node_list = self
2854 .client
2855 .validator_node_provider()
2856 .make_nodes_from_list(vec![validator])
2857 .unwrap();
2858 let (public_key, node) = node_list.next().unwrap();
2859 let remote_node = RemoteNode { node, public_key };
2860 let local_node = self.client.local_node.clone();
2861 self.process_notification(
2862 remote_node,
2863 local_node,
2864 notification,
2865 &ListeningMode::FullChain,
2866 )
2867 .await
2868 .unwrap();
2869 }
2870}