1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 sync::Arc,
7 time::Duration,
8 vec,
9};
10
11use async_trait::async_trait;
12use futures::{
13 future::Either,
14 lock::{Mutex, MutexGuard},
15 Future,
16};
17use linera_base::{
18 crypto::{AccountPublicKey, CryptoHash, InMemorySigner, ValidatorKeypair, ValidatorPublicKey},
19 data_types::*,
20 identifiers::{AccountOwner, BlobId, ChainId},
21 ownership::ChainOwnership,
22};
23use linera_chain::{
24 data_types::BlockProposal,
25 types::{
26 CertificateKind, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27 LiteCertificate, Timeout, ValidatedBlock,
28 },
29};
30use linera_execution::{committee::Committee, ResourceControlPolicy, WasmRuntime};
31use linera_storage::{DbStorage, ResultReadCertificates, Storage, TestClock};
32#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
33use linera_storage_service::client::StorageServiceDatabase;
34use linera_version::VersionInfo;
35#[cfg(feature = "dynamodb")]
36use linera_views::dynamo_db::DynamoDbDatabase;
37#[cfg(feature = "scylladb")]
38use linera_views::scylla_db::ScyllaDbDatabase;
39use linera_views::{
40 memory::MemoryDatabase, random::generate_test_namespace, store::TestKeyValueDatabase as _,
41};
42use tokio::sync::oneshot;
43use tokio_stream::wrappers::UnboundedReceiverStream;
44#[cfg(feature = "rocksdb")]
45use {
46 linera_views::rocks_db::RocksDbDatabase,
47 tokio::sync::{Semaphore, SemaphorePermit},
48};
49
50use crate::{
51 client::{chain_client, Client},
52 data_types::*,
53 node::{
54 CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
55 ValidatorNodeProvider,
56 },
57 notifier::ChannelNotifier,
58 worker::{Notification, ProcessableCertificate, WorkerState},
59};
60
61#[derive(Debug, PartialEq, Clone, Copy)]
62pub enum FaultType {
63 Honest,
64 Offline,
65 OfflineWithInfo,
66 NoChains,
67 DontSendConfirmVote,
68 DontProcessValidated,
69 DontSendValidateVote,
70}
71
72struct LocalValidator<S>
79where
80 S: Storage,
81{
82 state: WorkerState<S>,
83 notifier: Arc<ChannelNotifier<Notification>>,
84}
85
86#[derive(Clone)]
87pub struct LocalValidatorClient<S>
88where
89 S: Storage,
90{
91 public_key: ValidatorPublicKey,
92 client: Arc<Mutex<LocalValidator<S>>>,
93 fault_type: FaultType,
94}
95
96impl<S> ValidatorNode for LocalValidatorClient<S>
97where
98 S: Storage + Clone + Send + Sync + 'static,
99{
100 type NotificationStream = NotificationStream;
101
102 fn address(&self) -> String {
103 format!("local:{}", self.public_key)
104 }
105
106 async fn handle_block_proposal(
107 &self,
108 proposal: BlockProposal,
109 ) -> Result<ChainInfoResponse, NodeError> {
110 self.spawn_and_receive(move |validator, sender| {
111 validator.do_handle_block_proposal(proposal, sender)
112 })
113 .await
114 }
115
116 async fn handle_lite_certificate(
117 &self,
118 certificate: LiteCertificate<'_>,
119 _delivery: CrossChainMessageDelivery,
120 ) -> Result<ChainInfoResponse, NodeError> {
121 let certificate = certificate.cloned();
122 self.spawn_and_receive(move |validator, sender| {
123 validator.do_handle_lite_certificate(certificate, sender)
124 })
125 .await
126 }
127
128 async fn handle_timeout_certificate(
129 &self,
130 certificate: GenericCertificate<Timeout>,
131 ) -> Result<ChainInfoResponse, NodeError> {
132 self.spawn_and_receive(move |validator, sender| {
133 validator.do_handle_certificate(certificate, sender)
134 })
135 .await
136 }
137
138 async fn handle_validated_certificate(
139 &self,
140 certificate: GenericCertificate<ValidatedBlock>,
141 ) -> Result<ChainInfoResponse, NodeError> {
142 self.spawn_and_receive(move |validator, sender| {
143 validator.do_handle_certificate(certificate, sender)
144 })
145 .await
146 }
147
148 async fn handle_confirmed_certificate(
149 &self,
150 certificate: GenericCertificate<ConfirmedBlock>,
151 _delivery: CrossChainMessageDelivery,
152 ) -> Result<ChainInfoResponse, NodeError> {
153 self.spawn_and_receive(move |validator, sender| {
154 validator.do_handle_certificate(certificate, sender)
155 })
156 .await
157 }
158
159 async fn handle_chain_info_query(
160 &self,
161 query: ChainInfoQuery,
162 ) -> Result<ChainInfoResponse, NodeError> {
163 self.spawn_and_receive(move |validator, sender| {
164 validator.do_handle_chain_info_query(query, sender)
165 })
166 .await
167 }
168
169 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
170 self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
171 .await
172 }
173
174 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
175 Ok(Default::default())
176 }
177
178 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
179 Ok(self
180 .client
181 .lock()
182 .await
183 .state
184 .storage_client()
185 .read_network_description()
186 .await
187 .transpose()
188 .ok_or(NodeError::ViewError {
189 error: "missing NetworkDescription".to_owned(),
190 })??)
191 }
192
193 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
194 self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
195 .await
196 }
197
198 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
199 self.spawn_and_receive(move |validator, sender| validator.do_download_blob(blob_id, sender))
200 .await
201 }
202
203 async fn download_pending_blob(
204 &self,
205 chain_id: ChainId,
206 blob_id: BlobId,
207 ) -> Result<BlobContent, NodeError> {
208 self.spawn_and_receive(move |validator, sender| {
209 validator.do_download_pending_blob(chain_id, blob_id, sender)
210 })
211 .await
212 }
213
214 async fn handle_pending_blob(
215 &self,
216 chain_id: ChainId,
217 blob: BlobContent,
218 ) -> Result<ChainInfoResponse, NodeError> {
219 self.spawn_and_receive(move |validator, sender| {
220 validator.do_handle_pending_blob(chain_id, blob, sender)
221 })
222 .await
223 }
224
225 async fn download_certificate(
226 &self,
227 hash: CryptoHash,
228 ) -> Result<ConfirmedBlockCertificate, NodeError> {
229 self.spawn_and_receive(move |validator, sender| {
230 validator.do_download_certificate(hash, sender)
231 })
232 .await
233 }
234
235 async fn download_certificates(
236 &self,
237 hashes: Vec<CryptoHash>,
238 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
239 self.spawn_and_receive(move |validator, sender| {
240 validator.do_download_certificates(hashes, sender)
241 })
242 .await
243 }
244
245 async fn download_certificates_by_heights(
246 &self,
247 chain_id: ChainId,
248 heights: Vec<BlockHeight>,
249 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
250 self.spawn_and_receive(move |validator, sender| {
251 validator.do_download_certificates_by_heights(chain_id, heights, sender)
252 })
253 .await
254 }
255
256 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
257 self.spawn_and_receive(move |validator, sender| {
258 validator.do_blob_last_used_by(blob_id, sender)
259 })
260 .await
261 }
262
263 async fn blob_last_used_by_certificate(
264 &self,
265 blob_id: BlobId,
266 ) -> Result<ConfirmedBlockCertificate, NodeError> {
267 self.spawn_and_receive(move |validator, sender| {
268 validator.do_blob_last_used_by_certificate(blob_id, sender)
269 })
270 .await
271 }
272
273 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
274 self.spawn_and_receive(move |validator, sender| {
275 validator.do_missing_blob_ids(blob_ids, sender)
276 })
277 .await
278 }
279
280 async fn get_shard_info(
281 &self,
282 _chain_id: ChainId,
283 ) -> Result<crate::data_types::ShardInfo, NodeError> {
284 Ok(crate::data_types::ShardInfo {
286 shard_id: 0,
287 total_shards: 1,
288 })
289 }
290}
291
292impl<S> LocalValidatorClient<S>
293where
294 S: Storage + Clone + Send + Sync + 'static,
295{
296 fn new(public_key: ValidatorPublicKey, state: WorkerState<S>) -> Self {
297 let client = LocalValidator {
298 state,
299 notifier: Arc::new(ChannelNotifier::default()),
300 };
301 Self {
302 public_key,
303 client: Arc::new(Mutex::new(client)),
304 fault_type: FaultType::Honest,
305 }
306 }
307
308 pub fn name(&self) -> ValidatorPublicKey {
309 self.public_key
310 }
311
312 fn set_fault_type(&mut self, fault_type: FaultType) {
313 self.fault_type = fault_type;
314 }
315
316 pub async fn chain_info_with_manager_values(
318 &mut self,
319 chain_id: ChainId,
320 ) -> Result<Box<ChainInfo>, NodeError> {
321 let query = ChainInfoQuery::new(chain_id).with_manager_values();
322 let response = self.handle_chain_info_query(query).await?;
323 Ok(response.info)
324 }
325
326 async fn spawn_and_receive<F, R, T>(&self, f: F) -> T
329 where
330 T: Send + 'static,
331 R: Future<Output = Result<(), T>> + Send,
332 F: FnOnce(Self, oneshot::Sender<T>) -> R + Send + 'static,
333 {
334 let validator = self.clone();
335 let (sender, receiver) = oneshot::channel();
336 tokio::spawn(async move {
337 if f(validator, sender).await.is_err() {
338 tracing::debug!("result could not be sent");
339 }
340 });
341 receiver.await.unwrap()
342 }
343
344 async fn do_handle_block_proposal(
345 self,
346 proposal: BlockProposal,
347 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
348 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
349 let result = match self.fault_type {
350 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
351 error: "offline".to_string(),
352 }),
353 FaultType::NoChains => Err(NodeError::InactiveChain(proposal.content.block.chain_id)),
354 FaultType::DontSendValidateVote
355 | FaultType::Honest
356 | FaultType::DontSendConfirmVote
357 | FaultType::DontProcessValidated => {
358 let result = self
359 .client
360 .lock()
361 .await
362 .state
363 .handle_block_proposal(proposal)
364 .await
365 .map_err(Into::into);
366 if self.fault_type == FaultType::DontSendValidateVote {
367 Err(NodeError::ClientIoError {
368 error: "refusing to validate".to_string(),
369 })
370 } else {
371 result
372 }
373 }
374 };
375 sender.send(result.map(|(info, _actions)| info))
377 }
378
379 async fn do_handle_lite_certificate(
380 self,
381 certificate: LiteCertificate<'_>,
382 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
383 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
384 let client = self.client.clone();
385 let mut validator = client.lock().await;
386 let result = async move {
387 match validator.state.full_certificate(certificate).await? {
388 Either::Left(confirmed) => {
389 self.do_handle_certificate_internal(confirmed, &mut validator)
390 .await
391 }
392 Either::Right(validated) => {
393 self.do_handle_certificate_internal(validated, &mut validator)
394 .await
395 }
396 }
397 }
398 .await;
399 sender.send(result)
400 }
401
402 async fn do_handle_certificate_internal<T: ProcessableCertificate>(
403 &self,
404 certificate: GenericCertificate<T>,
405 validator: &mut MutexGuard<'_, LocalValidator<S>>,
406 ) -> Result<ChainInfoResponse, NodeError> {
407 match self.fault_type {
408 FaultType::DontProcessValidated if T::KIND == CertificateKind::Validated => {
409 Err(NodeError::ClientIoError {
410 error: "refusing to process validated block".to_string(),
411 })
412 }
413 FaultType::NoChains => Err(NodeError::InactiveChain(certificate.value().chain_id())),
414 FaultType::Honest
415 | FaultType::DontSendConfirmVote
416 | FaultType::DontProcessValidated
417 | FaultType::DontSendValidateVote => {
418 let result = validator
419 .state
420 .fully_handle_certificate_with_notifications(certificate, &validator.notifier)
421 .await
422 .map_err(Into::into);
423 if T::KIND == CertificateKind::Validated
424 && self.fault_type == FaultType::DontSendConfirmVote
425 {
426 Err(NodeError::ClientIoError {
427 error: "refusing to confirm".to_string(),
428 })
429 } else {
430 result
431 }
432 }
433 FaultType::Offline | FaultType::OfflineWithInfo => Err(NodeError::ClientIoError {
434 error: "offline".to_string(),
435 }),
436 }
437 }
438
439 async fn do_handle_certificate<T: ProcessableCertificate>(
440 self,
441 certificate: GenericCertificate<T>,
442 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
443 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
444 let mut validator = self.client.lock().await;
445 let result = self
446 .do_handle_certificate_internal(certificate, &mut validator)
447 .await;
448 sender.send(result)
449 }
450
451 async fn do_handle_chain_info_query(
452 self,
453 query: ChainInfoQuery,
454 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
455 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
456 let validator = self.client.lock().await;
457 let result = match self.fault_type {
458 FaultType::Offline => Err(NodeError::ClientIoError {
459 error: "offline".to_string(),
460 }),
461 FaultType::NoChains => Err(NodeError::InactiveChain(query.chain_id)),
462 FaultType::Honest
463 | FaultType::DontSendConfirmVote
464 | FaultType::DontProcessValidated
465 | FaultType::DontSendValidateVote
466 | FaultType::OfflineWithInfo => validator
467 .state
468 .handle_chain_info_query(query)
469 .await
470 .map_err(Into::into),
471 };
472 sender.send(result.map(|(info, _actions)| info))
474 }
475
476 async fn do_subscribe(
477 self,
478 chains: Vec<ChainId>,
479 sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
480 ) -> Result<(), Result<NotificationStream, NodeError>> {
481 let validator = self.client.lock().await;
482 let rx = validator.notifier.subscribe(chains);
483 let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
484 sender.send(Ok(stream))
485 }
486
487 async fn do_upload_blob(
488 self,
489 content: BlobContent,
490 sender: oneshot::Sender<Result<BlobId, NodeError>>,
491 ) -> Result<(), Result<BlobId, NodeError>> {
492 let validator = self.client.lock().await;
493 let blob = Blob::new(content);
494 let id = blob.id();
495 let storage = validator.state.storage_client();
496 let result = match storage.maybe_write_blobs(&[blob]).await {
497 Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
498 Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
499 Err(error) => Err(error.into()),
500 };
501 sender.send(result)
502 }
503
504 async fn do_download_blob(
505 self,
506 blob_id: BlobId,
507 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
508 ) -> Result<(), Result<BlobContent, NodeError>> {
509 let validator = self.client.lock().await;
510 let blob = validator
511 .state
512 .storage_client()
513 .read_blob(blob_id)
514 .await
515 .map_err(Into::into);
516 let blob = match blob {
517 Ok(blob) => blob.ok_or(NodeError::BlobsNotFound(vec![blob_id])),
518 Err(error) => Err(error),
519 };
520 sender.send(blob.map(|blob| blob.into_content()))
521 }
522
523 async fn do_download_pending_blob(
524 self,
525 chain_id: ChainId,
526 blob_id: BlobId,
527 sender: oneshot::Sender<Result<BlobContent, NodeError>>,
528 ) -> Result<(), Result<BlobContent, NodeError>> {
529 let validator = self.client.lock().await;
530 let result = validator
531 .state
532 .download_pending_blob(chain_id, blob_id)
533 .await
534 .map_err(Into::into);
535 sender.send(result.map(|blob| blob.into_content()))
536 }
537
538 async fn do_handle_pending_blob(
539 self,
540 chain_id: ChainId,
541 blob: BlobContent,
542 sender: oneshot::Sender<Result<ChainInfoResponse, NodeError>>,
543 ) -> Result<(), Result<ChainInfoResponse, NodeError>> {
544 let validator = self.client.lock().await;
545 let result = validator
546 .state
547 .handle_pending_blob(chain_id, Blob::new(blob))
548 .await
549 .map_err(Into::into);
550 sender.send(result)
551 }
552
553 async fn do_download_certificate(
554 self,
555 hash: CryptoHash,
556 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
557 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
558 let validator = self.client.lock().await;
559 let certificate = validator
560 .state
561 .storage_client()
562 .read_certificate(hash)
563 .await
564 .map_err(Into::into);
565
566 let certificate = match certificate {
567 Err(error) => Err(error),
568 Ok(entry) => match entry {
569 Some(certificate) => Ok(certificate),
570 None => {
571 panic!("Missing certificate: {hash}");
572 }
573 },
574 };
575
576 sender.send(certificate)
577 }
578
579 async fn do_download_certificates(
580 self,
581 hashes: Vec<CryptoHash>,
582 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
583 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
584 let validator = self.client.lock().await;
585 let certificates = validator
586 .state
587 .storage_client()
588 .read_certificates(hashes.clone())
589 .await
590 .map_err(Into::into);
591
592 let certificates = match certificates {
593 Err(error) => Err(error),
594 Ok(certificates) => match ResultReadCertificates::new(certificates, hashes) {
595 ResultReadCertificates::Certificates(certificates) => Ok(certificates),
596 ResultReadCertificates::InvalidHashes(hashes) => {
597 panic!("Missing certificates: {:?}", hashes)
598 }
599 },
600 };
601
602 sender.send(certificates)
603 }
604
605 async fn do_download_certificates_by_heights(
606 self,
607 chain_id: ChainId,
608 heights: Vec<BlockHeight>,
609 sender: oneshot::Sender<Result<Vec<ConfirmedBlockCertificate>, NodeError>>,
610 ) -> Result<(), Result<Vec<ConfirmedBlockCertificate>, NodeError>> {
611 let (query_sender, query_receiver) = oneshot::channel();
613 let query = ChainInfoQuery::new(chain_id).with_sent_certificate_hashes_by_heights(heights);
614
615 let self_clone = self.clone();
616 self.do_handle_chain_info_query(query, query_sender)
617 .await
618 .expect("Failed to handle chain info query");
619
620 let chain_info_response = query_receiver.await.map_err(|_| {
622 Err(NodeError::ClientIoError {
623 error: "Failed to receive chain info response".to_string(),
624 })
625 })?;
626
627 let hashes = match chain_info_response {
628 Ok(response) => response.info.requested_sent_certificate_hashes,
629 Err(e) => {
630 return sender.send(Err(e));
631 }
632 };
633
634 let (cert_sender, cert_receiver) = oneshot::channel();
636 self_clone
637 .do_download_certificates(hashes, cert_sender)
638 .await?;
639
640 let result = cert_receiver.await.map_err(|_| {
642 Err(NodeError::ClientIoError {
643 error: "Failed to receive certificates".to_string(),
644 })
645 })?;
646
647 sender.send(result)
648 }
649
650 async fn do_blob_last_used_by(
651 self,
652 blob_id: BlobId,
653 sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
654 ) -> Result<(), Result<CryptoHash, NodeError>> {
655 let validator = self.client.lock().await;
656 let blob_state = validator
657 .state
658 .storage_client()
659 .read_blob_state(blob_id)
660 .await
661 .map_err(Into::into);
662 let certificate_hash = match blob_state {
663 Err(err) => Err(err),
664 Ok(blob_state) => match blob_state {
665 None => Err(NodeError::BlobsNotFound(vec![blob_id])),
666 Some(blob_state) => blob_state
667 .last_used_by
668 .ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
669 },
670 };
671
672 sender.send(certificate_hash)
673 }
674
675 async fn do_blob_last_used_by_certificate(
676 self,
677 blob_id: BlobId,
678 sender: oneshot::Sender<Result<ConfirmedBlockCertificate, NodeError>>,
679 ) -> Result<(), Result<ConfirmedBlockCertificate, NodeError>> {
680 match self.blob_last_used_by(blob_id).await {
681 Ok(cert_hash) => {
682 let cert = self.download_certificate(cert_hash).await;
683 sender.send(cert)
684 }
685 Err(err) => sender.send(Err(err)),
686 }
687 }
688
689 async fn do_missing_blob_ids(
690 self,
691 blob_ids: Vec<BlobId>,
692 sender: oneshot::Sender<Result<Vec<BlobId>, NodeError>>,
693 ) -> Result<(), Result<Vec<BlobId>, NodeError>> {
694 let validator = self.client.lock().await;
695 let missing_blob_ids = validator
696 .state
697 .storage_client()
698 .missing_blobs(&blob_ids)
699 .await
700 .map_err(Into::into);
701 sender.send(missing_blob_ids)
702 }
703}
704
705#[derive(Clone)]
706pub struct NodeProvider<S>(Arc<std::sync::Mutex<Vec<LocalValidatorClient<S>>>>)
707where
708 S: Storage;
709
710impl<S> NodeProvider<S>
711where
712 S: Storage + Clone,
713{
714 fn all_nodes(&self) -> Vec<LocalValidatorClient<S>> {
715 self.0.lock().unwrap().clone()
716 }
717}
718
719impl<S> ValidatorNodeProvider for NodeProvider<S>
720where
721 S: Storage + Clone + Send + Sync + 'static,
722{
723 type Node = LocalValidatorClient<S>;
724
725 fn make_node(&self, _name: &str) -> Result<Self::Node, NodeError> {
726 unimplemented!()
727 }
728
729 fn make_nodes_from_list<A>(
730 &self,
731 validators: impl IntoIterator<Item = (ValidatorPublicKey, A)>,
732 ) -> Result<impl Iterator<Item = (ValidatorPublicKey, Self::Node)>, NodeError>
733 where
734 A: AsRef<str>,
735 {
736 let list = self.0.lock().unwrap();
737 Ok(validators
738 .into_iter()
739 .map(|(public_key, address)| {
740 list.iter()
741 .find(|client| client.public_key == public_key)
742 .ok_or_else(|| NodeError::CannotResolveValidatorAddress {
743 address: address.as_ref().to_string(),
744 })
745 .map(|client| (public_key, client.clone()))
746 })
747 .collect::<Result<Vec<_>, _>>()?
748 .into_iter())
749 }
750}
751
752impl<S> FromIterator<LocalValidatorClient<S>> for NodeProvider<S>
753where
754 S: Storage,
755{
756 fn from_iter<T>(iter: T) -> Self
757 where
758 T: IntoIterator<Item = LocalValidatorClient<S>>,
759 {
760 Self(Arc::new(std::sync::Mutex::new(iter.into_iter().collect())))
761 }
762}
763
764pub struct TestBuilder<B: StorageBuilder> {
771 storage_builder: B,
772 pub initial_committee: Committee,
773 admin_description: Option<ChainDescription>,
774 network_description: Option<NetworkDescription>,
775 genesis_storage_builder: GenesisStorageBuilder,
776 node_provider: NodeProvider<B::Storage>,
777 validator_storages: HashMap<ValidatorPublicKey, B::Storage>,
778 chain_client_storages: Vec<B::Storage>,
779 pub chain_owners: BTreeMap<ChainId, AccountOwner>,
780 pub signer: InMemorySigner,
781}
782
783#[async_trait]
784pub trait StorageBuilder {
785 type Storage: Storage + Clone + Send + Sync + 'static;
786
787 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error>;
788
789 fn clock(&self) -> &TestClock;
790}
791
792#[derive(Default)]
793struct GenesisStorageBuilder {
794 accounts: Vec<GenesisAccount>,
795}
796
797struct GenesisAccount {
798 description: ChainDescription,
799 public_key: AccountPublicKey,
800}
801
802impl GenesisStorageBuilder {
803 fn add(&mut self, description: ChainDescription, public_key: AccountPublicKey) {
804 self.accounts.push(GenesisAccount {
805 description,
806 public_key,
807 })
808 }
809
810 async fn build<S>(&self, storage: S) -> S
811 where
812 S: Storage + Clone + Send + Sync + 'static,
813 {
814 for account in &self.accounts {
815 storage
816 .create_chain(account.description.clone())
817 .await
818 .unwrap();
819 }
820 storage
821 }
822}
823
824pub type ChainClient<S> =
825 crate::client::ChainClient<crate::environment::Impl<S, NodeProvider<S>, InMemorySigner>>;
826
827impl<S: Storage + Clone + Send + Sync + 'static> ChainClient<S> {
828 pub async fn read_confirmed_blocks_downward(
830 &self,
831 from: CryptoHash,
832 limit: u32,
833 ) -> anyhow::Result<Vec<ConfirmedBlock>> {
834 let mut hash = Some(from);
835 let mut values = Vec::new();
836 for _ in 0..limit {
837 let Some(next_hash) = hash else {
838 break;
839 };
840 let value = self.read_confirmed_block(next_hash).await?;
841 hash = value.block().header.previous_block_hash;
842 values.push(value);
843 }
844 Ok(values)
845 }
846}
847
848impl<B> TestBuilder<B>
849where
850 B: StorageBuilder,
851{
852 pub async fn new(
853 mut storage_builder: B,
854 count: usize,
855 with_faulty_validators: usize,
856 mut signer: InMemorySigner,
857 ) -> Result<Self, anyhow::Error> {
858 let mut validators = Vec::new();
859 for _ in 0..count {
860 let validator_keypair = ValidatorKeypair::generate();
861 let account_public_key = signer.generate_new();
862 validators.push((validator_keypair, account_public_key));
863 }
864 let for_committee = validators
865 .iter()
866 .map(|(validating, account)| (validating.public_key, *account))
867 .collect::<Vec<_>>();
868 let initial_committee = Committee::make_simple(for_committee);
869 let mut validator_clients = Vec::new();
870 let mut validator_storages = HashMap::new();
871 let mut faulty_validators = HashSet::new();
872 for (i, (validator_keypair, _account_public_key)) in validators.into_iter().enumerate() {
873 let validator_public_key = validator_keypair.public_key;
874 let storage = storage_builder.build().await?;
875 let state = WorkerState::new(
876 format!("Node {}", i),
877 Some(validator_keypair.secret_key),
878 storage.clone(),
879 5_000,
880 10_000,
881 )
882 .with_allow_inactive_chains(false)
883 .with_allow_messages_from_deprecated_epochs(false);
884 let mut validator = LocalValidatorClient::new(validator_public_key, state);
885 if i < with_faulty_validators {
886 faulty_validators.insert(validator_public_key);
887 validator.set_fault_type(FaultType::NoChains);
888 }
889 validator_clients.push(validator);
890 validator_storages.insert(validator_public_key, storage);
891 }
892 tracing::info!(
893 "Test will use the following faulty validators: {:?}",
894 faulty_validators
895 );
896 Ok(Self {
897 storage_builder,
898 initial_committee,
899 admin_description: None,
900 network_description: None,
901 genesis_storage_builder: GenesisStorageBuilder::default(),
902 node_provider: NodeProvider::from_iter(validator_clients),
903 validator_storages,
904 chain_client_storages: Vec::new(),
905 chain_owners: BTreeMap::new(),
906 signer,
907 })
908 }
909
910 pub fn with_policy(mut self, policy: ResourceControlPolicy) -> Self {
911 let validators = self.initial_committee.validators().clone();
912 self.initial_committee = Committee::new(validators, policy);
913 self
914 }
915
916 pub fn set_fault_type(&mut self, indexes: impl AsRef<[usize]>, fault_type: FaultType) {
917 let mut faulty_validators = vec![];
918 let mut validator_clients = self.node_provider.0.lock().unwrap();
919 for index in indexes.as_ref() {
920 let validator = &mut validator_clients[*index];
921 validator.set_fault_type(fault_type);
922 faulty_validators.push(validator.public_key);
923 }
924 tracing::info!(
925 "Making the following validators {:?}: {:?}",
926 fault_type,
927 faulty_validators
928 );
929 }
930
931 pub async fn add_root_chain(
936 &mut self,
937 index: u32,
938 balance: Amount,
939 ) -> anyhow::Result<ChainClient<B::Storage>> {
940 if self.admin_description.is_none() && index != 0 {
942 Box::pin(self.add_root_chain(0, Amount::ZERO)).await?;
943 }
944 let origin = ChainOrigin::Root(index);
945 let public_key = self.signer.generate_new();
946 let open_chain_config = InitialChainConfig {
947 ownership: ChainOwnership::single(public_key.into()),
948 epoch: Epoch(0),
949 min_active_epoch: Epoch(0),
950 max_active_epoch: Epoch(0),
951 balance,
952 application_permissions: ApplicationPermissions::default(),
953 };
954 let description = ChainDescription::new(origin, open_chain_config, Timestamp::from(0));
955 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
956 if index == 0 {
957 self.admin_description = Some(description.clone());
958 self.network_description = Some(NetworkDescription {
959 admin_chain_id: description.id(),
960 genesis_config_hash: CryptoHash::test_hash("genesis config"),
962 genesis_timestamp: Timestamp::from(0),
963 genesis_committee_blob_hash: committee_blob.id().hash,
964 name: "test network".to_string(),
965 });
966 }
967 self.genesis_storage_builder
969 .add(description.clone(), public_key);
970
971 let network_description = self.network_description.as_ref().unwrap();
972
973 for validator in self.node_provider.all_nodes() {
974 let storage = self
975 .validator_storages
976 .get_mut(&validator.public_key)
977 .unwrap();
978 storage
979 .write_network_description(network_description)
980 .await
981 .expect("writing the NetworkDescription should succeed");
982 storage
983 .write_blob(&committee_blob)
984 .await
985 .expect("writing a blob should succeed");
986 storage.create_chain(description.clone()).await.unwrap();
987 }
988 for storage in &mut self.chain_client_storages {
989 storage.create_chain(description.clone()).await.unwrap();
990 }
991 let chain_id = description.id();
992 self.chain_owners.insert(chain_id, public_key.into());
993 self.make_client(chain_id, None, BlockHeight::ZERO).await
994 }
995
996 pub fn genesis_chains(&self) -> Vec<(AccountPublicKey, Amount)> {
997 let mut result = Vec::new();
998 for (i, genesis_account) in self.genesis_storage_builder.accounts.iter().enumerate() {
999 assert_eq!(
1000 genesis_account.description.origin(),
1001 ChainOrigin::Root(i as u32)
1002 );
1003 result.push((
1004 genesis_account.public_key,
1005 genesis_account.description.config().balance,
1006 ));
1007 }
1008 result
1009 }
1010
1011 pub fn admin_id(&self) -> ChainId {
1012 self.admin_description
1013 .as_ref()
1014 .expect("admin chain not initialized")
1015 .id()
1016 }
1017
1018 pub fn admin_description(&self) -> Option<&ChainDescription> {
1019 self.admin_description.as_ref()
1020 }
1021
1022 pub fn make_node_provider(&self) -> NodeProvider<B::Storage> {
1023 self.node_provider.clone()
1024 }
1025
1026 pub fn node(&mut self, index: usize) -> LocalValidatorClient<B::Storage> {
1027 self.node_provider.0.lock().unwrap()[index].clone()
1028 }
1029
1030 pub async fn make_storage(&mut self) -> anyhow::Result<B::Storage> {
1031 let storage = self.storage_builder.build().await?;
1032 let network_description = self.network_description.as_ref().unwrap();
1033 let committee_blob = Blob::new_committee(bcs::to_bytes(&self.initial_committee).unwrap());
1034 storage
1035 .write_network_description(network_description)
1036 .await
1037 .expect("writing the NetworkDescription should succeed");
1038 storage
1039 .write_blob(&committee_blob)
1040 .await
1041 .expect("writing a blob should succeed");
1042 Ok(self.genesis_storage_builder.build(storage).await)
1043 }
1044
1045 pub async fn make_client_with_options(
1046 &mut self,
1047 chain_id: ChainId,
1048 block_hash: Option<CryptoHash>,
1049 block_height: BlockHeight,
1050 options: chain_client::Options,
1051 ) -> anyhow::Result<ChainClient<B::Storage>> {
1052 let storage = self.make_storage().await?;
1055 self.chain_client_storages.push(storage.clone());
1056 let client = Arc::new(Client::new(
1057 crate::environment::Impl {
1058 network: self.make_node_provider(),
1059 storage,
1060 signer: self.signer.clone(),
1061 },
1062 self.admin_id(),
1063 false,
1064 [chain_id],
1065 format!("Client node for {:.8}", chain_id),
1066 Duration::from_secs(30),
1067 Duration::from_secs(1),
1068 options,
1069 5_000,
1070 10_000,
1071 crate::client::RequestsSchedulerConfig::default(),
1072 ));
1073 Ok(client.create_chain_client(
1074 chain_id,
1075 block_hash,
1076 block_height,
1077 None,
1078 self.chain_owners.get(&chain_id).copied(),
1079 None,
1080 ))
1081 }
1082
1083 pub async fn make_client(
1084 &mut self,
1085 chain_id: ChainId,
1086 block_hash: Option<CryptoHash>,
1087 block_height: BlockHeight,
1088 ) -> anyhow::Result<ChainClient<B::Storage>> {
1089 self.make_client_with_options(
1090 chain_id,
1091 block_hash,
1092 block_height,
1093 chain_client::Options::test_default(),
1094 )
1095 .await
1096 }
1097
1098 pub async fn check_that_validators_have_certificate(
1100 &self,
1101 chain_id: ChainId,
1102 block_height: BlockHeight,
1103 target_count: usize,
1104 ) -> Option<ConfirmedBlockCertificate> {
1105 let query = ChainInfoQuery::new(chain_id)
1106 .with_sent_certificate_hashes_by_heights(vec![block_height]);
1107 let mut count = 0;
1108 let mut certificate = None;
1109 for validator in self.node_provider.all_nodes() {
1110 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1111 if response.check(validator.public_key).is_ok() {
1112 let ChainInfo {
1113 mut requested_sent_certificate_hashes,
1114 ..
1115 } = *response.info;
1116 debug_assert!(requested_sent_certificate_hashes.len() <= 1);
1117 if let Some(cert_hash) = requested_sent_certificate_hashes.pop() {
1118 if let Ok(cert) = validator.download_certificate(cert_hash).await {
1119 if cert.inner().block().header.chain_id == chain_id
1120 && cert.inner().block().header.height == block_height
1121 {
1122 cert.check(&self.initial_committee).unwrap();
1123 count += 1;
1124 certificate = Some(cert);
1125 }
1126 }
1127 }
1128 }
1129 }
1130 }
1131 assert!(count >= target_count);
1132 certificate
1133 }
1134
1135 pub async fn check_that_validators_are_in_round(
1138 &self,
1139 chain_id: ChainId,
1140 block_height: BlockHeight,
1141 round: Round,
1142 target_count: usize,
1143 ) {
1144 let query = ChainInfoQuery::new(chain_id);
1145 let mut count = 0;
1146 for validator in self.node_provider.all_nodes() {
1147 if let Ok(response) = validator.handle_chain_info_query(query.clone()).await {
1148 if response.info.manager.current_round == round
1149 && response.info.next_block_height == block_height
1150 && response.check(validator.public_key).is_ok()
1151 {
1152 count += 1;
1153 }
1154 }
1155 }
1156 assert!(count >= target_count);
1157 }
1158
1159 pub async fn check_that_validators_have_empty_outboxes(&self, chain_id: ChainId) {
1161 for validator in self.node_provider.all_nodes() {
1162 let guard = validator.client.lock().await;
1163 let chain = guard.state.chain_state_view(chain_id).await.unwrap();
1164 assert_eq!(chain.outboxes.indices().await.unwrap(), []);
1165 }
1166 }
1167}
1168
1169#[cfg(feature = "rocksdb")]
1170static ROCKS_DB_SEMAPHORE: Semaphore = Semaphore::const_new(5);
1172
1173#[derive(Default)]
1174pub struct MemoryStorageBuilder {
1175 namespace: String,
1176 instance_counter: usize,
1177 wasm_runtime: Option<WasmRuntime>,
1178 clock: TestClock,
1179}
1180
1181#[async_trait]
1182impl StorageBuilder for MemoryStorageBuilder {
1183 type Storage = DbStorage<MemoryDatabase, TestClock>;
1184
1185 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1186 self.instance_counter += 1;
1187 let config = MemoryDatabase::new_test_config().await?;
1188 if self.namespace.is_empty() {
1189 self.namespace = generate_test_namespace();
1190 }
1191 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1192 Ok(
1193 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1194 .await?,
1195 )
1196 }
1197
1198 fn clock(&self) -> &TestClock {
1199 &self.clock
1200 }
1201}
1202
1203impl MemoryStorageBuilder {
1204 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1207 MemoryStorageBuilder {
1208 wasm_runtime: wasm_runtime.into(),
1209 ..MemoryStorageBuilder::default()
1210 }
1211 }
1212}
1213
1214#[cfg(feature = "rocksdb")]
1215pub struct RocksDbStorageBuilder {
1216 namespace: String,
1217 instance_counter: usize,
1218 wasm_runtime: Option<WasmRuntime>,
1219 clock: TestClock,
1220 _permit: SemaphorePermit<'static>,
1221}
1222
1223#[cfg(feature = "rocksdb")]
1224impl RocksDbStorageBuilder {
1225 pub async fn new() -> Self {
1226 RocksDbStorageBuilder {
1227 namespace: String::new(),
1228 instance_counter: 0,
1229 wasm_runtime: None,
1230 clock: TestClock::default(),
1231 _permit: ROCKS_DB_SEMAPHORE.acquire().await.unwrap(),
1232 }
1233 }
1234
1235 #[cfg(any(feature = "wasmer", feature = "wasmtime"))]
1238 pub async fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1239 RocksDbStorageBuilder {
1240 wasm_runtime: wasm_runtime.into(),
1241 ..RocksDbStorageBuilder::new().await
1242 }
1243 }
1244}
1245
1246#[cfg(feature = "rocksdb")]
1247#[async_trait]
1248impl StorageBuilder for RocksDbStorageBuilder {
1249 type Storage = DbStorage<RocksDbDatabase, TestClock>;
1250
1251 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1252 self.instance_counter += 1;
1253 let config = RocksDbDatabase::new_test_config().await?;
1254 if self.namespace.is_empty() {
1255 self.namespace = generate_test_namespace();
1256 }
1257 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1258 Ok(
1259 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1260 .await?,
1261 )
1262 }
1263
1264 fn clock(&self) -> &TestClock {
1265 &self.clock
1266 }
1267}
1268
1269#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1270#[derive(Default)]
1271pub struct ServiceStorageBuilder {
1272 namespace: String,
1273 instance_counter: usize,
1274 wasm_runtime: Option<WasmRuntime>,
1275 clock: TestClock,
1276}
1277
1278#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1279impl ServiceStorageBuilder {
1280 pub fn new() -> Self {
1282 Self::with_wasm_runtime(None)
1283 }
1284
1285 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1287 ServiceStorageBuilder {
1288 wasm_runtime: wasm_runtime.into(),
1289 ..ServiceStorageBuilder::default()
1290 }
1291 }
1292}
1293
1294#[cfg(all(not(target_arch = "wasm32"), feature = "storage-service"))]
1295#[async_trait]
1296impl StorageBuilder for ServiceStorageBuilder {
1297 type Storage = DbStorage<StorageServiceDatabase, TestClock>;
1298
1299 async fn build(&mut self) -> anyhow::Result<Self::Storage> {
1300 self.instance_counter += 1;
1301 let config = StorageServiceDatabase::new_test_config().await?;
1302 if self.namespace.is_empty() {
1303 self.namespace = generate_test_namespace();
1304 }
1305 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1306 Ok(
1307 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1308 .await?,
1309 )
1310 }
1311
1312 fn clock(&self) -> &TestClock {
1313 &self.clock
1314 }
1315}
1316
1317#[cfg(feature = "dynamodb")]
1318#[derive(Default)]
1319pub struct DynamoDbStorageBuilder {
1320 namespace: String,
1321 instance_counter: usize,
1322 wasm_runtime: Option<WasmRuntime>,
1323 clock: TestClock,
1324}
1325
1326#[cfg(feature = "dynamodb")]
1327impl DynamoDbStorageBuilder {
1328 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1331 DynamoDbStorageBuilder {
1332 wasm_runtime: wasm_runtime.into(),
1333 ..DynamoDbStorageBuilder::default()
1334 }
1335 }
1336}
1337
1338#[cfg(feature = "dynamodb")]
1339#[async_trait]
1340impl StorageBuilder for DynamoDbStorageBuilder {
1341 type Storage = DbStorage<DynamoDbDatabase, TestClock>;
1342
1343 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1344 self.instance_counter += 1;
1345 let config = DynamoDbDatabase::new_test_config().await?;
1346 if self.namespace.is_empty() {
1347 self.namespace = generate_test_namespace();
1348 }
1349 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1350 Ok(
1351 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1352 .await?,
1353 )
1354 }
1355
1356 fn clock(&self) -> &TestClock {
1357 &self.clock
1358 }
1359}
1360
1361#[cfg(feature = "scylladb")]
1362#[derive(Default)]
1363pub struct ScyllaDbStorageBuilder {
1364 namespace: String,
1365 instance_counter: usize,
1366 wasm_runtime: Option<WasmRuntime>,
1367 clock: TestClock,
1368}
1369
1370#[cfg(feature = "scylladb")]
1371impl ScyllaDbStorageBuilder {
1372 pub fn with_wasm_runtime(wasm_runtime: impl Into<Option<WasmRuntime>>) -> Self {
1375 ScyllaDbStorageBuilder {
1376 wasm_runtime: wasm_runtime.into(),
1377 ..ScyllaDbStorageBuilder::default()
1378 }
1379 }
1380}
1381
1382#[cfg(feature = "scylladb")]
1383#[async_trait]
1384impl StorageBuilder for ScyllaDbStorageBuilder {
1385 type Storage = DbStorage<ScyllaDbDatabase, TestClock>;
1386
1387 async fn build(&mut self) -> Result<Self::Storage, anyhow::Error> {
1388 self.instance_counter += 1;
1389 let config = ScyllaDbDatabase::new_test_config().await?;
1390 if self.namespace.is_empty() {
1391 self.namespace = generate_test_namespace();
1392 }
1393 let namespace = format!("{}_{}", self.namespace, self.instance_counter);
1394 Ok(
1395 DbStorage::new_for_testing(config, &namespace, self.wasm_runtime, self.clock.clone())
1396 .await?,
1397 )
1398 }
1399
1400 fn clock(&self) -> &TestClock {
1401 &self.clock
1402 }
1403}
1404
1405pub trait ClientOutcomeResultExt<T, E> {
1406 fn unwrap_ok_committed(self) -> T;
1407}
1408
1409impl<T, E: std::fmt::Debug> ClientOutcomeResultExt<T, E> for Result<ClientOutcome<T>, E> {
1410 fn unwrap_ok_committed(self) -> T {
1411 self.unwrap().unwrap()
1412 }
1413}