linera_core/unit_tests/
test_utils.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet},
6    sync::Arc,
7    time::Duration,
8    vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13    future::Either,
14    lock::{Mutex, MutexGuard},
15    Future,
16};
17use linera_base::{
18    crypto::{AccountPublicKey, CryptoHash, ValidatorKeypair, ValidatorPublicKey},
19    data_types::*,
20    identifiers::{AccountOwner, BlobId, ChainId},
21    ownership::ChainOwnership,
22};
23use linera_chain::{
24    data_types::BlockProposal,
25    types::{
26        CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27        LiteCertificate, Timeout, ValidatedBlock,
28    },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::StorageServiceDatabase;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbDatabase;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbDatabase;
39use linera_views::{
40    memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46    linera_views::rocks_db::RocksDbDatabase,
47    tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51    client::{chain_client, Client},
52    data_types::*,
53    environment::{TestSigner, TestWallet},
54    node::{
55        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
56        ValidatorNodeProvider,
57    },
58    notifier::ChannelNotifier,
59    worker::{Notification, ProcessableCertificate, WorkerState},
60};
61
62#[derive(Debug, PartialEq, Clone, Copy)]
63pub enum FaultType {
64    Honest,
65    Offline,
66    OfflineWithInfo,
67    NoChains,
68    DontSendConfirmVote,
69    DontProcessValidated,
70    DontSendValidateVote,
71}
72
73/// A validator used for testing. "Faulty" validators ignore block proposals (but not
74/// certificates or info queries) and have the wrong initial balance for all chains.
75///
76/// All methods are executed in spawned Tokio tasks, so that canceling a client task doesn't cause
77/// the validator's tasks to be canceled: In a real network, a validator also wouldn't cancel
78/// tasks if the client stopped waiting for the response.
79struct LocalValidator<S>
80where
81    S: Storage,
82{
83    state: WorkerState<S>,
84    notifier: Arc<ChannelNotifier<Notification>>,
85}
86
87#[derive(Clone)]
88pub struct LocalValidatorClient<S>
89where
90    S: Storage,
91{
92    public_key: ValidatorPublicKey,
93    client: Arc<Mutex<LocalValidator<S>>>,
94    fault_type: FaultType,
95}
96
97impl<S> ValidatorNode for LocalValidatorClient<S>
98where
99    S: Storage + Clone + Send + Sync + 'static,
100{
101    type NotificationStream = NotificationStream;
102
103    fn address(&self) -> String {
104        format!("local:{}", self.public_key)
105    }
106
107    async fn handle_block_proposal(
108        &self,
109        proposal: BlockProposal,
110    ) -> Result<ChainInfoResponse, NodeError> {
111        self.spawn_and_receive(move |validator, sender| {
112            validator.do_handle_block_proposal(proposal, sender)
113        })
114        .await
115    }
116
117    async fn handle_lite_certificate(
118        &self,
119        certificate: LiteCertificate<'_>,
120        _delivery: CrossChainMessageDelivery,
121    ) -> Result<ChainInfoResponse, NodeError> {
122        let certificate = certificate.cloned();
123        self.spawn_and_receive(move |validator, sender| {
124            validator.do_handle_lite_certificate(certificate, sender)
125        })
126        .await
127    }
128
129    async fn handle_timeout_certificate(
130        &self,
131        certificate: GenericCertificate<Timeout>,
132    ) -> Result<ChainInfoResponse, NodeError> {
133        self.spawn_and_receive(move |validator, sender| {
134            validator.do_handle_certificate(certificate, sender)
135        })
136        .await
137    }
138
139    async fn handle_validated_certificate(
140        &self,
141        certificate: GenericCertificate<ValidatedBlock>,
142    ) -> Result<ChainInfoResponse, NodeError> {
143        self.spawn_and_receive(move |validator, sender| {
144            validator.do_handle_certificate(certificate, sender)
145        })
146        .await
147    }
148
149    async fn handle_confirmed_certificate(
150        &self,
151        certificate: GenericCertificate<ConfirmedBlock>,
152        _delivery: CrossChainMessageDelivery,
153    ) -> Result<ChainInfoResponse, NodeError> {
154        self.spawn_and_receive(move |validator, sender| {
155            validator.do_handle_certificate(certificate, sender)
156        })
157        .await
158    }
159
160    async fn handle_chain_info_query(
161        &self,
162        query: ChainInfoQuery,
163    ) -> Result<ChainInfoResponse, NodeError> {
164        self.spawn_and_receive(move |validator, sender| {
165            validator.do_handle_chain_info_query(query, sender)
166        })
167        .await
168    }
169
170    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
171        self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
172            .await
173    }
174
175    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
176        Ok(Default::default())
177    }
178
179    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
180        Ok(self
181            .client
182            .lock()
183            .await
184            .state
185            .storage_client()
186            .read_network_description()
187            .await
188            .transpose()
189            .ok_or(NodeError::ViewError {
190                error: "missing NetworkDescription".to_owned(),
191            })??)
192    }
193
194    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
195        self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
196            .await
197    }
198
199    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
200        self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
201            .await
202    }
203
204    async fn download_pending_blob(
205        &self,
206        chain_id: ChainId,
207        blob_id: BlobId,
208    ) -> Result<BlobContent, NodeError> {
209        self.spawn_and_receive(move |validator, sender| {
210            validator.do_download_pending_blob(chain_id, blob_id, sender)
211        })
212        .await
213    }
214
215    async fn handle_pending_blob(
216        &self,
217        chain_id: ChainId,
218        blob: BlobContent,
219    ) -> Result<ChainInfoResponse, NodeError> {
220        self.spawn_and_receive(move |validator, sender| {
221            validator.do_handle_pending_blob(chain_id, blob, sender)
222        })
223        .await
224    }
225
226    async fn download_certificate(
227        &self,
228        hash: CryptoHash,
229    ) -> Result<ConfirmedBlockCertificate, NodeError> {
230        self.spawn_and_receive(move |validator, sender| {
231            validator.do_download_certificate(hash, sender)
232        })
233        .await
234    }
235
236    async fn download_certificates(
237        &self,
238        hashes: Vec<CryptoHash>,
239    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
240        self.spawn_and_receive(move |validator, sender| {
241            validator.do_download_certificates(hashes, sender)
242        })
243        .await
244    }
245
246    async fn download_certificates_by_heights(
247        &self,
248        chain_id: ChainId,
249        heights: Vec<BlockHeight>,
250    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
251        self.spawn_and_receive(move |validator, sender| {
252            validator.do_download_certificates_by_heights(chain_id, heights, sender)
253        })
254        .await
255    }
256
257    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
258        self.spawn_and_receive(move |validator, sender| {
259            validator.do_blob_last_used_by(blob_id, sender)
260        })
261        .await
262    }
263
264    async fn blob_last_used_by_certificate(
265        &self,
266        blob_id: BlobId,
267    ) -> Result<ConfirmedBlockCertificate, NodeError> {
268        self.spawn_and_receive(move |validator, sender| {
269            validator.do_blob_last_used_by_certificate(blob_id, sender)
270        })
271        .await
272    }
273
274    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
275        self.spawn_and_receive(move |validator, sender| {
276            validator.do_missing_blob_ids(blob_ids, sender)
277        })
278        .await
279    }
280
281    async fn get_shard_info(
282        &self,
283        _chain_id: ChainId,
284    ) -> Result<crate::data_types::ShardInfo, NodeError> {
285        // For test purposes, return a dummy shard info
286        Ok(crate::data_types::ShardInfo {
287            shard_id: 0,
288            total_shards: 1,
289        })
290    }
291}
292
293impl<S> LocalValidatorClient<S>
294where
295    S: Storage + Clone + Send + Sync + 'static,
296{
297    fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
298        let client = LocalValidator {
299            state,
300            notifier: Arc::new(ChannelNotifier::default()),
301        };
302        Self {
303            public_key,
304            client: Arc::new(Mutex::new(client)),
305            fault_type: FaultType::Honest,
306        }
307    }
308
309    pub fn name(&self) -> ValidatorPublicKey {
310        self.public_key
311    }
312
313    fn set_fault_type(&mut self, fault_type: FaultType) {
314        self.fault_type = fault_type;
315    }
316
317    /// Obtains the basic `ChainInfo` data for the local validator chain, with chain manager values.
318    pub async fn chain_info_with_manager_values(
319        &mut self,
320        chain_id: ChainId,
321    ) -> Result<Box<ChainInfo>, NodeError> {
322        let query = ChainInfoQuery::new(chain_id).with_manager_values();
323        let response = self.handle_chain_info_query(query).await?;
324        Ok(response.info)
325    }
326
327    /// Executes the future produced by `f` in a new thread in a new Tokio runtime.
328    /// Returns the value that the future puts into the sender.
329    async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
330    where
331        T: Send + 'static,
332        R: Future<Output = Result<(), T>> + Send,
333        F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
334    {
335        let validator = self.clone();
336        let (sender, receiver) = oneshot::channel();
337        tokio::spawn(async move {
338            if f(validator, sender).await.is_err() {
339                tracing::debug!("result could not be sent");
340            }
341        });
342        receiver.await.unwrap()
343    }
344
345    async fn do_handle_block_proposal(
346        self,
347        proposal: BlockProposal,
348        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
349    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
350        let result = match self.fault_type {
351            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
352                error: "offline".to_string(),
353            }),
354            FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
355            FaultType::DontSendValidateVote
356            | FaultType::Honest
357            | FaultType::DontSendConfirmVote
358            | FaultType::DontProcessValidated => {
359                let result = self
360                    .client
361                    .lock()
362                    .await
363                    .state
364                    .handle_block_proposal(proposal)
365                    .await
366                    .map_err(Into::into);
367                if self.fault_type == FaultType::DontSendValidateVote {
368                    Err(NodeError::ClientIoError {
369                        error: "refusing to validate".to_string(),
370                    })
371                } else {
372                    result
373                }
374            }
375        };
376        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
377        sender.send(result.map(|(info, _actions)| info))
378    }
379
380    async fn do_handle_lite_certificate(
381        self,
382        certificate: LiteCertificate<'_>,
383        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
384    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
385        let client = self.client.clone();
386        let mut validator = client.lock().await;
387        let result = async move {
388            match validator.state.full_certificate(certificate).await? {
389                Either::Left(confirmed) => {
390                    self.do_handle_certificate_internal(confirmed, &mut validator)
391                        .await
392                }
393                Either::Right(validated) => {
394                    self.do_handle_certificate_internal(validated, &mut validator)
395                        .await
396                }
397            }
398        }
399        .await;
400        sender.send(result)
401    }
402
403    async fn do_handle_certificate_internal<T: ProcessableCertificate>(
404        &self,
405        certificate: GenericCertificate<T>,
406        validator: &mut MutexGuard<'_, LocalValidator<S>>,
407    ) -> Result<ChainInfoResponse, NodeError> {
408        match self.fault_type {
409            FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
410                Err(NodeError::ClientIoError {
411                    error: "refusing to process validated block".to_string(),
412                })
413            }
414            FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
415            FaultType::Honest
416            | FaultType::DontSendConfirmVote
417            | FaultType::DontProcessValidated
418            | FaultType::DontSendValidateVote => {
419                let result = validator
420                    .state
421                    .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
422                    .await
423                    .map_err(Into::into);
424                if T::KIND == CertificateKind::Validated
425                    && self.fault_type == FaultType::DontSendConfirmVote
426                {
427                    Err(NodeError::ClientIoError {
428                        error: "refusing to confirm".to_string(),
429                    })
430                } else {
431                    result
432                }
433            }
434            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
435                error: "offline".to_string(),
436            }),
437        }
438    }
439
440    async fn do_handle_certificate<T: ProcessableCertificate>(
441        self,
442        certificate: GenericCertificate<T>,
443        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
444    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
445        let mut validator = self.client.lock().await;
446        let result = self
447            .do_handle_certificate_internal(certificate, &mut validator)
448            .await;
449        sender.send(result)
450    }
451
452    async fn do_handle_chain_info_query(
453        self,
454        query: ChainInfoQuery,
455        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
456    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
457        let validator = self.client.lock().await;
458        let result = match self.fault_type {
459            FaultType::Offline => Err(NodeError::ClientIoError {
460                error: "offline".to_string(),
461            }),
462            FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
463            FaultType::Honest
464            | FaultType::DontSendConfirmVote
465            | FaultType::DontProcessValidated
466            | FaultType::DontSendValidateVote
467            | FaultType::OfflineWithInfo => validator
468                .state
469                .handle_chain_info_query(query)
470                .await
471                .map_err(Into::into),
472        };
473        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
474        sender.send(result.map(|(info, _actions)| info))
475    }
476
477    async fn do_subscribe(
478        self,
479        chains: Vec<ChainId>,
480        sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
481    ) -> Result<(), Result<NotificationStream, NodeError>> {
482        let validator = self.client.lock().await;
483        let rx = validator.notifier.subscribe(chains);
484        let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
485        sender.send(Ok(stream))
486    }
487
488    async fn do_upload_blob(
489        self,
490        content: BlobContent,
491        sender: oneshot::Sender<Result<BlobId, NodeError>>,
492    ) -> Result<(), Result<BlobId, NodeError>> {
493        let validator = self.client.lock().await;
494        let blob = Blob::new(content);
495        let id = blob.id();
496        let storage = validator.state.storage_client();
497        let result = match storage.maybe_write_blobs(&[blob]).await {
498            Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
499            Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
500            Err(error) => Err(error.into()),
501        };
502        sender.send(result)
503    }
504
505    async fn do_download_blob(
506        self,
507        blob_id: BlobId,
508        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
509    ) -> Result<(), Result<BlobContent, NodeError>> {
510        let validator = self.client.lock().await;
511        let blob = validator
512            .state
513            .storage_client()
514            .read_blob(blob_id)
515            .await
516            .map_err(Into::into);
517        let blob = match blob {
518            Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
519            Err(error) => Err(error),
520        };
521        sender.send(blob.map(|blob| blob.into_content()))
522    }
523
524    async fn do_download_pending_blob(
525        self,
526        chain_id: ChainId,
527        blob_id: BlobId,
528        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
529    ) -> Result<(), Result<BlobContent, NodeError>> {
530        let validator = self.client.lock().await;
531        let result = validator
532            .state
533            .download_pending_blob(chain_id, blob_id)
534            .await
535            .map_err(Into::into);
536        sender.send(result.map(|blob| blob.into_content()))
537    }
538
539    async fn do_handle_pending_blob(
540        self,
541        chain_id: ChainId,
542        blob: BlobContent,
543        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
544    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
545        let validator = self.client.lock().await;
546        let result = validator
547            .state
548            .handle_pending_blob(chain_id, Blob::new(blob))
549            .await
550            .map_err(Into::into);
551        sender.send(result)
552    }
553
554    async fn do_download_certificate(
555        self,
556        hash: CryptoHash,
557        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
558    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
559        let validator = self.client.lock().await;
560        let certificate = validator
561            .state
562            .storage_client()
563            .read_certificate(hash)
564            .await
565            .map_err(Into::into);
566
567        let certificate = match certificate {
568            Err(error) => Err(error),
569            Ok(entry) => match entry {
570                Some(certificate) => Ok(certificate),
571                None => {
572                    panic!("Missing certificate: {hash}");
573                }
574            },
575        };
576
577        sender.send(certificate)
578    }
579
580    async fn do_download_certificates(
581        self,
582        hashes: Vec<CryptoHash>,
583        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
584    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
585        let validator = self.client.lock().await;
586        let certificates = validator
587            .state
588            .storage_client()
589            .read_certificates(hashes.clone())
590            .await
591            .map_err(Into::into);
592
593        let certificates = match certificates {
594            Err(error) => Err(error),
595            Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
596                ResultReadCertificates::Certificates(certificates) => Ok(certificates),
597                ResultReadCertificates::InvalidHashes(hashes) => {
598                    panic!("Missing certificates: {:?}", hashes)
599                }
600            },
601        };
602
603        sender.send(certificates)
604    }
605
606    async fn do_download_certificates_by_heights(
607        self,
608        chain_id: ChainId,
609        heights: Vec<BlockHeight>,
610        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
611    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
612        // First, use do_handle_chain_info_query to get the certificate hashes
613        let (query_sender, query_receiver) = oneshot::channel();
614        let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
615
616        let self_clone = self.clone();
617        self.do_handle_chain_info_query(query, query_sender)
618            .await
619            .expect("Failed to handle chain info query");
620
621        // Get the response from the chain info query
622        let chain_info_response = query_receiver.await.map_err(|_| {
623            Err(NodeError::ClientIoError {
624                error: "Failed to receive chain info response".to_string(),
625            })
626        })?;
627
628        let hashes = match chain_info_response {
629            Ok(response) => response.info.requested_sent_certificate_hashes,
630            Err(e) => {
631                return sender.send(Err(e));
632            }
633        };
634
635        // Now use do_download_certificates to get the actual certificates
636        let (cert_sender, cert_receiver) = oneshot::channel();
637        self_clone
638            .do_download_certificates(hashes, cert_sender)
639            .await?;
640
641        // Forward the result to the original sender
642        let result = cert_receiver.await.map_err(|_| {
643            Err(NodeError::ClientIoError {
644                error: "Failed to receive certificates".to_string(),
645            })
646        })?;
647
648        sender.send(result)
649    }
650
651    async fn do_blob_last_used_by(
652        self,
653        blob_id: BlobId,
654        sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
655    ) -> Result<(), Result<CryptoHash, NodeError>> {
656        let validator = self.client.lock().await;
657        let blob_state = validator
658            .state
659            .storage_client()
660            .read_blob_state(blob_id)
661            .await
662            .map_err(Into::into);
663        let certificate_hash = match blob_state {
664            Err(err) => Err(err),
665            Ok(blob_state) => match blob_state {
666                None => Err(NodeError::BlobsNotFound(vec![blob_id])),
667                Some(blob_state) => blob_state
668                    .last_used_by
669                    .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
670            },
671        };
672
673        sender.send(certificate_hash)
674    }
675
676    async fn do_blob_last_used_by_certificate(
677        self,
678        blob_id: BlobId,
679        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
680    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
681        match self.blob_last_used_by(blob_id).await {
682            Ok(cert_hash) => {
683                let cert = self.download_certificate(cert_hash).await;
684                sender.send(cert)
685            }
686            Err(err) => sender.send(Err(err)),
687        }
688    }
689
690    async fn do_missing_blob_ids(
691        self,
692        blob_ids: Vec<BlobId>,
693        sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
694    ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
695        let validator = self.client.lock().await;
696        let missing_blob_ids = validator
697            .state
698            .storage_client()
699            .missing_blobs(&blob_ids)
700            .await
701            .map_err(Into::into);
702        sender.send(missing_blob_ids)
703    }
704}
705
706#[derive(Clone)]
707pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
708where
709    S: Storage;
710
711impl<S> NodeProvider<S>
712where
713    S: Storage + Clone,
714{
715    fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
716        self.0.lock().unwrap().clone()
717    }
718}
719
720impl<S> ValidatorNodeProvider for NodeProvider<S>
721where
722    S: Storage + Clone + Send + Sync + 'static,
723{
724    type Node = LocalValidatorClient<S>;
725
726    fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
727        unimplemented!()
728    }
729
730    fn make_nodes_from_list<A>(
731        &self,
732        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
733    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
734    where
735        A: AsRef<str>,
736    {
737        let list = self.0.lock().unwrap();
738        Ok(validators
739            .into_iter()
740            .map(|(public_key, address)| {
741                list.iter()
742                    .find(|client| client.public_key == public_key)
743                    .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
744                        address: address.as_ref().to_string(),
745                    })
746                    .map(|client| (public_key, client.clone()))
747            })
748            .collect::<Result<Vec<_>, _>>()?
749            .into_iter())
750    }
751}
752
753impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
754where
755    S: Storage,
756{
757    fn from_iter<T>(iter: T) -> Self
758    where
759        T: IntoIterator<Item = LocalValidatorClient<S>>,
760    {
761        Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
762    }
763}
764
765// NOTE:
766// * To communicate with a quorum of validators, chain clients iterate over a copy of
767// `validator_clients` to spawn I/O tasks.
768// * When using `LocalValidatorClient`, clients communicate with an exact quorum then stop.
769// * Most tests have 1 faulty validator out 4 so that there is exactly only 1 quorum to
770// communicate with.
771pub struct TestBuilder<B: StorageBuilder> {
772    storage_builder: B,
773    pub initial_committee: Committee,
774    admin_description: Option<ChainDescription>,
775    network_description: Option<NetworkDescription>,
776    genesis_storage_builder: GenesisStorageBuilder,
777    node_provider: NodeProvider<B::Storage>,
778    validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
779    chain_client_storages: Vec<B::Storage>,
780    pub chain_owners: BTreeMap<ChainId, AccountOwner>,
781    pub signer: TestSigner,
782}
783
784#[async_trait]
785pub trait StorageBuilder {
786    type Storage: Storage + Clone + Send + Sync + 'static;
787
788    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
789
790    fn clock(&self) -> &TestClock;
791}
792
793#[derive(Default)]
794struct GenesisStorageBuilder {
795    accounts: Vec<GenesisAccount>,
796}
797
798struct GenesisAccount {
799    description: ChainDescription,
800    public_key: AccountPublicKey,
801}
802
803impl GenesisStorageBuilder {
804    fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
805        self.accounts.push(GenesisAccount {
806            description,
807            public_key,
808        })
809    }
810
811    async fn build<S>(&self, storage: S) -> S
812    where
813        S: Storage + Clone + Send + Sync + 'static,
814    {
815        for account in &self.accounts {
816            storage
817                .create_chain(account.description.clone())
818                .await
819                .unwrap();
820        }
821        storage
822    }
823}
824
825pub type ChainClient<S> = crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>>>;
826
827impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
828    /// Reads the hashed certificate values in descending order from the given hash.
829    pub async fn read_confirmed_blocks_downward(
830        &self,
831        from: CryptoHash,
832        limit: u32,
833    ) -> anyhow::Result<Vec<ConfirmedBlock>> {
834        let mut hash = Some(from);
835        let mut values = Vec::new();
836        for _ in 0..limit {
837            let Some(next_hash) = hash else {
838                break;
839            };
840            let value = self.read_confirmed_block(next_hash).await?;
841            hash = value.block().header.previous_block_hash;
842            values.push(value);
843        }
844        Ok(values)
845    }
846}
847
848impl<B> TestBuilder<B>
849where
850    B: StorageBuilder,
851{
852    pub async fn new(
853        mut storage_builder: B,
854        count: usize,
855        with_faulty_validators: usize,
856        mut signer: TestSigner,
857    ) -> Result<Self, anyhow::Error> {
858        let mut validators = Vec::new();
859        for _ in 0..count {
860            let validator_keypair = ValidatorKeypair::generate();
861            let account_public_key = signer.generate_new();
862            validators.push((validator_keypair, account_public_key));
863        }
864        let for_committee = validators
865            .iter()
866            .map(|(validating, account)| (validating.public_key, *account))
867            .collect::<Vec<_>>();
868        let initial_committee = Committee::make_simple(for_committee);
869        let mut validator_clients = Vec::new();
870        let mut validator_storages = HashMap::new();
871        let mut faulty_validators = HashSet::new();
872        for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
873            let validator_public_key = validator_keypair.public_key;
874            let storage = storage_builder.build().await?;
875            let state = WorkerState::new(
876                format!("Node {}", i),
877                Some(validator_keypair.secret_key),
878                storage.clone(),
879                5_000,
880                10_000,
881            )
882            .with_allow_inactive_chains(false)
883            .with_allow_messages_from_deprecated_epochs(false);
884            let mut validator = LocalValidatorClient::new(validator_public_key, state);
885            if i < with_faulty_validators {
886                faulty_validators.insert(validator_public_key);
887                validator.set_fault_type(FaultType::NoChains);
888            }
889            validator_clients.push(validator);
890            validator_storages.insert(validator_public_key, storage);
891        }
892        tracing::info!(
893            "Test will use the following faulty validators: {:?}",
894            faulty_validators
895        );
896        Ok(Self {
897            storage_builder,
898            initial_committee,
899            admin_description: None,
900            network_description: None,
901            genesis_storage_builder: GenesisStorageBuilder::default(),
902            node_provider: NodeProvider::from_iter(validator_clients),
903            validator_storages,
904            chain_client_storages: Vec::new(),
905            chain_owners: BTreeMap::new(),
906            signer,
907        })
908    }
909
910    pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
911        let validators = self.initial_committee.validators().clone();
912        self.initial_committee = Committee::new(validators, policy);
913        self
914    }
915
916    pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
917        let mut faulty_validators = vec![];
918        let mut validator_clients = self.node_provider.0.lock().unwrap();
919        for index in indexes.as_ref() {
920            let validator = &mut validator_clients[*index];
921            validator.set_fault_type(fault_type);
922            faulty_validators.push(validator.public_key);
923        }
924        tracing::info!(
925            "Making the following validators {:?}: {:?}",
926            fault_type,
927            faulty_validators
928        );
929    }
930
931    /// Creates the root chain with the given `index`, and returns a client for it.
932    ///
933    /// Root chain 0 is the admin chain and needs to be initialized first, otherwise its balance
934    /// is automatically set to zero.
935    pub async fn add_root_chain(
936        &mut self,
937        index: u32,
938        balance: Amount,
939    ) -> anyhow::Result<ChainClient<B::Storage>> {
940        // Make sure the admin chain is initialized.
941        if self.admin_description.is_none() && index != 0 {
942            Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
943        }
944        let origin = ChainOrigin::Root(index);
945        let public_key = self.signer.generate_new();
946        let open_chain_config = InitialChainConfig {
947            ownership: ChainOwnership::single(public_key.into()),
948            epoch: Epoch(0),
949            min_active_epoch: Epoch(0),
950            max_active_epoch: Epoch(0),
951            balance,
952            application_permissions: ApplicationPermissions::default(),
953        };
954        let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
955        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
956        if index == 0 {
957            self.admin_description = Some(description.clone());
958            self.network_description = Some(NetworkDescription {
959                admin_chain_id: description.id(),
960                // dummy values to fill the description
961                genesis_config_hash: CryptoHash::test_hash("genesis config"),
962                genesis_timestamp: Timestamp::from(0),
963                genesis_committee_blob_hash: committee_blob.id().hash,
964                name: "test network".to_string(),
965            });
966        }
967        // Remember what's in the genesis store for future clients to join.
968        self.genesis_storage_builder
969            .add(description.clone(), public_key);
970
971        let network_description = self.network_description.as_ref().unwrap();
972
973        for validator in self.node_provider.all_nodes() {
974            let storage = self
975                .validator_storages
976                .get_mut(&validator.public_key)
977                .unwrap();
978            storage
979                .write_network_description(network_description)
980                .await
981                .expect("writing the NetworkDescription should succeed");
982            storage
983                .write_blob(&committee_blob)
984                .await
985                .expect("writing a blob should succeed");
986            storage.create_chain(description.clone()).await.unwrap();
987        }
988        for storage in &mut self.chain_client_storages {
989            storage.create_chain(description.clone()).await.unwrap();
990        }
991        let chain_id = description.id();
992        self.chain_owners.insert(chain_id, public_key.into());
993        self.make_client(chain_id, None, BlockHeight::ZERO).await
994    }
995
996    pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
997        let mut result = Vec::new();
998        for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
999            assert_eq!(
1000                genesis_account.description.origin(),
1001                ChainOrigin::Root(i as u32)
1002            );
1003            result.push((
1004                genesis_account.public_key,
1005                genesis_account.description.config().balance,
1006            ));
1007        }
1008        result
1009    }
1010
1011    pub fn admin_id(&self) -> ChainId {
1012        self.admin_description
1013            .as_ref()
1014            .expect("admin chain not initialized")
1015            .id()
1016    }
1017
1018    pub fn admin_description(&self) -> Option<&ChainDescription> {
1019        self.admin_description.as_ref()
1020    }
1021
1022    pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1023        self.node_provider.clone()
1024    }
1025
1026    pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1027        self.node_provider.0.lock().unwrap()[index].clone()
1028    }
1029
1030    pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1031        let storage = self.storage_builder.build().await?;
1032        let network_description = self.network_description.as_ref().unwrap();
1033        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1034        storage
1035            .write_network_description(network_description)
1036            .await
1037            .expect("writing the NetworkDescription should succeed");
1038        storage
1039            .write_blob(&committee_blob)
1040            .await
1041            .expect("writing a blob should succeed");
1042        Ok(self.genesis_storage_builder.build(storage).await)
1043    }
1044
1045    pub async fn make_client_with_options(
1046        &mut self,
1047        chain_id: ChainId,
1048        block_hash: Option<CryptoHash>,
1049        block_height: BlockHeight,
1050        options: chain_client::Options,
1051        follow_only: bool,
1052    ) -> anyhow::Result<ChainClient<B::Storage>> {
1053        // Note that new clients are only given the genesis store: they must figure out
1054        // the rest by asking validators.
1055        let storage = self.make_storage().await?;
1056        self.chain_client_storages.push(storage.clone());
1057        let client = Arc::new(Client::new(
1058            crate::environment::Impl {
1059                network: self.make_node_provider(),
1060                storage,
1061                signer: self.signer.clone(),
1062                wallet: TestWallet::default(),
1063            },
1064            self.admin_id(),
1065            false,
1066            [chain_id],
1067            format!("Client node for {:.8}", chain_id),
1068            Duration::from_secs(30),
1069            Duration::from_secs(1),
1070            options,
1071            5_000,
1072            10_000,
1073            crate::client::RequestsSchedulerConfig::default(),
1074        ));
1075        Ok(client.create_chain_client(
1076            chain_id,
1077            block_hash,
1078            block_height,
1079            None,
1080            self.chain_owners.get(&chain_id).copied(),
1081            None,
1082            follow_only,
1083        ))
1084    }
1085
1086    pub async fn make_client(
1087        &mut self,
1088        chain_id: ChainId,
1089        block_hash: Option<CryptoHash>,
1090        block_height: BlockHeight,
1091    ) -> anyhow::Result<ChainClient<B::Storage>> {
1092        self.make_client_with_options(
1093            chain_id,
1094            block_hash,
1095            block_height,
1096            chain_client::Options::test_default(),
1097            false,
1098        )
1099        .await
1100    }
1101
1102    /// Tries to find a (confirmation) certificate for the given chain_id and block height.
1103    pub async fn check_that_validators_have_certificate(
1104        &self,
1105        chain_id: ChainId,
1106        block_height: BlockHeight,
1107        target_count: usize,
1108    ) -> Option<ConfirmedBlockCertificate> {
1109        let query = ChainInfoQuery::new(chain_id)
1110            .with_sent_certificate_hashes_by_heights(vec![block_height]);
1111        let mut count = 0;
1112        let mut certificate = None;
1113        for validator in self.node_provider.all_nodes() {
1114            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1115                if response.check(validator.public_key).is_ok() {
1116                    let ChainInfo {
1117                        mut requested_sent_certificate_hashes,
1118                        ..
1119                    } = *response.info;
1120                    debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1121                    if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1122                        if let Ok(cert) = validator.download_certificate(cert_hash).await {
1123                            if cert.inner().block().header.chain_id == chain_id
1124                                && cert.inner().block().header.height == block_height
1125                            {
1126                                cert.check(&self.initial_committee).unwrap();
1127                                count += 1;
1128                                certificate = Some(cert);
1129                            }
1130                        }
1131                    }
1132                }
1133            }
1134        }
1135        assert!(count >= target_count);
1136        certificate
1137    }
1138
1139    /// Tries to find a (confirmation) certificate for the given chain_id and block height, and are
1140    /// in the expected round.
1141    pub async fn check_that_validators_are_in_round(
1142        &self,
1143        chain_id: ChainId,
1144        block_height: BlockHeight,
1145        round: Round,
1146        target_count: usize,
1147    ) {
1148        let query = ChainInfoQuery::new(chain_id);
1149        let mut count = 0;
1150        for validator in self.node_provider.all_nodes() {
1151            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1152                if response.info.manager.current_round == round
1153                    && response.info.next_block_height == block_height
1154                    && response.check(validator.public_key).is_ok()
1155                {
1156                    count += 1;
1157                }
1158            }
1159        }
1160        assert!(count >= target_count);
1161    }
1162
1163    /// Panics if any validator has a nonempty outbox for the given chain.
1164    pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1165        for validator in self.node_provider.all_nodes() {
1166            let guard = validator.client.lock().await;
1167            let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1168            assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1169        }
1170    }
1171}
1172
1173#[cfg(feature = "rocksdb")]
1174/// Limit concurrency for RocksDB tests to avoid "too many open files" errors.
1175static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1176
1177#[derive(Default)]
1178pub struct MemoryStorageBuilder {
1179    namespace: String,
1180    instance_counter: usize,
1181    wasm_runtime: Option<WasmRuntime>,
1182    clock: TestClock,
1183}
1184
1185#[async_trait]
1186impl StorageBuilder for MemoryStorageBuilder {
1187    type Storage = DbStorage<MemoryDatabase, TestClock>;
1188
1189    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1190        self.instance_counter += 1;
1191        let config = MemoryDatabase::new_test_config().await?;
1192        if self.namespace.is_empty() {
1193            self.namespace = generate_test_namespace();
1194        }
1195        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1196        Ok(
1197            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1198                .await?,
1199        )
1200    }
1201
1202    fn clock(&self) -> &TestClock {
1203        &self.clock
1204    }
1205}
1206
1207impl MemoryStorageBuilder {
1208    /// Creates a [`MemoryStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1209    /// applications.
1210    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1211        MemoryStorageBuilder {
1212            wasm_runtime: wasm_runtime.into(),
1213            ..MemoryStorageBuilder::default()
1214        }
1215    }
1216}
1217
1218#[cfg(feature = "rocksdb")]
1219pub struct RocksDbStorageBuilder {
1220    namespace: String,
1221    instance_counter: usize,
1222    wasm_runtime: Option<WasmRuntime>,
1223    clock: TestClock,
1224    _permit: SemaphorePermit<'static>,
1225}
1226
1227#[cfg(feature = "rocksdb")]
1228impl RocksDbStorageBuilder {
1229    pub async fn new() -> Self {
1230        RocksDbStorageBuilder {
1231            namespace: String::new(),
1232            instance_counter: 0,
1233            wasm_runtime: None,
1234            clock: TestClock::default(),
1235            _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1236        }
1237    }
1238
1239    /// Creates a [`RocksDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1240    /// applications.
1241    #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1242    pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1243        RocksDbStorageBuilder {
1244            wasm_runtime: wasm_runtime.into(),
1245            ..RocksDbStorageBuilder::new().await
1246        }
1247    }
1248}
1249
1250#[cfg(feature = "rocksdb")]
1251#[async_trait]
1252impl StorageBuilder for RocksDbStorageBuilder {
1253    type Storage = DbStorage<RocksDbDatabase, TestClock>;
1254
1255    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1256        self.instance_counter += 1;
1257        let config = RocksDbDatabase::new_test_config().await?;
1258        if self.namespace.is_empty() {
1259            self.namespace = generate_test_namespace();
1260        }
1261        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1262        Ok(
1263            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1264                .await?,
1265        )
1266    }
1267
1268    fn clock(&self) -> &TestClock {
1269        &self.clock
1270    }
1271}
1272
1273#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1274#[derive(Default)]
1275pub struct ServiceStorageBuilder {
1276    namespace: String,
1277    instance_counter: usize,
1278    wasm_runtime: Option<WasmRuntime>,
1279    clock: TestClock,
1280}
1281
1282#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1283impl ServiceStorageBuilder {
1284    /// Creates a `ServiceStorage`.
1285    pub fn new() -> Self {
1286        Self::with_wasm_runtime(None)
1287    }
1288
1289    /// Creates a `ServiceStorage` with the given Wasm runtime.
1290    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1291        ServiceStorageBuilder {
1292            wasm_runtime: wasm_runtime.into(),
1293            ..ServiceStorageBuilder::default()
1294        }
1295    }
1296}
1297
1298#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1299#[async_trait]
1300impl StorageBuilder for ServiceStorageBuilder {
1301    type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1302
1303    async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1304        self.instance_counter += 1;
1305        let config = StorageServiceDatabase::new_test_config().await?;
1306        if self.namespace.is_empty() {
1307            self.namespace = generate_test_namespace();
1308        }
1309        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1310        Ok(
1311            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1312                .await?,
1313        )
1314    }
1315
1316    fn clock(&self) -> &TestClock {
1317        &self.clock
1318    }
1319}
1320
1321#[cfg(feature = "dynamodb")]
1322#[derive(Default)]
1323pub struct DynamoDbStorageBuilder {
1324    namespace: String,
1325    instance_counter: usize,
1326    wasm_runtime: Option<WasmRuntime>,
1327    clock: TestClock,
1328}
1329
1330#[cfg(feature = "dynamodb")]
1331impl DynamoDbStorageBuilder {
1332    /// Creates a [`DynamoDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1333    /// applications.
1334    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1335        DynamoDbStorageBuilder {
1336            wasm_runtime: wasm_runtime.into(),
1337            ..DynamoDbStorageBuilder::default()
1338        }
1339    }
1340}
1341
1342#[cfg(feature = "dynamodb")]
1343#[async_trait]
1344impl StorageBuilder for DynamoDbStorageBuilder {
1345    type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1346
1347    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1348        self.instance_counter += 1;
1349        let config = DynamoDbDatabase::new_test_config().await?;
1350        if self.namespace.is_empty() {
1351            self.namespace = generate_test_namespace();
1352        }
1353        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1354        Ok(
1355            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1356                .await?,
1357        )
1358    }
1359
1360    fn clock(&self) -> &TestClock {
1361        &self.clock
1362    }
1363}
1364
1365#[cfg(feature = "scylladb")]
1366#[derive(Default)]
1367pub struct ScyllaDbStorageBuilder {
1368    namespace: String,
1369    instance_counter: usize,
1370    wasm_runtime: Option<WasmRuntime>,
1371    clock: TestClock,
1372}
1373
1374#[cfg(feature = "scylladb")]
1375impl ScyllaDbStorageBuilder {
1376    /// Creates a [`ScyllaDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1377    /// applications.
1378    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1379        ScyllaDbStorageBuilder {
1380            wasm_runtime: wasm_runtime.into(),
1381            ..ScyllaDbStorageBuilder::default()
1382        }
1383    }
1384}
1385
1386#[cfg(feature = "scylladb")]
1387#[async_trait]
1388impl StorageBuilder for ScyllaDbStorageBuilder {
1389    type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1390
1391    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1392        self.instance_counter += 1;
1393        let config = ScyllaDbDatabase::new_test_config().await?;
1394        if self.namespace.is_empty() {
1395            self.namespace = generate_test_namespace();
1396        }
1397        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1398        Ok(
1399            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1400                .await?,
1401        )
1402    }
1403
1404    fn clock(&self) -> &TestClock {
1405        &self.clock
1406    }
1407}
1408
1409pub trait ClientOutcomeResultExt<T, E> {
1410    fn unwrap_ok_committed(self) -> T;
1411}
1412
1413impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1414    fn unwrap_ok_committed(self) -> T {
1415        self.unwrap().unwrap()
1416    }
1417}