1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 sync::Arc,
7 time::Duration,
8 vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13 future::Either,
14 lock::{Mutex, MutexGuard},
15 Future,
16};
17use linera_base::{
18 crypto::{AccountPublicKey, CryptoHash, ValidatorKeypair, ValidatorPublicKey},
19 data_types::*,
20 identifiers::{AccountOwner, BlobId, ChainId},
21 ownership::ChainOwnership,
22};
23use linera_chain::{
24 data_types::BlockProposal,
25 types::{
26 CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27 LiteCertificate, Timeout, ValidatedBlock,
28 },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::StorageServiceDatabase;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbDatabase;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbDatabase;
39use linera_views::{
40 memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46 linera_views::rocks_db::RocksDbDatabase,
47 tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51 client::{chain_client, Client},
52 data_types::*,
53 environment::{TestSigner, TestWallet},
54 node::{
55 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
56 ValidatorNodeProvider,
57 },
58 notifier::ChannelNotifier,
59 worker::{Notification, ProcessableCertificate, WorkerState},
60};
61
62#[derive(Debug, PartialEq, Clone, Copy)]
63pub enum FaultType {
64 Honest,
65 Offline,
66 OfflineWithInfo,
67 NoChains,
68 DontSendConfirmVote,
69 DontProcessValidated,
70 DontSendValidateVote,
71}
72
73struct LocalValidator<S>
80where
81 S: Storage,
82{
83 state: WorkerState<S>,
84 notifier: Arc<ChannelNotifier<Notification>>,
85}
86
87#[derive(Clone)]
88pub struct LocalValidatorClient<S>
89where
90 S: Storage,
91{
92 public_key: ValidatorPublicKey,
93 client: Arc<Mutex<LocalValidator<S>>>,
94 fault_type: FaultType,
95}
96
97impl<S> ValidatorNode for LocalValidatorClient<S>
98where
99 S: Storage + Clone + Send + Sync + 'static,
100{
101 type NotificationStream = NotificationStream;
102
103 fn address(&self) -> String {
104 format!("local:{}", self.public_key)
105 }
106
107 async fn handle_block_proposal(
108 &self,
109 proposal: BlockProposal,
110 ) -> Result<ChainInfoResponse, NodeError> {
111 self.spawn_and_receive(move |validator, sender| {
112 validator.do_handle_block_proposal(proposal, sender)
113 })
114 .await
115 }
116
117 async fn handle_lite_certificate(
118 &self,
119 certificate: LiteCertificate<'_>,
120 _delivery: CrossChainMessageDelivery,
121 ) -> Result<ChainInfoResponse, NodeError> {
122 let certificate = certificate.cloned();
123 self.spawn_and_receive(move |validator, sender| {
124 validator.do_handle_lite_certificate(certificate, sender)
125 })
126 .await
127 }
128
129 async fn handle_timeout_certificate(
130 &self,
131 certificate: GenericCertificate<Timeout>,
132 ) -> Result<ChainInfoResponse, NodeError> {
133 self.spawn_and_receive(move |validator, sender| {
134 validator.do_handle_certificate(certificate, sender)
135 })
136 .await
137 }
138
139 async fn handle_validated_certificate(
140 &self,
141 certificate: GenericCertificate<ValidatedBlock>,
142 ) -> Result<ChainInfoResponse, NodeError> {
143 self.spawn_and_receive(move |validator, sender| {
144 validator.do_handle_certificate(certificate, sender)
145 })
146 .await
147 }
148
149 async fn handle_confirmed_certificate(
150 &self,
151 certificate: GenericCertificate<ConfirmedBlock>,
152 _delivery: CrossChainMessageDelivery,
153 ) -> Result<ChainInfoResponse, NodeError> {
154 self.spawn_and_receive(move |validator, sender| {
155 validator.do_handle_certificate(certificate, sender)
156 })
157 .await
158 }
159
160 async fn handle_chain_info_query(
161 &self,
162 query: ChainInfoQuery,
163 ) -> Result<ChainInfoResponse, NodeError> {
164 self.spawn_and_receive(move |validator, sender| {
165 validator.do_handle_chain_info_query(query, sender)
166 })
167 .await
168 }
169
170 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
171 self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
172 .await
173 }
174
175 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
176 Ok(Default::default())
177 }
178
179 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
180 Ok(self
181 .client
182 .lock()
183 .await
184 .state
185 .storage_client()
186 .read_network_description()
187 .await
188 .transpose()
189 .ok_or(NodeError::ViewError {
190 error: "missing NetworkDescription".to_owned(),
191 })??)
192 }
193
194 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
195 self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
196 .await
197 }
198
199 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
200 self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
201 .await
202 }
203
204 async fn download_pending_blob(
205 &self,
206 chain_id: ChainId,
207 blob_id: BlobId,
208 ) -> Result<BlobContent, NodeError> {
209 self.spawn_and_receive(move |validator, sender| {
210 validator.do_download_pending_blob(chain_id, blob_id, sender)
211 })
212 .await
213 }
214
215 async fn handle_pending_blob(
216 &self,
217 chain_id: ChainId,
218 blob: BlobContent,
219 ) -> Result<ChainInfoResponse, NodeError> {
220 self.spawn_and_receive(move |validator, sender| {
221 validator.do_handle_pending_blob(chain_id, blob, sender)
222 })
223 .await
224 }
225
226 async fn download_certificate(
227 &self,
228 hash: CryptoHash,
229 ) -> Result<ConfirmedBlockCertificate, NodeError> {
230 self.spawn_and_receive(move |validator, sender| {
231 validator.do_download_certificate(hash, sender)
232 })
233 .await
234 }
235
236 async fn download_certificates(
237 &self,
238 hashes: Vec<CryptoHash>,
239 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
240 self.spawn_and_receive(move |validator, sender| {
241 validator.do_download_certificates(hashes, sender)
242 })
243 .await
244 }
245
246 async fn download_certificates_by_heights(
247 &self,
248 chain_id: ChainId,
249 heights: Vec<BlockHeight>,
250 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
251 self.spawn_and_receive(move |validator, sender| {
252 validator.do_download_certificates_by_heights(chain_id, heights, sender)
253 })
254 .await
255 }
256
257 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
258 self.spawn_and_receive(move |validator, sender| {
259 validator.do_blob_last_used_by(blob_id, sender)
260 })
261 .await
262 }
263
264 async fn blob_last_used_by_certificate(
265 &self,
266 blob_id: BlobId,
267 ) -> Result<ConfirmedBlockCertificate, NodeError> {
268 self.spawn_and_receive(move |validator, sender| {
269 validator.do_blob_last_used_by_certificate(blob_id, sender)
270 })
271 .await
272 }
273
274 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
275 self.spawn_and_receive(move |validator, sender| {
276 validator.do_missing_blob_ids(blob_ids, sender)
277 })
278 .await
279 }
280
281 async fn get_shard_info(
282 &self,
283 _chain_id: ChainId,
284 ) -> Result<crate::data_types::ShardInfo, NodeError> {
285 Ok(crate::data_types::ShardInfo {
287 shard_id: 0,
288 total_shards: 1,
289 })
290 }
291}
292
293impl<S> LocalValidatorClient<S>
294where
295 S: Storage + Clone + Send + Sync + 'static,
296{
297 fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
298 let client = LocalValidator {
299 state,
300 notifier: Arc::new(ChannelNotifier::default()),
301 };
302 Self {
303 public_key,
304 client: Arc::new(Mutex::new(client)),
305 fault_type: FaultType::Honest,
306 }
307 }
308
309 pub fn name(&self) -> ValidatorPublicKey {
310 self.public_key
311 }
312
313 fn set_fault_type(&mut self, fault_type: FaultType) {
314 self.fault_type = fault_type;
315 }
316
317 pub async fn chain_info_with_manager_values(
319 &mut self,
320 chain_id: ChainId,
321 ) -> Result<Box<ChainInfo>, NodeError> {
322 let query = ChainInfoQuery::new(chain_id).with_manager_values();
323 let response = self.handle_chain_info_query(query).await?;
324 Ok(response.info)
325 }
326
327 async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
330 where
331 T: Send + 'static,
332 R: Future<Output = Result<(), T>> + Send,
333 F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
334 {
335 let validator = self.clone();
336 let (sender, receiver) = oneshot::channel();
337 tokio::spawn(async move {
338 if f(validator, sender).await.is_err() {
339 tracing::debug!("result could not be sent");
340 }
341 });
342 receiver.await.unwrap()
343 }
344
345 async fn do_handle_block_proposal(
346 self,
347 proposal: BlockProposal,
348 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
349 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
350 let result = match self.fault_type {
351 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
352 error: "offline".to_string(),
353 }),
354 FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
355 FaultType::DontSendValidateVote
356 | FaultType::Honest
357 | FaultType::DontSendConfirmVote
358 | FaultType::DontProcessValidated => {
359 let result = self
360 .client
361 .lock()
362 .await
363 .state
364 .handle_block_proposal(proposal)
365 .await
366 .map_err(Into::into);
367 if self.fault_type == FaultType::DontSendValidateVote {
368 Err(NodeError::ClientIoError {
369 error: "refusing to validate".to_string(),
370 })
371 } else {
372 result
373 }
374 }
375 };
376 sender.send(result.map(|(info, _actions)| info))
378 }
379
380 async fn do_handle_lite_certificate(
381 self,
382 certificate: LiteCertificate<'_>,
383 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
384 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
385 let client = self.client.clone();
386 let mut validator = client.lock().await;
387 let result = async move {
388 match validator.state.full_certificate(certificate).await? {
389 Either::Left(confirmed) => {
390 self.do_handle_certificate_internal(confirmed, &mut validator)
391 .await
392 }
393 Either::Right(validated) => {
394 self.do_handle_certificate_internal(validated, &mut validator)
395 .await
396 }
397 }
398 }
399 .await;
400 sender.send(result)
401 }
402
403 async fn do_handle_certificate_internal<T: ProcessableCertificate>(
404 &self,
405 certificate: GenericCertificate<T>,
406 validator: &mut MutexGuard<'_, LocalValidator<S>>,
407 ) -> Result<ChainInfoResponse, NodeError> {
408 match self.fault_type {
409 FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
410 Err(NodeError::ClientIoError {
411 error: "refusing to process validated block".to_string(),
412 })
413 }
414 FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
415 FaultType::Honest
416 | FaultType::DontSendConfirmVote
417 | FaultType::DontProcessValidated
418 | FaultType::DontSendValidateVote => {
419 let result = validator
420 .state
421 .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
422 .await
423 .map_err(Into::into);
424 if T::KIND == CertificateKind::Validated
425 && self.fault_type == FaultType::DontSendConfirmVote
426 {
427 Err(NodeError::ClientIoError {
428 error: "refusing to confirm".to_string(),
429 })
430 } else {
431 result
432 }
433 }
434 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
435 error: "offline".to_string(),
436 }),
437 }
438 }
439
440 async fn do_handle_certificate<T: ProcessableCertificate>(
441 self,
442 certificate: GenericCertificate<T>,
443 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
444 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
445 let mut validator = self.client.lock().await;
446 let result = self
447 .do_handle_certificate_internal(certificate, &mut validator)
448 .await;
449 sender.send(result)
450 }
451
452 async fn do_handle_chain_info_query(
453 self,
454 query: ChainInfoQuery,
455 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
456 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
457 let validator = self.client.lock().await;
458 let result = match self.fault_type {
459 FaultType::Offline => Err(NodeError::ClientIoError {
460 error: "offline".to_string(),
461 }),
462 FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
463 FaultType::Honest
464 | FaultType::DontSendConfirmVote
465 | FaultType::DontProcessValidated
466 | FaultType::DontSendValidateVote
467 | FaultType::OfflineWithInfo => validator
468 .state
469 .handle_chain_info_query(query)
470 .await
471 .map_err(Into::into),
472 };
473 sender.send(result.map(|(info, _actions)| info))
475 }
476
477 async fn do_subscribe(
478 self,
479 chains: Vec<ChainId>,
480 sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
481 ) -> Result<(), Result<NotificationStream, NodeError>> {
482 let validator = self.client.lock().await;
483 let rx = validator.notifier.subscribe(chains);
484 let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
485 sender.send(Ok(stream))
486 }
487
488 async fn do_upload_blob(
489 self,
490 content: BlobContent,
491 sender: oneshot::Sender<Result<BlobId, NodeError>>,
492 ) -> Result<(), Result<BlobId, NodeError>> {
493 let validator = self.client.lock().await;
494 let blob = Blob::new(content);
495 let id = blob.id();
496 let storage = validator.state.storage_client();
497 let result = match storage.maybe_write_blobs(&[blob]).await {
498 Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
499 Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
500 Err(error) => Err(error.into()),
501 };
502 sender.send(result)
503 }
504
505 async fn do_download_blob(
506 self,
507 blob_id: BlobId,
508 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
509 ) -> Result<(), Result<BlobContent, NodeError>> {
510 let validator = self.client.lock().await;
511 let blob = validator
512 .state
513 .storage_client()
514 .read_blob(blob_id)
515 .await
516 .map_err(Into::into);
517 let blob = match blob {
518 Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
519 Err(error) => Err(error),
520 };
521 sender.send(blob.map(|blob| blob.into_content()))
522 }
523
524 async fn do_download_pending_blob(
525 self,
526 chain_id: ChainId,
527 blob_id: BlobId,
528 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
529 ) -> Result<(), Result<BlobContent, NodeError>> {
530 let validator = self.client.lock().await;
531 let result = validator
532 .state
533 .download_pending_blob(chain_id, blob_id)
534 .await
535 .map_err(Into::into);
536 sender.send(result.map(|blob| blob.into_content()))
537 }
538
539 async fn do_handle_pending_blob(
540 self,
541 chain_id: ChainId,
542 blob: BlobContent,
543 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
544 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
545 let validator = self.client.lock().await;
546 let result = validator
547 .state
548 .handle_pending_blob(chain_id, Blob::new(blob))
549 .await
550 .map_err(Into::into);
551 sender.send(result)
552 }
553
554 async fn do_download_certificate(
555 self,
556 hash: CryptoHash,
557 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
558 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
559 let validator = self.client.lock().await;
560 let certificate = validator
561 .state
562 .storage_client()
563 .read_certificate(hash)
564 .await
565 .map_err(Into::into);
566
567 let certificate = match certificate {
568 Err(error) => Err(error),
569 Ok(entry) => match entry {
570 Some(certificate) => Ok(certificate),
571 None => {
572 panic!("Missing certificate: {hash}");
573 }
574 },
575 };
576
577 sender.send(certificate)
578 }
579
580 async fn do_download_certificates(
581 self,
582 hashes: Vec<CryptoHash>,
583 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
584 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
585 let validator = self.client.lock().await;
586 let certificates = validator
587 .state
588 .storage_client()
589 .read_certificates(hashes.clone())
590 .await
591 .map_err(Into::into);
592
593 let certificates = match certificates {
594 Err(error) => Err(error),
595 Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
596 ResultReadCertificates::Certificates(certificates) => Ok(certificates),
597 ResultReadCertificates::InvalidHashes(hashes) => {
598 panic!("Missing certificates: {:?}", hashes)
599 }
600 },
601 };
602
603 sender.send(certificates)
604 }
605
606 async fn do_download_certificates_by_heights(
607 self,
608 chain_id: ChainId,
609 heights: Vec<BlockHeight>,
610 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
611 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
612 let (query_sender, query_receiver) = oneshot::channel();
614 let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
615
616 let self_clone = self.clone();
617 self.do_handle_chain_info_query(query, query_sender)
618 .await
619 .expect("Failed to handle chain info query");
620
621 let chain_info_response = query_receiver.await.map_err(|_| {
623 Err(NodeError::ClientIoError {
624 error: "Failed to receive chain info response".to_string(),
625 })
626 })?;
627
628 let hashes = match chain_info_response {
629 Ok(response) => response.info.requested_sent_certificate_hashes,
630 Err(e) => {
631 return sender.send(Err(e));
632 }
633 };
634
635 let (cert_sender, cert_receiver) = oneshot::channel();
637 self_clone
638 .do_download_certificates(hashes, cert_sender)
639 .await?;
640
641 let result = cert_receiver.await.map_err(|_| {
643 Err(NodeError::ClientIoError {
644 error: "Failed to receive certificates".to_string(),
645 })
646 })?;
647
648 sender.send(result)
649 }
650
651 async fn do_blob_last_used_by(
652 self,
653 blob_id: BlobId,
654 sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
655 ) -> Result<(), Result<CryptoHash, NodeError>> {
656 let validator = self.client.lock().await;
657 let blob_state = validator
658 .state
659 .storage_client()
660 .read_blob_state(blob_id)
661 .await
662 .map_err(Into::into);
663 let certificate_hash = match blob_state {
664 Err(err) => Err(err),
665 Ok(blob_state) => match blob_state {
666 None => Err(NodeError::BlobsNotFound(vec![blob_id])),
667 Some(blob_state) => blob_state
668 .last_used_by
669 .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
670 },
671 };
672
673 sender.send(certificate_hash)
674 }
675
676 async fn do_blob_last_used_by_certificate(
677 self,
678 blob_id: BlobId,
679 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
680 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
681 match self.blob_last_used_by(blob_id).await {
682 Ok(cert_hash) => {
683 let cert = self.download_certificate(cert_hash).await;
684 sender.send(cert)
685 }
686 Err(err) => sender.send(Err(err)),
687 }
688 }
689
690 async fn do_missing_blob_ids(
691 self,
692 blob_ids: Vec<BlobId>,
693 sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
694 ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
695 let validator = self.client.lock().await;
696 let missing_blob_ids = validator
697 .state
698 .storage_client()
699 .missing_blobs(&blob_ids)
700 .await
701 .map_err(Into::into);
702 sender.send(missing_blob_ids)
703 }
704}
705
706#[derive(Clone)]
707pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
708where
709 S: Storage;
710
711impl<S> NodeProvider<S>
712where
713 S: Storage + Clone,
714{
715 fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
716 self.0.lock().unwrap().clone()
717 }
718}
719
720impl<S> ValidatorNodeProvider for NodeProvider<S>
721where
722 S: Storage + Clone + Send + Sync + 'static,
723{
724 type Node = LocalValidatorClient<S>;
725
726 fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
727 unimplemented!()
728 }
729
730 fn make_nodes_from_list<A>(
731 &self,
732 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
733 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
734 where
735 A: AsRef<str>,
736 {
737 let list = self.0.lock().unwrap();
738 Ok(validators
739 .into_iter()
740 .map(|(public_key, address)| {
741 list.iter()
742 .find(|client| client.public_key == public_key)
743 .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
744 address: address.as_ref().to_string(),
745 })
746 .map(|client| (public_key, client.clone()))
747 })
748 .collect::<Result<Vec<_>, _>>()?
749 .into_iter())
750 }
751}
752
753impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
754where
755 S: Storage,
756{
757 fn from_iter<T>(iter: T) -> Self
758 where
759 T: IntoIterator<Item = LocalValidatorClient<S>>,
760 {
761 Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
762 }
763}
764
765pub struct TestBuilder<B: StorageBuilder> {
772 storage_builder: B,
773 pub initial_committee: Committee,
774 admin_description: Option<ChainDescription>,
775 network_description: Option<NetworkDescription>,
776 genesis_storage_builder: GenesisStorageBuilder,
777 node_provider: NodeProvider<B::Storage>,
778 validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
779 chain_client_storages: Vec<B::Storage>,
780 pub chain_owners: BTreeMap<ChainId, AccountOwner>,
781 pub signer: TestSigner,
782}
783
784#[async_trait]
785pub trait StorageBuilder {
786 type Storage: Storage + Clone + Send + Sync + 'static;
787
788 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
789
790 fn clock(&self) -> &TestClock;
791}
792
793#[derive(Default)]
794struct GenesisStorageBuilder {
795 accounts: Vec<GenesisAccount>,
796}
797
798struct GenesisAccount {
799 description: ChainDescription,
800 public_key: AccountPublicKey,
801}
802
803impl GenesisStorageBuilder {
804 fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
805 self.accounts.push(GenesisAccount {
806 description,
807 public_key,
808 })
809 }
810
811 async fn build<S>(&self, storage: S) -> S
812 where
813 S: Storage + Clone + Send + Sync + 'static,
814 {
815 for account in &self.accounts {
816 storage
817 .create_chain(account.description.clone())
818 .await
819 .unwrap();
820 }
821 storage
822 }
823}
824
825pub type ChainClient<S> = crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>>>;
826
827impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
828 pub async fn read_confirmed_blocks_downward(
830 &self,
831 from: CryptoHash,
832 limit: u32,
833 ) -> anyhow::Result<Vec<ConfirmedBlock>> {
834 let mut hash = Some(from);
835 let mut values = Vec::new();
836 for _ in 0..limit {
837 let Some(next_hash) = hash else {
838 break;
839 };
840 let value = self.read_confirmed_block(next_hash).await?;
841 hash = value.block().header.previous_block_hash;
842 values.push(value);
843 }
844 Ok(values)
845 }
846}
847
848impl<B> TestBuilder<B>
849where
850 B: StorageBuilder,
851{
852 pub async fn new(
853 mut storage_builder: B,
854 count: usize,
855 with_faulty_validators: usize,
856 mut signer: TestSigner,
857 ) -> Result<Self, anyhow::Error> {
858 let mut validators = Vec::new();
859 for _ in 0..count {
860 let validator_keypair = ValidatorKeypair::generate();
861 let account_public_key = signer.generate_new();
862 validators.push((validator_keypair, account_public_key));
863 }
864 let for_committee = validators
865 .iter()
866 .map(|(validating, account)| (validating.public_key, *account))
867 .collect::<Vec<_>>();
868 let initial_committee = Committee::make_simple(for_committee);
869 let mut validator_clients = Vec::new();
870 let mut validator_storages = HashMap::new();
871 let mut faulty_validators = HashSet::new();
872 for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
873 let validator_public_key = validator_keypair.public_key;
874 let storage = storage_builder.build().await?;
875 let state = WorkerState::new(
876 format!("Node {}", i),
877 Some(validator_keypair.secret_key),
878 storage.clone(),
879 5_000,
880 10_000,
881 )
882 .with_allow_inactive_chains(false)
883 .with_allow_messages_from_deprecated_epochs(false);
884 let mut validator = LocalValidatorClient::new(validator_public_key, state);
885 if i < with_faulty_validators {
886 faulty_validators.insert(validator_public_key);
887 validator.set_fault_type(FaultType::NoChains);
888 }
889 validator_clients.push(validator);
890 validator_storages.insert(validator_public_key, storage);
891 }
892 tracing::info!(
893 "Test will use the following faulty validators: {:?}",
894 faulty_validators
895 );
896 Ok(Self {
897 storage_builder,
898 initial_committee,
899 admin_description: None,
900 network_description: None,
901 genesis_storage_builder: GenesisStorageBuilder::default(),
902 node_provider: NodeProvider::from_iter(validator_clients),
903 validator_storages,
904 chain_client_storages: Vec::new(),
905 chain_owners: BTreeMap::new(),
906 signer,
907 })
908 }
909
910 pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
911 let validators = self.initial_committee.validators().clone();
912 self.initial_committee = Committee::new(validators, policy);
913 self
914 }
915
916 pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
917 let mut faulty_validators = vec![];
918 let mut validator_clients = self.node_provider.0.lock().unwrap();
919 for index in indexes.as_ref() {
920 let validator = &mut validator_clients[*index];
921 validator.set_fault_type(fault_type);
922 faulty_validators.push(validator.public_key);
923 }
924 tracing::info!(
925 "Making the following validators {:?}: {:?}",
926 fault_type,
927 faulty_validators
928 );
929 }
930
931 pub async fn add_root_chain(
936 &mut self,
937 index: u32,
938 balance: Amount,
939 ) -> anyhow::Result<ChainClient<B::Storage>> {
940 if self.admin_description.is_none() && index != 0 {
942 Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
943 }
944 let origin = ChainOrigin::Root(index);
945 let public_key = self.signer.generate_new();
946 let open_chain_config = InitialChainConfig {
947 ownership: ChainOwnership::single(public_key.into()),
948 epoch: Epoch(0),
949 min_active_epoch: Epoch(0),
950 max_active_epoch: Epoch(0),
951 balance,
952 application_permissions: ApplicationPermissions::default(),
953 };
954 let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
955 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
956 if index == 0 {
957 self.admin_description = Some(description.clone());
958 self.network_description = Some(NetworkDescription {
959 admin_chain_id: description.id(),
960 genesis_config_hash: CryptoHash::test_hash("genesis config"),
962 genesis_timestamp: Timestamp::from(0),
963 genesis_committee_blob_hash: committee_blob.id().hash,
964 name: "test network".to_string(),
965 });
966 }
967 self.genesis_storage_builder
969 .add(description.clone(), public_key);
970
971 let network_description = self.network_description.as_ref().unwrap();
972
973 for validator in self.node_provider.all_nodes() {
974 let storage = self
975 .validator_storages
976 .get_mut(&validator.public_key)
977 .unwrap();
978 storage
979 .write_network_description(network_description)
980 .await
981 .expect("writing the NetworkDescription should succeed");
982 storage
983 .write_blob(&committee_blob)
984 .await
985 .expect("writing a blob should succeed");
986 storage.create_chain(description.clone()).await.unwrap();
987 }
988 for storage in &mut self.chain_client_storages {
989 storage.create_chain(description.clone()).await.unwrap();
990 }
991 let chain_id = description.id();
992 self.chain_owners.insert(chain_id, public_key.into());
993 self.make_client(chain_id, None, BlockHeight::ZERO).await
994 }
995
996 pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
997 let mut result = Vec::new();
998 for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
999 assert_eq!(
1000 genesis_account.description.origin(),
1001 ChainOrigin::Root(i as u32)
1002 );
1003 result.push((
1004 genesis_account.public_key,
1005 genesis_account.description.config().balance,
1006 ));
1007 }
1008 result
1009 }
1010
1011 pub fn admin_id(&self) -> ChainId {
1012 self.admin_description
1013 .as_ref()
1014 .expect("admin chain not initialized")
1015 .id()
1016 }
1017
1018 pub fn admin_description(&self) -> Option<&ChainDescription> {
1019 self.admin_description.as_ref()
1020 }
1021
1022 pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1023 self.node_provider.clone()
1024 }
1025
1026 pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1027 self.node_provider.0.lock().unwrap()[index].clone()
1028 }
1029
1030 pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1031 let storage = self.storage_builder.build().await?;
1032 let network_description = self.network_description.as_ref().unwrap();
1033 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1034 storage
1035 .write_network_description(network_description)
1036 .await
1037 .expect("writing the NetworkDescription should succeed");
1038 storage
1039 .write_blob(&committee_blob)
1040 .await
1041 .expect("writing a blob should succeed");
1042 Ok(self.genesis_storage_builder.build(storage).await)
1043 }
1044
1045 pub async fn make_client_with_options(
1046 &mut self,
1047 chain_id: ChainId,
1048 block_hash: Option<CryptoHash>,
1049 block_height: BlockHeight,
1050 options: chain_client::Options,
1051 follow_only: bool,
1052 ) -> anyhow::Result<ChainClient<B::Storage>> {
1053 let storage = self.make_storage().await?;
1056 self.chain_client_storages.push(storage.clone());
1057 let client = Arc::new(Client::new(
1058 crate::environment::Impl {
1059 network: self.make_node_provider(),
1060 storage,
1061 signer: self.signer.clone(),
1062 wallet: TestWallet::default(),
1063 },
1064 self.admin_id(),
1065 false,
1066 [chain_id],
1067 format!("Client node for {:.8}", chain_id),
1068 Duration::from_secs(30),
1069 Duration::from_secs(1),
1070 options,
1071 5_000,
1072 10_000,
1073 crate::client::RequestsSchedulerConfig::default(),
1074 ));
1075 Ok(client.create_chain_client(
1076 chain_id,
1077 block_hash,
1078 block_height,
1079 None,
1080 self.chain_owners.get(&chain_id).copied(),
1081 None,
1082 follow_only,
1083 ))
1084 }
1085
1086 pub async fn make_client(
1087 &mut self,
1088 chain_id: ChainId,
1089 block_hash: Option<CryptoHash>,
1090 block_height: BlockHeight,
1091 ) -> anyhow::Result<ChainClient<B::Storage>> {
1092 self.make_client_with_options(
1093 chain_id,
1094 block_hash,
1095 block_height,
1096 chain_client::Options::test_default(),
1097 false,
1098 )
1099 .await
1100 }
1101
1102 pub async fn check_that_validators_have_certificate(
1104 &self,
1105 chain_id: ChainId,
1106 block_height: BlockHeight,
1107 target_count: usize,
1108 ) -> Option<ConfirmedBlockCertificate> {
1109 let query = ChainInfoQuery::new(chain_id)
1110 .with_sent_certificate_hashes_by_heights(vec![block_height]);
1111 let mut count = 0;
1112 let mut certificate = None;
1113 for validator in self.node_provider.all_nodes() {
1114 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1115 if response.check(validator.public_key).is_ok() {
1116 let ChainInfo {
1117 mut requested_sent_certificate_hashes,
1118 ..
1119 } = *response.info;
1120 debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1121 if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1122 if let Ok(cert) = validator.download_certificate(cert_hash).await {
1123 if cert.inner().block().header.chain_id == chain_id
1124 && cert.inner().block().header.height == block_height
1125 {
1126 cert.check(&self.initial_committee).unwrap();
1127 count += 1;
1128 certificate = Some(cert);
1129 }
1130 }
1131 }
1132 }
1133 }
1134 }
1135 assert!(count >= target_count);
1136 certificate
1137 }
1138
1139 pub async fn check_that_validators_are_in_round(
1142 &self,
1143 chain_id: ChainId,
1144 block_height: BlockHeight,
1145 round: Round,
1146 target_count: usize,
1147 ) {
1148 let query = ChainInfoQuery::new(chain_id);
1149 let mut count = 0;
1150 for validator in self.node_provider.all_nodes() {
1151 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1152 if response.info.manager.current_round == round
1153 && response.info.next_block_height == block_height
1154 && response.check(validator.public_key).is_ok()
1155 {
1156 count += 1;
1157 }
1158 }
1159 }
1160 assert!(count >= target_count);
1161 }
1162
1163 pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1165 for validator in self.node_provider.all_nodes() {
1166 let guard = validator.client.lock().await;
1167 let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1168 assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1169 }
1170 }
1171}
1172
1173#[cfg(feature = "rocksdb")]
1174static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1176
1177#[derive(Default)]
1178pub struct MemoryStorageBuilder {
1179 namespace: String,
1180 instance_counter: usize,
1181 wasm_runtime: Option<WasmRuntime>,
1182 clock: TestClock,
1183}
1184
1185#[async_trait]
1186impl StorageBuilder for MemoryStorageBuilder {
1187 type Storage = DbStorage<MemoryDatabase, TestClock>;
1188
1189 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1190 self.instance_counter += 1;
1191 let config = MemoryDatabase::new_test_config().await?;
1192 if self.namespace.is_empty() {
1193 self.namespace = generate_test_namespace();
1194 }
1195 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1196 Ok(
1197 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1198 .await?,
1199 )
1200 }
1201
1202 fn clock(&self) -> &TestClock {
1203 &self.clock
1204 }
1205}
1206
1207impl MemoryStorageBuilder {
1208 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1211 MemoryStorageBuilder {
1212 wasm_runtime: wasm_runtime.into(),
1213 ..MemoryStorageBuilder::default()
1214 }
1215 }
1216}
1217
1218#[cfg(feature = "rocksdb")]
1219pub struct RocksDbStorageBuilder {
1220 namespace: String,
1221 instance_counter: usize,
1222 wasm_runtime: Option<WasmRuntime>,
1223 clock: TestClock,
1224 _permit: SemaphorePermit<'static>,
1225}
1226
1227#[cfg(feature = "rocksdb")]
1228impl RocksDbStorageBuilder {
1229 pub async fn new() -> Self {
1230 RocksDbStorageBuilder {
1231 namespace: String::new(),
1232 instance_counter: 0,
1233 wasm_runtime: None,
1234 clock: TestClock::default(),
1235 _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1236 }
1237 }
1238
1239 #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1242 pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1243 RocksDbStorageBuilder {
1244 wasm_runtime: wasm_runtime.into(),
1245 ..RocksDbStorageBuilder::new().await
1246 }
1247 }
1248}
1249
1250#[cfg(feature = "rocksdb")]
1251#[async_trait]
1252impl StorageBuilder for RocksDbStorageBuilder {
1253 type Storage = DbStorage<RocksDbDatabase, TestClock>;
1254
1255 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1256 self.instance_counter += 1;
1257 let config = RocksDbDatabase::new_test_config().await?;
1258 if self.namespace.is_empty() {
1259 self.namespace = generate_test_namespace();
1260 }
1261 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1262 Ok(
1263 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1264 .await?,
1265 )
1266 }
1267
1268 fn clock(&self) -> &TestClock {
1269 &self.clock
1270 }
1271}
1272
1273#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1274#[derive(Default)]
1275pub struct ServiceStorageBuilder {
1276 namespace: String,
1277 instance_counter: usize,
1278 wasm_runtime: Option<WasmRuntime>,
1279 clock: TestClock,
1280}
1281
1282#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1283impl ServiceStorageBuilder {
1284 pub fn new() -> Self {
1286 Self::with_wasm_runtime(None)
1287 }
1288
1289 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1291 ServiceStorageBuilder {
1292 wasm_runtime: wasm_runtime.into(),
1293 ..ServiceStorageBuilder::default()
1294 }
1295 }
1296}
1297
1298#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1299#[async_trait]
1300impl StorageBuilder for ServiceStorageBuilder {
1301 type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1302
1303 async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1304 self.instance_counter += 1;
1305 let config = StorageServiceDatabase::new_test_config().await?;
1306 if self.namespace.is_empty() {
1307 self.namespace = generate_test_namespace();
1308 }
1309 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1310 Ok(
1311 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1312 .await?,
1313 )
1314 }
1315
1316 fn clock(&self) -> &TestClock {
1317 &self.clock
1318 }
1319}
1320
1321#[cfg(feature = "dynamodb")]
1322#[derive(Default)]
1323pub struct DynamoDbStorageBuilder {
1324 namespace: String,
1325 instance_counter: usize,
1326 wasm_runtime: Option<WasmRuntime>,
1327 clock: TestClock,
1328}
1329
1330#[cfg(feature = "dynamodb")]
1331impl DynamoDbStorageBuilder {
1332 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1335 DynamoDbStorageBuilder {
1336 wasm_runtime: wasm_runtime.into(),
1337 ..DynamoDbStorageBuilder::default()
1338 }
1339 }
1340}
1341
1342#[cfg(feature = "dynamodb")]
1343#[async_trait]
1344impl StorageBuilder for DynamoDbStorageBuilder {
1345 type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1346
1347 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1348 self.instance_counter += 1;
1349 let config = DynamoDbDatabase::new_test_config().await?;
1350 if self.namespace.is_empty() {
1351 self.namespace = generate_test_namespace();
1352 }
1353 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1354 Ok(
1355 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1356 .await?,
1357 )
1358 }
1359
1360 fn clock(&self) -> &TestClock {
1361 &self.clock
1362 }
1363}
1364
1365#[cfg(feature = "scylladb")]
1366#[derive(Default)]
1367pub struct ScyllaDbStorageBuilder {
1368 namespace: String,
1369 instance_counter: usize,
1370 wasm_runtime: Option<WasmRuntime>,
1371 clock: TestClock,
1372}
1373
1374#[cfg(feature = "scylladb")]
1375impl ScyllaDbStorageBuilder {
1376 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1379 ScyllaDbStorageBuilder {
1380 wasm_runtime: wasm_runtime.into(),
1381 ..ScyllaDbStorageBuilder::default()
1382 }
1383 }
1384}
1385
1386#[cfg(feature = "scylladb")]
1387#[async_trait]
1388impl StorageBuilder for ScyllaDbStorageBuilder {
1389 type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1390
1391 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1392 self.instance_counter += 1;
1393 let config = ScyllaDbDatabase::new_test_config().await?;
1394 if self.namespace.is_empty() {
1395 self.namespace = generate_test_namespace();
1396 }
1397 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1398 Ok(
1399 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1400 .await?,
1401 )
1402 }
1403
1404 fn clock(&self) -> &TestClock {
1405 &self.clock
1406 }
1407}
1408
1409pub trait ClientOutcomeResultExt<T, E> {
1410 fn unwrap_ok_committed(self) -> T;
1411}
1412
1413impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1414 fn unwrap_ok_committed(self) -> T {
1415 self.unwrap().unwrap()
1416 }
1417}