linera_core/unit_tests/
test_utils.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet},
6    sync::Arc,
7    time::Duration,
8    vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13    future::Either,
14    lock::{Mutex, MutexGuard},
15    Future,
16};
17use linera_base::{
18    crypto::{AccountPublicKey, CryptoHash, InMemorySigner, ValidatorKeypair, ValidatorPublicKey},
19    data_types::*,
20    identifiers::{AccountOwner, BlobId, ChainId},
21    ownership::ChainOwnership,
22};
23use linera_chain::{
24    data_types::BlockProposal,
25    types::{
26        CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27        LiteCertificate, Timeout, ValidatedBlock,
28    },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::StorageServiceDatabase;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbDatabase;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbDatabase;
39use linera_views::{
40    memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46    linera_views::rocks_db::RocksDbDatabase,
47    tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51    client::{chain_client, Client},
52    data_types::*,
53    node::{
54        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
55        ValidatorNodeProvider,
56    },
57    notifier::ChannelNotifier,
58    worker::{Notification, ProcessableCertificate, WorkerState},
59};
60
61#[derive(Debug, PartialEq, Clone, Copy)]
62pub enum FaultType {
63    Honest,
64    Offline,
65    OfflineWithInfo,
66    NoChains,
67    DontSendConfirmVote,
68    DontProcessValidated,
69    DontSendValidateVote,
70}
71
72/// A validator used for testing. "Faulty" validators ignore block proposals (but not
73/// certificates or info queries) and have the wrong initial balance for all chains.
74///
75/// All methods are executed in spawned Tokio tasks, so that canceling a client task doesn't cause
76/// the validator's tasks to be canceled: In a real network, a validator also wouldn't cancel
77/// tasks if the client stopped waiting for the response.
78struct LocalValidator<S>
79where
80    S: Storage,
81{
82    state: WorkerState<S>,
83    notifier: Arc<ChannelNotifier<Notification>>,
84}
85
86#[derive(Clone)]
87pub struct LocalValidatorClient<S>
88where
89    S: Storage,
90{
91    public_key: ValidatorPublicKey,
92    client: Arc<Mutex<LocalValidator<S>>>,
93    fault_type: FaultType,
94}
95
96impl<S> ValidatorNode for LocalValidatorClient<S>
97where
98    S: Storage + Clone + Send + Sync + 'static,
99{
100    type NotificationStream = NotificationStream;
101
102    fn address(&self) -> String {
103        format!("local:{}", self.public_key)
104    }
105
106    async fn handle_block_proposal(
107        &self,
108        proposal: BlockProposal,
109    ) -> Result<ChainInfoResponse, NodeError> {
110        self.spawn_and_receive(move |validator, sender| {
111            validator.do_handle_block_proposal(proposal, sender)
112        })
113        .await
114    }
115
116    async fn handle_lite_certificate(
117        &self,
118        certificate: LiteCertificate<'_>,
119        _delivery: CrossChainMessageDelivery,
120    ) -> Result<ChainInfoResponse, NodeError> {
121        let certificate = certificate.cloned();
122        self.spawn_and_receive(move |validator, sender| {
123            validator.do_handle_lite_certificate(certificate, sender)
124        })
125        .await
126    }
127
128    async fn handle_timeout_certificate(
129        &self,
130        certificate: GenericCertificate<Timeout>,
131    ) -> Result<ChainInfoResponse, NodeError> {
132        self.spawn_and_receive(move |validator, sender| {
133            validator.do_handle_certificate(certificate, sender)
134        })
135        .await
136    }
137
138    async fn handle_validated_certificate(
139        &self,
140        certificate: GenericCertificate<ValidatedBlock>,
141    ) -> Result<ChainInfoResponse, NodeError> {
142        self.spawn_and_receive(move |validator, sender| {
143            validator.do_handle_certificate(certificate, sender)
144        })
145        .await
146    }
147
148    async fn handle_confirmed_certificate(
149        &self,
150        certificate: GenericCertificate<ConfirmedBlock>,
151        _delivery: CrossChainMessageDelivery,
152    ) -> Result<ChainInfoResponse, NodeError> {
153        self.spawn_and_receive(move |validator, sender| {
154            validator.do_handle_certificate(certificate, sender)
155        })
156        .await
157    }
158
159    async fn handle_chain_info_query(
160        &self,
161        query: ChainInfoQuery,
162    ) -> Result<ChainInfoResponse, NodeError> {
163        self.spawn_and_receive(move |validator, sender| {
164            validator.do_handle_chain_info_query(query, sender)
165        })
166        .await
167    }
168
169    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
170        self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
171            .await
172    }
173
174    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
175        Ok(Default::default())
176    }
177
178    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
179        Ok(self
180            .client
181            .lock()
182            .await
183            .state
184            .storage_client()
185            .read_network_description()
186            .await
187            .transpose()
188            .ok_or(NodeError::ViewError {
189                error: "missing NetworkDescription".to_owned(),
190            })??)
191    }
192
193    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
194        self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
195            .await
196    }
197
198    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
199        self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
200            .await
201    }
202
203    async fn download_pending_blob(
204        &self,
205        chain_id: ChainId,
206        blob_id: BlobId,
207    ) -> Result<BlobContent, NodeError> {
208        self.spawn_and_receive(move |validator, sender| {
209            validator.do_download_pending_blob(chain_id, blob_id, sender)
210        })
211        .await
212    }
213
214    async fn handle_pending_blob(
215        &self,
216        chain_id: ChainId,
217        blob: BlobContent,
218    ) -> Result<ChainInfoResponse, NodeError> {
219        self.spawn_and_receive(move |validator, sender| {
220            validator.do_handle_pending_blob(chain_id, blob, sender)
221        })
222        .await
223    }
224
225    async fn download_certificate(
226        &self,
227        hash: CryptoHash,
228    ) -> Result<ConfirmedBlockCertificate, NodeError> {
229        self.spawn_and_receive(move |validator, sender| {
230            validator.do_download_certificate(hash, sender)
231        })
232        .await
233    }
234
235    async fn download_certificates(
236        &self,
237        hashes: Vec<CryptoHash>,
238    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
239        self.spawn_and_receive(move |validator, sender| {
240            validator.do_download_certificates(hashes, sender)
241        })
242        .await
243    }
244
245    async fn download_certificates_by_heights(
246        &self,
247        chain_id: ChainId,
248        heights: Vec<BlockHeight>,
249    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
250        self.spawn_and_receive(move |validator, sender| {
251            validator.do_download_certificates_by_heights(chain_id, heights, sender)
252        })
253        .await
254    }
255
256    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
257        self.spawn_and_receive(move |validator, sender| {
258            validator.do_blob_last_used_by(blob_id, sender)
259        })
260        .await
261    }
262
263    async fn blob_last_used_by_certificate(
264        &self,
265        blob_id: BlobId,
266    ) -> Result<ConfirmedBlockCertificate, NodeError> {
267        self.spawn_and_receive(move |validator, sender| {
268            validator.do_blob_last_used_by_certificate(blob_id, sender)
269        })
270        .await
271    }
272
273    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
274        self.spawn_and_receive(move |validator, sender| {
275            validator.do_missing_blob_ids(blob_ids, sender)
276        })
277        .await
278    }
279
280    async fn get_shard_info(
281        &self,
282        _chain_id: ChainId,
283    ) -> Result<crate::data_types::ShardInfo, NodeError> {
284        // For test purposes, return a dummy shard info
285        Ok(crate::data_types::ShardInfo {
286            shard_id: 0,
287            total_shards: 1,
288        })
289    }
290}
291
292impl<S> LocalValidatorClient<S>
293where
294    S: Storage + Clone + Send + Sync + 'static,
295{
296    fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
297        let client = LocalValidator {
298            state,
299            notifier: Arc::new(ChannelNotifier::default()),
300        };
301        Self {
302            public_key,
303            client: Arc::new(Mutex::new(client)),
304            fault_type: FaultType::Honest,
305        }
306    }
307
308    pub fn name(&self) -> ValidatorPublicKey {
309        self.public_key
310    }
311
312    fn set_fault_type(&mut self, fault_type: FaultType) {
313        self.fault_type = fault_type;
314    }
315
316    /// Obtains the basic `ChainInfo` data for the local validator chain, with chain manager values.
317    pub async fn chain_info_with_manager_values(
318        &mut self,
319        chain_id: ChainId,
320    ) -> Result<Box<ChainInfo>, NodeError> {
321        let query = ChainInfoQuery::new(chain_id).with_manager_values();
322        let response = self.handle_chain_info_query(query).await?;
323        Ok(response.info)
324    }
325
326    /// Executes the future produced by `f` in a new thread in a new Tokio runtime.
327    /// Returns the value that the future puts into the sender.
328    async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
329    where
330        T: Send + 'static,
331        R: Future<Output = Result<(), T>> + Send,
332        F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
333    {
334        let validator = self.clone();
335        let (sender, receiver) = oneshot::channel();
336        tokio::spawn(async move {
337            if f(validator, sender).await.is_err() {
338                tracing::debug!("result could not be sent");
339            }
340        });
341        receiver.await.unwrap()
342    }
343
344    async fn do_handle_block_proposal(
345        self,
346        proposal: BlockProposal,
347        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
348    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
349        let result = match self.fault_type {
350            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
351                error: "offline".to_string(),
352            }),
353            FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
354            FaultType::DontSendValidateVote
355            | FaultType::Honest
356            | FaultType::DontSendConfirmVote
357            | FaultType::DontProcessValidated => {
358                let result = self
359                    .client
360                    .lock()
361                    .await
362                    .state
363                    .handle_block_proposal(proposal)
364                    .await
365                    .map_err(Into::into);
366                if self.fault_type == FaultType::DontSendValidateVote {
367                    Err(NodeError::ClientIoError {
368                        error: "refusing to validate".to_string(),
369                    })
370                } else {
371                    result
372                }
373            }
374        };
375        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
376        sender.send(result.map(|(info, _actions)| info))
377    }
378
379    async fn do_handle_lite_certificate(
380        self,
381        certificate: LiteCertificate<'_>,
382        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
383    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
384        let client = self.client.clone();
385        let mut validator = client.lock().await;
386        let result = async move {
387            match validator.state.full_certificate(certificate).await? {
388                Either::Left(confirmed) => {
389                    self.do_handle_certificate_internal(confirmed, &mut validator)
390                        .await
391                }
392                Either::Right(validated) => {
393                    self.do_handle_certificate_internal(validated, &mut validator)
394                        .await
395                }
396            }
397        }
398        .await;
399        sender.send(result)
400    }
401
402    async fn do_handle_certificate_internal<T: ProcessableCertificate>(
403        &self,
404        certificate: GenericCertificate<T>,
405        validator: &mut MutexGuard<'_, LocalValidator<S>>,
406    ) -> Result<ChainInfoResponse, NodeError> {
407        match self.fault_type {
408            FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
409                Err(NodeError::ClientIoError {
410                    error: "refusing to process validated block".to_string(),
411                })
412            }
413            FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
414            FaultType::Honest
415            | FaultType::DontSendConfirmVote
416            | FaultType::DontProcessValidated
417            | FaultType::DontSendValidateVote => {
418                let result = validator
419                    .state
420                    .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
421                    .await
422                    .map_err(Into::into);
423                if T::KIND == CertificateKind::Validated
424                    && self.fault_type == FaultType::DontSendConfirmVote
425                {
426                    Err(NodeError::ClientIoError {
427                        error: "refusing to confirm".to_string(),
428                    })
429                } else {
430                    result
431                }
432            }
433            FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
434                error: "offline".to_string(),
435            }),
436        }
437    }
438
439    async fn do_handle_certificate<T: ProcessableCertificate>(
440        self,
441        certificate: GenericCertificate<T>,
442        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
443    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
444        let mut validator = self.client.lock().await;
445        let result = self
446            .do_handle_certificate_internal(certificate, &mut validator)
447            .await;
448        sender.send(result)
449    }
450
451    async fn do_handle_chain_info_query(
452        self,
453        query: ChainInfoQuery,
454        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
455    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
456        let validator = self.client.lock().await;
457        let result = match self.fault_type {
458            FaultType::Offline => Err(NodeError::ClientIoError {
459                error: "offline".to_string(),
460            }),
461            FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
462            FaultType::Honest
463            | FaultType::DontSendConfirmVote
464            | FaultType::DontProcessValidated
465            | FaultType::DontSendValidateVote
466            | FaultType::OfflineWithInfo => validator
467                .state
468                .handle_chain_info_query(query)
469                .await
470                .map_err(Into::into),
471        };
472        // In a local node cross-chain messages can't get lost, so we can ignore the actions here.
473        sender.send(result.map(|(info, _actions)| info))
474    }
475
476    async fn do_subscribe(
477        self,
478        chains: Vec<ChainId>,
479        sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
480    ) -> Result<(), Result<NotificationStream, NodeError>> {
481        let validator = self.client.lock().await;
482        let rx = validator.notifier.subscribe(chains);
483        let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
484        sender.send(Ok(stream))
485    }
486
487    async fn do_upload_blob(
488        self,
489        content: BlobContent,
490        sender: oneshot::Sender<Result<BlobId, NodeError>>,
491    ) -> Result<(), Result<BlobId, NodeError>> {
492        let validator = self.client.lock().await;
493        let blob = Blob::new(content);
494        let id = blob.id();
495        let storage = validator.state.storage_client();
496        let result = match storage.maybe_write_blobs(&[blob]).await {
497            Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
498            Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
499            Err(error) => Err(error.into()),
500        };
501        sender.send(result)
502    }
503
504    async fn do_download_blob(
505        self,
506        blob_id: BlobId,
507        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
508    ) -> Result<(), Result<BlobContent, NodeError>> {
509        let validator = self.client.lock().await;
510        let blob = validator
511            .state
512            .storage_client()
513            .read_blob(blob_id)
514            .await
515            .map_err(Into::into);
516        let blob = match blob {
517            Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
518            Err(error) => Err(error),
519        };
520        sender.send(blob.map(|blob| blob.into_content()))
521    }
522
523    async fn do_download_pending_blob(
524        self,
525        chain_id: ChainId,
526        blob_id: BlobId,
527        sender: oneshot::Sender<Result<BlobContent, NodeError>>,
528    ) -> Result<(), Result<BlobContent, NodeError>> {
529        let validator = self.client.lock().await;
530        let result = validator
531            .state
532            .download_pending_blob(chain_id, blob_id)
533            .await
534            .map_err(Into::into);
535        sender.send(result.map(|blob| blob.into_content()))
536    }
537
538    async fn do_handle_pending_blob(
539        self,
540        chain_id: ChainId,
541        blob: BlobContent,
542        sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
543    ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
544        let validator = self.client.lock().await;
545        let result = validator
546            .state
547            .handle_pending_blob(chain_id, Blob::new(blob))
548            .await
549            .map_err(Into::into);
550        sender.send(result)
551    }
552
553    async fn do_download_certificate(
554        self,
555        hash: CryptoHash,
556        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
557    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
558        let validator = self.client.lock().await;
559        let certificate = validator
560            .state
561            .storage_client()
562            .read_certificate(hash)
563            .await
564            .map_err(Into::into);
565
566        let certificate = match certificate {
567            Err(error) => Err(error),
568            Ok(entry) => match entry {
569                Some(certificate) => Ok(certificate),
570                None => {
571                    panic!("Missing certificate: {hash}");
572                }
573            },
574        };
575
576        sender.send(certificate)
577    }
578
579    async fn do_download_certificates(
580        self,
581        hashes: Vec<CryptoHash>,
582        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
583    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
584        let validator = self.client.lock().await;
585        let certificates = validator
586            .state
587            .storage_client()
588            .read_certificates(hashes.clone())
589            .await
590            .map_err(Into::into);
591
592        let certificates = match certificates {
593            Err(error) => Err(error),
594            Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
595                ResultReadCertificates::Certificates(certificates) => Ok(certificates),
596                ResultReadCertificates::InvalidHashes(hashes) => {
597                    panic!("Missing certificates: {:?}", hashes)
598                }
599            },
600        };
601
602        sender.send(certificates)
603    }
604
605    async fn do_download_certificates_by_heights(
606        self,
607        chain_id: ChainId,
608        heights: Vec<BlockHeight>,
609        sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
610    ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
611        // First, use do_handle_chain_info_query to get the certificate hashes
612        let (query_sender, query_receiver) = oneshot::channel();
613        let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
614
615        let self_clone = self.clone();
616        self.do_handle_chain_info_query(query, query_sender)
617            .await
618            .expect("Failed to handle chain info query");
619
620        // Get the response from the chain info query
621        let chain_info_response = query_receiver.await.map_err(|_| {
622            Err(NodeError::ClientIoError {
623                error: "Failed to receive chain info response".to_string(),
624            })
625        })?;
626
627        let hashes = match chain_info_response {
628            Ok(response) => response.info.requested_sent_certificate_hashes,
629            Err(e) => {
630                return sender.send(Err(e));
631            }
632        };
633
634        // Now use do_download_certificates to get the actual certificates
635        let (cert_sender, cert_receiver) = oneshot::channel();
636        self_clone
637            .do_download_certificates(hashes, cert_sender)
638            .await?;
639
640        // Forward the result to the original sender
641        let result = cert_receiver.await.map_err(|_| {
642            Err(NodeError::ClientIoError {
643                error: "Failed to receive certificates".to_string(),
644            })
645        })?;
646
647        sender.send(result)
648    }
649
650    async fn do_blob_last_used_by(
651        self,
652        blob_id: BlobId,
653        sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
654    ) -> Result<(), Result<CryptoHash, NodeError>> {
655        let validator = self.client.lock().await;
656        let blob_state = validator
657            .state
658            .storage_client()
659            .read_blob_state(blob_id)
660            .await
661            .map_err(Into::into);
662        let certificate_hash = match blob_state {
663            Err(err) => Err(err),
664            Ok(blob_state) => match blob_state {
665                None => Err(NodeError::BlobsNotFound(vec![blob_id])),
666                Some(blob_state) => blob_state
667                    .last_used_by
668                    .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
669            },
670        };
671
672        sender.send(certificate_hash)
673    }
674
675    async fn do_blob_last_used_by_certificate(
676        self,
677        blob_id: BlobId,
678        sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
679    ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
680        match self.blob_last_used_by(blob_id).await {
681            Ok(cert_hash) => {
682                let cert = self.download_certificate(cert_hash).await;
683                sender.send(cert)
684            }
685            Err(err) => sender.send(Err(err)),
686        }
687    }
688
689    async fn do_missing_blob_ids(
690        self,
691        blob_ids: Vec<BlobId>,
692        sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
693    ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
694        let validator = self.client.lock().await;
695        let missing_blob_ids = validator
696            .state
697            .storage_client()
698            .missing_blobs(&blob_ids)
699            .await
700            .map_err(Into::into);
701        sender.send(missing_blob_ids)
702    }
703}
704
705#[derive(Clone)]
706pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
707where
708    S: Storage;
709
710impl<S> NodeProvider<S>
711where
712    S: Storage + Clone,
713{
714    fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
715        self.0.lock().unwrap().clone()
716    }
717}
718
719impl<S> ValidatorNodeProvider for NodeProvider<S>
720where
721    S: Storage + Clone + Send + Sync + 'static,
722{
723    type Node = LocalValidatorClient<S>;
724
725    fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
726        unimplemented!()
727    }
728
729    fn make_nodes_from_list<A>(
730        &self,
731        validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
732    ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
733    where
734        A: AsRef<str>,
735    {
736        let list = self.0.lock().unwrap();
737        Ok(validators
738            .into_iter()
739            .map(|(public_key, address)| {
740                list.iter()
741                    .find(|client| client.public_key == public_key)
742                    .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
743                        address: address.as_ref().to_string(),
744                    })
745                    .map(|client| (public_key, client.clone()))
746            })
747            .collect::<Result<Vec<_>, _>>()?
748            .into_iter())
749    }
750}
751
752impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
753where
754    S: Storage,
755{
756    fn from_iter<T>(iter: T) -> Self
757    where
758        T: IntoIterator<Item = LocalValidatorClient<S>>,
759    {
760        Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
761    }
762}
763
764// NOTE:
765// * To communicate with a quorum of validators, chain clients iterate over a copy of
766// `validator_clients` to spawn I/O tasks.
767// * When using `LocalValidatorClient`, clients communicate with an exact quorum then stop.
768// * Most tests have 1 faulty validator out 4 so that there is exactly only 1 quorum to
769// communicate with.
770pub struct TestBuilder<B: StorageBuilder> {
771    storage_builder: B,
772    pub initial_committee: Committee,
773    admin_description: Option<ChainDescription>,
774    network_description: Option<NetworkDescription>,
775    genesis_storage_builder: GenesisStorageBuilder,
776    node_provider: NodeProvider<B::Storage>,
777    validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
778    chain_client_storages: Vec<B::Storage>,
779    pub chain_owners: BTreeMap<ChainId, AccountOwner>,
780    pub signer: InMemorySigner,
781}
782
783#[async_trait]
784pub trait StorageBuilder {
785    type Storage: Storage + Clone + Send + Sync + 'static;
786
787    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
788
789    fn clock(&self) -> &TestClock;
790}
791
792#[derive(Default)]
793struct GenesisStorageBuilder {
794    accounts: Vec<GenesisAccount>,
795}
796
797struct GenesisAccount {
798    description: ChainDescription,
799    public_key: AccountPublicKey,
800}
801
802impl GenesisStorageBuilder {
803    fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
804        self.accounts.push(GenesisAccount {
805            description,
806            public_key,
807        })
808    }
809
810    async fn build<S>(&self, storage: S) -> S
811    where
812        S: Storage + Clone + Send + Sync + 'static,
813    {
814        for account in &self.accounts {
815            storage
816                .create_chain(account.description.clone())
817                .await
818                .unwrap();
819        }
820        storage
821    }
822}
823
824pub type ChainClient<S> =
825    crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>, InMemorySigner>>;
826
827impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
828    /// Reads the hashed certificate values in descending order from the given hash.
829    pub async fn read_confirmed_blocks_downward(
830        &self,
831        from: CryptoHash,
832        limit: u32,
833    ) -> anyhow::Result<Vec<ConfirmedBlock>> {
834        let mut hash = Some(from);
835        let mut values = Vec::new();
836        for _ in 0..limit {
837            let Some(next_hash) = hash else {
838                break;
839            };
840            let value = self.read_confirmed_block(next_hash).await?;
841            hash = value.block().header.previous_block_hash;
842            values.push(value);
843        }
844        Ok(values)
845    }
846}
847
848impl<B> TestBuilder<B>
849where
850    B: StorageBuilder,
851{
852    pub async fn new(
853        mut storage_builder: B,
854        count: usize,
855        with_faulty_validators: usize,
856        mut signer: InMemorySigner,
857    ) -> Result<Self, anyhow::Error> {
858        let mut validators = Vec::new();
859        for _ in 0..count {
860            let validator_keypair = ValidatorKeypair::generate();
861            let account_public_key = signer.generate_new();
862            validators.push((validator_keypair, account_public_key));
863        }
864        let for_committee = validators
865            .iter()
866            .map(|(validating, account)| (validating.public_key, *account))
867            .collect::<Vec<_>>();
868        let initial_committee = Committee::make_simple(for_committee);
869        let mut validator_clients = Vec::new();
870        let mut validator_storages = HashMap::new();
871        let mut faulty_validators = HashSet::new();
872        for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
873            let validator_public_key = validator_keypair.public_key;
874            let storage = storage_builder.build().await?;
875            let state = WorkerState::new(
876                format!("Node {}", i),
877                Some(validator_keypair.secret_key),
878                storage.clone(),
879                5_000,
880                10_000,
881            )
882            .with_allow_inactive_chains(false)
883            .with_allow_messages_from_deprecated_epochs(false);
884            let mut validator = LocalValidatorClient::new(validator_public_key, state);
885            if i < with_faulty_validators {
886                faulty_validators.insert(validator_public_key);
887                validator.set_fault_type(FaultType::NoChains);
888            }
889            validator_clients.push(validator);
890            validator_storages.insert(validator_public_key, storage);
891        }
892        tracing::info!(
893            "Test will use the following faulty validators: {:?}",
894            faulty_validators
895        );
896        Ok(Self {
897            storage_builder,
898            initial_committee,
899            admin_description: None,
900            network_description: None,
901            genesis_storage_builder: GenesisStorageBuilder::default(),
902            node_provider: NodeProvider::from_iter(validator_clients),
903            validator_storages,
904            chain_client_storages: Vec::new(),
905            chain_owners: BTreeMap::new(),
906            signer,
907        })
908    }
909
910    pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
911        let validators = self.initial_committee.validators().clone();
912        self.initial_committee = Committee::new(validators, policy);
913        self
914    }
915
916    pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
917        let mut faulty_validators = vec![];
918        let mut validator_clients = self.node_provider.0.lock().unwrap();
919        for index in indexes.as_ref() {
920            let validator = &mut validator_clients[*index];
921            validator.set_fault_type(fault_type);
922            faulty_validators.push(validator.public_key);
923        }
924        tracing::info!(
925            "Making the following validators {:?}: {:?}",
926            fault_type,
927            faulty_validators
928        );
929    }
930
931    /// Creates the root chain with the given `index`, and returns a client for it.
932    ///
933    /// Root chain 0 is the admin chain and needs to be initialized first, otherwise its balance
934    /// is automatically set to zero.
935    pub async fn add_root_chain(
936        &mut self,
937        index: u32,
938        balance: Amount,
939    ) -> anyhow::Result<ChainClient<B::Storage>> {
940        // Make sure the admin chain is initialized.
941        if self.admin_description.is_none() && index != 0 {
942            Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
943        }
944        let origin = ChainOrigin::Root(index);
945        let public_key = self.signer.generate_new();
946        let open_chain_config = InitialChainConfig {
947            ownership: ChainOwnership::single(public_key.into()),
948            epoch: Epoch(0),
949            min_active_epoch: Epoch(0),
950            max_active_epoch: Epoch(0),
951            balance,
952            application_permissions: ApplicationPermissions::default(),
953        };
954        let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
955        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
956        if index == 0 {
957            self.admin_description = Some(description.clone());
958            self.network_description = Some(NetworkDescription {
959                admin_chain_id: description.id(),
960                // dummy values to fill the description
961                genesis_config_hash: CryptoHash::test_hash("genesis config"),
962                genesis_timestamp: Timestamp::from(0),
963                genesis_committee_blob_hash: committee_blob.id().hash,
964                name: "test network".to_string(),
965            });
966        }
967        // Remember what's in the genesis store for future clients to join.
968        self.genesis_storage_builder
969            .add(description.clone(), public_key);
970
971        let network_description = self.network_description.as_ref().unwrap();
972
973        for validator in self.node_provider.all_nodes() {
974            let storage = self
975                .validator_storages
976                .get_mut(&validator.public_key)
977                .unwrap();
978            storage
979                .write_network_description(network_description)
980                .await
981                .expect("writing the NetworkDescription should succeed");
982            storage
983                .write_blob(&committee_blob)
984                .await
985                .expect("writing a blob should succeed");
986            storage.create_chain(description.clone()).await.unwrap();
987        }
988        for storage in &mut self.chain_client_storages {
989            storage.create_chain(description.clone()).await.unwrap();
990        }
991        let chain_id = description.id();
992        self.chain_owners.insert(chain_id, public_key.into());
993        self.make_client(chain_id, None, BlockHeight::ZERO).await
994    }
995
996    pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
997        let mut result = Vec::new();
998        for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
999            assert_eq!(
1000                genesis_account.description.origin(),
1001                ChainOrigin::Root(i as u32)
1002            );
1003            result.push((
1004                genesis_account.public_key,
1005                genesis_account.description.config().balance,
1006            ));
1007        }
1008        result
1009    }
1010
1011    pub fn admin_id(&self) -> ChainId {
1012        self.admin_description
1013            .as_ref()
1014            .expect("admin chain not initialized")
1015            .id()
1016    }
1017
1018    pub fn admin_description(&self) -> Option<&ChainDescription> {
1019        self.admin_description.as_ref()
1020    }
1021
1022    pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1023        self.node_provider.clone()
1024    }
1025
1026    pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1027        self.node_provider.0.lock().unwrap()[index].clone()
1028    }
1029
1030    pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1031        let storage = self.storage_builder.build().await?;
1032        let network_description = self.network_description.as_ref().unwrap();
1033        let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1034        storage
1035            .write_network_description(network_description)
1036            .await
1037            .expect("writing the NetworkDescription should succeed");
1038        storage
1039            .write_blob(&committee_blob)
1040            .await
1041            .expect("writing a blob should succeed");
1042        Ok(self.genesis_storage_builder.build(storage).await)
1043    }
1044
1045    pub async fn make_client_with_options(
1046        &mut self,
1047        chain_id: ChainId,
1048        block_hash: Option<CryptoHash>,
1049        block_height: BlockHeight,
1050        options: chain_client::Options,
1051    ) -> anyhow::Result<ChainClient<B::Storage>> {
1052        // Note that new clients are only given the genesis store: they must figure out
1053        // the rest by asking validators.
1054        let storage = self.make_storage().await?;
1055        self.chain_client_storages.push(storage.clone());
1056        let client = Arc::new(Client::new(
1057            crate::environment::Impl {
1058                network: self.make_node_provider(),
1059                storage,
1060                signer: self.signer.clone(),
1061            },
1062            self.admin_id(),
1063            false,
1064            [chain_id],
1065            format!("Client node for {:.8}", chain_id),
1066            Duration::from_secs(30),
1067            Duration::from_secs(1),
1068            options,
1069            5_000,
1070            10_000,
1071            crate::client::RequestsSchedulerConfig::default(),
1072        ));
1073        Ok(client.create_chain_client(
1074            chain_id,
1075            block_hash,
1076            block_height,
1077            None,
1078            self.chain_owners.get(&chain_id).copied(),
1079            None,
1080        ))
1081    }
1082
1083    pub async fn make_client(
1084        &mut self,
1085        chain_id: ChainId,
1086        block_hash: Option<CryptoHash>,
1087        block_height: BlockHeight,
1088    ) -> anyhow::Result<ChainClient<B::Storage>> {
1089        self.make_client_with_options(
1090            chain_id,
1091            block_hash,
1092            block_height,
1093            chain_client::Options::test_default(),
1094        )
1095        .await
1096    }
1097
1098    /// Tries to find a (confirmation) certificate for the given chain_id and block height.
1099    pub async fn check_that_validators_have_certificate(
1100        &self,
1101        chain_id: ChainId,
1102        block_height: BlockHeight,
1103        target_count: usize,
1104    ) -> Option<ConfirmedBlockCertificate> {
1105        let query = ChainInfoQuery::new(chain_id)
1106            .with_sent_certificate_hashes_by_heights(vec![block_height]);
1107        let mut count = 0;
1108        let mut certificate = None;
1109        for validator in self.node_provider.all_nodes() {
1110            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1111                if response.check(validator.public_key).is_ok() {
1112                    let ChainInfo {
1113                        mut requested_sent_certificate_hashes,
1114                        ..
1115                    } = *response.info;
1116                    debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1117                    if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1118                        if let Ok(cert) = validator.download_certificate(cert_hash).await {
1119                            if cert.inner().block().header.chain_id == chain_id
1120                                && cert.inner().block().header.height == block_height
1121                            {
1122                                cert.check(&self.initial_committee).unwrap();
1123                                count += 1;
1124                                certificate = Some(cert);
1125                            }
1126                        }
1127                    }
1128                }
1129            }
1130        }
1131        assert!(count >= target_count);
1132        certificate
1133    }
1134
1135    /// Tries to find a (confirmation) certificate for the given chain_id and block height, and are
1136    /// in the expected round.
1137    pub async fn check_that_validators_are_in_round(
1138        &self,
1139        chain_id: ChainId,
1140        block_height: BlockHeight,
1141        round: Round,
1142        target_count: usize,
1143    ) {
1144        let query = ChainInfoQuery::new(chain_id);
1145        let mut count = 0;
1146        for validator in self.node_provider.all_nodes() {
1147            if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1148                if response.info.manager.current_round == round
1149                    && response.info.next_block_height == block_height
1150                    && response.check(validator.public_key).is_ok()
1151                {
1152                    count += 1;
1153                }
1154            }
1155        }
1156        assert!(count >= target_count);
1157    }
1158
1159    /// Panics if any validator has a nonempty outbox for the given chain.
1160    pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1161        for validator in self.node_provider.all_nodes() {
1162            let guard = validator.client.lock().await;
1163            let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1164            assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1165        }
1166    }
1167}
1168
1169#[cfg(feature = "rocksdb")]
1170/// Limit concurrency for RocksDB tests to avoid "too many open files" errors.
1171static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1172
1173#[derive(Default)]
1174pub struct MemoryStorageBuilder {
1175    namespace: String,
1176    instance_counter: usize,
1177    wasm_runtime: Option<WasmRuntime>,
1178    clock: TestClock,
1179}
1180
1181#[async_trait]
1182impl StorageBuilder for MemoryStorageBuilder {
1183    type Storage = DbStorage<MemoryDatabase, TestClock>;
1184
1185    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1186        self.instance_counter += 1;
1187        let config = MemoryDatabase::new_test_config().await?;
1188        if self.namespace.is_empty() {
1189            self.namespace = generate_test_namespace();
1190        }
1191        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1192        Ok(
1193            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1194                .await?,
1195        )
1196    }
1197
1198    fn clock(&self) -> &TestClock {
1199        &self.clock
1200    }
1201}
1202
1203impl MemoryStorageBuilder {
1204    /// Creates a [`MemoryStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1205    /// applications.
1206    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1207        MemoryStorageBuilder {
1208            wasm_runtime: wasm_runtime.into(),
1209            ..MemoryStorageBuilder::default()
1210        }
1211    }
1212}
1213
1214#[cfg(feature = "rocksdb")]
1215pub struct RocksDbStorageBuilder {
1216    namespace: String,
1217    instance_counter: usize,
1218    wasm_runtime: Option<WasmRuntime>,
1219    clock: TestClock,
1220    _permit: SemaphorePermit<'static>,
1221}
1222
1223#[cfg(feature = "rocksdb")]
1224impl RocksDbStorageBuilder {
1225    pub async fn new() -> Self {
1226        RocksDbStorageBuilder {
1227            namespace: String::new(),
1228            instance_counter: 0,
1229            wasm_runtime: None,
1230            clock: TestClock::default(),
1231            _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1232        }
1233    }
1234
1235    /// Creates a [`RocksDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1236    /// applications.
1237    #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1238    pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1239        RocksDbStorageBuilder {
1240            wasm_runtime: wasm_runtime.into(),
1241            ..RocksDbStorageBuilder::new().await
1242        }
1243    }
1244}
1245
1246#[cfg(feature = "rocksdb")]
1247#[async_trait]
1248impl StorageBuilder for RocksDbStorageBuilder {
1249    type Storage = DbStorage<RocksDbDatabase, TestClock>;
1250
1251    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1252        self.instance_counter += 1;
1253        let config = RocksDbDatabase::new_test_config().await?;
1254        if self.namespace.is_empty() {
1255            self.namespace = generate_test_namespace();
1256        }
1257        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1258        Ok(
1259            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1260                .await?,
1261        )
1262    }
1263
1264    fn clock(&self) -> &TestClock {
1265        &self.clock
1266    }
1267}
1268
1269#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1270#[derive(Default)]
1271pub struct ServiceStorageBuilder {
1272    namespace: String,
1273    instance_counter: usize,
1274    wasm_runtime: Option<WasmRuntime>,
1275    clock: TestClock,
1276}
1277
1278#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1279impl ServiceStorageBuilder {
1280    /// Creates a `ServiceStorage`.
1281    pub fn new() -> Self {
1282        Self::with_wasm_runtime(None)
1283    }
1284
1285    /// Creates a `ServiceStorage` with the given Wasm runtime.
1286    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1287        ServiceStorageBuilder {
1288            wasm_runtime: wasm_runtime.into(),
1289            ..ServiceStorageBuilder::default()
1290        }
1291    }
1292}
1293
1294#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1295#[async_trait]
1296impl StorageBuilder for ServiceStorageBuilder {
1297    type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1298
1299    async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1300        self.instance_counter += 1;
1301        let config = StorageServiceDatabase::new_test_config().await?;
1302        if self.namespace.is_empty() {
1303            self.namespace = generate_test_namespace();
1304        }
1305        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1306        Ok(
1307            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1308                .await?,
1309        )
1310    }
1311
1312    fn clock(&self) -> &TestClock {
1313        &self.clock
1314    }
1315}
1316
1317#[cfg(feature = "dynamodb")]
1318#[derive(Default)]
1319pub struct DynamoDbStorageBuilder {
1320    namespace: String,
1321    instance_counter: usize,
1322    wasm_runtime: Option<WasmRuntime>,
1323    clock: TestClock,
1324}
1325
1326#[cfg(feature = "dynamodb")]
1327impl DynamoDbStorageBuilder {
1328    /// Creates a [`DynamoDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1329    /// applications.
1330    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1331        DynamoDbStorageBuilder {
1332            wasm_runtime: wasm_runtime.into(),
1333            ..DynamoDbStorageBuilder::default()
1334        }
1335    }
1336}
1337
1338#[cfg(feature = "dynamodb")]
1339#[async_trait]
1340impl StorageBuilder for DynamoDbStorageBuilder {
1341    type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1342
1343    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1344        self.instance_counter += 1;
1345        let config = DynamoDbDatabase::new_test_config().await?;
1346        if self.namespace.is_empty() {
1347            self.namespace = generate_test_namespace();
1348        }
1349        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1350        Ok(
1351            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1352                .await?,
1353        )
1354    }
1355
1356    fn clock(&self) -> &TestClock {
1357        &self.clock
1358    }
1359}
1360
1361#[cfg(feature = "scylladb")]
1362#[derive(Default)]
1363pub struct ScyllaDbStorageBuilder {
1364    namespace: String,
1365    instance_counter: usize,
1366    wasm_runtime: Option<WasmRuntime>,
1367    clock: TestClock,
1368}
1369
1370#[cfg(feature = "scylladb")]
1371impl ScyllaDbStorageBuilder {
1372    /// Creates a [`ScyllaDbStorageBuilder`] that uses the specified [`WasmRuntime`] to run Wasm
1373    /// applications.
1374    pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1375        ScyllaDbStorageBuilder {
1376            wasm_runtime: wasm_runtime.into(),
1377            ..ScyllaDbStorageBuilder::default()
1378        }
1379    }
1380}
1381
1382#[cfg(feature = "scylladb")]
1383#[async_trait]
1384impl StorageBuilder for ScyllaDbStorageBuilder {
1385    type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1386
1387    async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1388        self.instance_counter += 1;
1389        let config = ScyllaDbDatabase::new_test_config().await?;
1390        if self.namespace.is_empty() {
1391            self.namespace = generate_test_namespace();
1392        }
1393        let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1394        Ok(
1395            DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1396                .await?,
1397        )
1398    }
1399
1400    fn clock(&self) -> &TestClock {
1401        &self.clock
1402    }
1403}
1404
1405pub trait ClientOutcomeResultExt<T, E> {
1406    fn unwrap_ok_committed(self) -> T;
1407}
1408
1409impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1410    fn unwrap_ok_committed(self) -> T {
1411        self.unwrap().unwrap()
1412    }
1413}