1mod state;
5use std::{
6 collections::{hash_map, BTreeMap, BTreeSet, HashMap},
7 convert::Infallible,
8 iter,
9 sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14 future::{self, Either, FusedFuture, Future, FutureExt},
15 select,
16 stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
17};
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{
21 abi::Abi,
22 crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
23 data_types::{
24 Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
25 ChainDescription, Epoch, Round, Timestamp,
26 },
27 ensure,
28 identifiers::{
29 Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
30 ModuleId, StreamId,
31 },
32 ownership::{ChainOwnership, TimeoutConfig},
33 time::{Duration, Instant},
34};
35#[cfg(not(target_arch = "wasm32"))]
36use linera_base::{data_types::Bytecode, vm::VmRuntime};
37use linera_chain::{
38 data_types::{BlockProposal, ChainAndHeight, IncomingBundle, ProposedBlock, Transaction},
39 manager::LockingBlock,
40 types::{
41 Block, ConfirmedBlock, ConfirmedBlockCertificate, Timeout, TimeoutCertificate,
42 ValidatedBlock,
43 },
44 ChainError, ChainExecutionContext, ChainStateView,
45};
46use linera_execution::{
47 committee::Committee,
48 system::{
49 AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
50 REMOVED_EPOCH_STREAM_NAME,
51 },
52 ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
53};
54use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
55use linera_views::ViewError;
56use rand::seq::SliceRandom;
57use serde::Serialize;
58pub use state::State;
59use thiserror::Error;
60use tokio::sync::{mpsc, OwnedRwLockReadGuard};
61use tokio_stream::wrappers::UnboundedReceiverStream;
62use tokio_util::sync::CancellationToken;
63use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
64
65use super::{
66 received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
67 ExecuteBlockOutcome, ListeningMode, MessagePolicy, PendingProposal, ReceiveCertificateMode,
68 TimingType,
69};
70use crate::{
71 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
72 environment::Environment,
73 local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
74 node::{
75 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
76 ValidatorNodeProvider as _,
77 },
78 remote_node::RemoteNode,
79 updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
80 worker::{Notification, Reason, WorkerError},
81};
82
83#[derive(Debug, Clone)]
84pub struct Options {
85 pub max_pending_message_bundles: usize,
87 pub message_policy: MessagePolicy,
89 pub cross_chain_message_delivery: CrossChainMessageDelivery,
91 pub grace_period: f64,
94 pub blob_download_timeout: Duration,
96 pub certificate_batch_download_timeout: Duration,
98 pub certificate_download_batch_size: u64,
101 pub sender_certificate_download_batch_size: usize,
104 pub max_joined_tasks: usize,
106}
107
108#[cfg(with_testing)]
109impl Options {
110 pub fn test_default() -> Self {
111 use super::{
112 DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
113 };
114 use crate::DEFAULT_GRACE_PERIOD;
115
116 Options {
117 max_pending_message_bundles: 10,
118 message_policy: MessagePolicy::new_accept_all(),
119 cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
120 grace_period: DEFAULT_GRACE_PERIOD,
121 blob_download_timeout: Duration::from_secs(1),
122 certificate_batch_download_timeout: Duration::from_secs(1),
123 certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
124 sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
125 max_joined_tasks: 100,
126 }
127 }
128}
129
130#[derive(Debug)]
136pub struct ChainClient<Env: Environment> {
137 #[debug(skip)]
139 pub(crate) client: Arc<Client<Env>>,
140 chain_id: ChainId,
142 #[debug(skip)]
144 options: Options,
145 preferred_owner: Option<AccountOwner>,
148 initial_next_block_height: BlockHeight,
150 initial_block_hash: Option<CryptoHash>,
152 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
154}
155
156impl<Env: Environment> Clone for ChainClient<Env> {
157 fn clone(&self) -> Self {
158 Self {
159 client: self.client.clone(),
160 chain_id: self.chain_id,
161 options: self.options.clone(),
162 preferred_owner: self.preferred_owner,
163 initial_next_block_height: self.initial_next_block_height,
164 initial_block_hash: self.initial_block_hash,
165 timing_sender: self.timing_sender.clone(),
166 }
167 }
168}
169
170#[derive(Debug, Error)]
172pub enum Error {
173 #[error("Local node operation failed: {0}")]
174 LocalNodeError(#[from] LocalNodeError),
175
176 #[error("Remote node operation failed: {0}")]
177 RemoteNodeError(#[from] NodeError),
178
179 #[error(transparent)]
180 ArithmeticError(#[from] ArithmeticError),
181
182 #[error("Missing certificates: {0:?}")]
183 ReadCertificatesError(Vec<CryptoHash>),
184
185 #[error("Missing confirmed block: {0:?}")]
186 MissingConfirmedBlock(CryptoHash),
187
188 #[error("JSON (de)serialization error: {0}")]
189 JsonError(#[from] serde_json::Error),
190
191 #[error("Chain operation failed: {0}")]
192 ChainError(#[from] ChainError),
193
194 #[error(transparent)]
195 CommunicationError(#[from] CommunicationError<NodeError>),
196
197 #[error("Internal error within chain client: {0}")]
198 InternalError(&'static str),
199
200 #[error(
201 "Cannot accept a certificate from an unknown committee in the future. \
202 Please synchronize the local view of the admin chain"
203 )]
204 CommitteeSynchronizationError,
205
206 #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
207 WalletSynchronizationError,
208
209 #[error("The state of the client is incompatible with the proposed block: {0}")]
210 BlockProposalError(&'static str),
211
212 #[error(
213 "Cannot accept a certificate from a committee that was retired. \
214 Try a newer certificate from the same origin"
215 )]
216 CommitteeDeprecationError,
217
218 #[error("Protocol error within chain client: {0}")]
219 ProtocolError(&'static str),
220
221 #[error("Signer doesn't have key to sign for chain {0}")]
222 CannotFindKeyForChain(ChainId),
223
224 #[error("client is not configured to propose on chain {0}")]
225 NoAccountKeyConfigured(ChainId),
226
227 #[error("The chain client isn't owner on chain {0}")]
228 NotAnOwner(ChainId),
229
230 #[error(transparent)]
231 ViewError(#[from] ViewError),
232
233 #[error(
234 "Failed to download certificates and update local node to the next height \
235 {target_next_block_height} of chain {chain_id}"
236 )]
237 CannotDownloadCertificates {
238 chain_id: ChainId,
239 target_next_block_height: BlockHeight,
240 },
241
242 #[error(transparent)]
243 BcsError(#[from] bcs::Error),
244
245 #[error(
246 "Unexpected quorum: validators voted for block {hash} in {round}, \
247 expected block {expected_hash} in {expected_round}"
248 )]
249 UnexpectedQuorum {
250 hash: CryptoHash,
251 round: Round,
252 expected_hash: CryptoHash,
253 expected_round: Round,
254 },
255
256 #[error("signer error: {0:?}")]
257 Signer(#[source] Box<dyn signer::Error>),
258
259 #[error("Cannot revoke the current epoch {0}")]
260 CannotRevokeCurrentEpoch(Epoch),
261
262 #[error("Epoch is already revoked")]
263 EpochAlreadyRevoked,
264
265 #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
266 CannotDownloadMissingSenderBlock {
267 chain_id: ChainId,
268 height: BlockHeight,
269 },
270}
271
272impl From<Infallible> for Error {
273 fn from(infallible: Infallible) -> Self {
274 match infallible {}
275 }
276}
277
278impl Error {
279 pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
280 Self::Signer(Box::new(err))
281 }
282}
283
284impl<Env: Environment> ChainClient<Env> {
285 pub fn new(
286 client: Arc<Client<Env>>,
287 chain_id: ChainId,
288 options: Options,
289 initial_block_hash: Option<CryptoHash>,
290 initial_next_block_height: BlockHeight,
291 preferred_owner: Option<AccountOwner>,
292 timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
293 ) -> Self {
294 ChainClient {
295 client,
296 chain_id,
297 options,
298 preferred_owner,
299 initial_block_hash,
300 initial_next_block_height,
301 timing_sender,
302 }
303 }
304
305 #[instrument(level = "trace", skip(self))]
307 fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
308 self.client
309 .chains
310 .pin()
311 .get(&self.chain_id)
312 .expect("Chain client constructed for invalid chain")
313 .client_mutex()
314 }
315
316 #[instrument(level = "trace", skip(self))]
318 pub fn pending_proposal(&self) -> Option<PendingProposal> {
319 self.client
320 .chains
321 .pin()
322 .get(&self.chain_id)
323 .expect("Chain client constructed for invalid chain")
324 .pending_proposal()
325 .clone()
326 }
327
328 #[instrument(level = "trace", skip(self, f))]
330 fn update_state<F>(&self, f: F)
331 where
332 F: Fn(&mut State),
333 {
334 let chains = self.client.chains.pin();
335 chains
336 .update(self.chain_id, |state| {
337 let mut state = state.clone_for_update_unchecked();
338 f(&mut state);
339 state
340 })
341 .expect("Chain client constructed for invalid chain");
342 }
343
344 #[instrument(level = "trace", skip(self))]
346 pub fn signer(&self) -> &impl Signer {
347 self.client.signer()
348 }
349
350 #[instrument(level = "trace", skip(self))]
352 pub fn options_mut(&mut self) -> &mut Options {
353 &mut self.options
354 }
355
356 #[instrument(level = "trace", skip(self))]
358 pub fn options(&self) -> &Options {
359 &self.options
360 }
361
362 #[instrument(level = "trace", skip(self))]
364 pub fn chain_id(&self) -> ChainId {
365 self.chain_id
366 }
367
368 pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
370 self.timing_sender.clone()
371 }
372
373 #[instrument(level = "trace", skip(self))]
375 pub fn admin_id(&self) -> ChainId {
376 self.client.admin_id
377 }
378
379 #[instrument(level = "trace", skip(self))]
381 pub fn preferred_owner(&self) -> Option<AccountOwner> {
382 self.preferred_owner
383 }
384
385 #[instrument(level = "trace", skip(self))]
387 pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
388 self.preferred_owner = Some(preferred_owner);
389 }
390
391 #[instrument(level = "trace", skip(self))]
393 pub fn unset_preferred_owner(&mut self) {
394 self.preferred_owner = None;
395 }
396
397 #[instrument(level = "trace")]
399 pub async fn chain_state_view(
400 &self,
401 ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
402 self.client.local_node.chain_state_view(self.chain_id).await
403 }
404
405 #[instrument(level = "trace", skip(self))]
407 pub async fn event_stream_publishers(
408 &self,
409 ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
410 let mut publishers = self
411 .chain_state_view()
412 .await?
413 .execution_state
414 .system
415 .event_subscriptions
416 .indices()
417 .await?
418 .into_iter()
419 .fold(
420 BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
421 |mut map, (chain_id, stream_id)| {
422 map.entry(chain_id).or_default().insert(stream_id);
423 map
424 },
425 );
426 if self.chain_id != self.client.admin_id {
427 publishers.insert(
428 self.client.admin_id,
429 vec![
430 StreamId::system(EPOCH_STREAM_NAME),
431 StreamId::system(REMOVED_EPOCH_STREAM_NAME),
432 ]
433 .into_iter()
434 .collect(),
435 );
436 }
437 Ok(publishers)
438 }
439
440 #[instrument(level = "trace")]
442 pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
443 self.subscribe_to(self.chain_id)
444 }
445
446 #[instrument(level = "trace")]
448 pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
449 Ok(Box::pin(UnboundedReceiverStream::new(
450 self.client.notifier.subscribe(vec![chain_id]),
451 )))
452 }
453
454 #[instrument(level = "trace")]
456 pub fn storage_client(&self) -> &Env::Storage {
457 self.client.storage_client()
458 }
459
460 #[instrument(level = "trace")]
462 pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
463 let query = ChainInfoQuery::new(self.chain_id);
464 let response = self
465 .client
466 .local_node
467 .handle_chain_info_query(query)
468 .await?;
469 self.client.update_from_info(&response.info);
470 Ok(response.info)
471 }
472
473 #[instrument(level = "trace")]
475 pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
476 let query = ChainInfoQuery::new(self.chain_id)
477 .with_manager_values()
478 .with_committees();
479 let response = self
480 .client
481 .local_node
482 .handle_chain_info_query(query)
483 .await?;
484 self.client.update_from_info(&response.info);
485 Ok(response.info)
486 }
487
488 pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
490 self.client.get_chain_description(self.chain_id).await
491 }
492
493 #[instrument(level = "trace")]
496 async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
497 if self.options.message_policy.is_ignore() {
498 return Ok(Vec::new());
500 }
501
502 let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
503 let info = self
504 .client
505 .local_node
506 .handle_chain_info_query(query)
507 .await?
508 .info;
509 if self.preferred_owner.is_some_and(|owner| {
510 info.manager
511 .ownership
512 .is_super_owner_no_regular_owners(&owner)
513 }) {
514 ensure!(
516 info.next_block_height >= self.initial_next_block_height,
517 Error::WalletSynchronizationError
518 );
519 }
520
521 Ok(info
522 .requested_pending_message_bundles
523 .into_iter()
524 .filter_map(|mut bundle| {
525 self.options
526 .message_policy
527 .must_handle(&mut bundle)
528 .then_some(bundle)
529 })
530 .take(self.options.max_pending_message_bundles)
531 .collect())
532 }
533
534 #[instrument(level = "trace")]
538 async fn collect_stream_updates(&self) -> Result<Option<Operation>, Error> {
539 let subscription_map = self
541 .chain_state_view()
542 .await?
543 .execution_state
544 .system
545 .event_subscriptions
546 .index_values()
547 .await?;
548 let futures = subscription_map
550 .into_iter()
551 .map(|((chain_id, stream_id), subscriptions)| {
552 let client = self.client.clone();
553 async move {
554 let chain = client.local_node.chain_state_view(chain_id).await?;
555 if let Some(next_expected_index) = chain
556 .next_expected_events
557 .get(&stream_id)
558 .await?
559 .filter(|next_index| *next_index > subscriptions.next_index)
560 {
561 Ok(Some((chain_id, stream_id, next_expected_index)))
562 } else {
563 Ok::<_, Error>(None)
564 }
565 }
566 });
567 let updates = futures::stream::iter(futures)
568 .buffer_unordered(self.options.max_joined_tasks)
569 .try_collect::<Vec<_>>()
570 .await?
571 .into_iter()
572 .flatten()
573 .collect::<Vec<_>>();
574 if updates.is_empty() {
575 return Ok(None);
576 }
577 Ok(Some(SystemOperation::UpdateStreams(updates).into()))
578 }
579
580 #[instrument(level = "trace")]
581 async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
582 self.client.chain_info_with_committees(self.chain_id).await
583 }
584
585 #[instrument(level = "trace")]
587 async fn epoch_and_committees(
588 &self,
589 ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
590 let info = self.chain_info_with_committees().await?;
591 let epoch = info.epoch;
592 let committees = info.into_committees()?;
593 Ok((epoch, committees))
594 }
595
596 #[instrument(level = "trace")]
598 pub async fn local_committee(&self) -> Result<Committee, Error> {
599 let info = match self.chain_info_with_committees().await {
600 Ok(info) => info,
601 Err(LocalNodeError::BlobsNotFound(_)) => {
602 self.synchronize_chain_state(self.chain_id).await?;
603 self.chain_info_with_committees().await?
604 }
605 Err(err) => return Err(err.into()),
606 };
607 Ok(info.into_current_committee()?)
608 }
609
610 #[instrument(level = "trace")]
612 pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
613 self.client.admin_committee().await
614 }
615
616 #[instrument(level = "trace")]
620 pub async fn identity(&self) -> Result<AccountOwner, Error> {
621 let Some(preferred_owner) = self.preferred_owner else {
622 return Err(Error::NoAccountKeyConfigured(self.chain_id));
623 };
624 let manager = self.chain_info().await?.manager;
625 ensure!(
626 manager.ownership.is_active(),
627 LocalNodeError::InactiveChain(self.chain_id)
628 );
629
630 let is_owner = manager
631 .ownership
632 .all_owners()
633 .chain(&manager.leader)
634 .any(|owner| *owner == preferred_owner);
635
636 if !is_owner {
637 let accepted_owners = manager
638 .ownership
639 .all_owners()
640 .chain(&manager.leader)
641 .collect::<Vec<_>>();
642 warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
643 "Chain has multiple owners configured but none is preferred owner",
644 );
645 return Err(Error::NotAnOwner(self.chain_id));
646 }
647
648 let has_signer = self
649 .signer()
650 .contains_key(&preferred_owner)
651 .await
652 .map_err(Error::signer_failure)?;
653
654 if !has_signer {
655 warn!(%self.chain_id, ?preferred_owner,
656 "Chain is one of the owners but its Signer instance doesn't contain the key",
657 );
658 return Err(Error::CannotFindKeyForChain(self.chain_id));
659 }
660
661 Ok(preferred_owner)
662 }
663
664 #[instrument(level = "trace")]
667 pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
668 #[cfg(with_metrics)]
669 let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
670
671 let mut info = self.synchronize_to_known_height().await?;
672
673 if self.preferred_owner.is_none_or(|owner| {
674 !info
675 .manager
676 .ownership
677 .is_super_owner_no_regular_owners(&owner)
678 }) {
679 info = self.client.synchronize_chain_state(self.chain_id).await?;
683 }
684
685 if info.epoch > self.client.admin_committees().await?.0 {
686 self.client
687 .synchronize_chain_state(self.client.admin_id)
688 .await?;
689 }
690
691 self.client.update_from_info(&info);
692 Ok(info)
693 }
694
695 async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
700 let info = self
701 .client
702 .download_certificates(self.chain_id, self.initial_next_block_height)
703 .await?;
704 if info.next_block_height == self.initial_next_block_height {
705 ensure!(
707 self.initial_block_hash == info.block_hash,
708 Error::InternalError("Invalid chain of blocks in local node")
709 );
710 }
711 Ok(info)
712 }
713
714 #[instrument(level = "trace", skip(old_committee))]
716 pub async fn update_validators(&self, old_committee: Option<&Committee>) -> Result<(), Error> {
717 let update_validators_start = linera_base::time::Instant::now();
718 if let Some(old_committee) = old_committee {
720 self.communicate_chain_updates(old_committee).await?
721 };
722 if let Ok(new_committee) = self.local_committee().await {
723 if Some(&new_committee) != old_committee {
724 self.communicate_chain_updates(&new_committee).await?;
727 }
728 }
729 self.send_timing(update_validators_start, TimingType::UpdateValidators);
730 Ok(())
731 }
732
733 #[instrument(level = "trace", skip(committee))]
735 pub async fn communicate_chain_updates(&self, committee: &Committee) -> Result<(), Error> {
736 let delivery = self.options.cross_chain_message_delivery;
737 let height = self.chain_info().await?.next_block_height;
738 self.client
739 .communicate_chain_updates(committee, self.chain_id, height, delivery)
740 .await
741 }
742
743 async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
746 let chain_ids = self
747 .chain_state_view()
748 .await?
749 .execution_state
750 .system
751 .event_subscriptions
752 .indices()
753 .await?
754 .iter()
755 .map(|(chain_id, _)| *chain_id)
756 .chain(iter::once(self.client.admin_id))
757 .filter(|chain_id| *chain_id != self.chain_id)
758 .collect::<BTreeSet<_>>();
759 stream::iter(
760 chain_ids
761 .into_iter()
762 .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
763 )
764 .buffer_unordered(self.options.max_joined_tasks)
765 .collect::<Vec<_>>()
766 .await
767 .into_iter()
768 .collect::<Result<Vec<_>, _>>()?;
769 Ok(())
770 }
771
772 #[instrument(level = "trace")]
781 pub async fn find_received_certificates(
782 &self,
783 cancellation_token: Option<CancellationToken>,
784 ) -> Result<(), Error> {
785 debug!(chain_id = %self.chain_id, "starting find_received_certificates");
786 #[cfg(with_metrics)]
787 let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
788 let chain_id = self.chain_id;
790 let (_, committee) = self.admin_committee().await?;
791 let nodes = self.client.make_nodes(&committee)?;
792
793 let trackers = self
794 .client
795 .local_node
796 .chain_state_view(chain_id)
797 .await?
798 .received_certificate_trackers
799 .get()
800 .clone();
801
802 trace!("find_received_certificates: read trackers");
803
804 let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
805 let result = communicate_with_quorum(
807 &nodes,
808 &committee,
809 |_| (),
810 |remote_node| {
811 let client = &self.client;
812 let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
813 let received_log_batches = Arc::clone(&received_log_batches);
814 Box::pin(async move {
815 let batch = client
816 .get_received_log_from_validator(chain_id, &remote_node, tracker)
817 .await?;
818 let mut batches = received_log_batches.lock().unwrap();
819 batches.push((remote_node.public_key, batch));
820 Ok(())
821 })
822 },
823 self.options.grace_period,
824 )
825 .await;
826
827 if let Err(error) = result {
828 error!(
829 %error,
830 "Failed to synchronize received_logs from at least a quorum of validators",
831 );
832 }
833
834 let received_logs: Vec<_> = {
835 let mut received_log_batches = received_log_batches.lock().unwrap();
836 std::mem::take(received_log_batches.as_mut())
837 };
838
839 debug!(
840 received_logs_len = %received_logs.len(),
841 received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
842 "collected received logs"
843 );
844
845 let (received_logs, mut validator_trackers) = {
846 (
847 ReceivedLogs::from_received_result(received_logs.clone()),
848 ValidatorTrackers::new(received_logs, &trackers),
849 )
850 };
851
852 debug!(
853 num_chains = %received_logs.num_chains(),
854 num_certs = %received_logs.num_certs(),
855 "find_received_certificates: total number of chains and certificates to sync",
856 );
857
858 let max_blocks_per_chain =
859 self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
860 for received_log in received_logs.into_batches(
861 self.options.sender_certificate_download_batch_size,
862 max_blocks_per_chain,
863 ) {
864 validator_trackers = self
865 .receive_sender_certificates(
866 received_log,
867 validator_trackers,
868 &nodes,
869 cancellation_token.clone(),
870 )
871 .await?;
872
873 self.update_received_certificate_trackers(&validator_trackers)
874 .await;
875 }
876
877 info!("find_received_certificates finished");
878
879 Ok(())
880 }
881
882 async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
883 let updated_trackers = trackers.to_map();
884 trace!(?updated_trackers, "updated tracker values");
885
886 if let Err(error) = self
888 .client
889 .local_node
890 .update_received_certificate_trackers(self.chain_id, updated_trackers)
891 .await
892 {
893 error!(
894 chain_id = %self.chain_id,
895 %error,
896 "Failed to update the certificate trackers for chain",
897 );
898 }
899 }
900
901 async fn receive_sender_certificates(
904 &self,
905 mut received_logs: ReceivedLogs,
906 mut validator_trackers: ValidatorTrackers,
907 nodes: &[RemoteNode<Env::ValidatorNode>],
908 cancellation_token: Option<CancellationToken>,
909 ) -> Result<ValidatorTrackers, Error> {
910 debug!(
911 num_chains = %received_logs.num_chains(),
912 num_certs = %received_logs.num_certs(),
913 "receive_sender_certificates: number of chains and certificates to sync",
914 );
915
916 let local_next_heights = self
918 .client
919 .local_node
920 .next_outbox_heights(received_logs.chains(), self.chain_id)
921 .await?;
922
923 validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
924
925 debug!(
926 remaining_total_certificates = %received_logs.num_certs(),
927 "receive_sender_certificates: computed remote_heights"
928 );
929
930 let mut other_sender_chains = Vec::new();
931 let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
932
933 let cert_futures = received_logs.heights_per_chain().into_iter().filter_map(
934 |(sender_chain_id, remote_heights)| {
935 if remote_heights.is_empty() {
936 other_sender_chains.push(sender_chain_id);
940 return None;
941 };
942 let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
943 let sender = sender.clone();
944 let client = self.client.clone();
945 let mut nodes = nodes.to_vec();
946 nodes.shuffle(&mut rand::thread_rng());
947 let received_logs_ref = &received_logs;
948 Some(async move {
949 client
950 .download_and_process_sender_chain(
951 sender_chain_id,
952 &nodes,
953 received_logs_ref,
954 remote_heights,
955 sender,
956 )
957 .await
958 })
959 },
960 );
961
962 let update_trackers = linera_base::task::spawn(async move {
963 while let Some(chain_and_height) = receiver.recv().await {
964 validator_trackers.downloaded_cert(chain_and_height);
965 }
966 validator_trackers
967 });
968
969 let mut cancellation_future = Box::pin(
970 async move {
971 if let Some(token) = cancellation_token {
972 token.cancelled().await
973 } else {
974 future::pending().await
975 }
976 }
977 .fuse(),
978 );
979
980 select! {
981 _ = stream::iter(cert_futures)
982 .buffer_unordered(self.options.max_joined_tasks)
983 .for_each(future::ready)
984 => (),
985 _ = cancellation_future => ()
986 };
987
988 drop(sender);
989
990 let validator_trackers = update_trackers
991 .await
992 .map_err(|_| Error::InternalError("could not join the update_trackers task"))?;
993
994 debug!(
995 num_other_chains = %other_sender_chains.len(),
996 "receive_sender_certificates: processing certificates finished"
997 );
998
999 self.retry_pending_cross_chain_requests(nodes, other_sender_chains)
1003 .await;
1004
1005 debug!("receive_sender_certificates: finished processing other_sender_chains");
1006
1007 Ok(validator_trackers)
1008 }
1009
1010 async fn retry_pending_cross_chain_requests(
1013 &self,
1014 nodes: &[RemoteNode<Env::ValidatorNode>],
1015 other_sender_chains: Vec<ChainId>,
1016 ) {
1017 let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
1018 let local_node = self.client.local_node.clone();
1019 async move {
1020 if let Err(error) = match local_node
1021 .retry_pending_cross_chain_requests(chain_id)
1022 .await
1023 {
1024 Ok(()) => Ok(()),
1025 Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1026 if let Err(error) = self
1027 .client
1028 .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1029 .await
1030 {
1031 error!(
1032 ?blob_ids,
1033 %error,
1034 "Error while attempting to download blobs during retrying outgoing \
1035 messages"
1036 );
1037 }
1038 local_node
1039 .retry_pending_cross_chain_requests(chain_id)
1040 .await
1041 }
1042 err => err,
1043 } {
1044 error!(
1045 %chain_id,
1046 %error,
1047 "Failed to retry outgoing messages from chain"
1048 );
1049 }
1050 }
1051 }));
1052 stream.for_each(future::ready).await;
1053 }
1054
1055 #[instrument(level = "trace")]
1057 pub async fn transfer(
1058 &self,
1059 owner: AccountOwner,
1060 amount: Amount,
1061 recipient: Account,
1062 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1063 self.execute_operation(SystemOperation::Transfer {
1065 owner,
1066 recipient,
1067 amount,
1068 })
1069 .await
1070 }
1071
1072 #[instrument(level = "trace")]
1075 pub async fn read_data_blob(
1076 &self,
1077 hash: CryptoHash,
1078 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1079 let blob_id = BlobId {
1080 hash,
1081 blob_type: BlobType::Data,
1082 };
1083 self.execute_operation(SystemOperation::VerifyBlob { blob_id })
1084 .await
1085 }
1086
1087 #[instrument(level = "trace")]
1089 pub async fn claim(
1090 &self,
1091 owner: AccountOwner,
1092 target_id: ChainId,
1093 recipient: Account,
1094 amount: Amount,
1095 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1096 self.execute_operation(SystemOperation::Claim {
1097 owner,
1098 target_id,
1099 recipient,
1100 amount,
1101 })
1102 .await
1103 }
1104
1105 #[instrument(level = "trace")]
1108 pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1109 let chain_id = self.chain_id;
1110 let info = self.chain_info_with_committees().await?;
1111 let committee = info.current_committee()?;
1112 let height = info.next_block_height;
1113 let round = info.manager.current_round;
1114 let action = CommunicateAction::RequestTimeout {
1115 height,
1116 round,
1117 chain_id,
1118 };
1119 let value = Timeout::new(chain_id, height, info.epoch);
1120 let certificate = Box::new(
1121 self.client
1122 .communicate_chain_action(committee, action, value)
1123 .await?,
1124 );
1125 self.client.process_certificate(certificate.clone()).await?;
1126 self.client
1128 .communicate_chain_updates(
1129 committee,
1130 chain_id,
1131 height,
1132 CrossChainMessageDelivery::NonBlocking,
1133 )
1134 .await?;
1135 Ok(*certificate)
1136 }
1137
1138 #[instrument(level = "trace", skip_all)]
1140 pub async fn synchronize_chain_state(
1141 &self,
1142 chain_id: ChainId,
1143 ) -> Result<Box<ChainInfo>, Error> {
1144 self.client.synchronize_chain_state(chain_id).await
1145 }
1146
1147 #[instrument(level = "trace", skip_all)]
1150 pub async fn synchronize_chain_state_from_committee(
1151 &self,
1152 committee: Committee,
1153 ) -> Result<Box<ChainInfo>, Error> {
1154 self.client
1155 .synchronize_chain_state_from_committee(self.chain_id, committee)
1156 .await
1157 }
1158
1159 #[instrument(level = "trace", skip(operations, blobs))]
1161 pub async fn execute_operations(
1162 &self,
1163 operations: Vec<Operation>,
1164 blobs: Vec<Blob>,
1165 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1166 let timing_start = linera_base::time::Instant::now();
1167
1168 let result = loop {
1169 let execute_block_start = linera_base::time::Instant::now();
1170 match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1172 Ok(ExecuteBlockOutcome::Executed(certificate)) => {
1173 self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1174 break Ok(ClientOutcome::Committed(certificate));
1175 }
1176 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
1177 break Ok(ClientOutcome::WaitForTimeout(timeout));
1178 }
1179 Ok(ExecuteBlockOutcome::Conflict(certificate)) => {
1180 info!(
1181 height = %certificate.block().header.height,
1182 "Another block was committed; retrying."
1183 );
1184 }
1185 Err(Error::CommunicationError(CommunicationError::Trusted(
1186 NodeError::UnexpectedBlockHeight {
1187 expected_block_height,
1188 found_block_height,
1189 },
1190 ))) if expected_block_height > found_block_height => {
1191 tracing::info!(
1192 "Local state is outdated; synchronizing chain {:.8}",
1193 self.chain_id
1194 );
1195 self.synchronize_chain_state(self.chain_id).await?;
1196 }
1197 Err(err) => return Err(err),
1198 };
1199 };
1200
1201 self.send_timing(timing_start, TimingType::ExecuteOperations);
1202
1203 result
1204 }
1205
1206 pub async fn execute_operation(
1208 &self,
1209 operation: impl Into<Operation>,
1210 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1211 self.execute_operations(vec![operation.into()], vec![])
1212 .await
1213 }
1214
1215 #[instrument(level = "trace", skip(operations, blobs))]
1219 async fn execute_block(
1220 &self,
1221 operations: Vec<Operation>,
1222 blobs: Vec<Blob>,
1223 ) -> Result<ExecuteBlockOutcome, Error> {
1224 let transactions = self.prepend_epochs_messages_and_events(operations).await?;
1225
1226 if transactions.is_empty() {
1227 return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1228 WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1229 )));
1230 }
1231
1232 self.execute_prepared_transactions(transactions, blobs)
1233 .await
1234 }
1235
1236 #[instrument(level = "trace", skip(transactions, blobs))]
1237 async fn execute_prepared_transactions(
1238 &self,
1239 transactions: Vec<Transaction>,
1240 blobs: Vec<Blob>,
1241 ) -> Result<ExecuteBlockOutcome, Error> {
1242 #[cfg(with_metrics)]
1243 let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1244
1245 let mutex = self.client_mutex();
1246 let _guard = mutex.lock_owned().await;
1247 match self.process_pending_block_without_prepare().await? {
1249 ClientOutcome::Committed(Some(certificate)) => {
1250 return Ok(ExecuteBlockOutcome::Conflict(certificate))
1251 }
1252 ClientOutcome::WaitForTimeout(timeout) => {
1253 return Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
1254 }
1255 ClientOutcome::Committed(None) => {}
1256 }
1257
1258 let block = self.new_pending_block(transactions, blobs).await?;
1259
1260 match self.process_pending_block_without_prepare().await? {
1261 ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
1262 Ok(ExecuteBlockOutcome::Executed(certificate))
1263 }
1264 ClientOutcome::Committed(Some(certificate)) => {
1265 Ok(ExecuteBlockOutcome::Conflict(certificate))
1266 }
1267 ClientOutcome::Committed(None) => {
1269 Err(Error::BlockProposalError("Unexpected block proposal error"))
1270 }
1271 ClientOutcome::WaitForTimeout(timeout) => {
1272 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
1273 }
1274 }
1275 }
1276
1277 #[instrument(level = "trace", skip(operations))]
1283 async fn prepend_epochs_messages_and_events(
1284 &self,
1285 operations: Vec<Operation>,
1286 ) -> Result<Vec<Transaction>, Error> {
1287 let incoming_bundles = self.pending_message_bundles().await?;
1288 let stream_updates = self.collect_stream_updates().await?;
1289 Ok(self
1290 .collect_epoch_changes()
1291 .await?
1292 .into_iter()
1293 .map(Transaction::ExecuteOperation)
1294 .chain(
1295 incoming_bundles
1296 .into_iter()
1297 .map(Transaction::ReceiveMessages),
1298 )
1299 .chain(
1300 stream_updates
1301 .into_iter()
1302 .map(Transaction::ExecuteOperation),
1303 )
1304 .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1305 .collect::<Vec<_>>())
1306 }
1307
1308 #[instrument(level = "trace", skip(transactions, blobs))]
1312 async fn new_pending_block(
1313 &self,
1314 transactions: Vec<Transaction>,
1315 blobs: Vec<Blob>,
1316 ) -> Result<Block, Error> {
1317 let identity = self.identity().await?;
1318
1319 ensure!(
1320 self.pending_proposal().is_none(),
1321 Error::BlockProposalError(
1322 "Client state already has a pending block; \
1323 use the `linera retry-pending-block` command to commit that first"
1324 )
1325 );
1326 let info = self.chain_info_with_committees().await?;
1327 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1328 let proposed_block = ProposedBlock {
1329 epoch: info.epoch,
1330 chain_id: self.chain_id,
1331 transactions,
1332 previous_block_hash: info.block_hash,
1333 height: info.next_block_height,
1334 authenticated_owner: Some(identity),
1335 timestamp,
1336 };
1337
1338 let round = match self.round_for_new_proposal(&info, &identity, true).await? {
1343 Either::Left(round) => round.multi_leader(),
1344 Either::Right(_) => None,
1345 };
1346 let (block, _) = self
1349 .client
1350 .stage_block_execution_and_discard_failing_messages(
1351 proposed_block,
1352 round,
1353 blobs.clone(),
1354 )
1355 .await?;
1356 let (proposed_block, _) = block.clone().into_proposal();
1357 self.update_state(|state| {
1358 state.set_pending_proposal(proposed_block.clone(), blobs.clone())
1359 });
1360 Ok(block)
1361 }
1362
1363 #[instrument(level = "trace", skip(transactions))]
1368 fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1369 let local_time = self.storage_client().clock().current_time();
1370 transactions
1371 .iter()
1372 .filter_map(Transaction::incoming_bundle)
1373 .map(|msg| msg.bundle.timestamp)
1374 .max()
1375 .map_or(local_time, |timestamp| timestamp.max(local_time))
1376 .max(block_time)
1377 }
1378
1379 #[instrument(level = "trace", skip(query))]
1381 pub async fn query_application(&self, query: Query) -> Result<QueryOutcome, Error> {
1382 loop {
1383 let result = self
1384 .client
1385 .local_node
1386 .query_application(self.chain_id, query.clone())
1387 .await;
1388 if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1389 let validators = self.client.validator_nodes().await?;
1390 self.client
1391 .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1392 .await?;
1393 continue; }
1395 return Ok(result?);
1396 }
1397 }
1398
1399 #[instrument(level = "trace", skip(query))]
1401 pub async fn query_system_application(
1402 &self,
1403 query: SystemQuery,
1404 ) -> Result<QueryOutcome<SystemResponse>, Error> {
1405 let QueryOutcome {
1406 response,
1407 operations,
1408 } = self.query_application(Query::System(query)).await?;
1409 match response {
1410 QueryResponse::System(response) => Ok(QueryOutcome {
1411 response,
1412 operations,
1413 }),
1414 _ => Err(Error::InternalError("Unexpected response for system query")),
1415 }
1416 }
1417
1418 #[instrument(level = "trace", skip(application_id, query))]
1420 pub async fn query_user_application<A: Abi>(
1421 &self,
1422 application_id: ApplicationId<A>,
1423 query: &A::Query,
1424 ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1425 let query = Query::user(application_id, query)?;
1426 let QueryOutcome {
1427 response,
1428 operations,
1429 } = self.query_application(query).await?;
1430 match response {
1431 QueryResponse::User(response_bytes) => {
1432 let response = serde_json::from_slice(&response_bytes)?;
1433 Ok(QueryOutcome {
1434 response,
1435 operations,
1436 })
1437 }
1438 _ => Err(Error::InternalError("Unexpected response for user query")),
1439 }
1440 }
1441
1442 #[instrument(level = "trace")]
1449 pub async fn query_balance(&self) -> Result<Amount, Error> {
1450 let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
1451 Ok(balance)
1452 }
1453
1454 #[instrument(level = "trace", skip(owner))]
1461 pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1462 if owner.is_chain() {
1463 self.query_balance().await
1464 } else {
1465 Ok(self
1466 .query_balances_with_owner(owner)
1467 .await?
1468 .1
1469 .unwrap_or(Amount::ZERO))
1470 }
1471 }
1472
1473 #[instrument(level = "trace", skip(owner))]
1480 pub(crate) async fn query_balances_with_owner(
1481 &self,
1482 owner: AccountOwner,
1483 ) -> Result<(Amount, Option<Amount>), Error> {
1484 let incoming_bundles = self.pending_message_bundles().await?;
1485 if incoming_bundles.is_empty() {
1488 let chain_balance = self.local_balance().await?;
1489 let owner_balance = self.local_owner_balance(owner).await?;
1490 return Ok((chain_balance, Some(owner_balance)));
1491 }
1492 let info = self.chain_info().await?;
1493 let transactions = incoming_bundles
1494 .into_iter()
1495 .map(Transaction::ReceiveMessages)
1496 .collect::<Vec<_>>();
1497 let timestamp = self.next_timestamp(&transactions, info.timestamp);
1498 let block = ProposedBlock {
1499 epoch: info.epoch,
1500 chain_id: self.chain_id,
1501 transactions,
1502 previous_block_hash: info.block_hash,
1503 height: info.next_block_height,
1504 authenticated_owner: if owner == AccountOwner::CHAIN {
1505 None
1506 } else {
1507 Some(owner)
1508 },
1509 timestamp,
1510 };
1511 match self
1512 .client
1513 .stage_block_execution_and_discard_failing_messages(block, None, Vec::new())
1514 .await
1515 {
1516 Ok((_, response)) => Ok((
1517 response.info.chain_balance,
1518 response.info.requested_owner_balance,
1519 )),
1520 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1521 error,
1522 )))) if matches!(
1523 &*error,
1524 ChainError::ExecutionError(
1525 execution_error,
1526 ChainExecutionContext::Block
1527 ) if matches!(
1528 **execution_error,
1529 ExecutionError::FeesExceedFunding { .. }
1530 )
1531 ) =>
1532 {
1533 Ok((Amount::ZERO, Some(Amount::ZERO)))
1535 }
1536 Err(error) => Err(error),
1537 }
1538 }
1539
1540 #[instrument(level = "trace")]
1544 pub async fn local_balance(&self) -> Result<Amount, Error> {
1545 let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1546 Ok(balance)
1547 }
1548
1549 #[instrument(level = "trace", skip(owner))]
1553 pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1554 if owner.is_chain() {
1555 self.local_balance().await
1556 } else {
1557 Ok(self
1558 .local_balances_with_owner(owner)
1559 .await?
1560 .1
1561 .unwrap_or(Amount::ZERO))
1562 }
1563 }
1564
1565 #[instrument(level = "trace", skip(owner))]
1569 pub(crate) async fn local_balances_with_owner(
1570 &self,
1571 owner: AccountOwner,
1572 ) -> Result<(Amount, Option<Amount>), Error> {
1573 ensure!(
1574 self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1575 Error::WalletSynchronizationError
1576 );
1577 let mut query = ChainInfoQuery::new(self.chain_id);
1578 query.request_owner_balance = owner;
1579 let response = self
1580 .client
1581 .local_node
1582 .handle_chain_info_query(query)
1583 .await?;
1584 Ok((
1585 response.info.chain_balance,
1586 response.info.requested_owner_balance,
1587 ))
1588 }
1589
1590 #[instrument(level = "trace")]
1592 pub async fn transfer_to_account(
1593 &self,
1594 from: AccountOwner,
1595 amount: Amount,
1596 account: Account,
1597 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1598 self.transfer(from, amount, account).await
1599 }
1600
1601 #[cfg(with_testing)]
1603 #[instrument(level = "trace")]
1604 pub async fn burn(
1605 &self,
1606 owner: AccountOwner,
1607 amount: Amount,
1608 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1609 let recipient = Account::burn_address(self.chain_id);
1610 self.transfer(owner, amount, recipient).await
1611 }
1612
1613 #[instrument(level = "trace")]
1614 pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1615 let validators = self.client.validator_nodes().await?;
1616 self.client
1617 .fetch_chain_info(self.chain_id, &validators)
1618 .await
1619 }
1620
1621 #[instrument(level = "trace")]
1627 pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1628 let info = self.prepare_chain().await?;
1629 self.synchronize_publisher_chains().await?;
1630 self.find_received_certificates(None).await?;
1631 Ok(info)
1632 }
1633
1634 #[instrument(level = "trace")]
1636 pub async fn process_pending_block(
1637 &self,
1638 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1639 self.prepare_chain().await?;
1640 self.process_pending_block_without_prepare().await
1641 }
1642
1643 #[instrument(level = "trace")]
1645 async fn process_pending_block_without_prepare(
1646 &self,
1647 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1648 let info = self.request_leader_timeout_if_needed().await?;
1649
1650 if info.manager.has_locking_block_in_current_round()
1652 && !info.manager.current_round.is_fast()
1653 {
1654 return self.finalize_locking_block(info).await;
1655 }
1656 let owner = self.identity().await?;
1657
1658 let local_node = &self.client.local_node;
1659 let pending_proposal = self.pending_proposal();
1661 let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
1662 match &**locking {
1663 LockingBlock::Regular(certificate) => {
1664 let blob_ids = certificate.block().required_blob_ids();
1665 let blobs = local_node
1666 .get_locking_blobs(&blob_ids, self.chain_id)
1667 .await?
1668 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1669 debug!("Retrying locking block from round {}", certificate.round);
1670 (certificate.block().clone(), blobs)
1671 }
1672 LockingBlock::Fast(proposal) => {
1673 let proposed_block = proposal.content.block.clone();
1674 let blob_ids = proposed_block.published_blob_ids();
1675 let blobs = local_node
1676 .get_locking_blobs(&blob_ids, self.chain_id)
1677 .await?
1678 .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1679 let block = self
1680 .client
1681 .stage_block_execution(proposed_block, None, blobs.clone())
1682 .await?
1683 .0;
1684 debug!("Retrying locking block from fast round.");
1685 (block, blobs)
1686 }
1687 }
1688 } else if let Some(pending_proposal) = pending_proposal {
1689 let proposed_block = pending_proposal.block;
1693 let round = match self.round_for_new_proposal(&info, &owner, true).await? {
1694 Either::Left(round) => round.multi_leader(),
1695 Either::Right(_) => None,
1696 };
1697 let (block, _) = self
1698 .client
1699 .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
1700 .await?;
1701 debug!("Proposing the local pending block.");
1702 (block, pending_proposal.blobs)
1703 } else {
1704 return Ok(ClientOutcome::Committed(None)); };
1706
1707 let has_oracle_responses = block.has_oracle_responses();
1708 let (proposed_block, outcome) = block.into_proposal();
1709 let round = match self
1710 .round_for_new_proposal(&info, &owner, has_oracle_responses)
1711 .await?
1712 {
1713 Either::Left(round) => round,
1714 Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
1715 };
1716 debug!("Proposing block for round {}", round);
1717
1718 let already_handled_locally = info
1719 .manager
1720 .already_handled_proposal(round, &proposed_block);
1721 let proposal = if let Some(locking) = info.manager.requested_locking {
1723 Box::new(match *locking {
1724 LockingBlock::Regular(cert) => {
1725 BlockProposal::new_retry_regular(owner, round, cert, self.signer())
1726 .await
1727 .map_err(Error::signer_failure)?
1728 }
1729 LockingBlock::Fast(proposal) => {
1730 BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
1731 .await
1732 .map_err(Error::signer_failure)?
1733 }
1734 })
1735 } else {
1736 Box::new(
1737 BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
1738 .await
1739 .map_err(Error::signer_failure)?,
1740 )
1741 };
1742 if !already_handled_locally {
1743 if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
1745 match err {
1746 LocalNodeError::BlobsNotFound(_) => {
1747 local_node
1748 .handle_pending_blobs(self.chain_id, blobs)
1749 .await?;
1750 local_node.handle_block_proposal(*proposal.clone()).await?;
1751 }
1752 err => return Err(err.into()),
1753 }
1754 }
1755 }
1756 let committee = self.local_committee().await?;
1757 let block = Block::new(proposed_block, outcome);
1758 let submit_block_proposal_start = linera_base::time::Instant::now();
1760 let certificate = if round.is_fast() {
1761 let hashed_value = ConfirmedBlock::new(block);
1762 self.client
1763 .submit_block_proposal(&committee, proposal, hashed_value)
1764 .await?
1765 } else {
1766 let hashed_value = ValidatedBlock::new(block);
1767 let certificate = self
1768 .client
1769 .submit_block_proposal(&committee, proposal, hashed_value.clone())
1770 .await?;
1771 self.client.finalize_block(&committee, certificate).await?
1772 };
1773 self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
1774 debug!(round = %certificate.round, "Sending confirmed block to validators");
1775 self.update_validators(Some(&committee)).await?;
1776 Ok(ClientOutcome::Committed(Some(certificate)))
1777 }
1778
1779 fn send_timing(&self, start: Instant, timing_type: TimingType) {
1780 let Some(sender) = &self.timing_sender else {
1781 return;
1782 };
1783 if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
1784 tracing::warn!(%err, "Failed to send timing info");
1785 }
1786 }
1787
1788 async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
1791 let mut info = self.chain_info_with_manager_values().await?;
1792 if let Some(round_timeout) = info.manager.round_timeout {
1795 if round_timeout <= self.storage_client().clock().current_time() {
1796 if let Err(e) = self.request_leader_timeout().await {
1797 info!("Failed to obtain a timeout certificate: {}", e);
1798 } else {
1799 info = self.chain_info_with_manager_values().await?;
1800 }
1801 }
1802 }
1803 Ok(info)
1804 }
1805
1806 async fn finalize_locking_block(
1810 &self,
1811 info: Box<ChainInfo>,
1812 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1813 let locking = info
1814 .manager
1815 .requested_locking
1816 .expect("Should have a locking block");
1817 let LockingBlock::Regular(certificate) = *locking else {
1818 panic!("Should have a locking validated block");
1819 };
1820 debug!(
1821 round = %certificate.round,
1822 "Finalizing locking block"
1823 );
1824 let committee = self.local_committee().await?;
1825 match self
1826 .client
1827 .finalize_block(&committee, certificate.clone())
1828 .await
1829 {
1830 Ok(certificate) => {
1831 self.update_validators(Some(&committee)).await?;
1832 Ok(ClientOutcome::Committed(Some(certificate)))
1833 }
1834 Err(Error::CommunicationError(error))
1835 if info.manager.current_round >= Round::SingleLeader(0) =>
1836 {
1837 let timestamp = info.manager.round_timeout.ok_or(error)?;
1840 Ok(ClientOutcome::WaitForTimeout(RoundTimeout {
1841 timestamp,
1842 current_round: info.manager.current_round,
1843 next_block_height: info.next_block_height,
1844 }))
1845 }
1846 Err(error) => Err(error),
1847 }
1848 }
1849
1850 async fn round_for_new_proposal(
1852 &self,
1853 info: &ChainInfo,
1854 identity: &AccountOwner,
1855 has_oracle_responses: bool,
1856 ) -> Result<Either<Round, RoundTimeout>, Error> {
1857 let seed = *self.chain_state_view().await?.manager.seed.get();
1858 let manager = &info.manager;
1859 let conflict = manager
1863 .requested_signed_proposal
1864 .as_ref()
1865 .into_iter()
1866 .chain(&manager.requested_proposed)
1867 .any(|proposal| proposal.content.round == manager.current_round)
1868 || (manager.current_round.is_fast() && has_oracle_responses);
1869 let round = if !conflict {
1870 manager.current_round
1871 } else if let Some(round) = manager
1872 .ownership
1873 .next_round(manager.current_round)
1874 .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
1875 {
1876 round
1877 } else if let Some(timeout) = info.round_timeout() {
1878 return Ok(Either::Right(timeout));
1879 } else {
1880 return Err(Error::BlockProposalError(
1881 "Conflicting proposal in the current round",
1882 ));
1883 };
1884 let current_committee = info
1885 .current_committee()?
1886 .validators
1887 .values()
1888 .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
1889 .collect();
1890 if manager.can_propose(identity, round, seed, ¤t_committee) {
1891 return Ok(Either::Left(round));
1892 }
1893 if let Some(timeout) = info.round_timeout() {
1894 return Ok(Either::Right(timeout));
1895 }
1896 Err(Error::BlockProposalError(
1897 "Not a leader in the current round",
1898 ))
1899 }
1900
1901 #[cfg(with_testing)]
1903 #[instrument(level = "trace")]
1904 pub fn clear_pending_proposal(&self) {
1905 self.update_state(|state| state.clear_pending_proposal());
1906 }
1907
1908 #[instrument(level = "trace")]
1912 pub async fn rotate_key_pair(
1913 &self,
1914 public_key: AccountPublicKey,
1915 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1916 self.transfer_ownership(public_key.into()).await
1917 }
1918
1919 #[instrument(level = "trace")]
1921 pub async fn transfer_ownership(
1922 &self,
1923 new_owner: AccountOwner,
1924 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1925 self.execute_operation(SystemOperation::ChangeOwnership {
1926 super_owners: vec![new_owner],
1927 owners: Vec::new(),
1928 multi_leader_rounds: 2,
1929 open_multi_leader_rounds: false,
1930 timeout_config: TimeoutConfig::default(),
1931 })
1932 .await
1933 }
1934
1935 #[instrument(level = "trace")]
1937 pub async fn share_ownership(
1938 &self,
1939 new_owner: AccountOwner,
1940 new_weight: u64,
1941 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1942 loop {
1943 let ownership = self.prepare_chain().await?.manager.ownership;
1944 ensure!(
1945 ownership.is_active(),
1946 ChainError::InactiveChain(self.chain_id)
1947 );
1948 let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
1949 owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
1950 owners.push((new_owner, new_weight));
1951 let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
1952 super_owners: Vec::new(),
1953 owners,
1954 multi_leader_rounds: ownership.multi_leader_rounds,
1955 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
1956 timeout_config: ownership.timeout_config,
1957 })];
1958 match self.execute_block(operations, vec![]).await? {
1959 ExecuteBlockOutcome::Executed(certificate) => {
1960 return Ok(ClientOutcome::Committed(certificate));
1961 }
1962 ExecuteBlockOutcome::Conflict(certificate) => {
1963 info!(
1964 height = %certificate.block().header.height,
1965 "Another block was committed; retrying."
1966 );
1967 }
1968 ExecuteBlockOutcome::WaitForTimeout(timeout) => {
1969 return Ok(ClientOutcome::WaitForTimeout(timeout));
1970 }
1971 };
1972 }
1973 }
1974
1975 #[instrument(level = "trace")]
1978 pub async fn change_ownership(
1979 &self,
1980 ownership: ChainOwnership,
1981 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1982 self.execute_operation(SystemOperation::ChangeOwnership {
1983 super_owners: ownership.super_owners.into_iter().collect(),
1984 owners: ownership.owners.into_iter().collect(),
1985 multi_leader_rounds: ownership.multi_leader_rounds,
1986 open_multi_leader_rounds: ownership.open_multi_leader_rounds,
1987 timeout_config: ownership.timeout_config.clone(),
1988 })
1989 .await
1990 }
1991
1992 #[instrument(level = "trace", skip(application_permissions))]
1994 pub async fn change_application_permissions(
1995 &self,
1996 application_permissions: ApplicationPermissions,
1997 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1998 self.execute_operation(SystemOperation::ChangeApplicationPermissions(
1999 application_permissions,
2000 ))
2001 .await
2002 }
2003
2004 #[instrument(level = "trace", skip(self))]
2006 pub async fn open_chain(
2007 &self,
2008 ownership: ChainOwnership,
2009 application_permissions: ApplicationPermissions,
2010 balance: Amount,
2011 ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2012 loop {
2013 let config = OpenChainConfig {
2014 ownership: ownership.clone(),
2015 balance,
2016 application_permissions: application_permissions.clone(),
2017 };
2018 let operation = Operation::system(SystemOperation::OpenChain(config));
2019 let certificate = match self.execute_block(vec![operation], vec![]).await? {
2020 ExecuteBlockOutcome::Executed(certificate) => certificate,
2021 ExecuteBlockOutcome::Conflict(_) => continue,
2022 ExecuteBlockOutcome::WaitForTimeout(timeout) => {
2023 return Ok(ClientOutcome::WaitForTimeout(timeout));
2024 }
2025 };
2026 let chain_blob = certificate
2028 .block()
2029 .body
2030 .blobs
2031 .last()
2032 .and_then(|blobs| blobs.last())
2033 .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2034 let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2035 self.client.track_chain(description.id());
2037 self.client
2038 .local_node
2039 .retry_pending_cross_chain_requests(self.chain_id)
2040 .await?;
2041 return Ok(ClientOutcome::Committed((description, certificate)));
2042 }
2043 }
2044
2045 #[instrument(level = "trace")]
2048 pub async fn close_chain(
2049 &self,
2050 ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2051 match self.execute_operation(SystemOperation::CloseChain).await {
2052 Ok(outcome) => Ok(outcome.map(Some)),
2053 Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2054 chain_error,
2055 )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2056 Ok(ClientOutcome::Committed(None)) }
2058 Err(error) => Err(error),
2059 }
2060 }
2061
2062 #[cfg(not(target_arch = "wasm32"))]
2064 #[instrument(level = "trace", skip(contract, service))]
2065 pub async fn publish_module(
2066 &self,
2067 contract: Bytecode,
2068 service: Bytecode,
2069 vm_runtime: VmRuntime,
2070 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2071 let (blobs, module_id) = super::create_bytecode_blobs(contract, service, vm_runtime).await;
2072 self.publish_module_blobs(blobs, module_id).await
2073 }
2074
2075 #[cfg(not(target_arch = "wasm32"))]
2077 #[instrument(level = "trace", skip(blobs, module_id))]
2078 pub async fn publish_module_blobs(
2079 &self,
2080 blobs: Vec<Blob>,
2081 module_id: ModuleId,
2082 ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2083 self.execute_operations(
2084 vec![Operation::system(SystemOperation::PublishModule {
2085 module_id,
2086 })],
2087 blobs,
2088 )
2089 .await?
2090 .try_map(|certificate| Ok((module_id, certificate)))
2091 }
2092
2093 #[instrument(level = "trace", skip(bytes))]
2095 pub async fn publish_data_blobs(
2096 &self,
2097 bytes: Vec<Vec<u8>>,
2098 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2099 let blobs = bytes.into_iter().map(Blob::new_data);
2100 let publish_blob_operations = blobs
2101 .clone()
2102 .map(|blob| {
2103 Operation::system(SystemOperation::PublishDataBlob {
2104 blob_hash: blob.id().hash,
2105 })
2106 })
2107 .collect();
2108 self.execute_operations(publish_blob_operations, blobs.collect())
2109 .await
2110 }
2111
2112 #[instrument(level = "trace", skip(bytes))]
2114 pub async fn publish_data_blob(
2115 &self,
2116 bytes: Vec<u8>,
2117 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2118 self.publish_data_blobs(vec![bytes]).await
2119 }
2120
2121 #[instrument(
2123 level = "trace",
2124 skip(self, parameters, instantiation_argument, required_application_ids)
2125 )]
2126 pub async fn create_application<
2127 A: Abi,
2128 Parameters: Serialize,
2129 InstantiationArgument: Serialize,
2130 >(
2131 &self,
2132 module_id: ModuleId<A, Parameters, InstantiationArgument>,
2133 parameters: &Parameters,
2134 instantiation_argument: &InstantiationArgument,
2135 required_application_ids: Vec<ApplicationId>,
2136 ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2137 let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2138 let parameters = serde_json::to_vec(parameters)?;
2139 Ok(self
2140 .create_application_untyped(
2141 module_id.forget_abi(),
2142 parameters,
2143 instantiation_argument,
2144 required_application_ids,
2145 )
2146 .await?
2147 .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2148 }
2149
2150 #[instrument(
2152 level = "trace",
2153 skip(
2154 self,
2155 module_id,
2156 parameters,
2157 instantiation_argument,
2158 required_application_ids
2159 )
2160 )]
2161 pub async fn create_application_untyped(
2162 &self,
2163 module_id: ModuleId,
2164 parameters: Vec<u8>,
2165 instantiation_argument: Vec<u8>,
2166 required_application_ids: Vec<ApplicationId>,
2167 ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2168 self.execute_operation(SystemOperation::CreateApplication {
2169 module_id,
2170 parameters,
2171 instantiation_argument,
2172 required_application_ids,
2173 })
2174 .await?
2175 .try_map(|certificate| {
2176 let mut creation: Vec<_> = certificate
2178 .block()
2179 .created_blob_ids()
2180 .into_iter()
2181 .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2182 .collect();
2183 if creation.len() > 1 {
2184 return Err(Error::InternalError(
2185 "Unexpected number of application descriptions published",
2186 ));
2187 }
2188 let blob_id = creation.pop().ok_or(Error::InternalError(
2189 "ApplicationDescription blob not found.",
2190 ))?;
2191 let id = ApplicationId::new(blob_id.hash);
2192 Ok((id, certificate))
2193 })
2194 }
2195
2196 #[instrument(level = "trace", skip(committee))]
2198 pub async fn stage_new_committee(
2199 &self,
2200 committee: Committee,
2201 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2202 let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2203 let blob_hash = blob.id().hash;
2204 match self
2205 .execute_operations(
2206 vec![Operation::system(SystemOperation::Admin(
2207 AdminOperation::PublishCommitteeBlob { blob_hash },
2208 ))],
2209 vec![blob],
2210 )
2211 .await?
2212 {
2213 ClientOutcome::Committed(_) => {}
2214 outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
2215 }
2216 let epoch = self.chain_info().await?.epoch.try_add_one()?;
2217 self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2218 epoch,
2219 blob_hash,
2220 }))
2221 .await
2222 }
2223
2224 #[instrument(level = "trace")]
2230 pub async fn process_inbox(
2231 &self,
2232 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2233 self.prepare_chain().await?;
2234 self.process_inbox_without_prepare().await
2235 }
2236
2237 #[instrument(level = "trace")]
2243 pub async fn process_inbox_without_prepare(
2244 &self,
2245 ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2246 #[cfg(with_metrics)]
2247 let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2248
2249 let mut certificates = Vec::new();
2250 loop {
2251 let transactions = self.prepend_epochs_messages_and_events(vec![]).await?;
2255 if transactions.is_empty() {
2257 return Ok((certificates, None));
2258 }
2259 match self
2260 .execute_prepared_transactions(transactions, vec![])
2261 .await
2262 {
2263 Ok(ExecuteBlockOutcome::Executed(certificate))
2264 | Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
2265 Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
2266 return Ok((certificates, Some(timeout)));
2267 }
2268 Err(error) => return Err(error),
2269 };
2270 }
2271 }
2272
2273 async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2276 let (mut min_epoch, mut next_epoch) = {
2277 let (epoch, committees) = self.epoch_and_committees().await?;
2278 let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
2279 (min_epoch, epoch.try_add_one()?)
2280 };
2281 let mut epoch_change_ops = Vec::new();
2282 while self
2283 .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2284 .await?
2285 {
2286 epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2287 next_epoch,
2288 )));
2289 next_epoch.try_add_assign_one()?;
2290 }
2291 while self
2292 .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
2293 .await?
2294 {
2295 epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
2296 min_epoch,
2297 )));
2298 min_epoch.try_add_assign_one()?;
2299 }
2300 Ok(epoch_change_ops)
2301 }
2302
2303 async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2306 let event_id = EventId {
2307 chain_id: self.client.admin_id,
2308 stream_id: StreamId::system(stream_name),
2309 index,
2310 };
2311 Ok(self
2312 .client
2313 .storage_client()
2314 .read_event(event_id)
2315 .await?
2316 .is_some())
2317 }
2318
2319 pub async fn events_from_index(
2321 &self,
2322 stream_id: StreamId,
2323 start_index: u32,
2324 ) -> Result<Vec<IndexAndEvent>, Error> {
2325 Ok(self
2326 .client
2327 .storage_client()
2328 .read_events_from_index(&self.chain_id, &stream_id, start_index)
2329 .await?)
2330 }
2331
2332 #[instrument(level = "trace")]
2337 pub async fn revoke_epochs(
2338 &self,
2339 revoked_epoch: Epoch,
2340 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2341 self.prepare_chain().await?;
2342 let (current_epoch, committees) = self.epoch_and_committees().await?;
2343 ensure!(
2344 revoked_epoch < current_epoch,
2345 Error::CannotRevokeCurrentEpoch(current_epoch)
2346 );
2347 ensure!(
2348 committees.contains_key(&revoked_epoch),
2349 Error::EpochAlreadyRevoked
2350 );
2351 let operations = committees
2352 .keys()
2353 .filter_map(|epoch| {
2354 if *epoch <= revoked_epoch {
2355 Some(Operation::system(SystemOperation::Admin(
2356 AdminOperation::RemoveCommittee { epoch: *epoch },
2357 )))
2358 } else {
2359 None
2360 }
2361 })
2362 .collect();
2363 self.execute_operations(operations, vec![]).await
2364 }
2365
2366 #[instrument(level = "trace")]
2370 pub async fn transfer_to_account_unsafe_unconfirmed(
2371 &self,
2372 owner: AccountOwner,
2373 amount: Amount,
2374 recipient: Account,
2375 ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2376 self.execute_operation(SystemOperation::Transfer {
2377 owner,
2378 recipient,
2379 amount,
2380 })
2381 .await
2382 }
2383
2384 #[instrument(level = "trace", skip(hash))]
2385 pub async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, Error> {
2386 let block = self
2387 .client
2388 .storage_client()
2389 .read_confirmed_block(hash)
2390 .await?;
2391 block.ok_or(Error::MissingConfirmedBlock(hash))
2392 }
2393
2394 #[instrument(level = "trace")]
2396 pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2397 self.client
2398 .local_node
2399 .retry_pending_cross_chain_requests(self.chain_id)
2400 .await?;
2401 Ok(())
2402 }
2403
2404 #[instrument(level = "trace", skip(local_node))]
2405 async fn local_chain_info(
2406 &self,
2407 chain_id: ChainId,
2408 local_node: &mut LocalNodeClient<Env::Storage>,
2409 ) -> Result<Option<Box<ChainInfo>>, Error> {
2410 match local_node.chain_info(chain_id).await {
2411 Ok(info) => {
2412 self.client.update_from_info(&info);
2414 Ok(Some(info))
2415 }
2416 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
2417 Err(err) => Err(err.into()),
2418 }
2419 }
2420
2421 #[instrument(level = "trace", skip(chain_id, local_node))]
2422 async fn local_next_block_height(
2423 &self,
2424 chain_id: ChainId,
2425 local_node: &mut LocalNodeClient<Env::Storage>,
2426 ) -> Result<BlockHeight, Error> {
2427 Ok(self
2428 .local_chain_info(chain_id, local_node)
2429 .await?
2430 .map_or(BlockHeight::ZERO, |info| info.next_block_height))
2431 }
2432
2433 #[instrument(level = "trace")]
2436 async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
2437 Ok(self
2438 .client
2439 .local_node
2440 .get_inbox_next_height(self.chain_id, origin)
2441 .await?)
2442 }
2443
2444 #[instrument(level = "trace", skip(remote_node, local_node, notification))]
2445 async fn process_notification(
2446 &self,
2447 remote_node: RemoteNode<Env::ValidatorNode>,
2448 mut local_node: LocalNodeClient<Env::Storage>,
2449 notification: Notification,
2450 listening_mode: &ListeningMode,
2451 ) -> Result<(), Error> {
2452 match notification.reason {
2453 Reason::NewIncomingBundle { origin, height } => {
2454 if self.local_next_height_to_receive(origin).await? > height {
2455 debug!(
2456 chain_id = %self.chain_id,
2457 "Accepting redundant notification for new message"
2458 );
2459 return Ok(());
2460 }
2461 self.client
2462 .download_sender_block_with_sending_ancestors(
2463 self.chain_id,
2464 origin,
2465 height,
2466 &remote_node,
2467 )
2468 .await?;
2469 if self.local_next_height_to_receive(origin).await? <= height {
2470 info!(
2471 chain_id = %self.chain_id,
2472 "NewIncomingBundle: Fail to synchronize new message after notification"
2473 );
2474 }
2475 }
2476 Reason::NewBlock { height, .. } => {
2477 let chain_id = notification.chain_id;
2478 if self
2479 .local_next_block_height(chain_id, &mut local_node)
2480 .await?
2481 > height
2482 {
2483 debug!(
2484 chain_id = %self.chain_id,
2485 "Accepting redundant notification for new block"
2486 );
2487 return Ok(());
2488 }
2489 match listening_mode {
2490 ListeningMode::FullChain => {
2491 self.client
2492 .synchronize_chain_state_from(&remote_node, chain_id)
2493 .await?;
2494 if self
2495 .local_next_block_height(chain_id, &mut local_node)
2496 .await?
2497 <= height
2498 {
2499 info!("NewBlock: Fail to synchronize new block after notification");
2500 }
2501 trace!(
2502 chain_id = %self.chain_id,
2503 %height,
2504 "NewBlock: processed notification",
2505 );
2506 }
2507 ListeningMode::EventsOnly(_) => {
2508 debug!(
2509 chain_id = %self.chain_id,
2510 %height,
2511 "NewBlock: ignoring notification due to listening in EventsOnly mode"
2512 );
2513 }
2514 }
2515 }
2516 Reason::NewEvents {
2517 height,
2518 hash,
2519 event_streams,
2520 } => {
2521 if self
2522 .local_next_block_height(notification.chain_id, &mut local_node)
2523 .await?
2524 > height
2525 {
2526 debug!(
2527 chain_id = %self.chain_id,
2528 "Accepting redundant notification for new block"
2529 );
2530 return Ok(());
2531 }
2532 let should_process = match listening_mode {
2533 ListeningMode::FullChain => true,
2534 ListeningMode::EventsOnly(relevant_events) => relevant_events
2535 .intersection(&event_streams)
2536 .next()
2537 .is_some(),
2538 };
2539 if !should_process {
2540 debug!(
2541 chain_id = %self.chain_id,
2542 %height,
2543 "NewEvents: got a notification, but no relevant event streams in it"
2544 );
2545 return Ok(());
2546 }
2547 trace!(
2548 chain_id = %self.chain_id,
2549 %height,
2550 "NewEvents: processing notification"
2551 );
2552 let mut certificates = remote_node.node.download_certificates(vec![hash]).await?;
2553 let certificate = certificates
2556 .pop()
2557 .expect("download_certificates should have returned one certificate");
2558 self.client
2559 .receive_sender_certificate(
2560 certificate,
2561 ReceiveCertificateMode::NeedsCheck,
2562 None,
2563 )
2564 .await?;
2565 }
2566 Reason::NewRound { height, round } => {
2567 if matches!(listening_mode, ListeningMode::EventsOnly(_)) {
2568 debug!("NewRound: ignoring a notification due to listening mode");
2569 return Ok(());
2570 }
2571 let chain_id = notification.chain_id;
2572 if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
2573 if (info.next_block_height, info.manager.current_round) >= (height, round) {
2574 debug!(
2575 chain_id = %self.chain_id,
2576 "Accepting redundant notification for new round"
2577 );
2578 return Ok(());
2579 }
2580 }
2581 self.client
2582 .synchronize_chain_state_from(&remote_node, chain_id)
2583 .await?;
2584 let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
2585 error!(
2586 chain_id = %self.chain_id,
2587 "NewRound: Fail to read local chain info for {chain_id}"
2588 );
2589 return Ok(());
2590 };
2591 if (info.next_block_height, info.manager.current_round) < (height, round) {
2592 error!(
2593 chain_id = %self.chain_id,
2594 "NewRound: Fail to synchronize new block after notification"
2595 );
2596 }
2597 }
2598 }
2599 Ok(())
2600 }
2601
2602 pub fn is_tracked(&self) -> bool {
2604 self.client
2605 .tracked_chains
2606 .read()
2607 .unwrap()
2608 .contains(&self.chain_id)
2609 }
2610
2611 #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
2614 pub async fn listen(
2615 &self,
2616 listening_mode: ListeningMode,
2617 ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
2618 use future::FutureExt as _;
2619
2620 async fn await_while_polling<F: FusedFuture>(
2621 future: F,
2622 background_work: impl FusedStream<Item = ()>,
2623 ) -> F::Output {
2624 tokio::pin!(future);
2625 tokio::pin!(background_work);
2626 loop {
2627 futures::select! {
2628 _ = background_work.next() => (),
2629 result = future => return result,
2630 }
2631 }
2632 }
2633
2634 let mut senders = HashMap::new(); let notifications = self.subscribe()?;
2636 let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
2637
2638 let mut process_notifications = FuturesUnordered::new();
2645
2646 match self
2647 .update_notification_streams(&mut senders, &listening_mode)
2648 .await
2649 {
2650 Ok(handler) => process_notifications.push(handler),
2651 Err(error) => error!("Failed to update committee: {error}"),
2652 };
2653
2654 let this = self.clone();
2655 let update_streams = async move {
2656 let mut abortable_notifications = abortable_notifications.fuse();
2657
2658 while let Some(notification) =
2659 await_while_polling(abortable_notifications.next(), &mut process_notifications)
2660 .await
2661 {
2662 if let Reason::NewBlock { .. } = notification.reason {
2663 match Box::pin(await_while_polling(
2664 this.update_notification_streams(&mut senders, &listening_mode)
2665 .fuse(),
2666 &mut process_notifications,
2667 ))
2668 .await
2669 {
2670 Ok(handler) => process_notifications.push(handler),
2671 Err(error) => error!("Failed to update committee: {error}"),
2672 }
2673 }
2674 }
2675
2676 for abort in senders.into_values() {
2677 abort.abort();
2678 }
2679
2680 let () = process_notifications.collect().await;
2681 }
2682 .in_current_span();
2683
2684 Ok((update_streams, AbortOnDrop(abort), notifications))
2685 }
2686
2687 #[instrument(level = "trace", skip(senders))]
2688 async fn update_notification_streams(
2689 &self,
2690 senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
2691 listening_mode: &ListeningMode,
2692 ) -> Result<impl Future<Output = ()>, Error> {
2693 let (nodes, local_node) = {
2694 let committee = self.local_committee().await?;
2695 let nodes: HashMap<_, _> = self
2696 .client
2697 .validator_node_provider()
2698 .make_nodes(&committee)?
2699 .collect();
2700 (nodes, self.client.local_node.clone())
2701 };
2702 senders.retain(|validator, abort| {
2704 if !nodes.contains_key(validator) {
2705 abort.abort();
2706 }
2707 !abort.is_aborted()
2708 });
2709 let validator_tasks = FuturesUnordered::new();
2711 for (public_key, node) in nodes {
2712 let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
2713 continue;
2714 };
2715 let this = self.clone();
2716 let stream = stream::once({
2717 let node = node.clone();
2718 async move {
2719 let stream = node.subscribe(vec![this.chain_id]).await?;
2720 let remote_node = RemoteNode { public_key, node };
2723 this.client
2724 .synchronize_chain_state_from(&remote_node, this.chain_id)
2725 .await?;
2726 Ok::<_, Error>(stream)
2727 }
2728 })
2729 .filter_map(move |result| async move {
2730 if let Err(error) = &result {
2731 info!(?error, "Could not connect to validator {public_key}");
2732 } else {
2733 debug!("Connected to validator {public_key}");
2734 }
2735 result.ok()
2736 })
2737 .flatten();
2738 let (stream, abort) = stream::abortable(stream);
2739 let mut stream = Box::pin(stream);
2740 let this = self.clone();
2741 let local_node = local_node.clone();
2742 let remote_node = RemoteNode { public_key, node };
2743 let listening_mode_cloned = listening_mode.clone();
2744 validator_tasks.push(async move {
2745 while let Some(notification) = stream.next().await {
2746 if let Err(err) = this
2747 .process_notification(
2748 remote_node.clone(),
2749 local_node.clone(),
2750 notification.clone(),
2751 &listening_mode_cloned,
2752 )
2753 .await
2754 {
2755 tracing::info!(
2756 chain_id = %this.chain_id,
2757 validator_public_key = ?remote_node.public_key,
2758 ?notification,
2759 "Failed to process notification: {err}",
2760 );
2761 }
2762 }
2763 });
2764 entry.insert(abort);
2765 }
2766 Ok(validator_tasks.collect())
2767 }
2768
2769 #[instrument(level = "trace", skip(remote_node))]
2771 pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
2772 let validator_next_block_height = match remote_node
2773 .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
2774 .await
2775 {
2776 Ok(info) => info.info.next_block_height.0,
2777 Err(NodeError::BlobsNotFound(_)) => 0,
2778 Err(err) => return Err(err.into()),
2779 };
2780 let local_chain_state = self.chain_info().await?;
2781
2782 let Some(missing_certificate_count) = local_chain_state
2783 .next_block_height
2784 .0
2785 .checked_sub(validator_next_block_height)
2786 .filter(|count| *count > 0)
2787 else {
2788 debug!("Validator is up-to-date with local state");
2789 return Ok(());
2790 };
2791
2792 let missing_certificates_end = usize::try_from(local_chain_state.next_block_height.0)
2793 .expect("`usize` should be at least `u64`");
2794 let missing_certificates_start = missing_certificates_end
2795 - usize::try_from(missing_certificate_count).expect("`usize` should be at least `u64`");
2796
2797 let missing_certificate_hashes = self
2798 .chain_state_view()
2799 .await?
2800 .confirmed_log
2801 .read(missing_certificates_start..missing_certificates_end)
2802 .await?;
2803
2804 let certificates = self
2805 .client
2806 .storage_client()
2807 .read_certificates(missing_certificate_hashes.clone())
2808 .await?;
2809 let certificates =
2810 match ResultReadCertificates::new(certificates, missing_certificate_hashes) {
2811 ResultReadCertificates::Certificates(certificates) => certificates,
2812 ResultReadCertificates::InvalidHashes(hashes) => {
2813 return Err(Error::ReadCertificatesError(hashes))
2814 }
2815 };
2816 for certificate in certificates {
2817 match remote_node
2818 .handle_confirmed_certificate(
2819 certificate.clone(),
2820 CrossChainMessageDelivery::NonBlocking,
2821 )
2822 .await
2823 {
2824 Ok(_) => (),
2825 Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
2826 let missing_blobs: Vec<_> = self
2828 .client
2829 .storage_client()
2830 .read_blobs(&missing_blob_ids)
2831 .await?
2832 .into_iter()
2833 .flatten()
2834 .collect();
2835 remote_node.upload_blobs(missing_blobs).await?;
2836 remote_node
2837 .handle_confirmed_certificate(
2838 certificate,
2839 CrossChainMessageDelivery::NonBlocking,
2840 )
2841 .await?;
2842 }
2843 Err(err) => return Err(err.into()),
2844 }
2845 }
2846
2847 Ok(())
2848 }
2849}
2850
2851#[cfg(with_testing)]
2852impl<Env: Environment> ChainClient<Env> {
2853 pub async fn process_notification_from(
2854 &self,
2855 notification: Notification,
2856 validator: (ValidatorPublicKey, &str),
2857 ) {
2858 let mut node_list = self
2859 .client
2860 .validator_node_provider()
2861 .make_nodes_from_list(vec![validator])
2862 .unwrap();
2863 let (public_key, node) = node_list.next().unwrap();
2864 let remote_node = RemoteNode { node, public_key };
2865 let local_node = self.client.local_node.clone();
2866 self.process_notification(
2867 remote_node,
2868 local_node,
2869 notification,
2870 &ListeningMode::FullChain,
2871 )
2872 .await
2873 .unwrap();
2874 }
2875}