linera_core/unit_tests/
test_utils.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet},
6    sync::Arc,
7    time::Duration,
8    vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13    future::Either,
14    lock::{Mutex, MutexGuard},
15    Future,
16};
17use linera_base::{
18    crypto::{AccountPublicKey, CryptoHash, ValidatorKeypair, ValidatorPublicKey},
19    data_types::*,
20    identifiers::{AccountOwner, BlobId, ChainId},
21    ownership::ChainOwnership,
22};
23use linera_chain::{
24    data_types::BlockProposal,
25    types::{
26        CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27        LiteCertificate, Timeout, ValidatedBlock,
28    },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::StorageServiceDatabase;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbDatabase;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbDatabase;
39use linera_views::{
40    memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46    linera_views::rocks_db::RocksDbDatabase,
47    tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51    client::{chain_client, Client},
52    data_types::*,
53    environment::{TestSigner, TestWallet},
54    node::{
55        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
56        ValidatorNodeProvider,
57    },
58    notifier::ChannelNotifier,
59    worker::{Notification, ProcessableCertificate, WorkerState},
60};
61
62#[derive(Debug, PartialEq, Clone, Copy)]
63pub enum FaultType {
64    Honest,
65    Offline,
66    OfflineWithInfo,
67    NoChains,
68    DontSendConfirmVote,
69    DontProcessValidated,
70    DontSendValidateVote,
71}
72
73/// A validator used for testing. "Faulty" validators ignore block proposals (but not
74/// certificates or info queries) and have the wrong initial balance for all chains.
75///
76/// All methods are executed in spawned Tokio tasks, so that canceling a client task doesn't cause
77/// the validator's tasks to be canceled: In a real network, a validator also wouldn't cancel
78/// tasks if the client stopped waiting for the response.
79struct LocalValidator<S>
80where
81    S: Storage,
82{
83    state: WorkerState<S>,
84    notifier: Arc<ChannelNotifier<Notification>>,
85}
86
87#[derive(Clone)]
88pub struct LocalValidatorClient<S>
89where
90    S: Storage,
91{
92    public_key: ValidatorPublicKey,
93    client: Arc<Mutex<LocalValidator<S>>>,
94    fault_type: FaultType,
95}
96
97impl<S> ValidatorNode for LocalValidatorClient<S>
98where
99    S: Storage + Clone + Send + Sync + 'static,
100{
101    type NotificationStream = NotificationStream;
102
103    fn address(&self) -> String {
104        format!("local:{}", self.public_key)
105    }
106
107    async fn handle_block_proposal(
108        &self,
109        proposal: BlockProposal,
110    ) -> Result<ChainInfoResponse, NodeError> {
111        self.spawn_and_receive(move |validator, sender| {
112            validator.do_handle_block_proposal(proposal, sender)
113        })
114        .await
115    }
116
117    async fn handle_lite_certificate(
118        &self,
119        certificate: LiteCertificate<'_>,
120        _delivery: CrossChainMessageDelivery,
121    ) -> Result<ChainInfoResponse, NodeError> {
122        let certificate = certificate.cloned();
123        self.spawn_and_receive(move |validator, sender| {
124            validator.do_handle_lite_certificate(certificate, sender)
125        })
126        .await
127    }
128
129    async fn handle_timeout_certificate(
130        &self,
131        certificate: GenericCertificate<Timeout>,
132    ) -> Result<ChainInfoResponse, NodeError> {
133        self.spawn_and_receive(move |validator, sender| {
134            validator.do_handle_certificate(certificate, sender)
135        })
136        .await
137    }
138
139    async fn handle_validated_certificate(
140        &self,
141        certificate: GenericCertificate<ValidatedBlock>,
142    ) -> Result<ChainInfoResponse, NodeError> {
143        self.spawn_and_receive(move |validator, sender| {
144            validator.do_handle_certificate(certificate, sender)
145        })
146        .await
147    }
148
149    async fn handle_confirmed_certificate(
150        &self,
151        certificate: GenericCertificate<ConfirmedBlock>,
152        _delivery: CrossChainMessageDelivery,
153    ) -> Result<ChainInfoResponse, NodeError> {
154        self.spawn_and_receive(move |validator, sender| {
155            validator.do_handle_certificate(certificate, sender)
156        })
157        .await
158    }
159
160    async fn handle_chain_info_query(
161        &self,
162        query: ChainInfoQuery,
163    ) -> Result<ChainInfoResponse, NodeError> {
164        self.spawn_and_receive(move |validator, sender| {
165            validator.do_handle_chain_info_query(query, sender)
166        })
167        .await
168    }
169
170    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
171        self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
172            .await
173    }
174
175    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
176        Ok(Default::default())
177    }
178
179    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
180        Ok(self
181            .client
182            .lock()
183            .await
184            .state
185            .storage_client()
186            .read_network_description()
187            .await
188            .transpose()
189            .ok_or(NodeError::ViewError {
190                error: "missing NetworkDescription".to_owned(),
191            })??)
192    }
193
194    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
195        self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
196            .await
197    }
198
199    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
200        self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
201            .await
202    }
203
204    async fn download_pending_blob(
205        &self,
206        chain_id: ChainId,
207        blob_id: BlobId,
208    ) -> Result<BlobContent, NodeError> {
209        self.spawn_and_receive(move |validator, sender| {
210            validator.do_download_pending_blob(chain_id, blob_id, sender)
211        })
212        .await
213    }
214
215    async fn handle_pending_blob(
216        &self,
217        chain_id: ChainId,
218        blob: BlobContent,
219    ) -> Result<ChainInfoResponse, NodeError> {
220        self.spawn_and_receive(move |validator, sender| {
221            validator.do_handle_pending_blob(chain_id, blob, sender)
222        })
223        .await
224    }
225
226    async fn download_certificate(
227        &self,
228        hash: CryptoHash,
229    ) -> Result<ConfirmedBlockCertificate, NodeError> {
230        self.spawn_and_receive(move |validator, sender| {
231            validator.do_download_certificate(hash, sender)
232        })
233        .await
234    }
235
236    async fn download_certificates(
237        &self,
238        hashes: Vec<CryptoHash>,
239    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
240        self.spawn_and_receive(move |validator, sender| {
241            validator.do_download_certificates(hashes, sender)
242        })
243        .await
244    }
245
246    async fn download_certificates_by_heights(
247        &self,
248        chain_id: ChainId,
249        heights: Vec<BlockHeight>,
250    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
251        self.spawn_and_receive(move |validator, sender| {
252            validator.do_download_certificates_by_heights(chain_id, heights, sender)
253        })
254        .await
255    }
256
257    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
258        self.spawn_and_receive(move |validator, sender| {
259            validator.do_blob_last_used_by(blob_id, sender)
260        })
261        .await
262    }
263
264    async fn blob_last_used_by_certificate(
265        &self,
266        blob_id: BlobId,
267    ) -> Result<ConfirmedBlockCertificate, NodeError> {
268        self.spawn_and_receive(move |validator, sender| {
269            validator.do_blob_last_used_by_certificate(blob_id, sender)
270        })
271        .await
272    }
273
274    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
275        self.spawn_and_receive(move |validator, sender| {
276            validator.do_missing_blob_ids(blob_ids, sender)
277        })
278        .await
279    }
280
281    async fn get_shard_info(
282        &self,
283        _chain_id: ChainId,
284    ) -> Result<crate::data_types::ShardInfo, NodeError> {
285        // For test purposes, return a dummy shard info
286        Ok(crate::data_types::ShardInfo {
287            shard_id: 0,
288            total_shards: 1,
289        })
290    }
291}
292
293impl<S> LocalValidatorClient<S>
294where
295    S: Storage + Clone + Send + Sync + 'static,
296{
297    fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
298        let client = LocalValidator {
299            state,
300            notifier: Arc::new(ChannelNotifier::default()),
301        };
302        Self {
303            public_key,
304            client: Arc::new(Mutex::new(client)),
305            fault_type: FaultType::Honest,
306        }
307    }
308
309    pub fn name(&self) -> ValidatorPublicKey {
310        self.public_key
311    }
312
313    fn set_fault_type(&mut self, fault_type: FaultType) {
314        self.fault_type = fault_type;
315    }
316
317    /// Obtains the basic `ChainInfo` data for the local validator chain, with chain manager values.
318    pub async fn chain_info_with_manager_values(
319        &mut self,
320        chain_id: ChainId,
321    ) -> Result<Box<ChainInfo>, NodeError> {
322        let query = ChainInfoQuery::new(chain_id).with_manager_values();
323        let response = self.handle_chain_info_query(query).await?;
324        Ok(response.info)
325    }
326
327    /// Executes the future produced by `f` in a new thread in a new Tokio runtime.
328    /// Returns the value that the future puts into the sender.
329    async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
330    where
331        T: Send + 'static,
332        R: Future<Output = Result<(), T>> + Send,
333        F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
334    {
335        let validator = self.clone();
336        let (sender, receiver) = oneshot::channel();
337        tokio::spawn(async move {
338            if f(validator, sender).await.is_err() {
339                tracing::debug!("result could not be sent");
340            }
341        });
342        receiver.await.unwrap()
343    }
344
345    async fn do_handle_block_proposal(
346        self,
347        proposal: BlockProposal,
348        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
349    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
350        let result = match self.fault_type {
351            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
352                error: "offline".to_string(),
353            }),
354            FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
355            FaultType::DontSendValidateVote
356            | FaultType::Honest
357            | FaultType::DontSendConfirmVote
358            | FaultType::DontProcessValidated => {
359                let result = self
360                    .client
361                    .lock()
362                    .await
363                    .state
364                    .handle_block_proposal(proposal)
365                    .await
366                    .map_err(Into::into);
367                if self.fault_type == FaultType::DontSendValidateVote {
368                    Err(NodeError::ClientIoError {
369                        error: "refusing to validate".to_string(),
370                    })
371                } else {
372                    result
373                }
374            }
375        };
376        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
377        sender.send(result.map(|(info, _actions)| info))
378    }
379
380    async fn do_handle_lite_certificate(
381        self,
382        certificate: LiteCertificate<'_>,
383        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
384    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
385        let client = self.client.clone();
386        let mut validator = client.lock().await;
387        let result = async move {
388            match validator.state.full_certificate(certificate).await? {
389                Either::Left(confirmed) => {
390                    self.do_handle_certificate_internal(confirmed, &mut validator)
391                        .await
392                }
393                Either::Right(validated) => {
394                    self.do_handle_certificate_internal(validated, &mut validator)
395                        .await
396                }
397            }
398        }
399        .await;
400        sender.send(result)
401    }
402
403    async fn do_handle_certificate_internal<T: ProcessableCertificate>(
404        &self,
405        certificate: GenericCertificate<T>,
406        validator: &mut MutexGuard<'_, LocalValidator<S>>,
407    ) -> Result<ChainInfoResponse, NodeError> {
408        match self.fault_type {
409            FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
410                Err(NodeError::ClientIoError {
411                    error: "refusing to process validated block".to_string(),
412                })
413            }
414            FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
415            FaultType::Honest
416            | FaultType::DontSendConfirmVote
417            | FaultType::DontProcessValidated
418            | FaultType::DontSendValidateVote => {
419                let result = validator
420                    .state
421                    .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
422                    .await
423                    .map_err(Into::into);
424                if T::KIND == CertificateKind::Validated
425                    && self.fault_type == FaultType::DontSendConfirmVote
426                {
427                    Err(NodeError::ClientIoError {
428                        error: "refusing to confirm".to_string(),
429                    })
430                } else {
431                    result
432                }
433            }
434            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
435                error: "offline".to_string(),
436            }),
437        }
438    }
439
440    async fn do_handle_certificate<T: ProcessableCertificate>(
441        self,
442        certificate: GenericCertificate<T>,
443        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
444    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
445        let mut validator = self.client.lock().await;
446        let result = self
447            .do_handle_certificate_internal(certificate, &mut validator)
448            .await;
449        sender.send(result)
450    }
451
452    async fn do_handle_chain_info_query(
453        self,
454        query: ChainInfoQuery,
455        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
456    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
457        let validator = self.client.lock().await;
458        let result = match self.fault_type {
459            FaultType::Offline => Err(NodeError::ClientIoError {
460                error: "offline".to_string(),
461            }),
462            FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
463            FaultType::Honest
464            | FaultType::DontSendConfirmVote
465            | FaultType::DontProcessValidated
466            | FaultType::DontSendValidateVote
467            | FaultType::OfflineWithInfo => validator
468                .state
469                .handle_chain_info_query(query)
470                .await
471                .map_err(Into::into),
472        };
473        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
474        sender.send(result.map(|(info, _actions)| info))
475    }
476
477    async fn do_subscribe(
478        self,
479        chains: Vec<ChainId>,
480        sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
481    ) -> Result<(), Result<NotificationStream, NodeError>> {
482        let validator = self.client.lock().await;
483        let rx = validator.notifier.subscribe(chains);
484        let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
485        sender.send(Ok(stream))
486    }
487
488    async fn do_upload_blob(
489        self,
490        content: BlobContent,
491        sender: oneshot::Sender<Result<BlobId, NodeError>>,
492    ) -> Result<(), Result<BlobId, NodeError>> {
493        let validator = self.client.lock().await;
494        let blob = Blob::new(content);
495        let id = blob.id();
496        let storage = validator.state.storage_client();
497        let result = match storage.maybe_write_blobs(&[blob]).await {
498            Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
499            Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
500            Err(error) => Err(error.into()),
501        };
502        sender.send(result)
503    }
504
505    async fn do_download_blob(
506        self,
507        blob_id: BlobId,
508        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
509    ) -> Result<(), Result<BlobContent, NodeError>> {
510        let validator = self.client.lock().await;
511        let blob = validator
512            .state
513            .storage_client()
514            .read_blob(blob_id)
515            .await
516            .map_err(Into::into);
517        let blob = match blob {
518            Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
519            Err(error) => Err(error),
520        };
521        sender.send(blob.map(|blob| blob.into_content()))
522    }
523
524    async fn do_download_pending_blob(
525        self,
526        chain_id: ChainId,
527        blob_id: BlobId,
528        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
529    ) -> Result<(), Result<BlobContent, NodeError>> {
530        let validator = self.client.lock().await;
531        let result = validator
532            .state
533            .download_pending_blob(chain_id, blob_id)
534            .await
535            .map_err(Into::into);
536        sender.send(result.map(|blob| blob.into_content()))
537    }
538
539    async fn do_handle_pending_blob(
540        self,
541        chain_id: ChainId,
542        blob: BlobContent,
543        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
544    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
545        let validator = self.client.lock().await;
546        let result = validator
547            .state
548            .handle_pending_blob(chain_id, Blob::new(blob))
549            .await
550            .map_err(Into::into);
551        sender.send(result)
552    }
553
554    async fn do_download_certificate(
555        self,
556        hash: CryptoHash,
557        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
558    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
559        let validator = self.client.lock().await;
560        let certificate = validator
561            .state
562            .storage_client()
563            .read_certificate(hash)
564            .await
565            .map_err(Into::into);
566
567        let certificate = match certificate {
568            Err(error) => Err(error),
569            Ok(entry) => match entry {
570                Some(certificate) => Ok(certificate),
571                None => {
572                    panic!("Missing certificate: {hash}");
573                }
574            },
575        };
576
577        sender.send(certificate)
578    }
579
580    async fn do_download_certificates(
581        self,
582        hashes: Vec<CryptoHash>,
583        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
584    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
585        let validator = self.client.lock().await;
586        let certificates = validator
587            .state
588            .storage_client()
589            .read_certificates(hashes.clone())
590            .await
591            .map_err(Into::into);
592
593        let certificates = match certificates {
594            Err(error) => Err(error),
595            Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
596                ResultReadCertificates::Certificates(certificates) => Ok(certificates),
597                ResultReadCertificates::InvalidHashes(hashes) => {
598                    panic!("Missing certificates: {:?}", hashes)
599                }
600            },
601        };
602
603        sender.send(certificates)
604    }
605
606    async fn do_download_certificates_by_heights(
607        self,
608        chain_id: ChainId,
609        heights: Vec<BlockHeight>,
610        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
611    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
612        // First, use do_handle_chain_info_query to get the certificate hashes
613        let (query_sender, query_receiver) = oneshot::channel();
614        let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
615
616        let self_clone = self.clone();
617        self.do_handle_chain_info_query(query, query_sender)
618            .await
619            .expect("Failed to handle chain info query");
620
621        // Get the response from the chain info query
622        let chain_info_response = query_receiver.await.map_err(|_| {
623            Err(NodeError::ClientIoError {
624                error: "Failed to receive chain info response".to_string(),
625            })
626        })?;
627
628        let hashes = match chain_info_response {
629            Ok(response) => response.info.requested_sent_certificate_hashes,
630            Err(e) => {
631                return sender.send(Err(e));
632            }
633        };
634
635        // Now use do_download_certificates to get the actual certificates
636        let (cert_sender, cert_receiver) = oneshot::channel();
637        self_clone
638            .do_download_certificates(hashes, cert_sender)
639            .await?;
640
641        // Forward the result to the original sender
642        let result = cert_receiver.await.map_err(|_| {
643            Err(NodeError::ClientIoError {
644                error: "Failed to receive certificates".to_string(),
645            })
646        })?;
647
648        sender.send(result)
649    }
650
651    async fn do_blob_last_used_by(
652        self,
653        blob_id: BlobId,
654        sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
655    ) -> Result<(), Result<CryptoHash, NodeError>> {
656        let validator = self.client.lock().await;
657        let blob_state = validator
658            .state
659            .storage_client()
660            .read_blob_state(blob_id)
661            .await
662            .map_err(Into::into);
663        let certificate_hash = match blob_state {
664            Err(err) => Err(err),
665            Ok(blob_state) => match blob_state {
666                None => Err(NodeError::BlobsNotFound(vec![blob_id])),
667                Some(blob_state) => blob_state
668                    .last_used_by
669                    .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
670            },
671        };
672
673        sender.send(certificate_hash)
674    }
675
676    async fn do_blob_last_used_by_certificate(
677        self,
678        blob_id: BlobId,
679        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
680    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
681        match self.blob_last_used_by(blob_id).await {
682            Ok(cert_hash) => {
683                let cert = self.download_certificate(cert_hash).await;
684                sender.send(cert)
685            }
686            Err(err) => sender.send(Err(err)),
687        }
688    }
689
690    async fn do_missing_blob_ids(
691        self,
692        blob_ids: Vec<BlobId>,
693        sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
694    ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
695        let validator = self.client.lock().await;
696        let missing_blob_ids = validator
697            .state
698            .storage_client()
699            .missing_blobs(&blob_ids)
700            .await
701            .map_err(Into::into);
702        sender.send(missing_blob_ids)
703    }
704}
705
706#[derive(Clone)]
707pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
708where
709    S: Storage;
710
711impl<S> NodeProvider<S>
712where
713    S: Storage + Clone,
714{
715    fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
716        self.0.lock().unwrap().clone()
717    }
718}
719
720impl<S> ValidatorNodeProvider for NodeProvider<S>
721where
722    S: Storage + Clone + Send + Sync + 'static,
723{
724    type Node = LocalValidatorClient<S>;
725
726    fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
727        unimplemented!()
728    }
729
730    fn make_nodes_from_list<A>(
731        &self,
732        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
733    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
734    where
735        A: AsRef<str>,
736    {
737        let list = self.0.lock().unwrap();
738        Ok(validators
739            .into_iter()
740            .map(|(public_key, address)| {
741                list.iter()
742                    .find(|client| client.public_key == public_key)
743                    .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
744                        address: address.as_ref().to_string(),
745                    })
746                    .map(|client| (public_key, client.clone()))
747            })
748            .collect::<Result<Vec<_>, _>>()?
749            .into_iter())
750    }
751}
752
753impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
754where
755    S: Storage,
756{
757    fn from_iter<T>(iter: T) -> Self
758    where
759        T: IntoIterator<Item = LocalValidatorClient<S>>,
760    {
761        Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
762    }
763}
764
765// NOTE:
766// * To communicate with a quorum of validators, chain clients iterate over a copy of
767// `validator_clients` to spawn I/O tasks.
768// * When using `LocalValidatorClient`, clients communicate with an exact quorum then stop.
769// * Most tests have 1 faulty validator out 4 so that there is exactly only 1 quorum to
770// communicate with.
771pub struct TestBuilder<B: StorageBuilder> {
772    storage_builder: B,
773    pub initial_committee: Committee,
774    admin_description: Option<ChainDescription>,
775    network_description: Option<NetworkDescription>,
776    genesis_storage_builder: GenesisStorageBuilder,
777    node_provider: NodeProvider<B::Storage>,
778    validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
779    chain_client_storages: Vec<B::Storage>,
780    pub chain_owners: BTreeMap<ChainId, AccountOwner>,
781    pub signer: TestSigner,
782}
783
784#[async_trait]
785pub trait StorageBuilder {
786    type Storage: Storage + Clone + Send + Sync + 'static;
787
788    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
789
790    fn clock(&self) -> &TestClock;
791}
792
793#[derive(Default)]
794struct GenesisStorageBuilder {
795    accounts: Vec<GenesisAccount>,
796}
797
798struct GenesisAccount {
799    description: ChainDescription,
800    public_key: AccountPublicKey,
801}
802
803impl GenesisStorageBuilder {
804    fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
805        self.accounts.push(GenesisAccount {
806            description,
807            public_key,
808        })
809    }
810
811    async fn build<S>(&self, storage: S) -> S
812    where
813        S: Storage + Clone + Send + Sync + 'static,
814    {
815        for account in &self.accounts {
816            storage
817                .create_chain(account.description.clone())
818                .await
819                .unwrap();
820        }
821        storage
822    }
823}
824
825pub type ChainClient<S> = crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>>>;
826
827impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
828    /// Reads the hashed certificate values in descending order from the given hash.
829    pub async fn read_confirmed_blocks_downward(
830        &self,
831        from: CryptoHash,
832        limit: u32,
833    ) -> anyhow::Result<Vec<ConfirmedBlock>> {
834        let mut hash = Some(from);
835        let mut values = Vec::new();
836        for _ in 0..limit {
837            let Some(next_hash) = hash else {
838                break;
839            };
840            let value = self.read_confirmed_block(next_hash).await?;
841            hash = value.block().header.previous_block_hash;
842            values.push(value);
843        }
844        Ok(values)
845    }
846}
847
848impl<B> TestBuilder<B>
849where
850    B: StorageBuilder,
851{
852    pub async fn new(
853        mut storage_builder: B,
854        count: usize,
855        with_faulty_validators: usize,
856        mut signer: TestSigner,
857    ) -> Result<Self, anyhow::Error> {
858        let mut validators = Vec::new();
859        for _ in 0..count {
860            let validator_keypair = ValidatorKeypair::generate();
861            let account_public_key = signer.generate_new();
862            validators.push((validator_keypair, account_public_key));
863        }
864        let for_committee = validators
865            .iter()
866            .map(|(validating, account)| (validating.public_key, *account))
867            .collect::<Vec<_>>();
868        let initial_committee = Committee::make_simple(for_committee);
869        let mut validator_clients = Vec::new();
870        let mut validator_storages = HashMap::new();
871        let mut faulty_validators = HashSet::new();
872        for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
873            let validator_public_key = validator_keypair.public_key;
874            let storage = storage_builder.build().await?;
875            let state = WorkerState::new(
876                format!("Node {}", i),
877                Some(validator_keypair.secret_key),
878                storage.clone(),
879                5_000,
880                10_000,
881            )
882            .with_allow_inactive_chains(false)
883            .with_allow_messages_from_deprecated_epochs(false);
884            let mut validator = LocalValidatorClient::new(validator_public_key, state);
885            if i < with_faulty_validators {
886                faulty_validators.insert(validator_public_key);
887                validator.set_fault_type(FaultType::NoChains);
888            }
889            validator_clients.push(validator);
890            validator_storages.insert(validator_public_key, storage);
891        }
892        tracing::info!(
893            "Test will use the following faulty validators: {:?}",
894            faulty_validators
895        );
896        Ok(Self {
897            storage_builder,
898            initial_committee,
899            admin_description: None,
900            network_description: None,
901            genesis_storage_builder: GenesisStorageBuilder::default(),
902            node_provider: NodeProvider::from_iter(validator_clients),
903            validator_storages,
904            chain_client_storages: Vec::new(),
905            chain_owners: BTreeMap::new(),
906            signer,
907        })
908    }
909
910    pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
911        let validators = self.initial_committee.validators().clone();
912        self.initial_committee = Committee::new(validators, policy);
913        self
914    }
915
916    pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
917        let mut faulty_validators = vec![];
918        let mut validator_clients = self.node_provider.0.lock().unwrap();
919        for index in indexes.as_ref() {
920            let validator = &mut validator_clients[*index];
921            validator.set_fault_type(fault_type);
922            faulty_validators.push(validator.public_key);
923        }
924        tracing::info!(
925            "Making the following validators {:?}: {:?}",
926            fault_type,
927            faulty_validators
928        );
929    }
930
931    /// Creates the root chain with the given `index`, and returns a client for it.
932    ///
933    /// Root chain 0 is the admin chain and needs to be initialized first, otherwise its balance
934    /// is automatically set to zero.
935    pub async fn add_root_chain(
936        &mut self,
937        index: u32,
938        balance: Amount,
939    ) -> anyhow::Result<ChainClient<B::Storage>> {
940        // Make sure the admin chain is initialized.
941        if self.admin_description.is_none() && index != 0 {
942            Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
943        }
944        let origin = ChainOrigin::Root(index);
945        let public_key = self.signer.generate_new();
946        let open_chain_config = InitialChainConfig {
947            ownership: ChainOwnership::single(public_key.into()),
948            epoch: Epoch(0),
949            min_active_epoch: Epoch(0),
950            max_active_epoch: Epoch(0),
951            balance,
952            application_permissions: ApplicationPermissions::default(),
953        };
954        let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
955        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
956        if index == 0 {
957            self.admin_description = Some(description.clone());
958            self.network_description = Some(NetworkDescription {
959                admin_chain_id: description.id(),
960                // dummy values to fill the description
961                genesis_config_hash: CryptoHash::test_hash("genesis config"),
962                genesis_timestamp: Timestamp::from(0),
963                genesis_committee_blob_hash: committee_blob.id().hash,
964                name: "test network".to_string(),
965            });
966        }
967        // Remember what's in the genesis store for future clients to join.
968        self.genesis_storage_builder
969            .add(description.clone(), public_key);
970
971        let network_description = self.network_description.as_ref().unwrap();
972
973        for validator in self.node_provider.all_nodes() {
974            let storage = self
975                .validator_storages
976                .get_mut(&validator.public_key)
977                .unwrap();
978            storage
979                .write_network_description(network_description)
980                .await
981                .expect("writing the NetworkDescription should succeed");
982            storage
983                .write_blob(&committee_blob)
984                .await
985                .expect("writing a blob should succeed");
986            storage.create_chain(description.clone()).await.unwrap();
987        }
988        for storage in &mut self.chain_client_storages {
989            storage.create_chain(description.clone()).await.unwrap();
990        }
991        let chain_id = description.id();
992        self.chain_owners.insert(chain_id, public_key.into());
993        self.make_client(chain_id, None, BlockHeight::ZERO).await
994    }
995
996    pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
997        let mut result = Vec::new();
998        for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
999            assert_eq!(
1000                genesis_account.description.origin(),
1001                ChainOrigin::Root(i as u32)
1002            );
1003            result.push((
1004                genesis_account.public_key,
1005                genesis_account.description.config().balance,
1006            ));
1007        }
1008        result
1009    }
1010
1011    pub fn admin_id(&self) -> ChainId {
1012        self.admin_description
1013            .as_ref()
1014            .expect("admin chain not initialized")
1015            .id()
1016    }
1017
1018    pub fn admin_description(&self) -> Option<&ChainDescription> {
1019        self.admin_description.as_ref()
1020    }
1021
1022    pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1023        self.node_provider.clone()
1024    }
1025
1026    pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1027        self.node_provider.0.lock().unwrap()[index].clone()
1028    }
1029
1030    pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1031        let storage = self.storage_builder.build().await?;
1032        let network_description = self.network_description.as_ref().unwrap();
1033        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1034        storage
1035            .write_network_description(network_description)
1036            .await
1037            .expect("writing the NetworkDescription should succeed");
1038        storage
1039            .write_blob(&committee_blob)
1040            .await
1041            .expect("writing a blob should succeed");
1042        Ok(self.genesis_storage_builder.build(storage).await)
1043    }
1044
1045    pub async fn make_client_with_options(
1046        &mut self,
1047        chain_id: ChainId,
1048        block_hash: Option<CryptoHash>,
1049        block_height: BlockHeight,
1050        options: chain_client::Options,
1051        follow_only: bool,
1052    ) -> anyhow::Result<ChainClient<B::Storage>> {
1053        // Note that new clients are only given the genesis store: they must figure out
1054        // the rest by asking validators.
1055        let storage = self.make_storage().await?;
1056        self.chain_client_storages.push(storage.clone());
1057        let mode = if follow_only {
1058            crate::client::ListeningMode::FollowChain
1059        } else {
1060            crate::client::ListeningMode::FullChain
1061        };
1062        let client = Arc::new(Client::new(
1063            crate::environment::Impl {
1064                network: self.make_node_provider(),
1065                storage,
1066                signer: self.signer.clone(),
1067                wallet: TestWallet::default(),
1068            },
1069            self.admin_id(),
1070            false,
1071            [(chain_id, mode)],
1072            format!("Client node for {:.8}", chain_id),
1073            Duration::from_secs(30),
1074            Duration::from_secs(1),
1075            options,
1076            5_000,
1077            10_000,
1078            crate::client::RequestsSchedulerConfig::default(),
1079        ));
1080        Ok(client.create_chain_client(
1081            chain_id,
1082            block_hash,
1083            block_height,
1084            None,
1085            self.chain_owners.get(&chain_id).copied(),
1086            None,
1087            follow_only,
1088        ))
1089    }
1090
1091    pub async fn make_client(
1092        &mut self,
1093        chain_id: ChainId,
1094        block_hash: Option<CryptoHash>,
1095        block_height: BlockHeight,
1096    ) -> anyhow::Result<ChainClient<B::Storage>> {
1097        self.make_client_with_options(
1098            chain_id,
1099            block_hash,
1100            block_height,
1101            chain_client::Options::test_default(),
1102            false,
1103        )
1104        .await
1105    }
1106
1107    /// Tries to find a (confirmation) certificate for the given chain_id and block height.
1108    pub async fn check_that_validators_have_certificate(
1109        &self,
1110        chain_id: ChainId,
1111        block_height: BlockHeight,
1112        target_count: usize,
1113    ) -> Option<ConfirmedBlockCertificate> {
1114        let query = ChainInfoQuery::new(chain_id)
1115            .with_sent_certificate_hashes_by_heights(vec![block_height]);
1116        let mut count = 0;
1117        let mut certificate = None;
1118        for validator in self.node_provider.all_nodes() {
1119            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1120                if response.check(validator.public_key).is_ok() {
1121                    let ChainInfo {
1122                        mut requested_sent_certificate_hashes,
1123                        ..
1124                    } = *response.info;
1125                    debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1126                    if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1127                        if let Ok(cert) = validator.download_certificate(cert_hash).await {
1128                            if cert.inner().block().header.chain_id == chain_id
1129                                && cert.inner().block().header.height == block_height
1130                            {
1131                                cert.check(&self.initial_committee).unwrap();
1132                                count += 1;
1133                                certificate = Some(cert);
1134                            }
1135                        }
1136                    }
1137                }
1138            }
1139        }
1140        assert!(count >= target_count);
1141        certificate
1142    }
1143
1144    /// Tries to find a (confirmation) certificate for the given chain_id and block height, and are
1145    /// in the expected round.
1146    pub async fn check_that_validators_are_in_round(
1147        &self,
1148        chain_id: ChainId,
1149        block_height: BlockHeight,
1150        round: Round,
1151        target_count: usize,
1152    ) {
1153        let query = ChainInfoQuery::new(chain_id);
1154        let mut count = 0;
1155        for validator in self.node_provider.all_nodes() {
1156            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1157                if response.info.manager.current_round == round
1158                    && response.info.next_block_height == block_height
1159                    && response.check(validator.public_key).is_ok()
1160                {
1161                    count += 1;
1162                }
1163            }
1164        }
1165        assert!(count >= target_count);
1166    }
1167
1168    /// Panics if any validator has a nonempty outbox for the given chain.
1169    pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1170        for validator in self.node_provider.all_nodes() {
1171            let guard = validator.client.lock().await;
1172            let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1173            assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1174        }
1175    }
1176}
1177
1178#[cfg(feature = "rocksdb")]
1179/// Limit concurrency for RocksDB tests to avoid "too many open files" errors.
1180static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1181
1182#[derive(Default)]
1183pub struct MemoryStorageBuilder {
1184    namespace: String,
1185    instance_counter: usize,
1186    wasm_runtime: Option<WasmRuntime>,
1187    clock: TestClock,
1188}
1189
1190#[async_trait]
1191impl StorageBuilder for MemoryStorageBuilder {
1192    type Storage = DbStorage<MemoryDatabase, TestClock>;
1193
1194    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1195        self.instance_counter += 1;
1196        let config = MemoryDatabase::new_test_config().await?;
1197        if self.namespace.is_empty() {
1198            self.namespace = generate_test_namespace();
1199        }
1200        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1201        Ok(
1202            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1203                .await?,
1204        )
1205    }
1206
1207    fn clock(&self) -> &TestClock {
1208        &self.clock
1209    }
1210}
1211
1212impl MemoryStorageBuilder {
1213    /// Creates a [`MemoryStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1214    /// applications.
1215    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1216        MemoryStorageBuilder {
1217            wasm_runtime: wasm_runtime.into(),
1218            ..MemoryStorageBuilder::default()
1219        }
1220    }
1221}
1222
1223#[cfg(feature = "rocksdb")]
1224pub struct RocksDbStorageBuilder {
1225    namespace: String,
1226    instance_counter: usize,
1227    wasm_runtime: Option<WasmRuntime>,
1228    clock: TestClock,
1229    _permit: SemaphorePermit<'static>,
1230}
1231
1232#[cfg(feature = "rocksdb")]
1233impl RocksDbStorageBuilder {
1234    pub async fn new() -> Self {
1235        RocksDbStorageBuilder {
1236            namespace: String::new(),
1237            instance_counter: 0,
1238            wasm_runtime: None,
1239            clock: TestClock::default(),
1240            _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1241        }
1242    }
1243
1244    /// Creates a [`RocksDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1245    /// applications.
1246    #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1247    pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1248        RocksDbStorageBuilder {
1249            wasm_runtime: wasm_runtime.into(),
1250            ..RocksDbStorageBuilder::new().await
1251        }
1252    }
1253}
1254
1255#[cfg(feature = "rocksdb")]
1256#[async_trait]
1257impl StorageBuilder for RocksDbStorageBuilder {
1258    type Storage = DbStorage<RocksDbDatabase, TestClock>;
1259
1260    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1261        self.instance_counter += 1;
1262        let config = RocksDbDatabase::new_test_config().await?;
1263        if self.namespace.is_empty() {
1264            self.namespace = generate_test_namespace();
1265        }
1266        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1267        Ok(
1268            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1269                .await?,
1270        )
1271    }
1272
1273    fn clock(&self) -> &TestClock {
1274        &self.clock
1275    }
1276}
1277
1278#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1279#[derive(Default)]
1280pub struct ServiceStorageBuilder {
1281    namespace: String,
1282    instance_counter: usize,
1283    wasm_runtime: Option<WasmRuntime>,
1284    clock: TestClock,
1285}
1286
1287#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1288impl ServiceStorageBuilder {
1289    /// Creates a `ServiceStorage`.
1290    pub fn new() -> Self {
1291        Self::with_wasm_runtime(None)
1292    }
1293
1294    /// Creates a `ServiceStorage` with the given Wasm runtime.
1295    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1296        ServiceStorageBuilder {
1297            wasm_runtime: wasm_runtime.into(),
1298            ..ServiceStorageBuilder::default()
1299        }
1300    }
1301}
1302
1303#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1304#[async_trait]
1305impl StorageBuilder for ServiceStorageBuilder {
1306    type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1307
1308    async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1309        self.instance_counter += 1;
1310        let config = StorageServiceDatabase::new_test_config().await?;
1311        if self.namespace.is_empty() {
1312            self.namespace = generate_test_namespace();
1313        }
1314        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1315        Ok(
1316            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1317                .await?,
1318        )
1319    }
1320
1321    fn clock(&self) -> &TestClock {
1322        &self.clock
1323    }
1324}
1325
1326#[cfg(feature = "dynamodb")]
1327#[derive(Default)]
1328pub struct DynamoDbStorageBuilder {
1329    namespace: String,
1330    instance_counter: usize,
1331    wasm_runtime: Option<WasmRuntime>,
1332    clock: TestClock,
1333}
1334
1335#[cfg(feature = "dynamodb")]
1336impl DynamoDbStorageBuilder {
1337    /// Creates a [`DynamoDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1338    /// applications.
1339    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1340        DynamoDbStorageBuilder {
1341            wasm_runtime: wasm_runtime.into(),
1342            ..DynamoDbStorageBuilder::default()
1343        }
1344    }
1345}
1346
1347#[cfg(feature = "dynamodb")]
1348#[async_trait]
1349impl StorageBuilder for DynamoDbStorageBuilder {
1350    type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1351
1352    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1353        self.instance_counter += 1;
1354        let config = DynamoDbDatabase::new_test_config().await?;
1355        if self.namespace.is_empty() {
1356            self.namespace = generate_test_namespace();
1357        }
1358        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1359        Ok(
1360            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1361                .await?,
1362        )
1363    }
1364
1365    fn clock(&self) -> &TestClock {
1366        &self.clock
1367    }
1368}
1369
1370#[cfg(feature = "scylladb")]
1371#[derive(Default)]
1372pub struct ScyllaDbStorageBuilder {
1373    namespace: String,
1374    instance_counter: usize,
1375    wasm_runtime: Option<WasmRuntime>,
1376    clock: TestClock,
1377}
1378
1379#[cfg(feature = "scylladb")]
1380impl ScyllaDbStorageBuilder {
1381    /// Creates a [`ScyllaDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1382    /// applications.
1383    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1384        ScyllaDbStorageBuilder {
1385            wasm_runtime: wasm_runtime.into(),
1386            ..ScyllaDbStorageBuilder::default()
1387        }
1388    }
1389}
1390
1391#[cfg(feature = "scylladb")]
1392#[async_trait]
1393impl StorageBuilder for ScyllaDbStorageBuilder {
1394    type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1395
1396    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1397        self.instance_counter += 1;
1398        let config = ScyllaDbDatabase::new_test_config().await?;
1399        if self.namespace.is_empty() {
1400            self.namespace = generate_test_namespace();
1401        }
1402        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1403        Ok(
1404            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1405                .await?,
1406        )
1407    }
1408
1409    fn clock(&self) -> &TestClock {
1410        &self.clock
1411    }
1412}
1413
1414pub trait ClientOutcomeResultExt<T, E> {
1415    fn unwrap_ok_committed(self) -> T;
1416}
1417
1418impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1419    fn unwrap_ok_committed(self) -> T {
1420        self.unwrap().unwrap()
1421    }
1422}