1#![allow(clippy::cast_possible_truncation)]
5
6use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 sync::Arc,
9 time::Duration,
10 vec,
11};
12
13use async_trait::async_trait;
14use futures::{
15 future::Either,
16 lock::{Mutex, MutexGuard},
17 Future,
18};
19use linera_base::{
20 crypto::{AccountPublicKey, CryptoHash, ValidatorKeypair, ValidatorPublicKey},
21 data_types::*,
22 identifiers::{AccountOwner, BlobId, ChainId, EventId},
23 ownership::ChainOwnership,
24};
25use linera_chain::{
26 data_types::BlockProposal,
27 types::{
28 CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
29 LiteCertificate, Timeout, ValidatedBlock,
30 },
31};
32use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
33use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
34#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
35use linera_storage_service::client::StorageServiceDatabase;
36use linera_version::VersionInfo;
37#[cfg(feature = "dynamodb")]
38use linera_views::dynamo_db::DynamoDbDatabase;
39#[cfg(feature = "scylladb")]
40use linera_views::scylla_db::ScyllaDbDatabase;
41use linera_views::{
42 memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
43};
44use tokio::sync::oneshot;
45use tokio_stream::wrappers::UnboundedReceiverStream;
46#[cfg(feature = "rocksdb")]
47use {
48 linera_views::rocks_db::RocksDbDatabase,
49 tokio::sync::{Semaphore, SemaphorePermit},
50};
51
52use crate::{
53 chain_worker::ChainWorkerConfig,
54 client::{chain_client, Client},
55 data_types::*,
56 environment::{TestSigner, TestWallet},
57 node::{
58 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
59 ValidatorNodeProvider,
60 },
61 notifier::ChannelNotifier,
62 worker::{
63 Notification, ProcessableCertificate, WorkerState, DEFAULT_BLOCK_CACHE_SIZE,
64 DEFAULT_EXECUTION_STATE_CACHE_SIZE,
65 },
66};
67
68#[derive(Debug, PartialEq, Clone, Copy)]
69pub enum FaultType {
70 Honest,
71 Offline,
72 OfflineWithInfo,
73 NoChains,
74 DontSendConfirmVote,
75 DontProcessValidated,
76 DontSendValidateVote,
77}
78
79struct LocalValidator<S>
86where
87 S: Storage,
88{
89 state: WorkerState<S>,
90 notifier: Arc<ChannelNotifier<Notification>>,
91}
92
93#[derive(Clone)]
94pub struct LocalValidatorClient<S>
95where
96 S: Storage,
97{
98 public_key: ValidatorPublicKey,
99 client: Arc<Mutex<LocalValidator<S>>>,
100 fault_type: FaultType,
101}
102
103impl<S> ValidatorNode for LocalValidatorClient<S>
104where
105 S: Storage + Clone + Send + Sync + 'static,
106{
107 type NotificationStream = NotificationStream;
108
109 fn address(&self) -> String {
110 format!("local:{}", self.public_key)
111 }
112
113 async fn handle_block_proposal(
114 &self,
115 proposal: BlockProposal,
116 ) -> Result<ChainInfoResponse, NodeError> {
117 self.spawn_and_receive(move |validator, sender| {
118 validator.do_handle_block_proposal(proposal, sender)
119 })
120 .await
121 }
122
123 async fn handle_lite_certificate(
124 &self,
125 certificate: LiteCertificate<'_>,
126 _delivery: CrossChainMessageDelivery,
127 ) -> Result<ChainInfoResponse, NodeError> {
128 let certificate = certificate.cloned();
129 self.spawn_and_receive(move |validator, sender| {
130 validator.do_handle_lite_certificate(certificate, sender)
131 })
132 .await
133 }
134
135 async fn handle_timeout_certificate(
136 &self,
137 certificate: GenericCertificate<Timeout>,
138 ) -> Result<ChainInfoResponse, NodeError> {
139 self.spawn_and_receive(move |validator, sender| {
140 validator.do_handle_certificate(certificate, sender)
141 })
142 .await
143 }
144
145 async fn handle_validated_certificate(
146 &self,
147 certificate: GenericCertificate<ValidatedBlock>,
148 ) -> Result<ChainInfoResponse, NodeError> {
149 self.spawn_and_receive(move |validator, sender| {
150 validator.do_handle_certificate(certificate, sender)
151 })
152 .await
153 }
154
155 async fn handle_confirmed_certificate(
156 &self,
157 certificate: Arc<GenericCertificate<ConfirmedBlock>>,
158 _delivery: CrossChainMessageDelivery,
159 ) -> Result<ChainInfoResponse, NodeError> {
160 self.spawn_and_receive(move |validator, sender| {
161 validator.do_handle_certificate(Arc::unwrap_or_clone(certificate), sender)
162 })
163 .await
164 }
165
166 async fn handle_chain_info_query(
167 &self,
168 query: ChainInfoQuery,
169 ) -> Result<ChainInfoResponse, NodeError> {
170 self.spawn_and_receive(move |validator, sender| {
171 validator.do_handle_chain_info_query(query, sender)
172 })
173 .await
174 }
175
176 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
177 self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
178 .await
179 }
180
181 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
182 Ok(Default::default())
183 }
184
185 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
186 Ok(self
187 .client
188 .lock()
189 .await
190 .state
191 .storage_client()
192 .read_network_description()
193 .await
194 .transpose()
195 .ok_or_else(|| NodeError::ViewError {
196 error: "missing NetworkDescription".to_owned(),
197 })??)
198 }
199
200 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
201 self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
202 .await
203 }
204
205 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
206 self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
207 .await
208 }
209
210 async fn download_blobs(
211 &self,
212 blob_ids: Vec<BlobId>,
213 ) -> Result<crate::node::BlobStream, NodeError> {
214 let this = self.clone();
215 let stream = futures::stream::unfold(blob_ids.into_iter(), move |mut iter| {
216 let this = this.clone();
217 async move {
218 let blob_id = iter.next()?;
219 let result = this.download_blob(blob_id).await;
220 Some((result, iter))
221 }
222 });
223 Ok(Box::pin(stream))
224 }
225
226 async fn download_pending_blob(
227 &self,
228 chain_id: ChainId,
229 blob_id: BlobId,
230 ) -> Result<BlobContent, NodeError> {
231 self.spawn_and_receive(move |validator, sender| {
232 validator.do_download_pending_blob(chain_id, blob_id, sender)
233 })
234 .await
235 }
236
237 async fn handle_pending_blob(
238 &self,
239 chain_id: ChainId,
240 blob: BlobContent,
241 ) -> Result<ChainInfoResponse, NodeError> {
242 self.spawn_and_receive(move |validator, sender| {
243 validator.do_handle_pending_blob(chain_id, blob, sender)
244 })
245 .await
246 }
247
248 async fn download_certificate(
249 &self,
250 hash: CryptoHash,
251 ) -> Result<ConfirmedBlockCertificate, NodeError> {
252 self.spawn_and_receive(move |validator, sender| {
253 validator.do_download_certificate(hash, sender)
254 })
255 .await
256 }
257
258 async fn download_certificates(
259 &self,
260 hashes: Vec<CryptoHash>,
261 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
262 self.spawn_and_receive(move |validator, sender| {
263 validator.do_download_certificates(hashes, sender)
264 })
265 .await
266 }
267
268 async fn download_certificates_by_heights(
269 &self,
270 chain_id: ChainId,
271 heights: Vec<BlockHeight>,
272 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
273 self.spawn_and_receive(move |validator, sender| {
274 validator.do_download_certificates_by_heights(chain_id, heights, sender)
275 })
276 .await
277 }
278
279 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
280 self.spawn_and_receive(move |validator, sender| {
281 validator.do_blob_last_used_by(blob_id, sender)
282 })
283 .await
284 }
285
286 async fn blob_last_used_by_certificate(
287 &self,
288 blob_id: BlobId,
289 ) -> Result<ConfirmedBlockCertificate, NodeError> {
290 self.spawn_and_receive(move |validator, sender| {
291 validator.do_blob_last_used_by_certificate(blob_id, sender)
292 })
293 .await
294 }
295
296 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
297 self.spawn_and_receive(move |validator, sender| {
298 validator.do_missing_blob_ids(blob_ids, sender)
299 })
300 .await
301 }
302
303 async fn event_block_heights(
304 &self,
305 event_ids: Vec<EventId>,
306 ) -> Result<Vec<Option<BlockHeight>>, NodeError> {
307 self.spawn_and_receive(move |validator, sender| {
308 validator.do_event_block_heights(event_ids, sender)
309 })
310 .await
311 }
312
313 async fn get_shard_info(
314 &self,
315 _chain_id: ChainId,
316 ) -> Result<crate::data_types::ShardInfo, NodeError> {
317 Ok(crate::data_types::ShardInfo {
319 shard_id: 0,
320 total_shards: 1,
321 })
322 }
323}
324
325impl<S> LocalValidatorClient<S>
326where
327 S: Storage + Clone + Send + Sync + 'static,
328{
329 fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
330 let client = LocalValidator {
331 state,
332 notifier: Arc::new(ChannelNotifier::default()),
333 };
334 Self {
335 public_key,
336 client: Arc::new(Mutex::new(client)),
337 fault_type: FaultType::Honest,
338 }
339 }
340
341 pub fn name(&self) -> ValidatorPublicKey {
342 self.public_key
343 }
344
345 pub fn fault_type(&self) -> FaultType {
346 self.fault_type
347 }
348
349 fn set_fault_type(&mut self, fault_type: FaultType) {
350 self.fault_type = fault_type;
351 }
352
353 pub async fn chain_info_with_manager_values(
355 &mut self,
356 chain_id: ChainId,
357 ) -> Result<Box<ChainInfo>, NodeError> {
358 let query = ChainInfoQuery::new(chain_id).with_manager_values();
359 let response = self.handle_chain_info_query(query).await?;
360 Ok(response.info)
361 }
362
363 async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
366 where
367 T: Send + 'static,
368 R: Future<Output = Result<(), T>> + Send,
369 F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
370 {
371 let validator = self.clone();
372 let (sender, receiver) = oneshot::channel();
373 tokio::spawn(async move {
374 if f(validator, sender).await.is_err() {
375 tracing::debug!("result could not be sent");
376 }
377 });
378 receiver.await.unwrap()
379 }
380
381 async fn do_handle_block_proposal(
382 self,
383 proposal: BlockProposal,
384 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
385 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
386 let result = match self.fault_type {
387 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
388 error: "offline".to_string(),
389 }),
390 FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
391 FaultType::DontSendValidateVote
392 | FaultType::Honest
393 | FaultType::DontSendConfirmVote
394 | FaultType::DontProcessValidated => {
395 let result = self
396 .client
397 .lock()
398 .await
399 .state
400 .handle_block_proposal(proposal)
401 .await
402 .map_err(Into::into);
403 if self.fault_type == FaultType::DontSendValidateVote {
404 Err(NodeError::ClientIoError {
405 error: "refusing to validate".to_string(),
406 })
407 } else {
408 result
409 }
410 }
411 };
412 sender.send(result.map(|(info, _actions)| info))
414 }
415
416 async fn do_handle_lite_certificate(
417 self,
418 certificate: LiteCertificate<'_>,
419 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
420 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
421 let client = self.client.clone();
422 let validator = client.lock().await;
423 let result = async move {
424 match validator.state.full_certificate(certificate).await? {
425 Either::Left(confirmed) => {
426 self.do_handle_certificate_internal(confirmed, &validator)
427 .await
428 }
429 Either::Right(validated) => {
430 self.do_handle_certificate_internal(validated, &validator)
431 .await
432 }
433 }
434 }
435 .await;
436 sender.send(result)
437 }
438
439 async fn do_handle_certificate_internal<T: ProcessableCertificate>(
440 &self,
441 certificate: GenericCertificate<T>,
442 validator: &MutexGuard<'_, LocalValidator<S>>,
443 ) -> Result<ChainInfoResponse, NodeError> {
444 match self.fault_type {
445 FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
446 Err(NodeError::ClientIoError {
447 error: "refusing to process validated block".to_string(),
448 })
449 }
450 FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
451 FaultType::Honest
452 | FaultType::DontSendConfirmVote
453 | FaultType::DontProcessValidated
454 | FaultType::DontSendValidateVote => {
455 let result = validator
456 .state
457 .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
458 .await
459 .map_err(Into::into);
460 if T::KIND == CertificateKind::Validated
461 && self.fault_type == FaultType::DontSendConfirmVote
462 {
463 Err(NodeError::ClientIoError {
464 error: "refusing to confirm".to_string(),
465 })
466 } else {
467 result
468 }
469 }
470 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
471 error: "offline".to_string(),
472 }),
473 }
474 }
475
476 async fn do_handle_certificate<T: ProcessableCertificate>(
477 self,
478 certificate: GenericCertificate<T>,
479 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
480 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
481 let validator = self.client.lock().await;
482 let result = self
483 .do_handle_certificate_internal(certificate, &validator)
484 .await;
485 sender.send(result)
486 }
487
488 async fn do_handle_chain_info_query(
489 self,
490 query: ChainInfoQuery,
491 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
492 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
493 let validator = self.client.lock().await;
494 let result = match self.fault_type {
495 FaultType::Offline => Err(NodeError::ClientIoError {
496 error: "offline".to_string(),
497 }),
498 FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
499 FaultType::Honest
500 | FaultType::DontSendConfirmVote
501 | FaultType::DontProcessValidated
502 | FaultType::DontSendValidateVote
503 | FaultType::OfflineWithInfo => validator
504 .state
505 .handle_chain_info_query(query)
506 .await
507 .map_err(Into::into),
508 };
509 sender.send(result)
510 }
511
512 async fn do_subscribe(
513 self,
514 chains: Vec<ChainId>,
515 sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
516 ) -> Result<(), Result<NotificationStream, NodeError>> {
517 let validator = self.client.lock().await;
518 let rx = validator.notifier.subscribe(chains);
519 let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
520 sender.send(Ok(stream))
521 }
522
523 async fn do_upload_blob(
524 self,
525 content: BlobContent,
526 sender: oneshot::Sender<Result<BlobId, NodeError>>,
527 ) -> Result<(), Result<BlobId, NodeError>> {
528 let validator = self.client.lock().await;
529 let blob = Blob::new(content);
530 let id = blob.id();
531 let storage = validator.state.storage_client();
532 let result = match storage.maybe_write_blobs(&[blob]).await {
533 Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
534 Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
535 Err(error) => Err(error.into()),
536 };
537 sender.send(result)
538 }
539
540 async fn do_download_blob(
541 self,
542 blob_id: BlobId,
543 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
544 ) -> Result<(), Result<BlobContent, NodeError>> {
545 let validator = self.client.lock().await;
546 let blob = validator
547 .state
548 .storage_client()
549 .read_blob(blob_id)
550 .await
551 .map_err(Into::into);
552 let blob = match blob {
553 Ok(blob) => blob.ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
554 Err(error) => Err(error),
555 };
556 sender.send(blob.map(|blob| Arc::unwrap_or_clone(blob).into_content()))
557 }
558
559 async fn do_download_pending_blob(
560 self,
561 chain_id: ChainId,
562 blob_id: BlobId,
563 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
564 ) -> Result<(), Result<BlobContent, NodeError>> {
565 let validator = self.client.lock().await;
566 let result = validator
567 .state
568 .download_pending_blob(chain_id, blob_id)
569 .await
570 .map_err(Into::into);
571 sender.send(result.map(|blob| blob.content().clone()))
572 }
573
574 async fn do_handle_pending_blob(
575 self,
576 chain_id: ChainId,
577 blob: BlobContent,
578 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
579 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
580 let validator = self.client.lock().await;
581 let result = validator
582 .state
583 .handle_pending_blob(chain_id, Blob::new(blob))
584 .await
585 .map_err(Into::into);
586 sender.send(result)
587 }
588
589 async fn do_download_certificate(
590 self,
591 hash: CryptoHash,
592 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
593 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
594 let validator = self.client.lock().await;
595 let certificate = validator
596 .state
597 .storage_client()
598 .read_certificate(hash)
599 .await
600 .map_err(Into::into);
601
602 let certificate = match certificate {
603 Err(error) => Err(error),
604 Ok(entry) => match entry {
605 Some(certificate) => Ok(Arc::unwrap_or_clone(certificate)),
606 None => {
607 panic!("Missing certificate: {hash}");
608 }
609 },
610 };
611
612 sender.send(certificate)
613 }
614
615 async fn do_download_certificates(
616 self,
617 hashes: Vec<CryptoHash>,
618 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
619 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
620 let validator = self.client.lock().await;
621 let certificates = validator
622 .state
623 .storage_client()
624 .read_certificates(&hashes)
625 .await
626 .map_err(Into::into);
627
628 let certificates = match certificates {
629 Err(error) => Err(error),
630 Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
631 ResultReadCertificates::Certificates(certificates) => Ok(certificates),
632 ResultReadCertificates::InvalidHashes(hashes) => {
633 panic!("Missing certificates: {hashes:?}")
634 }
635 },
636 };
637
638 sender.send(certificates)
639 }
640
641 async fn do_download_certificates_by_heights(
642 self,
643 chain_id: ChainId,
644 heights: Vec<BlockHeight>,
645 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
646 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
647 let (query_sender, query_receiver) = oneshot::channel();
649 let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
650
651 let self_clone = self.clone();
652 self.do_handle_chain_info_query(query, query_sender)
653 .await
654 .expect("Failed to handle chain info query");
655
656 let chain_info_response = query_receiver.await.map_err(|_| {
658 Err(NodeError::ClientIoError {
659 error: "Failed to receive chain info response".to_string(),
660 })
661 })?;
662
663 let hashes = match chain_info_response {
664 Ok(response) => response.info.requested_sent_certificate_hashes,
665 Err(e) => {
666 return sender.send(Err(e));
667 }
668 };
669
670 let (cert_sender, cert_receiver) = oneshot::channel();
672 self_clone
673 .do_download_certificates(hashes, cert_sender)
674 .await?;
675
676 let result = cert_receiver.await.map_err(|_| {
678 Err(NodeError::ClientIoError {
679 error: "Failed to receive certificates".to_string(),
680 })
681 })?;
682
683 sender.send(result)
684 }
685
686 async fn do_blob_last_used_by(
687 self,
688 blob_id: BlobId,
689 sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
690 ) -> Result<(), Result<CryptoHash, NodeError>> {
691 let validator = self.client.lock().await;
692 let blob_state = validator
693 .state
694 .storage_client()
695 .read_blob_state(blob_id)
696 .await
697 .map_err(Into::into);
698 let certificate_hash = match blob_state {
699 Err(err) => Err(err),
700 Ok(blob_state) => match blob_state {
701 None => Err(NodeError::BlobsNotFound(vec![blob_id])),
702 Some(blob_state) => blob_state
703 .last_used_by
704 .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
705 },
706 };
707
708 sender.send(certificate_hash)
709 }
710
711 async fn do_blob_last_used_by_certificate(
712 self,
713 blob_id: BlobId,
714 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
715 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
716 match self.blob_last_used_by(blob_id).await {
717 Ok(cert_hash) => {
718 let cert = self.download_certificate(cert_hash).await;
719 sender.send(cert)
720 }
721 Err(err) => sender.send(Err(err)),
722 }
723 }
724
725 async fn do_missing_blob_ids(
726 self,
727 blob_ids: Vec<BlobId>,
728 sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
729 ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
730 let validator = self.client.lock().await;
731 let missing_blob_ids = validator
732 .state
733 .storage_client()
734 .missing_blobs(&blob_ids)
735 .await
736 .map_err(Into::into);
737 sender.send(missing_blob_ids)
738 }
739
740 async fn do_event_block_heights(
741 self,
742 event_ids: Vec<EventId>,
743 sender: oneshot::Sender<Result<Vec<Option<BlockHeight>>, NodeError>>,
744 ) -> Result<(), Result<Vec<Option<BlockHeight>>, NodeError>> {
745 let validator = self.client.lock().await;
746 let heights = validator
747 .state
748 .storage_client()
749 .read_event_block_heights(&event_ids)
750 .await
751 .map_err(Into::into);
752 sender.send(heights)
753 }
754}
755
756#[derive(Clone)]
757pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
758where
759 S: Storage;
760
761impl<S> NodeProvider<S>
762where
763 S: Storage + Clone,
764{
765 fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
766 self.0.lock().unwrap().clone()
767 }
768}
769
770impl<S> ValidatorNodeProvider for NodeProvider<S>
771where
772 S: Storage + Clone + Send + Sync + 'static,
773{
774 type Node = LocalValidatorClient<S>;
775
776 fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
777 unimplemented!()
778 }
779
780 fn make_nodes_from_list<A>(
781 &self,
782 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
783 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
784 where
785 A: AsRef<str>,
786 {
787 let list = self.0.lock().unwrap();
788 Ok(validators
789 .into_iter()
790 .map(|(public_key, address)| {
791 list.iter()
792 .find(|client| client.public_key == public_key)
793 .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
794 address: address.as_ref().to_string(),
795 })
796 .map(|client| (public_key, client.clone()))
797 })
798 .collect::<Result<Vec<_>, _>>()?
799 .into_iter())
800 }
801}
802
803impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
804where
805 S: Storage,
806{
807 fn from_iter<T>(iter: T) -> Self
808 where
809 T: IntoIterator<Item = LocalValidatorClient<S>>,
810 {
811 Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
812 }
813}
814
815pub struct TestBuilder<B: StorageBuilder> {
822 storage_builder: B,
823 pub initial_committee: Committee,
824 admin_description: Option<ChainDescription>,
825 network_description: Option<NetworkDescription>,
826 genesis_storage_builder: GenesisStorageBuilder,
827 node_provider: NodeProvider<B::Storage>,
828 validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
829 chain_client_storages: Vec<B::Storage>,
830 pub chain_owners: BTreeMap<ChainId, AccountOwner>,
831 pub signer: TestSigner,
832}
833
834#[async_trait]
835pub trait StorageBuilder {
836 type Storage: Storage + Clone + Send + Sync + 'static;
837
838 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
839
840 fn clock(&self) -> &TestClock;
841}
842
843#[derive(Default)]
844struct GenesisStorageBuilder {
845 accounts: Vec<GenesisAccount>,
846}
847
848struct GenesisAccount {
849 description: ChainDescription,
850 public_key: AccountPublicKey,
851}
852
853impl GenesisStorageBuilder {
854 fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
855 self.accounts.push(GenesisAccount {
856 description,
857 public_key,
858 })
859 }
860
861 async fn build<S>(&self, storage: S) -> S
862 where
863 S: Storage + Clone + Send + Sync + 'static,
864 {
865 for account in &self.accounts {
866 storage
867 .create_chain(account.description.clone())
868 .await
869 .unwrap();
870 }
871 storage
872 }
873}
874
875pub type ChainClient<S> = crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>>>;
876
877impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
878 pub async fn read_confirmed_blocks_downward(
880 &self,
881 from: CryptoHash,
882 limit: u32,
883 ) -> anyhow::Result<Vec<Arc<ConfirmedBlock>>> {
884 let mut hash = Some(from);
885 let mut values = Vec::new();
886 for _ in 0..limit {
887 let Some(next_hash) = hash else {
888 break;
889 };
890 let value = self.read_confirmed_block(next_hash).await?;
891 hash = value.block().header.previous_block_hash;
892 values.push(value);
893 }
894 Ok(values)
895 }
896}
897
898impl<B> TestBuilder<B>
899where
900 B: StorageBuilder,
901{
902 pub async fn new(
903 mut storage_builder: B,
904 count: usize,
905 with_faulty_validators: usize,
906 mut signer: TestSigner,
907 ) -> Result<Self, anyhow::Error> {
908 let mut validators = Vec::new();
909 for _ in 0..count {
910 let validator_keypair = ValidatorKeypair::generate();
911 let account_public_key = signer.generate_new();
912 validators.push((validator_keypair, account_public_key));
913 }
914 let for_committee = validators
915 .iter()
916 .map(|(validating, account)| (validating.public_key, *account))
917 .collect::<Vec<_>>();
918 let initial_committee = Committee::make_simple(for_committee);
919 let mut validator_clients = Vec::new();
920 let mut validator_storages = HashMap::new();
921 let mut faulty_validators = HashSet::new();
922 for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
923 let validator_public_key = validator_keypair.public_key;
924 let storage = storage_builder.build().await?;
925 let config = ChainWorkerConfig {
926 nickname: format!("Node {i}"),
927 ..ChainWorkerConfig::default()
928 }
929 .with_key_pair(Some(validator_keypair.secret_key));
930 let state = WorkerState::new(storage.clone(), config, None);
931 let mut validator = LocalValidatorClient::new(validator_public_key, state);
932 if i < with_faulty_validators {
933 faulty_validators.insert(validator_public_key);
934 validator.set_fault_type(FaultType::NoChains);
935 }
936 validator_clients.push(validator);
937 validator_storages.insert(validator_public_key, storage);
938 }
939 tracing::info!(
940 "Test will use the following faulty validators: {:?}",
941 faulty_validators
942 );
943 Ok(Self {
944 storage_builder,
945 initial_committee,
946 admin_description: None,
947 network_description: None,
948 genesis_storage_builder: GenesisStorageBuilder::default(),
949 node_provider: NodeProvider::from_iter(validator_clients),
950 validator_storages,
951 chain_client_storages: Vec::new(),
952 chain_owners: BTreeMap::new(),
953 signer,
954 })
955 }
956
957 pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
958 let validators = self.initial_committee.validators().clone();
959 self.initial_committee =
960 Committee::new(validators, policy).expect("committee votes should not overflow");
961 self
962 }
963
964 pub fn with_cross_chain_message_chunk_limit(self, limit: usize) -> Self {
965 let validator_clients = self.node_provider.0.lock().unwrap();
966 for validator in validator_clients.iter() {
967 let mut inner = validator.client.try_lock().expect("no contention at setup");
968 inner.state.set_cross_chain_message_chunk_limit(limit);
969 }
970 drop(validator_clients);
971 self
972 }
973
974 pub fn fault_type(&self, public_key: &ValidatorPublicKey) -> Option<FaultType> {
977 self.node_provider
978 .0
979 .lock()
980 .unwrap()
981 .iter()
982 .find(|client| client.public_key == *public_key)
983 .map(|client| client.fault_type())
984 }
985
986 pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
987 let mut faulty_validators = vec![];
988 let mut validator_clients = self.node_provider.0.lock().unwrap();
989 for index in indexes.as_ref() {
990 let validator = &mut validator_clients[*index];
991 validator.set_fault_type(fault_type);
992 faulty_validators.push(validator.public_key);
993 }
994 tracing::info!(
995 "Making the following validators {:?}: {:?}",
996 fault_type,
997 faulty_validators
998 );
999 }
1000
1001 pub async fn add_root_chain(
1006 &mut self,
1007 index: u32,
1008 balance: Amount,
1009 ) -> anyhow::Result<ChainClient<B::Storage>> {
1010 if self.admin_description.is_none() && index != 0 {
1012 Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
1013 }
1014 let origin = ChainOrigin::Root(index);
1015 let public_key = self.signer.generate_new();
1016 let open_chain_config = InitialChainConfig {
1017 ownership: ChainOwnership::single(public_key.into()),
1018 epoch: Epoch(0),
1019 balance,
1020 application_permissions: ApplicationPermissions::default(),
1021 };
1022 let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
1023 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1024 if index == 0 {
1025 self.admin_description = Some(description.clone());
1026 self.network_description = Some(NetworkDescription {
1027 admin_chain_id: description.id(),
1028 genesis_config_hash: CryptoHash::test_hash("genesis config"),
1030 genesis_timestamp: Timestamp::from(0),
1031 genesis_committee_blob_hash: committee_blob.id().hash,
1032 name: "test network".to_string(),
1033 });
1034 }
1035 self.genesis_storage_builder
1037 .add(description.clone(), public_key);
1038
1039 let network_description = self.network_description.as_ref().unwrap();
1040
1041 for validator in self.node_provider.all_nodes() {
1042 let storage = self
1043 .validator_storages
1044 .get_mut(&validator.public_key)
1045 .unwrap();
1046 storage
1047 .write_network_description(network_description)
1048 .await
1049 .expect("writing the NetworkDescription should succeed");
1050 storage
1051 .write_blob(&committee_blob)
1052 .await
1053 .expect("writing a blob should succeed");
1054 storage.create_chain(description.clone()).await.unwrap();
1055 }
1056 for storage in &mut self.chain_client_storages {
1057 storage.create_chain(description.clone()).await.unwrap();
1058 }
1059 let chain_id = description.id();
1060 self.chain_owners.insert(chain_id, public_key.into());
1061 self.make_client(chain_id, None, BlockHeight::ZERO).await
1062 }
1063
1064 pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
1065 let mut result = Vec::new();
1066 for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
1067 assert_eq!(
1068 genesis_account.description.origin(),
1069 ChainOrigin::Root(i as u32)
1070 );
1071 result.push((
1072 genesis_account.public_key,
1073 genesis_account.description.config().balance,
1074 ));
1075 }
1076 result
1077 }
1078
1079 pub fn admin_chain_id(&self) -> ChainId {
1080 self.admin_description
1081 .as_ref()
1082 .expect("admin chain not initialized")
1083 .id()
1084 }
1085
1086 pub fn admin_description(&self) -> Option<&ChainDescription> {
1087 self.admin_description.as_ref()
1088 }
1089
1090 pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1091 self.node_provider.clone()
1092 }
1093
1094 pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1095 self.node_provider.0.lock().unwrap()[index].clone()
1096 }
1097
1098 pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1099 let storage = self.storage_builder.build().await?;
1100 let network_description = self.network_description.as_ref().unwrap();
1101 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1102 storage
1103 .write_network_description(network_description)
1104 .await
1105 .expect("writing the NetworkDescription should succeed");
1106 storage
1107 .write_blob(&committee_blob)
1108 .await
1109 .expect("writing a blob should succeed");
1110 Ok(self.genesis_storage_builder.build(storage).await)
1111 }
1112
1113 pub async fn make_client_with_options(
1114 &mut self,
1115 chain_id: ChainId,
1116 block_hash: Option<CryptoHash>,
1117 block_height: BlockHeight,
1118 options: chain_client::Options,
1119 follow_only: bool,
1120 ) -> anyhow::Result<ChainClient<B::Storage>> {
1121 let storage = self.make_storage().await?;
1124 self.chain_client_storages.push(storage.clone());
1125 let mode = if follow_only {
1126 crate::client::ListeningMode::FollowChain
1127 } else {
1128 crate::client::ListeningMode::FullChain
1129 };
1130 let client = Arc::new(Client::new(
1131 crate::environment::Impl {
1132 network: self.make_node_provider(),
1133 storage,
1134 signer: self.signer.clone(),
1135 wallet: TestWallet::default(),
1136 },
1137 self.admin_chain_id(),
1138 false,
1139 [(chain_id, mode)],
1140 format!("Client node for {chain_id:.8}"),
1141 Some(Duration::from_secs(30)),
1142 Some(Duration::from_secs(1)),
1143 1000,
1144 options,
1145 DEFAULT_BLOCK_CACHE_SIZE,
1146 DEFAULT_EXECUTION_STATE_CACHE_SIZE,
1147 &crate::client::RequestsSchedulerConfig::default(),
1148 ));
1149 Ok(client.create_chain_client(
1150 chain_id,
1151 block_hash,
1152 block_height,
1153 &None,
1154 self.chain_owners.get(&chain_id).copied(),
1155 None,
1156 follow_only,
1157 ))
1158 }
1159
1160 pub async fn make_client(
1161 &mut self,
1162 chain_id: ChainId,
1163 block_hash: Option<CryptoHash>,
1164 block_height: BlockHeight,
1165 ) -> anyhow::Result<ChainClient<B::Storage>> {
1166 self.make_client_with_options(
1167 chain_id,
1168 block_hash,
1169 block_height,
1170 chain_client::Options::test_default(),
1171 false,
1172 )
1173 .await
1174 }
1175
1176 pub async fn check_that_validators_have_certificate(
1178 &self,
1179 chain_id: ChainId,
1180 block_height: BlockHeight,
1181 target_count: usize,
1182 ) -> Option<ConfirmedBlockCertificate> {
1183 let query = ChainInfoQuery::new(chain_id)
1184 .with_sent_certificate_hashes_by_heights(vec![block_height]);
1185 let mut count = 0;
1186 let mut certificate = None;
1187 for validator in self.node_provider.all_nodes() {
1188 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1189 if response.check(validator.public_key).is_ok() {
1190 let ChainInfo {
1191 mut requested_sent_certificate_hashes,
1192 ..
1193 } = *response.info;
1194 debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1195 if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1196 if let Ok(cert) = validator.download_certificate(cert_hash).await {
1197 if cert.inner().block().header.chain_id == chain_id
1198 && cert.inner().block().header.height == block_height
1199 {
1200 cert.check(&self.initial_committee).unwrap();
1201 count += 1;
1202 certificate = Some(cert);
1203 }
1204 }
1205 }
1206 }
1207 }
1208 }
1209 assert!(count >= target_count);
1210 certificate
1211 }
1212
1213 pub async fn check_that_validators_are_in_round(
1216 &self,
1217 chain_id: ChainId,
1218 block_height: BlockHeight,
1219 round: Round,
1220 target_count: usize,
1221 ) {
1222 let query = ChainInfoQuery::new(chain_id);
1223 let mut count = 0;
1224 for validator in self.node_provider.all_nodes() {
1225 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1226 if response.info.manager.current_round == round
1227 && response.info.next_block_height == block_height
1228 && response.check(validator.public_key).is_ok()
1229 {
1230 count += 1;
1231 }
1232 }
1233 }
1234 assert!(count >= target_count);
1235 }
1236
1237 pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1239 for validator in self.node_provider.all_nodes() {
1240 let guard = validator.client.lock().await;
1241 let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1242 assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1243 }
1244 }
1245}
1246
1247#[cfg(feature = "rocksdb")]
1248static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1250
1251#[derive(Default)]
1252pub struct MemoryStorageBuilder {
1253 namespace: String,
1254 instance_counter: usize,
1255 wasm_runtime: Option<WasmRuntime>,
1256 clock: TestClock,
1257}
1258
1259#[async_trait]
1260impl StorageBuilder for MemoryStorageBuilder {
1261 type Storage = DbStorage<MemoryDatabase, TestClock>;
1262
1263 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1264 self.instance_counter += 1;
1265 let config = MemoryDatabase::new_test_config().await?;
1266 if self.namespace.is_empty() {
1267 self.namespace = generate_test_namespace();
1268 }
1269 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1270 Ok(
1271 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1272 .await?,
1273 )
1274 }
1275
1276 fn clock(&self) -> &TestClock {
1277 &self.clock
1278 }
1279}
1280
1281impl MemoryStorageBuilder {
1282 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1285 MemoryStorageBuilder {
1286 wasm_runtime: wasm_runtime.into(),
1287 ..MemoryStorageBuilder::default()
1288 }
1289 }
1290}
1291
1292#[cfg(feature = "rocksdb")]
1293pub struct RocksDbStorageBuilder {
1294 namespace: String,
1295 instance_counter: usize,
1296 wasm_runtime: Option<WasmRuntime>,
1297 clock: TestClock,
1298 _permit: SemaphorePermit<'static>,
1299}
1300
1301#[cfg(feature = "rocksdb")]
1302impl RocksDbStorageBuilder {
1303 pub async fn new() -> Self {
1304 RocksDbStorageBuilder {
1305 namespace: String::new(),
1306 instance_counter: 0,
1307 wasm_runtime: None,
1308 clock: TestClock::default(),
1309 _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1310 }
1311 }
1312
1313 #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1316 pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1317 RocksDbStorageBuilder {
1318 wasm_runtime: wasm_runtime.into(),
1319 ..RocksDbStorageBuilder::new().await
1320 }
1321 }
1322}
1323
1324#[cfg(feature = "rocksdb")]
1325#[async_trait]
1326impl StorageBuilder for RocksDbStorageBuilder {
1327 type Storage = DbStorage<RocksDbDatabase, TestClock>;
1328
1329 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1330 self.instance_counter += 1;
1331 let config = RocksDbDatabase::new_test_config().await?;
1332 if self.namespace.is_empty() {
1333 self.namespace = generate_test_namespace();
1334 }
1335 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1336 Ok(
1337 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1338 .await?,
1339 )
1340 }
1341
1342 fn clock(&self) -> &TestClock {
1343 &self.clock
1344 }
1345}
1346
1347#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1348#[derive(Default)]
1349pub struct ServiceStorageBuilder {
1350 namespace: String,
1351 instance_counter: usize,
1352 wasm_runtime: Option<WasmRuntime>,
1353 clock: TestClock,
1354}
1355
1356#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1357impl ServiceStorageBuilder {
1358 pub fn new() -> Self {
1360 Self::with_wasm_runtime(None)
1361 }
1362
1363 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1365 ServiceStorageBuilder {
1366 wasm_runtime: wasm_runtime.into(),
1367 ..ServiceStorageBuilder::default()
1368 }
1369 }
1370}
1371
1372#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1373#[async_trait]
1374impl StorageBuilder for ServiceStorageBuilder {
1375 type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1376
1377 async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1378 self.instance_counter += 1;
1379 let config = StorageServiceDatabase::new_test_config().await?;
1380 if self.namespace.is_empty() {
1381 self.namespace = generate_test_namespace();
1382 }
1383 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1384 Ok(
1385 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1386 .await?,
1387 )
1388 }
1389
1390 fn clock(&self) -> &TestClock {
1391 &self.clock
1392 }
1393}
1394
1395#[cfg(feature = "dynamodb")]
1396#[derive(Default)]
1397pub struct DynamoDbStorageBuilder {
1398 namespace: String,
1399 instance_counter: usize,
1400 wasm_runtime: Option<WasmRuntime>,
1401 clock: TestClock,
1402}
1403
1404#[cfg(feature = "dynamodb")]
1405impl DynamoDbStorageBuilder {
1406 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1409 DynamoDbStorageBuilder {
1410 wasm_runtime: wasm_runtime.into(),
1411 ..DynamoDbStorageBuilder::default()
1412 }
1413 }
1414}
1415
1416#[cfg(feature = "dynamodb")]
1417#[async_trait]
1418impl StorageBuilder for DynamoDbStorageBuilder {
1419 type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1420
1421 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1422 self.instance_counter += 1;
1423 let config = DynamoDbDatabase::new_test_config().await?;
1424 if self.namespace.is_empty() {
1425 self.namespace = generate_test_namespace();
1426 }
1427 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1428 Ok(
1429 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1430 .await?,
1431 )
1432 }
1433
1434 fn clock(&self) -> &TestClock {
1435 &self.clock
1436 }
1437}
1438
1439#[cfg(feature = "scylladb")]
1440#[derive(Default)]
1441pub struct ScyllaDbStorageBuilder {
1442 namespace: String,
1443 instance_counter: usize,
1444 wasm_runtime: Option<WasmRuntime>,
1445 clock: TestClock,
1446}
1447
1448#[cfg(feature = "scylladb")]
1449impl ScyllaDbStorageBuilder {
1450 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1453 ScyllaDbStorageBuilder {
1454 wasm_runtime: wasm_runtime.into(),
1455 ..ScyllaDbStorageBuilder::default()
1456 }
1457 }
1458}
1459
1460#[cfg(feature = "scylladb")]
1461#[async_trait]
1462impl StorageBuilder for ScyllaDbStorageBuilder {
1463 type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1464
1465 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1466 self.instance_counter += 1;
1467 let config = ScyllaDbDatabase::new_test_config().await?;
1468 if self.namespace.is_empty() {
1469 self.namespace = generate_test_namespace();
1470 }
1471 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1472 Ok(
1473 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1474 .await?,
1475 )
1476 }
1477
1478 fn clock(&self) -> &TestClock {
1479 &self.clock
1480 }
1481}
1482
1483pub trait ClientOutcomeResultExt<T, E> {
1484 fn unwrap_ok_committed(self) -> T;
1487
1488 fn unwrap_ok_or_conflict(self) -> Result<T, Box<ConfirmedBlockCertificate>>;
1491}
1492
1493impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1494 fn unwrap_ok_committed(self) -> T {
1495 match self.unwrap() {
1496 ClientOutcome::Committed(t) => t,
1497 ClientOutcome::WaitForTimeout(timeout) => {
1498 panic!("unexpected timeout: {timeout}")
1499 }
1500 ClientOutcome::Conflict(certificate) => {
1501 panic!("unexpected conflict: {}", certificate.hash())
1502 }
1503 }
1504 }
1505
1506 fn unwrap_ok_or_conflict(self) -> Result<T, Box<ConfirmedBlockCertificate>> {
1507 match self.unwrap() {
1508 ClientOutcome::Committed(t) => Ok(t),
1509 ClientOutcome::Conflict(certificate) => Err(certificate),
1510 ClientOutcome::WaitForTimeout(timeout) => {
1511 panic!("unexpected timeout: {timeout}")
1512 }
1513 }
1514 }
1515}