linera_core/unit_tests/
test_utils.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet},
6    sync::Arc,
7    time::Duration,
8    vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13    future::Either,
14    lock::{Mutex, MutexGuard},
15    Future,
16};
17use linera_base::{
18    crypto::{AccountPublicKey, CryptoHash, ValidatorKeypair, ValidatorPublicKey},
19    data_types::*,
20    identifiers::{AccountOwner, BlobId, ChainId},
21    ownership::ChainOwnership,
22};
23use linera_chain::{
24    data_types::BlockProposal,
25    types::{
26        CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27        LiteCertificate, Timeout, ValidatedBlock,
28    },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::StorageServiceDatabase;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbDatabase;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbDatabase;
39use linera_views::{
40    memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46    linera_views::rocks_db::RocksDbDatabase,
47    tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51    chain_worker::ChainWorkerConfig,
52    client::{chain_client, Client},
53    data_types::*,
54    environment::{TestSigner, TestWallet},
55    node::{
56        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
57        ValidatorNodeProvider,
58    },
59    notifier::ChannelNotifier,
60    worker::{Notification, ProcessableCertificate, WorkerState},
61};
62
63#[derive(Debug, PartialEq, Clone, Copy)]
64pub enum FaultType {
65    Honest,
66    Offline,
67    OfflineWithInfo,
68    NoChains,
69    DontSendConfirmVote,
70    DontProcessValidated,
71    DontSendValidateVote,
72}
73
74/// A validator used for testing. "Faulty" validators ignore block proposals (but not
75/// certificates or info queries) and have the wrong initial balance for all chains.
76///
77/// All methods are executed in spawned Tokio tasks, so that canceling a client task doesn't cause
78/// the validator's tasks to be canceled: In a real network, a validator also wouldn't cancel
79/// tasks if the client stopped waiting for the response.
80struct LocalValidator<S>
81where
82    S: Storage,
83{
84    state: WorkerState<S>,
85    notifier: Arc<ChannelNotifier<Notification>>,
86}
87
88#[derive(Clone)]
89pub struct LocalValidatorClient<S>
90where
91    S: Storage,
92{
93    public_key: ValidatorPublicKey,
94    client: Arc<Mutex<LocalValidator<S>>>,
95    fault_type: FaultType,
96}
97
98impl<S> ValidatorNode for LocalValidatorClient<S>
99where
100    S: Storage + Clone + Send + Sync + 'static,
101{
102    type NotificationStream = NotificationStream;
103
104    fn address(&self) -> String {
105        format!("local:{}", self.public_key)
106    }
107
108    async fn handle_block_proposal(
109        &self,
110        proposal: BlockProposal,
111    ) -> Result<ChainInfoResponse, NodeError> {
112        self.spawn_and_receive(move |validator, sender| {
113            validator.do_handle_block_proposal(proposal, sender)
114        })
115        .await
116    }
117
118    async fn handle_lite_certificate(
119        &self,
120        certificate: LiteCertificate<'_>,
121        _delivery: CrossChainMessageDelivery,
122    ) -> Result<ChainInfoResponse, NodeError> {
123        let certificate = certificate.cloned();
124        self.spawn_and_receive(move |validator, sender| {
125            validator.do_handle_lite_certificate(certificate, sender)
126        })
127        .await
128    }
129
130    async fn handle_timeout_certificate(
131        &self,
132        certificate: GenericCertificate<Timeout>,
133    ) -> Result<ChainInfoResponse, NodeError> {
134        self.spawn_and_receive(move |validator, sender| {
135            validator.do_handle_certificate(certificate, sender)
136        })
137        .await
138    }
139
140    async fn handle_validated_certificate(
141        &self,
142        certificate: GenericCertificate<ValidatedBlock>,
143    ) -> Result<ChainInfoResponse, NodeError> {
144        self.spawn_and_receive(move |validator, sender| {
145            validator.do_handle_certificate(certificate, sender)
146        })
147        .await
148    }
149
150    async fn handle_confirmed_certificate(
151        &self,
152        certificate: GenericCertificate<ConfirmedBlock>,
153        _delivery: CrossChainMessageDelivery,
154    ) -> Result<ChainInfoResponse, NodeError> {
155        self.spawn_and_receive(move |validator, sender| {
156            validator.do_handle_certificate(certificate, sender)
157        })
158        .await
159    }
160
161    async fn handle_chain_info_query(
162        &self,
163        query: ChainInfoQuery,
164    ) -> Result<ChainInfoResponse, NodeError> {
165        self.spawn_and_receive(move |validator, sender| {
166            validator.do_handle_chain_info_query(query, sender)
167        })
168        .await
169    }
170
171    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
172        self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
173            .await
174    }
175
176    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
177        Ok(Default::default())
178    }
179
180    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
181        Ok(self
182            .client
183            .lock()
184            .await
185            .state
186            .storage_client()
187            .read_network_description()
188            .await
189            .transpose()
190            .ok_or_else(|| NodeError::ViewError {
191                error: "missing NetworkDescription".to_owned(),
192            })??)
193    }
194
195    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
196        self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
197            .await
198    }
199
200    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
201        self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
202            .await
203    }
204
205    async fn download_pending_blob(
206        &self,
207        chain_id: ChainId,
208        blob_id: BlobId,
209    ) -> Result<BlobContent, NodeError> {
210        self.spawn_and_receive(move |validator, sender| {
211            validator.do_download_pending_blob(chain_id, blob_id, sender)
212        })
213        .await
214    }
215
216    async fn handle_pending_blob(
217        &self,
218        chain_id: ChainId,
219        blob: BlobContent,
220    ) -> Result<ChainInfoResponse, NodeError> {
221        self.spawn_and_receive(move |validator, sender| {
222            validator.do_handle_pending_blob(chain_id, blob, sender)
223        })
224        .await
225    }
226
227    async fn download_certificate(
228        &self,
229        hash: CryptoHash,
230    ) -> Result<ConfirmedBlockCertificate, NodeError> {
231        self.spawn_and_receive(move |validator, sender| {
232            validator.do_download_certificate(hash, sender)
233        })
234        .await
235    }
236
237    async fn download_certificates(
238        &self,
239        hashes: Vec<CryptoHash>,
240    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
241        self.spawn_and_receive(move |validator, sender| {
242            validator.do_download_certificates(hashes, sender)
243        })
244        .await
245    }
246
247    async fn download_certificates_by_heights(
248        &self,
249        chain_id: ChainId,
250        heights: Vec<BlockHeight>,
251    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
252        self.spawn_and_receive(move |validator, sender| {
253            validator.do_download_certificates_by_heights(chain_id, heights, sender)
254        })
255        .await
256    }
257
258    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
259        self.spawn_and_receive(move |validator, sender| {
260            validator.do_blob_last_used_by(blob_id, sender)
261        })
262        .await
263    }
264
265    async fn blob_last_used_by_certificate(
266        &self,
267        blob_id: BlobId,
268    ) -> Result<ConfirmedBlockCertificate, NodeError> {
269        self.spawn_and_receive(move |validator, sender| {
270            validator.do_blob_last_used_by_certificate(blob_id, sender)
271        })
272        .await
273    }
274
275    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
276        self.spawn_and_receive(move |validator, sender| {
277            validator.do_missing_blob_ids(blob_ids, sender)
278        })
279        .await
280    }
281
282    async fn get_shard_info(
283        &self,
284        _chain_id: ChainId,
285    ) -> Result<crate::data_types::ShardInfo, NodeError> {
286        // For test purposes, return a dummy shard info
287        Ok(crate::data_types::ShardInfo {
288            shard_id: 0,
289            total_shards: 1,
290        })
291    }
292}
293
294impl<S> LocalValidatorClient<S>
295where
296    S: Storage + Clone + Send + Sync + 'static,
297{
298    fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
299        let client = LocalValidator {
300            state,
301            notifier: Arc::new(ChannelNotifier::default()),
302        };
303        Self {
304            public_key,
305            client: Arc::new(Mutex::new(client)),
306            fault_type: FaultType::Honest,
307        }
308    }
309
310    pub fn name(&self) -> ValidatorPublicKey {
311        self.public_key
312    }
313
314    fn set_fault_type(&mut self, fault_type: FaultType) {
315        self.fault_type = fault_type;
316    }
317
318    /// Obtains the basic `ChainInfo` data for the local validator chain, with chain manager values.
319    pub async fn chain_info_with_manager_values(
320        &mut self,
321        chain_id: ChainId,
322    ) -> Result<Box<ChainInfo>, NodeError> {
323        let query = ChainInfoQuery::new(chain_id).with_manager_values();
324        let response = self.handle_chain_info_query(query).await?;
325        Ok(response.info)
326    }
327
328    /// Executes the future produced by `f` in a new thread in a new Tokio runtime.
329    /// Returns the value that the future puts into the sender.
330    async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
331    where
332        T: Send + 'static,
333        R: Future<Output = Result<(), T>> + Send,
334        F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
335    {
336        let validator = self.clone();
337        let (sender, receiver) = oneshot::channel();
338        tokio::spawn(async move {
339            if f(validator, sender).await.is_err() {
340                tracing::debug!("result could not be sent");
341            }
342        });
343        receiver.await.unwrap()
344    }
345
346    async fn do_handle_block_proposal(
347        self,
348        proposal: BlockProposal,
349        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
350    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
351        let result = match self.fault_type {
352            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
353                error: "offline".to_string(),
354            }),
355            FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
356            FaultType::DontSendValidateVote
357            | FaultType::Honest
358            | FaultType::DontSendConfirmVote
359            | FaultType::DontProcessValidated => {
360                let result = self
361                    .client
362                    .lock()
363                    .await
364                    .state
365                    .handle_block_proposal(proposal)
366                    .await
367                    .map_err(Into::into);
368                if self.fault_type == FaultType::DontSendValidateVote {
369                    Err(NodeError::ClientIoError {
370                        error: "refusing to validate".to_string(),
371                    })
372                } else {
373                    result
374                }
375            }
376        };
377        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
378        sender.send(result.map(|(info, _actions)| info))
379    }
380
381    async fn do_handle_lite_certificate(
382        self,
383        certificate: LiteCertificate<'_>,
384        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
385    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
386        let client = self.client.clone();
387        let validator = client.lock().await;
388        let result = async move {
389            match validator.state.full_certificate(certificate).await? {
390                Either::Left(confirmed) => {
391                    self.do_handle_certificate_internal(confirmed, &validator)
392                        .await
393                }
394                Either::Right(validated) => {
395                    self.do_handle_certificate_internal(validated, &validator)
396                        .await
397                }
398            }
399        }
400        .await;
401        sender.send(result)
402    }
403
404    async fn do_handle_certificate_internal<T: ProcessableCertificate>(
405        &self,
406        certificate: GenericCertificate<T>,
407        validator: &MutexGuard<'_, LocalValidator<S>>,
408    ) -> Result<ChainInfoResponse, NodeError> {
409        match self.fault_type {
410            FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
411                Err(NodeError::ClientIoError {
412                    error: "refusing to process validated block".to_string(),
413                })
414            }
415            FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
416            FaultType::Honest
417            | FaultType::DontSendConfirmVote
418            | FaultType::DontProcessValidated
419            | FaultType::DontSendValidateVote => {
420                let result = validator
421                    .state
422                    .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
423                    .await
424                    .map_err(Into::into);
425                if T::KIND == CertificateKind::Validated
426                    && self.fault_type == FaultType::DontSendConfirmVote
427                {
428                    Err(NodeError::ClientIoError {
429                        error: "refusing to confirm".to_string(),
430                    })
431                } else {
432                    result
433                }
434            }
435            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
436                error: "offline".to_string(),
437            }),
438        }
439    }
440
441    async fn do_handle_certificate<T: ProcessableCertificate>(
442        self,
443        certificate: GenericCertificate<T>,
444        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
445    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
446        let validator = self.client.lock().await;
447        let result = self
448            .do_handle_certificate_internal(certificate, &validator)
449            .await;
450        sender.send(result)
451    }
452
453    async fn do_handle_chain_info_query(
454        self,
455        query: ChainInfoQuery,
456        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
457    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
458        let validator = self.client.lock().await;
459        let result = match self.fault_type {
460            FaultType::Offline => Err(NodeError::ClientIoError {
461                error: "offline".to_string(),
462            }),
463            FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
464            FaultType::Honest
465            | FaultType::DontSendConfirmVote
466            | FaultType::DontProcessValidated
467            | FaultType::DontSendValidateVote
468            | FaultType::OfflineWithInfo => validator
469                .state
470                .handle_chain_info_query(query)
471                .await
472                .map_err(Into::into),
473        };
474        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
475        sender.send(result.map(|(info, _actions)| info))
476    }
477
478    async fn do_subscribe(
479        self,
480        chains: Vec<ChainId>,
481        sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
482    ) -> Result<(), Result<NotificationStream, NodeError>> {
483        let validator = self.client.lock().await;
484        let rx = validator.notifier.subscribe(chains);
485        let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
486        sender.send(Ok(stream))
487    }
488
489    async fn do_upload_blob(
490        self,
491        content: BlobContent,
492        sender: oneshot::Sender<Result<BlobId, NodeError>>,
493    ) -> Result<(), Result<BlobId, NodeError>> {
494        let validator = self.client.lock().await;
495        let blob = Blob::new(content);
496        let id = blob.id();
497        let storage = validator.state.storage_client();
498        let result = match storage.maybe_write_blobs(&[blob]).await {
499            Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
500            Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
501            Err(error) => Err(error.into()),
502        };
503        sender.send(result)
504    }
505
506    async fn do_download_blob(
507        self,
508        blob_id: BlobId,
509        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
510    ) -> Result<(), Result<BlobContent, NodeError>> {
511        let validator = self.client.lock().await;
512        let blob = validator
513            .state
514            .storage_client()
515            .read_blob(blob_id)
516            .await
517            .map_err(Into::into);
518        let blob = match blob {
519            Ok(blob) => blob.ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
520            Err(error) => Err(error),
521        };
522        sender.send(blob.map(|blob| blob.into_content()))
523    }
524
525    async fn do_download_pending_blob(
526        self,
527        chain_id: ChainId,
528        blob_id: BlobId,
529        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
530    ) -> Result<(), Result<BlobContent, NodeError>> {
531        let validator = self.client.lock().await;
532        let result = validator
533            .state
534            .download_pending_blob(chain_id, blob_id)
535            .await
536            .map_err(Into::into);
537        sender.send(result.map(|blob| blob.into_content()))
538    }
539
540    async fn do_handle_pending_blob(
541        self,
542        chain_id: ChainId,
543        blob: BlobContent,
544        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
545    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
546        let validator = self.client.lock().await;
547        let result = validator
548            .state
549            .handle_pending_blob(chain_id, Blob::new(blob))
550            .await
551            .map_err(Into::into);
552        sender.send(result)
553    }
554
555    async fn do_download_certificate(
556        self,
557        hash: CryptoHash,
558        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
559    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
560        let validator = self.client.lock().await;
561        let certificate = validator
562            .state
563            .storage_client()
564            .read_certificate(hash)
565            .await
566            .map_err(Into::into);
567
568        let certificate = match certificate {
569            Err(error) => Err(error),
570            Ok(entry) => match entry {
571                Some(certificate) => Ok(certificate),
572                None => {
573                    panic!("Missing certificate: {hash}");
574                }
575            },
576        };
577
578        sender.send(certificate)
579    }
580
581    async fn do_download_certificates(
582        self,
583        hashes: Vec<CryptoHash>,
584        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
585    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
586        let validator = self.client.lock().await;
587        let certificates = validator
588            .state
589            .storage_client()
590            .read_certificates(&hashes)
591            .await
592            .map_err(Into::into);
593
594        let certificates = match certificates {
595            Err(error) => Err(error),
596            Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
597                ResultReadCertificates::Certificates(certificates) => Ok(certificates),
598                ResultReadCertificates::InvalidHashes(hashes) => {
599                    panic!("Missing certificates: {:?}", hashes)
600                }
601            },
602        };
603
604        sender.send(certificates)
605    }
606
607    async fn do_download_certificates_by_heights(
608        self,
609        chain_id: ChainId,
610        heights: Vec<BlockHeight>,
611        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
612    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
613        // First, use do_handle_chain_info_query to get the certificate hashes
614        let (query_sender, query_receiver) = oneshot::channel();
615        let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
616
617        let self_clone = self.clone();
618        self.do_handle_chain_info_query(query, query_sender)
619            .await
620            .expect("Failed to handle chain info query");
621
622        // Get the response from the chain info query
623        let chain_info_response = query_receiver.await.map_err(|_| {
624            Err(NodeError::ClientIoError {
625                error: "Failed to receive chain info response".to_string(),
626            })
627        })?;
628
629        let hashes = match chain_info_response {
630            Ok(response) => response.info.requested_sent_certificate_hashes,
631            Err(e) => {
632                return sender.send(Err(e));
633            }
634        };
635
636        // Now use do_download_certificates to get the actual certificates
637        let (cert_sender, cert_receiver) = oneshot::channel();
638        self_clone
639            .do_download_certificates(hashes, cert_sender)
640            .await?;
641
642        // Forward the result to the original sender
643        let result = cert_receiver.await.map_err(|_| {
644            Err(NodeError::ClientIoError {
645                error: "Failed to receive certificates".to_string(),
646            })
647        })?;
648
649        sender.send(result)
650    }
651
652    async fn do_blob_last_used_by(
653        self,
654        blob_id: BlobId,
655        sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
656    ) -> Result<(), Result<CryptoHash, NodeError>> {
657        let validator = self.client.lock().await;
658        let blob_state = validator
659            .state
660            .storage_client()
661            .read_blob_state(blob_id)
662            .await
663            .map_err(Into::into);
664        let certificate_hash = match blob_state {
665            Err(err) => Err(err),
666            Ok(blob_state) => match blob_state {
667                None => Err(NodeError::BlobsNotFound(vec![blob_id])),
668                Some(blob_state) => blob_state
669                    .last_used_by
670                    .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
671            },
672        };
673
674        sender.send(certificate_hash)
675    }
676
677    async fn do_blob_last_used_by_certificate(
678        self,
679        blob_id: BlobId,
680        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
681    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
682        match self.blob_last_used_by(blob_id).await {
683            Ok(cert_hash) => {
684                let cert = self.download_certificate(cert_hash).await;
685                sender.send(cert)
686            }
687            Err(err) => sender.send(Err(err)),
688        }
689    }
690
691    async fn do_missing_blob_ids(
692        self,
693        blob_ids: Vec<BlobId>,
694        sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
695    ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
696        let validator = self.client.lock().await;
697        let missing_blob_ids = validator
698            .state
699            .storage_client()
700            .missing_blobs(&blob_ids)
701            .await
702            .map_err(Into::into);
703        sender.send(missing_blob_ids)
704    }
705}
706
707#[derive(Clone)]
708pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
709where
710    S: Storage;
711
712impl<S> NodeProvider<S>
713where
714    S: Storage + Clone,
715{
716    fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
717        self.0.lock().unwrap().clone()
718    }
719}
720
721impl<S> ValidatorNodeProvider for NodeProvider<S>
722where
723    S: Storage + Clone + Send + Sync + 'static,
724{
725    type Node = LocalValidatorClient<S>;
726
727    fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
728        unimplemented!()
729    }
730
731    fn make_nodes_from_list<A>(
732        &self,
733        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
734    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
735    where
736        A: AsRef<str>,
737    {
738        let list = self.0.lock().unwrap();
739        Ok(validators
740            .into_iter()
741            .map(|(public_key, address)| {
742                list.iter()
743                    .find(|client| client.public_key == public_key)
744                    .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
745                        address: address.as_ref().to_string(),
746                    })
747                    .map(|client| (public_key, client.clone()))
748            })
749            .collect::<Result<Vec<_>, _>>()?
750            .into_iter())
751    }
752}
753
754impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
755where
756    S: Storage,
757{
758    fn from_iter<T>(iter: T) -> Self
759    where
760        T: IntoIterator<Item = LocalValidatorClient<S>>,
761    {
762        Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
763    }
764}
765
766// NOTE:
767// * To communicate with a quorum of validators, chain clients iterate over a copy of
768// `validator_clients` to spawn I/O tasks.
769// * When using `LocalValidatorClient`, clients communicate with an exact quorum then stop.
770// * Most tests have 1 faulty validator out 4 so that there is exactly only 1 quorum to
771// communicate with.
772pub struct TestBuilder<B: StorageBuilder> {
773    storage_builder: B,
774    pub initial_committee: Committee,
775    admin_description: Option<ChainDescription>,
776    network_description: Option<NetworkDescription>,
777    genesis_storage_builder: GenesisStorageBuilder,
778    node_provider: NodeProvider<B::Storage>,
779    validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
780    chain_client_storages: Vec<B::Storage>,
781    pub chain_owners: BTreeMap<ChainId, AccountOwner>,
782    pub signer: TestSigner,
783}
784
785#[async_trait]
786pub trait StorageBuilder {
787    type Storage: Storage + Clone + Send + Sync + 'static;
788
789    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
790
791    fn clock(&self) -> &TestClock;
792}
793
794#[derive(Default)]
795struct GenesisStorageBuilder {
796    accounts: Vec<GenesisAccount>,
797}
798
799struct GenesisAccount {
800    description: ChainDescription,
801    public_key: AccountPublicKey,
802}
803
804impl GenesisStorageBuilder {
805    fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
806        self.accounts.push(GenesisAccount {
807            description,
808            public_key,
809        })
810    }
811
812    async fn build<S>(&self, storage: S) -> S
813    where
814        S: Storage + Clone + Send + Sync + 'static,
815    {
816        for account in &self.accounts {
817            storage
818                .create_chain(account.description.clone())
819                .await
820                .unwrap();
821        }
822        storage
823    }
824}
825
826pub type ChainClient<S> = crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>>>;
827
828impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
829    /// Reads the hashed certificate values in descending order from the given hash.
830    pub async fn read_confirmed_blocks_downward(
831        &self,
832        from: CryptoHash,
833        limit: u32,
834    ) -> anyhow::Result<Vec<ConfirmedBlock>> {
835        let mut hash = Some(from);
836        let mut values = Vec::new();
837        for _ in 0..limit {
838            let Some(next_hash) = hash else {
839                break;
840            };
841            let value = self.read_confirmed_block(next_hash).await?;
842            hash = value.block().header.previous_block_hash;
843            values.push(value);
844        }
845        Ok(values)
846    }
847}
848
849impl<B> TestBuilder<B>
850where
851    B: StorageBuilder,
852{
853    pub async fn new(
854        mut storage_builder: B,
855        count: usize,
856        with_faulty_validators: usize,
857        mut signer: TestSigner,
858    ) -> Result<Self, anyhow::Error> {
859        let mut validators = Vec::new();
860        for _ in 0..count {
861            let validator_keypair = ValidatorKeypair::generate();
862            let account_public_key = signer.generate_new();
863            validators.push((validator_keypair, account_public_key));
864        }
865        let for_committee = validators
866            .iter()
867            .map(|(validating, account)| (validating.public_key, *account))
868            .collect::<Vec<_>>();
869        let initial_committee = Committee::make_simple(for_committee);
870        let mut validator_clients = Vec::new();
871        let mut validator_storages = HashMap::new();
872        let mut faulty_validators = HashSet::new();
873        for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
874            let validator_public_key = validator_keypair.public_key;
875            let storage = storage_builder.build().await?;
876            let config = ChainWorkerConfig {
877                nickname: format!("Node {}", i),
878                ..ChainWorkerConfig::default()
879            }
880            .with_key_pair(Some(validator_keypair.secret_key));
881            let state = WorkerState::new(storage.clone(), config, None);
882            let mut validator = LocalValidatorClient::new(validator_public_key, state);
883            if i < with_faulty_validators {
884                faulty_validators.insert(validator_public_key);
885                validator.set_fault_type(FaultType::NoChains);
886            }
887            validator_clients.push(validator);
888            validator_storages.insert(validator_public_key, storage);
889        }
890        tracing::info!(
891            "Test will use the following faulty validators: {:?}",
892            faulty_validators
893        );
894        Ok(Self {
895            storage_builder,
896            initial_committee,
897            admin_description: None,
898            network_description: None,
899            genesis_storage_builder: GenesisStorageBuilder::default(),
900            node_provider: NodeProvider::from_iter(validator_clients),
901            validator_storages,
902            chain_client_storages: Vec::new(),
903            chain_owners: BTreeMap::new(),
904            signer,
905        })
906    }
907
908    pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
909        let validators = self.initial_committee.validators().clone();
910        self.initial_committee =
911            Committee::new(validators, policy).expect("committee votes should not overflow");
912        self
913    }
914
915    pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
916        let mut faulty_validators = vec![];
917        let mut validator_clients = self.node_provider.0.lock().unwrap();
918        for index in indexes.as_ref() {
919            let validator = &mut validator_clients[*index];
920            validator.set_fault_type(fault_type);
921            faulty_validators.push(validator.public_key);
922        }
923        tracing::info!(
924            "Making the following validators {:?}: {:?}",
925            fault_type,
926            faulty_validators
927        );
928    }
929
930    /// Creates the root chain with the given `index`, and returns a client for it.
931    ///
932    /// Root chain 0 is the admin chain and needs to be initialized first, otherwise its balance
933    /// is automatically set to zero.
934    pub async fn add_root_chain(
935        &mut self,
936        index: u32,
937        balance: Amount,
938    ) -> anyhow::Result<ChainClient<B::Storage>> {
939        // Make sure the admin chain is initialized.
940        if self.admin_description.is_none() && index != 0 {
941            Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
942        }
943        let origin = ChainOrigin::Root(index);
944        let public_key = self.signer.generate_new();
945        let open_chain_config = InitialChainConfig {
946            ownership: ChainOwnership::single(public_key.into()),
947            epoch: Epoch(0),
948            min_active_epoch: Epoch(0),
949            max_active_epoch: Epoch(0),
950            balance,
951            application_permissions: ApplicationPermissions::default(),
952        };
953        let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
954        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
955        if index == 0 {
956            self.admin_description = Some(description.clone());
957            self.network_description = Some(NetworkDescription {
958                admin_chain_id: description.id(),
959                // dummy values to fill the description
960                genesis_config_hash: CryptoHash::test_hash("genesis config"),
961                genesis_timestamp: Timestamp::from(0),
962                genesis_committee_blob_hash: committee_blob.id().hash,
963                name: "test network".to_string(),
964            });
965        }
966        // Remember what's in the genesis store for future clients to join.
967        self.genesis_storage_builder
968            .add(description.clone(), public_key);
969
970        let network_description = self.network_description.as_ref().unwrap();
971
972        for validator in self.node_provider.all_nodes() {
973            let storage = self
974                .validator_storages
975                .get_mut(&validator.public_key)
976                .unwrap();
977            storage
978                .write_network_description(network_description)
979                .await
980                .expect("writing the NetworkDescription should succeed");
981            storage
982                .write_blob(&committee_blob)
983                .await
984                .expect("writing a blob should succeed");
985            storage.create_chain(description.clone()).await.unwrap();
986        }
987        for storage in &mut self.chain_client_storages {
988            storage.create_chain(description.clone()).await.unwrap();
989        }
990        let chain_id = description.id();
991        self.chain_owners.insert(chain_id, public_key.into());
992        self.make_client(chain_id, None, BlockHeight::ZERO).await
993    }
994
995    pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
996        let mut result = Vec::new();
997        for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
998            assert_eq!(
999                genesis_account.description.origin(),
1000                ChainOrigin::Root(i as u32)
1001            );
1002            result.push((
1003                genesis_account.public_key,
1004                genesis_account.description.config().balance,
1005            ));
1006        }
1007        result
1008    }
1009
1010    pub fn admin_chain_id(&self) -> ChainId {
1011        self.admin_description
1012            .as_ref()
1013            .expect("admin chain not initialized")
1014            .id()
1015    }
1016
1017    pub fn admin_description(&self) -> Option<&ChainDescription> {
1018        self.admin_description.as_ref()
1019    }
1020
1021    pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1022        self.node_provider.clone()
1023    }
1024
1025    pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1026        self.node_provider.0.lock().unwrap()[index].clone()
1027    }
1028
1029    pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1030        let storage = self.storage_builder.build().await?;
1031        let network_description = self.network_description.as_ref().unwrap();
1032        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1033        storage
1034            .write_network_description(network_description)
1035            .await
1036            .expect("writing the NetworkDescription should succeed");
1037        storage
1038            .write_blob(&committee_blob)
1039            .await
1040            .expect("writing a blob should succeed");
1041        Ok(self.genesis_storage_builder.build(storage).await)
1042    }
1043
1044    pub async fn make_client_with_options(
1045        &mut self,
1046        chain_id: ChainId,
1047        block_hash: Option<CryptoHash>,
1048        block_height: BlockHeight,
1049        options: chain_client::Options,
1050        follow_only: bool,
1051    ) -> anyhow::Result<ChainClient<B::Storage>> {
1052        // Note that new clients are only given the genesis store: they must figure out
1053        // the rest by asking validators.
1054        let storage = self.make_storage().await?;
1055        self.chain_client_storages.push(storage.clone());
1056        let mode = if follow_only {
1057            crate::client::ListeningMode::FollowChain
1058        } else {
1059            crate::client::ListeningMode::FullChain
1060        };
1061        let client = Arc::new(Client::new(
1062            crate::environment::Impl {
1063                network: self.make_node_provider(),
1064                storage,
1065                signer: self.signer.clone(),
1066                wallet: TestWallet::default(),
1067            },
1068            self.admin_chain_id(),
1069            false,
1070            [(chain_id, mode)],
1071            format!("Client node for {:.8}", chain_id),
1072            Some(Duration::from_secs(30)),
1073            Some(Duration::from_secs(1)),
1074            options,
1075            5_000,
1076            10_000,
1077            &crate::client::RequestsSchedulerConfig::default(),
1078        ));
1079        Ok(client.create_chain_client(
1080            chain_id,
1081            block_hash,
1082            block_height,
1083            &None,
1084            self.chain_owners.get(&chain_id).copied(),
1085            None,
1086            follow_only,
1087        ))
1088    }
1089
1090    pub async fn make_client(
1091        &mut self,
1092        chain_id: ChainId,
1093        block_hash: Option<CryptoHash>,
1094        block_height: BlockHeight,
1095    ) -> anyhow::Result<ChainClient<B::Storage>> {
1096        self.make_client_with_options(
1097            chain_id,
1098            block_hash,
1099            block_height,
1100            chain_client::Options::test_default(),
1101            false,
1102        )
1103        .await
1104    }
1105
1106    /// Tries to find a (confirmation) certificate for the given chain_id and block height.
1107    pub async fn check_that_validators_have_certificate(
1108        &self,
1109        chain_id: ChainId,
1110        block_height: BlockHeight,
1111        target_count: usize,
1112    ) -> Option<ConfirmedBlockCertificate> {
1113        let query = ChainInfoQuery::new(chain_id)
1114            .with_sent_certificate_hashes_by_heights(vec![block_height]);
1115        let mut count = 0;
1116        let mut certificate = None;
1117        for validator in self.node_provider.all_nodes() {
1118            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1119                if response.check(validator.public_key).is_ok() {
1120                    let ChainInfo {
1121                        mut requested_sent_certificate_hashes,
1122                        ..
1123                    } = *response.info;
1124                    debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1125                    if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1126                        if let Ok(cert) = validator.download_certificate(cert_hash).await {
1127                            if cert.inner().block().header.chain_id == chain_id
1128                                && cert.inner().block().header.height == block_height
1129                            {
1130                                cert.check(&self.initial_committee).unwrap();
1131                                count += 1;
1132                                certificate = Some(cert);
1133                            }
1134                        }
1135                    }
1136                }
1137            }
1138        }
1139        assert!(count >= target_count);
1140        certificate
1141    }
1142
1143    /// Tries to find a (confirmation) certificate for the given chain_id and block height, and are
1144    /// in the expected round.
1145    pub async fn check_that_validators_are_in_round(
1146        &self,
1147        chain_id: ChainId,
1148        block_height: BlockHeight,
1149        round: Round,
1150        target_count: usize,
1151    ) {
1152        let query = ChainInfoQuery::new(chain_id);
1153        let mut count = 0;
1154        for validator in self.node_provider.all_nodes() {
1155            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1156                if response.info.manager.current_round == round
1157                    && response.info.next_block_height == block_height
1158                    && response.check(validator.public_key).is_ok()
1159                {
1160                    count += 1;
1161                }
1162            }
1163        }
1164        assert!(count >= target_count);
1165    }
1166
1167    /// Panics if any validator has a nonempty outbox for the given chain.
1168    pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1169        for validator in self.node_provider.all_nodes() {
1170            let guard = validator.client.lock().await;
1171            let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1172            assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1173        }
1174    }
1175}
1176
1177#[cfg(feature = "rocksdb")]
1178/// Limit concurrency for RocksDB tests to avoid "too many open files" errors.
1179static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1180
1181#[derive(Default)]
1182pub struct MemoryStorageBuilder {
1183    namespace: String,
1184    instance_counter: usize,
1185    wasm_runtime: Option<WasmRuntime>,
1186    clock: TestClock,
1187}
1188
1189#[async_trait]
1190impl StorageBuilder for MemoryStorageBuilder {
1191    type Storage = DbStorage<MemoryDatabase, TestClock>;
1192
1193    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1194        self.instance_counter += 1;
1195        let config = MemoryDatabase::new_test_config().await?;
1196        if self.namespace.is_empty() {
1197            self.namespace = generate_test_namespace();
1198        }
1199        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1200        Ok(
1201            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1202                .await?,
1203        )
1204    }
1205
1206    fn clock(&self) -> &TestClock {
1207        &self.clock
1208    }
1209}
1210
1211impl MemoryStorageBuilder {
1212    /// Creates a [`MemoryStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1213    /// applications.
1214    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1215        MemoryStorageBuilder {
1216            wasm_runtime: wasm_runtime.into(),
1217            ..MemoryStorageBuilder::default()
1218        }
1219    }
1220}
1221
1222#[cfg(feature = "rocksdb")]
1223pub struct RocksDbStorageBuilder {
1224    namespace: String,
1225    instance_counter: usize,
1226    wasm_runtime: Option<WasmRuntime>,
1227    clock: TestClock,
1228    _permit: SemaphorePermit<'static>,
1229}
1230
1231#[cfg(feature = "rocksdb")]
1232impl RocksDbStorageBuilder {
1233    pub async fn new() -> Self {
1234        RocksDbStorageBuilder {
1235            namespace: String::new(),
1236            instance_counter: 0,
1237            wasm_runtime: None,
1238            clock: TestClock::default(),
1239            _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1240        }
1241    }
1242
1243    /// Creates a [`RocksDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1244    /// applications.
1245    #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1246    pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1247        RocksDbStorageBuilder {
1248            wasm_runtime: wasm_runtime.into(),
1249            ..RocksDbStorageBuilder::new().await
1250        }
1251    }
1252}
1253
1254#[cfg(feature = "rocksdb")]
1255#[async_trait]
1256impl StorageBuilder for RocksDbStorageBuilder {
1257    type Storage = DbStorage<RocksDbDatabase, TestClock>;
1258
1259    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1260        self.instance_counter += 1;
1261        let config = RocksDbDatabase::new_test_config().await?;
1262        if self.namespace.is_empty() {
1263            self.namespace = generate_test_namespace();
1264        }
1265        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1266        Ok(
1267            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1268                .await?,
1269        )
1270    }
1271
1272    fn clock(&self) -> &TestClock {
1273        &self.clock
1274    }
1275}
1276
1277#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1278#[derive(Default)]
1279pub struct ServiceStorageBuilder {
1280    namespace: String,
1281    instance_counter: usize,
1282    wasm_runtime: Option<WasmRuntime>,
1283    clock: TestClock,
1284}
1285
1286#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1287impl ServiceStorageBuilder {
1288    /// Creates a `ServiceStorage`.
1289    pub fn new() -> Self {
1290        Self::with_wasm_runtime(None)
1291    }
1292
1293    /// Creates a `ServiceStorage` with the given Wasm runtime.
1294    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1295        ServiceStorageBuilder {
1296            wasm_runtime: wasm_runtime.into(),
1297            ..ServiceStorageBuilder::default()
1298        }
1299    }
1300}
1301
1302#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1303#[async_trait]
1304impl StorageBuilder for ServiceStorageBuilder {
1305    type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1306
1307    async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1308        self.instance_counter += 1;
1309        let config = StorageServiceDatabase::new_test_config().await?;
1310        if self.namespace.is_empty() {
1311            self.namespace = generate_test_namespace();
1312        }
1313        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1314        Ok(
1315            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1316                .await?,
1317        )
1318    }
1319
1320    fn clock(&self) -> &TestClock {
1321        &self.clock
1322    }
1323}
1324
1325#[cfg(feature = "dynamodb")]
1326#[derive(Default)]
1327pub struct DynamoDbStorageBuilder {
1328    namespace: String,
1329    instance_counter: usize,
1330    wasm_runtime: Option<WasmRuntime>,
1331    clock: TestClock,
1332}
1333
1334#[cfg(feature = "dynamodb")]
1335impl DynamoDbStorageBuilder {
1336    /// Creates a [`DynamoDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1337    /// applications.
1338    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1339        DynamoDbStorageBuilder {
1340            wasm_runtime: wasm_runtime.into(),
1341            ..DynamoDbStorageBuilder::default()
1342        }
1343    }
1344}
1345
1346#[cfg(feature = "dynamodb")]
1347#[async_trait]
1348impl StorageBuilder for DynamoDbStorageBuilder {
1349    type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1350
1351    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1352        self.instance_counter += 1;
1353        let config = DynamoDbDatabase::new_test_config().await?;
1354        if self.namespace.is_empty() {
1355            self.namespace = generate_test_namespace();
1356        }
1357        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1358        Ok(
1359            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1360                .await?,
1361        )
1362    }
1363
1364    fn clock(&self) -> &TestClock {
1365        &self.clock
1366    }
1367}
1368
1369#[cfg(feature = "scylladb")]
1370#[derive(Default)]
1371pub struct ScyllaDbStorageBuilder {
1372    namespace: String,
1373    instance_counter: usize,
1374    wasm_runtime: Option<WasmRuntime>,
1375    clock: TestClock,
1376}
1377
1378#[cfg(feature = "scylladb")]
1379impl ScyllaDbStorageBuilder {
1380    /// Creates a [`ScyllaDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1381    /// applications.
1382    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1383        ScyllaDbStorageBuilder {
1384            wasm_runtime: wasm_runtime.into(),
1385            ..ScyllaDbStorageBuilder::default()
1386        }
1387    }
1388}
1389
1390#[cfg(feature = "scylladb")]
1391#[async_trait]
1392impl StorageBuilder for ScyllaDbStorageBuilder {
1393    type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1394
1395    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1396        self.instance_counter += 1;
1397        let config = ScyllaDbDatabase::new_test_config().await?;
1398        if self.namespace.is_empty() {
1399            self.namespace = generate_test_namespace();
1400        }
1401        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1402        Ok(
1403            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1404                .await?,
1405        )
1406    }
1407
1408    fn clock(&self) -> &TestClock {
1409        &self.clock
1410    }
1411}
1412
1413pub trait ClientOutcomeResultExt<T, E> {
1414    /// Unwraps the result and panics if it's not `Committed`.
1415    /// Use this when you expect the operation to succeed without conflicts.
1416    fn unwrap_ok_committed(self) -> T;
1417
1418    /// Unwraps the result, accepting both `Committed` and `Conflict` outcomes.
1419    /// Returns the committed value or the conflicting certificate (boxed).
1420    fn unwrap_ok_or_conflict(self) -> Result<T, Box<ConfirmedBlockCertificate>>;
1421}
1422
1423impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1424    fn unwrap_ok_committed(self) -> T {
1425        match self.unwrap() {
1426            ClientOutcome::Committed(t) => t,
1427            ClientOutcome::WaitForTimeout(timeout) => {
1428                panic!("unexpected timeout: {timeout}")
1429            }
1430            ClientOutcome::Conflict(certificate) => {
1431                panic!("unexpected conflict: {}", certificate.hash())
1432            }
1433        }
1434    }
1435
1436    fn unwrap_ok_or_conflict(self) -> Result<T, Box<ConfirmedBlockCertificate>> {
1437        match self.unwrap() {
1438            ClientOutcome::Committed(t) => Ok(t),
1439            ClientOutcome::Conflict(certificate) => Err(certificate),
1440            ClientOutcome::WaitForTimeout(timeout) => {
1441                panic!("unexpected timeout: {timeout}")
1442            }
1443        }
1444    }
1445}