1use std::{
5 collections::{BTreeMap, HashMap},
6 fmt::Debug,
7 sync::Arc,
8};
9
10use async_trait::async_trait;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14 crypto::CryptoHash,
15 data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
16 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
17};
18use linera_chain::{
19 types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
20 ChainStateView,
21};
22use linera_execution::{
23 BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
24};
25use linera_views::{
26 backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
27 batch::Batch,
28 context::ViewContext,
29 store::{
30 KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
31 },
32 views::View,
33 ViewError,
34};
35use serde::{Deserialize, Serialize};
36use tracing::instrument;
37#[cfg(with_testing)]
38use {
39 futures::channel::oneshot::{self, Receiver},
40 linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
41 std::cmp::Reverse,
42};
43
44use crate::{ChainRuntimeContext, Clock, Storage};
45
46#[cfg(with_metrics)]
47pub mod metrics {
48 use std::sync::LazyLock;
49
50 use linera_base::prometheus_util::{
51 exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
52 };
53 use prometheus::{HistogramVec, IntCounterVec};
54
55 pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
57 register_int_counter_vec(
58 "contains_blob",
59 "The metric counting how often a blob is tested for existence from storage",
60 &[],
61 )
62 });
63
64 pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
66 register_int_counter_vec(
67 "contains_blobs",
68 "The metric counting how often multiple blobs are tested for existence from storage",
69 &[],
70 )
71 });
72
73 pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
75 register_int_counter_vec(
76 "contains_blob_state",
77 "The metric counting how often a blob state is tested for existence from storage",
78 &[],
79 )
80 });
81
82 pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
84 register_int_counter_vec(
85 "contains_certificate",
86 "The metric counting how often a certificate is tested for existence from storage",
87 &[],
88 )
89 });
90
91 #[doc(hidden)]
93 pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
94 register_int_counter_vec(
95 "read_confirmed_block",
96 "The metric counting how often a hashed confirmed block is read from storage",
97 &[],
98 )
99 });
100
101 #[doc(hidden)]
103 pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
104 LazyLock::new(|| {
105 register_int_counter_vec(
106 "read_confirmed_blocks",
107 "The metric counting how often confirmed blocks are read from storage",
108 &[],
109 )
110 });
111
112 #[doc(hidden)]
114 pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
115 register_int_counter_vec(
116 "read_blob",
117 "The metric counting how often a blob is read from storage",
118 &[],
119 )
120 });
121
122 #[doc(hidden)]
124 pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
125 register_int_counter_vec(
126 "read_blob_state",
127 "The metric counting how often a blob state is read from storage",
128 &[],
129 )
130 });
131
132 #[doc(hidden)]
134 pub(super) static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
135 register_int_counter_vec(
136 "read_blob_states",
137 "The metric counting how often blob states are read from storage",
138 &[],
139 )
140 });
141
142 #[doc(hidden)]
144 pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
145 register_int_counter_vec(
146 "write_blob",
147 "The metric counting how often a blob is written to storage",
148 &[],
149 )
150 });
151
152 #[doc(hidden)]
154 pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
155 register_int_counter_vec(
156 "read_certificate",
157 "The metric counting how often a certificate is read from storage",
158 &[],
159 )
160 });
161
162 #[doc(hidden)]
164 pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
165 register_int_counter_vec(
166 "read_certificates",
167 "The metric counting how often certificate are read from storage",
168 &[],
169 )
170 });
171
172 #[doc(hidden)]
174 pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
175 register_int_counter_vec(
176 "write_certificate",
177 "The metric counting how often a certificate is written to storage",
178 &[],
179 )
180 });
181
182 #[doc(hidden)]
184 pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
185 register_histogram_vec(
186 "load_chain_latency",
187 "The latency to load a chain state",
188 &[],
189 exponential_bucket_latencies(1000.0),
190 )
191 });
192
193 #[doc(hidden)]
195 pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
196 register_int_counter_vec(
197 "read_event",
198 "The metric counting how often an event is read from storage",
199 &[],
200 )
201 });
202
203 pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
205 register_int_counter_vec(
206 "contains_event",
207 "The metric counting how often an event is tested for existence from storage",
208 &[],
209 )
210 });
211
212 #[doc(hidden)]
214 pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
215 register_int_counter_vec(
216 "write_event",
217 "The metric counting how often an event is written to storage",
218 &[],
219 )
220 });
221
222 #[doc(hidden)]
224 pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
225 register_int_counter_vec(
226 "network_description",
227 "The metric counting how often the network description is read from storage",
228 &[],
229 )
230 });
231
232 #[doc(hidden)]
234 pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
235 register_int_counter_vec(
236 "write_network_description",
237 "The metric counting how often the network description is written to storage",
238 &[],
239 )
240 });
241}
242
243const BLOB_KEY: &[u8] = &[0];
245
246const BLOB_STATE_KEY: &[u8] = &[1];
248
249const LITE_CERTIFICATE_KEY: &[u8] = &[2];
251
252const BLOCK_KEY: &[u8] = &[3];
254
255const NETWORK_DESCRIPTION_KEY: &[u8] = &[4];
257
258fn get_block_keys() -> Vec<Vec<u8>> {
259 vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
260}
261
262#[derive(Default)]
263#[allow(clippy::type_complexity)]
264struct MultiPartitionBatch {
265 keys_value_bytes: BTreeMap<Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>>,
266}
267
268impl MultiPartitionBatch {
269 fn new() -> Self {
270 Self::default()
271 }
272
273 fn put_key_values(&mut self, root_key: Vec<u8>, key_values: Vec<(Vec<u8>, Vec<u8>)>) {
274 let entry = self.keys_value_bytes.entry(root_key).or_default();
275 entry.extend(key_values);
276 }
277
278 fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
279 self.put_key_values(root_key, vec![(key, value)]);
280 }
281
282 fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
283 #[cfg(with_metrics)]
284 metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
285 let root_key = RootKey::BlobId(blob.id()).bytes();
286 let key = BLOB_KEY.to_vec();
287 self.put_key_value(root_key, key, blob.bytes().to_vec());
288 Ok(())
289 }
290
291 fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
292 let root_key = RootKey::BlobId(blob_id).bytes();
293 let key = BLOB_STATE_KEY.to_vec();
294 let value = bcs::to_bytes(blob_state)?;
295 self.put_key_value(root_key, key, value);
296 Ok(())
297 }
298
299 fn add_certificate(
308 &mut self,
309 certificate: &ConfirmedBlockCertificate,
310 ) -> Result<(), ViewError> {
311 #[cfg(with_metrics)]
312 metrics::WRITE_CERTIFICATE_COUNTER
313 .with_label_values(&[])
314 .inc();
315 let hash = certificate.hash();
316
317 let root_key = RootKey::BlockHash(hash).bytes();
319 let mut key_values = Vec::new();
320 let key = LITE_CERTIFICATE_KEY.to_vec();
321 let value = bcs::to_bytes(&certificate.lite_certificate())?;
322 key_values.push((key, value));
323 let key = BLOCK_KEY.to_vec();
324 let value = bcs::to_bytes(&certificate.value())?;
325 key_values.push((key, value));
326 self.put_key_values(root_key, key_values);
327
328 let chain_id = certificate.value().block().header.chain_id;
330 let height = certificate.value().block().header.height;
331 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
332 let height_key = to_height_key(height);
333 let index_value = bcs::to_bytes(&hash)?;
334 self.put_key_value(index_root_key, height_key, index_value);
335
336 Ok(())
337 }
338
339 fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
340 #[cfg(with_metrics)]
341 metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
342 let key = to_event_key(&event_id);
343 let root_key = RootKey::Event(event_id.chain_id).bytes();
344 self.put_key_value(root_key, key, value);
345 Ok(())
346 }
347
348 fn add_network_description(
349 &mut self,
350 information: &NetworkDescription,
351 ) -> Result<(), ViewError> {
352 #[cfg(with_metrics)]
353 metrics::WRITE_NETWORK_DESCRIPTION
354 .with_label_values(&[])
355 .inc();
356 let root_key = RootKey::NetworkDescription.bytes();
357 let key = NETWORK_DESCRIPTION_KEY.to_vec();
358 let value = bcs::to_bytes(information)?;
359 self.put_key_value(root_key, key, value);
360 Ok(())
361 }
362}
363
364#[derive(Clone)]
366pub struct DbStorage<Database, Clock = WallClock> {
367 database: Arc<Database>,
368 clock: Clock,
369 thread_pool: Arc<linera_execution::ThreadPool>,
370 wasm_runtime: Option<WasmRuntime>,
371 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
372 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
373 execution_runtime_config: ExecutionRuntimeConfig,
374}
375
376#[derive(Debug, Serialize, Deserialize)]
377enum RootKey {
378 NetworkDescription,
379 BlockExporterState(u32),
380 ChainState(ChainId),
381 BlockHash(CryptoHash),
382 BlobId(BlobId),
383 Event(ChainId),
384 BlockByHeight(ChainId),
385}
386
387const CHAIN_ID_TAG: u8 = 2;
388const BLOB_ID_TAG: u8 = 4;
389const EVENT_ID_TAG: u8 = 5;
390
391impl RootKey {
392 fn bytes(&self) -> Vec<u8> {
393 bcs::to_bytes(self).unwrap()
394 }
395}
396
397#[derive(Debug, Serialize, Deserialize)]
398struct RestrictedEventId {
399 pub stream_id: StreamId,
400 pub index: u32,
401}
402
403fn to_event_key(event_id: &EventId) -> Vec<u8> {
404 let restricted_event_id = RestrictedEventId {
405 stream_id: event_id.stream_id.clone(),
406 index: event_id.index,
407 };
408 bcs::to_bytes(&restricted_event_id).unwrap()
409}
410
411pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
412 bcs::to_bytes(&height).unwrap()
413}
414
415fn is_chain_state(root_key: &[u8]) -> bool {
416 if root_key.is_empty() {
417 return false;
418 }
419 root_key[0] == CHAIN_ID_TAG
420}
421
422#[derive(Clone, Copy)]
425pub struct ChainStatesFirstAssignment;
426
427impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
428 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
429 if root_key.is_empty() {
430 return Ok(StoreInUse::Second);
431 }
432 let store = match is_chain_state(root_key) {
433 true => StoreInUse::First,
434 false => StoreInUse::Second,
435 };
436 Ok(store)
437 }
438}
439
440#[derive(Clone)]
442pub struct WallClock;
443
444#[cfg_attr(not(web), async_trait)]
445#[cfg_attr(web, async_trait(?Send))]
446impl Clock for WallClock {
447 fn current_time(&self) -> Timestamp {
448 Timestamp::now()
449 }
450
451 async fn sleep(&self, delta: TimeDelta) {
452 linera_base::time::timer::sleep(delta.as_duration()).await
453 }
454
455 async fn sleep_until(&self, timestamp: Timestamp) {
456 let delta = timestamp.delta_since(Timestamp::now());
457 if delta > TimeDelta::ZERO {
458 self.sleep(delta).await
459 }
460 }
461}
462
463#[cfg(with_testing)]
464#[derive(Default)]
465struct TestClockInner {
466 time: Timestamp,
467 sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
468 sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
471}
472
473#[cfg(with_testing)]
474impl TestClockInner {
475 fn set(&mut self, time: Timestamp) {
476 self.time = time;
477 let senders = self.sleeps.split_off(&Reverse(time));
478 for sender in senders.into_values().flatten() {
479 sender.send(()).ok();
481 }
482 }
483
484 fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
485 let target_time = self.time.saturating_add(delta);
486 self.add_sleep_until(target_time)
487 }
488
489 fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
490 let (sender, receiver) = oneshot::channel();
491 let should_auto_advance = self
492 .sleep_callback
493 .as_ref()
494 .is_some_and(|callback| callback(time));
495 if should_auto_advance && time > self.time {
496 self.set(time);
498 sender.send(()).ok();
500 } else if self.time >= time {
501 sender.send(()).ok();
503 } else {
504 self.sleeps.entry(Reverse(time)).or_default().push(sender);
505 }
506 receiver
507 }
508}
509
510#[cfg(with_testing)]
513#[derive(Clone, Default)]
514pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
515
516#[cfg(with_testing)]
517#[cfg_attr(not(web), async_trait)]
518#[cfg_attr(web, async_trait(?Send))]
519impl Clock for TestClock {
520 fn current_time(&self) -> Timestamp {
521 self.lock().time
522 }
523
524 async fn sleep(&self, delta: TimeDelta) {
525 if delta == TimeDelta::ZERO {
526 return;
527 }
528 let receiver = self.lock().add_sleep(delta);
529 receiver.await.ok();
531 }
532
533 async fn sleep_until(&self, timestamp: Timestamp) {
534 let receiver = self.lock().add_sleep_until(timestamp);
535 receiver.await.ok();
537 }
538}
539
540#[cfg(with_testing)]
541impl TestClock {
542 pub fn new() -> Self {
544 TestClock(Arc::default())
545 }
546
547 pub fn set(&self, time: Timestamp) {
549 self.lock().set(time);
550 }
551
552 pub fn add(&self, delta: TimeDelta) {
554 let mut guard = self.lock();
555 let time = guard.time.saturating_add(delta);
556 guard.set(time);
557 }
558
559 pub fn current_time(&self) -> Timestamp {
561 self.lock().time
562 }
563
564 pub fn set_sleep_callback<F>(&self, callback: F)
569 where
570 F: Fn(Timestamp) -> bool + Send + Sync + 'static,
571 {
572 self.lock().sleep_callback = Some(Box::new(callback));
573 }
574
575 pub fn clear_sleep_callback(&self) {
577 self.lock().sleep_callback = None;
578 }
579
580 fn lock(&self) -> std::sync::MutexGuard<'_, TestClockInner> {
581 self.0.lock().expect("poisoned TestClock mutex")
582 }
583}
584
585#[cfg_attr(not(web), async_trait)]
586#[cfg_attr(web, async_trait(?Send))]
587impl<Database, C> Storage for DbStorage<Database, C>
588where
589 Database: KeyValueDatabase<
590 Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
591 Error: Send + Sync,
592 > + Clone
593 + linera_base::util::traits::AutoTraits
594 + 'static,
595 C: Clock + Clone + Send + Sync + 'static,
596{
597 type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
598 type Clock = C;
599 type BlockExporterContext = ViewContext<u32, Database::Store>;
600
601 fn clock(&self) -> &C {
602 &self.clock
603 }
604
605 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
606 &self.thread_pool
607 }
608
609 #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
610 async fn load_chain(
611 &self,
612 chain_id: ChainId,
613 ) -> Result<ChainStateView<Self::Context>, ViewError> {
614 #[cfg(with_metrics)]
615 let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
616 let runtime_context = ChainRuntimeContext {
617 storage: self.clone(),
618 thread_pool: self.thread_pool.clone(),
619 chain_id,
620 execution_runtime_config: self.execution_runtime_config,
621 user_contracts: self.user_contracts.clone(),
622 user_services: self.user_services.clone(),
623 };
624 let root_key = RootKey::ChainState(chain_id).bytes();
625 let store = self.database.open_exclusive(&root_key)?;
626 let context = ViewContext::create_root_context(store, runtime_context).await?;
627 ChainStateView::load(context).await
628 }
629
630 #[instrument(level = "trace", skip_all, fields(%blob_id))]
631 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
632 let root_key = RootKey::BlobId(blob_id).bytes();
633 let store = self.database.open_shared(&root_key)?;
634 let test = store.contains_key(BLOB_KEY).await?;
635 #[cfg(with_metrics)]
636 metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
637 Ok(test)
638 }
639
640 #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
641 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
642 let mut missing_blobs = Vec::new();
643 for blob_id in blob_ids {
644 let root_key = RootKey::BlobId(*blob_id).bytes();
645 let store = self.database.open_shared(&root_key)?;
646 if !store.contains_key(BLOB_KEY).await? {
647 missing_blobs.push(*blob_id);
648 }
649 }
650 #[cfg(with_metrics)]
651 metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
652 Ok(missing_blobs)
653 }
654
655 #[instrument(skip_all, fields(%blob_id))]
656 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
657 let root_key = RootKey::BlobId(blob_id).bytes();
658 let store = self.database.open_shared(&root_key)?;
659 let test = store.contains_key(BLOB_STATE_KEY).await?;
660 #[cfg(with_metrics)]
661 metrics::CONTAINS_BLOB_STATE_COUNTER
662 .with_label_values(&[])
663 .inc();
664 Ok(test)
665 }
666
667 #[instrument(skip_all, fields(%hash))]
668 async fn read_confirmed_block(
669 &self,
670 hash: CryptoHash,
671 ) -> Result<Option<ConfirmedBlock>, ViewError> {
672 let root_key = RootKey::BlockHash(hash).bytes();
673 let store = self.database.open_shared(&root_key)?;
674 let value = store.read_value(BLOCK_KEY).await?;
675 #[cfg(with_metrics)]
676 metrics::READ_CONFIRMED_BLOCK_COUNTER
677 .with_label_values(&[])
678 .inc();
679 Ok(value)
680 }
681
682 #[instrument(skip_all)]
683 async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
684 &self,
685 hashes: I,
686 ) -> Result<Vec<Option<ConfirmedBlock>>, ViewError> {
687 let hashes = hashes.into_iter().collect::<Vec<_>>();
688 if hashes.is_empty() {
689 return Ok(Vec::new());
690 }
691 let root_keys = Self::get_root_keys_for_certificates(&hashes);
692 let mut blocks = Vec::new();
693 for root_key in root_keys {
694 let store = self.database.open_shared(&root_key)?;
695 blocks.push(store.read_value(BLOCK_KEY).await?);
696 }
697 #[cfg(with_metrics)]
698 metrics::READ_CONFIRMED_BLOCKS_COUNTER
699 .with_label_values(&[])
700 .inc_by(hashes.len() as u64);
701 Ok(blocks)
702 }
703
704 #[instrument(skip_all, fields(%blob_id))]
705 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
706 let root_key = RootKey::BlobId(blob_id).bytes();
707 let store = self.database.open_shared(&root_key)?;
708 let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
709 #[cfg(with_metrics)]
710 metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
711 Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
712 }
713
714 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
715 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
716 if blob_ids.is_empty() {
717 return Ok(Vec::new());
718 }
719 let mut blobs = Vec::new();
720 for blob_id in blob_ids {
721 blobs.push(self.read_blob(*blob_id).await?);
722 }
723 #[cfg(with_metrics)]
724 metrics::READ_BLOB_COUNTER
725 .with_label_values(&[])
726 .inc_by(blob_ids.len() as u64);
727 Ok(blobs)
728 }
729
730 #[instrument(skip_all, fields(%blob_id))]
731 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
732 let root_key = RootKey::BlobId(blob_id).bytes();
733 let store = self.database.open_shared(&root_key)?;
734 let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
735 #[cfg(with_metrics)]
736 metrics::READ_BLOB_STATE_COUNTER
737 .with_label_values(&[])
738 .inc();
739 Ok(blob_state)
740 }
741
742 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
743 async fn read_blob_states(
744 &self,
745 blob_ids: &[BlobId],
746 ) -> Result<Vec<Option<BlobState>>, ViewError> {
747 if blob_ids.is_empty() {
748 return Ok(Vec::new());
749 }
750 let mut blob_states = Vec::new();
751 for blob_id in blob_ids {
752 blob_states.push(self.read_blob_state(*blob_id).await?);
753 }
754 #[cfg(with_metrics)]
755 metrics::READ_BLOB_STATES_COUNTER
756 .with_label_values(&[])
757 .inc_by(blob_ids.len() as u64);
758 Ok(blob_states)
759 }
760
761 #[instrument(skip_all, fields(blob_id = %blob.id()))]
762 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
763 let mut batch = MultiPartitionBatch::new();
764 batch.add_blob(blob)?;
765 self.write_batch(batch).await?;
766 Ok(())
767 }
768
769 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
770 async fn maybe_write_blob_states(
771 &self,
772 blob_ids: &[BlobId],
773 blob_state: BlobState,
774 ) -> Result<(), ViewError> {
775 if blob_ids.is_empty() {
776 return Ok(());
777 }
778 let mut maybe_blob_states = Vec::new();
779 for blob_id in blob_ids {
780 let root_key = RootKey::BlobId(*blob_id).bytes();
781 let store = self.database.open_shared(&root_key)?;
782 let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
783 maybe_blob_states.push(maybe_blob_state);
784 }
785 let mut batch = MultiPartitionBatch::new();
786 for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
787 match maybe_blob_state {
788 None => {
789 batch.add_blob_state(*blob_id, &blob_state)?;
790 }
791 Some(state) => {
792 if state.epoch < blob_state.epoch {
793 batch.add_blob_state(*blob_id, &blob_state)?;
794 }
795 }
796 }
797 }
798 self.write_batch(batch).await?;
802 Ok(())
803 }
804
805 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
806 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
807 if blobs.is_empty() {
808 return Ok(Vec::new());
809 }
810 let mut batch = MultiPartitionBatch::new();
811 let mut blob_states = Vec::new();
812 for blob in blobs {
813 let root_key = RootKey::BlobId(blob.id()).bytes();
814 let store = self.database.open_shared(&root_key)?;
815 let has_state = store.contains_key(BLOB_STATE_KEY).await?;
816 blob_states.push(has_state);
817 if has_state {
818 batch.add_blob(blob)?;
819 }
820 }
821 self.write_batch(batch).await?;
822 Ok(blob_states)
823 }
824
825 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
826 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
827 if blobs.is_empty() {
828 return Ok(());
829 }
830 let mut batch = MultiPartitionBatch::new();
831 for blob in blobs {
832 batch.add_blob(blob)?;
833 }
834 self.write_batch(batch).await
835 }
836
837 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
838 async fn write_blobs_and_certificate(
839 &self,
840 blobs: &[Blob],
841 certificate: &ConfirmedBlockCertificate,
842 ) -> Result<(), ViewError> {
843 let mut batch = MultiPartitionBatch::new();
844 for blob in blobs {
845 batch.add_blob(blob)?;
846 }
847 batch.add_certificate(certificate)?;
848 self.write_batch(batch).await
849 }
850
851 #[instrument(skip_all, fields(%hash))]
852 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
853 let root_key = RootKey::BlockHash(hash).bytes();
854 let store = self.database.open_shared(&root_key)?;
855 let results = store.contains_keys(&get_block_keys()).await?;
856 #[cfg(with_metrics)]
857 metrics::CONTAINS_CERTIFICATE_COUNTER
858 .with_label_values(&[])
859 .inc();
860 Ok(results[0] && results[1])
861 }
862
863 #[instrument(skip_all, fields(%hash))]
864 async fn read_certificate(
865 &self,
866 hash: CryptoHash,
867 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
868 let root_key = RootKey::BlockHash(hash).bytes();
869 let store = self.database.open_shared(&root_key)?;
870 let values = store.read_multi_values_bytes(&get_block_keys()).await?;
871 #[cfg(with_metrics)]
872 metrics::READ_CERTIFICATE_COUNTER
873 .with_label_values(&[])
874 .inc();
875 Self::deserialize_certificate(&values, hash)
876 }
877
878 #[instrument(skip_all)]
879 async fn read_certificates(
880 &self,
881 hashes: &[CryptoHash],
882 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
883 let raw_certs = self.read_certificates_raw(hashes).await?;
884
885 raw_certs
886 .into_iter()
887 .zip(hashes)
888 .map(|(maybe_raw, hash)| {
889 let Some((lite_cert_bytes, confirmed_block_bytes)) = maybe_raw else {
890 return Ok(None);
891 };
892 let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
893 let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
894 assert_eq!(&value.hash(), hash);
895 let certificate = cert
896 .with_value(value)
897 .ok_or(ViewError::InconsistentEntries)?;
898 Ok(Some(certificate))
899 })
900 .collect()
901 }
902
903 #[instrument(skip_all)]
904 async fn read_certificates_raw(
905 &self,
906 hashes: &[CryptoHash],
907 ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
908 if hashes.is_empty() {
909 return Ok(Vec::new());
910 }
911 let root_keys = Self::get_root_keys_for_certificates(hashes);
912 let mut values = Vec::new();
913 for root_key in root_keys {
914 let store = self.database.open_shared(&root_key)?;
915 values.extend(store.read_multi_values_bytes(&get_block_keys()).await?);
916 }
917 #[cfg(with_metrics)]
918 metrics::READ_CERTIFICATES_COUNTER
919 .with_label_values(&[])
920 .inc_by(hashes.len() as u64);
921 Ok(values
922 .chunks_exact(2)
923 .map(|chunk| {
924 let lite_cert_bytes = chunk[0].as_ref()?;
925 let confirmed_block_bytes = chunk[1].as_ref()?;
926 Some((lite_cert_bytes.clone(), confirmed_block_bytes.clone()))
927 })
928 .collect())
929 }
930
931 async fn read_certificate_hashes_by_heights(
932 &self,
933 chain_id: ChainId,
934 heights: &[BlockHeight],
935 ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
936 if heights.is_empty() {
937 return Ok(Vec::new());
938 }
939
940 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
941 let store = self.database.open_shared(&index_root_key)?;
942 let height_keys: Vec<Vec<u8>> = heights.iter().map(|h| to_height_key(*h)).collect();
943 let hash_bytes = store.read_multi_values_bytes(&height_keys).await?;
944 let hash_options: Vec<Option<CryptoHash>> = hash_bytes
945 .into_iter()
946 .map(|opt| {
947 opt.map(|bytes| bcs::from_bytes::<CryptoHash>(&bytes))
948 .transpose()
949 })
950 .collect::<Result<_, _>>()?;
951
952 Ok(hash_options)
953 }
954
955 #[instrument(skip_all)]
956 async fn read_certificates_by_heights_raw(
957 &self,
958 chain_id: ChainId,
959 heights: &[BlockHeight],
960 ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
961 let hashes: Vec<Option<CryptoHash>> = self
962 .read_certificate_hashes_by_heights(chain_id, heights)
963 .await?;
964
965 let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
967 for (index, maybe_hash) in hashes.iter().enumerate() {
968 if let Some(hash) = maybe_hash {
969 indices.entry(*hash).or_default().push(index);
970 }
971 }
972
973 let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
975
976 let mut result = vec![None; heights.len()];
977
978 for (raw_cert, hash) in self
979 .read_certificates_raw(&unique_hashes)
980 .await?
981 .into_iter()
982 .zip(unique_hashes)
983 {
984 if let Some(idx_list) = indices.get(&hash) {
985 for &index in idx_list {
986 result[index] = raw_cert.clone();
987 }
988 } else {
989 tracing::error!(?hash, "certificate hash not found in indices map",);
991 }
992 }
993
994 Ok(result)
995 }
996
997 #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
998 async fn read_certificates_by_heights(
999 &self,
1000 chain_id: ChainId,
1001 heights: &[BlockHeight],
1002 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
1003 self.read_certificates_by_heights_raw(chain_id, heights)
1004 .await?
1005 .into_iter()
1006 .map(|maybe_raw| match maybe_raw {
1007 None => Ok(None),
1008 Some((lite_cert_bytes, confirmed_block_bytes)) => {
1009 let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
1010 let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
1011 let certificate = cert
1012 .with_value(value)
1013 .ok_or(ViewError::InconsistentEntries)?;
1014 Ok(Some(certificate))
1015 }
1016 })
1017 .collect()
1018 }
1019
1020 #[instrument(skip_all, fields(event_id = ?event_id))]
1021 async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
1022 let event_key = to_event_key(&event_id);
1023 let root_key = RootKey::Event(event_id.chain_id).bytes();
1024 let store = self.database.open_shared(&root_key)?;
1025 let event = store.read_value_bytes(&event_key).await?;
1026 #[cfg(with_metrics)]
1027 metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
1028 Ok(event)
1029 }
1030
1031 #[instrument(skip_all, fields(event_id = ?event_id))]
1032 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1033 let event_key = to_event_key(&event_id);
1034 let root_key = RootKey::Event(event_id.chain_id).bytes();
1035 let store = self.database.open_shared(&root_key)?;
1036 let exists = store.contains_key(&event_key).await?;
1037 #[cfg(with_metrics)]
1038 metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
1039 Ok(exists)
1040 }
1041
1042 #[instrument(skip_all, fields(chain_id = %chain_id, stream_id = %stream_id, start_index = %start_index))]
1043 async fn read_events_from_index(
1044 &self,
1045 chain_id: &ChainId,
1046 stream_id: &StreamId,
1047 start_index: u32,
1048 ) -> Result<Vec<IndexAndEvent>, ViewError> {
1049 let root_key = RootKey::Event(*chain_id).bytes();
1050 let store = self.database.open_shared(&root_key)?;
1051 let mut keys = Vec::new();
1052 let mut indices = Vec::new();
1053 let prefix = bcs::to_bytes(stream_id).unwrap();
1054 for short_key in store.find_keys_by_prefix(&prefix).await? {
1055 let index = bcs::from_bytes::<u32>(&short_key)?;
1056 if index >= start_index {
1057 let mut key = prefix.clone();
1058 key.extend(short_key);
1059 keys.push(key);
1060 indices.push(index);
1061 }
1062 }
1063 let values = store.read_multi_values_bytes(&keys).await?;
1064 let mut returned_values = Vec::new();
1065 for (index, value) in indices.into_iter().zip(values) {
1066 let event = value.unwrap();
1067 returned_values.push(IndexAndEvent { index, event });
1068 }
1069 Ok(returned_values)
1070 }
1071
1072 #[instrument(skip_all)]
1073 async fn write_events(
1074 &self,
1075 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1076 ) -> Result<(), ViewError> {
1077 let mut batch = MultiPartitionBatch::new();
1078 for (event_id, value) in events {
1079 batch.add_event(event_id, value)?;
1080 }
1081 self.write_batch(batch).await
1082 }
1083
1084 #[instrument(skip_all)]
1085 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1086 let root_key = RootKey::NetworkDescription.bytes();
1087 let store = self.database.open_shared(&root_key)?;
1088 let maybe_value = store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1089 #[cfg(with_metrics)]
1090 metrics::READ_NETWORK_DESCRIPTION
1091 .with_label_values(&[])
1092 .inc();
1093 Ok(maybe_value)
1094 }
1095
1096 #[instrument(skip_all)]
1097 async fn write_network_description(
1098 &self,
1099 information: &NetworkDescription,
1100 ) -> Result<(), ViewError> {
1101 let mut batch = MultiPartitionBatch::new();
1102 batch.add_network_description(information)?;
1103 self.write_batch(batch).await?;
1104 Ok(())
1105 }
1106
1107 fn wasm_runtime(&self) -> Option<WasmRuntime> {
1108 self.wasm_runtime
1109 }
1110
1111 #[instrument(skip_all)]
1112 async fn block_exporter_context(
1113 &self,
1114 block_exporter_id: u32,
1115 ) -> Result<Self::BlockExporterContext, ViewError> {
1116 let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1117 let store = self.database.open_exclusive(&root_key)?;
1118 Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1119 }
1120
1121 async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError> {
1122 let root_keys = self.database.list_root_keys().await?;
1123 let mut blob_ids = Vec::new();
1124 for root_key in root_keys {
1125 if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
1126 let root_key_red = &root_key[1..];
1127 let blob_id = bcs::from_bytes(root_key_red)?;
1128 blob_ids.push(blob_id);
1129 }
1130 }
1131 Ok(blob_ids)
1132 }
1133
1134 async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError> {
1135 let root_keys = self.database.list_root_keys().await?;
1136 let mut chain_ids = Vec::new();
1137 for root_key in root_keys {
1138 if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
1139 let root_key_red = &root_key[1..];
1140 let chain_id = bcs::from_bytes(root_key_red)?;
1141 chain_ids.push(chain_id);
1142 }
1143 }
1144 Ok(chain_ids)
1145 }
1146
1147 async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError> {
1148 let root_keys = self.database.list_root_keys().await?;
1149 let mut event_ids = Vec::new();
1150 for root_key in root_keys {
1151 if !root_key.is_empty() && root_key[0] == EVENT_ID_TAG {
1152 let root_key_red = &root_key[1..];
1153 let chain_id = bcs::from_bytes(root_key_red)?;
1154 let store = self.database.open_shared(&root_key)?;
1155 let keys = store.find_keys_by_prefix(&[]).await?;
1156 for key in keys {
1157 let restricted_event_id = bcs::from_bytes::<RestrictedEventId>(&key)?;
1158 let event_id = EventId {
1159 chain_id,
1160 stream_id: restricted_event_id.stream_id,
1161 index: restricted_event_id.index,
1162 };
1163 event_ids.push(event_id);
1164 }
1165 }
1166 }
1167 Ok(event_ids)
1168 }
1169}
1170
1171impl<Database, C> DbStorage<Database, C>
1172where
1173 Database: KeyValueDatabase + Clone,
1174 Database::Store: KeyValueStore + Clone,
1175 C: Clock,
1176 Database::Error: Send + Sync,
1177{
1178 #[instrument(skip_all)]
1179 fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1180 hashes
1181 .iter()
1182 .map(|hash| RootKey::BlockHash(*hash).bytes())
1183 .collect()
1184 }
1185
1186 #[instrument(skip_all)]
1187 fn deserialize_certificate(
1188 pair: &[Option<Vec<u8>>],
1189 hash: CryptoHash,
1190 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
1191 let Some(cert_bytes) = pair[0].as_ref() else {
1192 return Ok(None);
1193 };
1194 let Some(value_bytes) = pair[1].as_ref() else {
1195 return Ok(None);
1196 };
1197 let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
1198 let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
1199 assert_eq!(value.hash(), hash);
1200 let certificate = cert
1201 .with_value(value)
1202 .ok_or(ViewError::InconsistentEntries)?;
1203 Ok(Some(certificate))
1204 }
1205
1206 #[instrument(skip_all)]
1207 async fn write_entry(
1208 store: &Database::Store,
1209 key_values: Vec<(Vec<u8>, Vec<u8>)>,
1210 ) -> Result<(), ViewError> {
1211 let mut batch = Batch::new();
1212 for (key, value) in key_values {
1213 batch.put_key_value_bytes(key, value);
1214 }
1215 store.write_batch(batch).await?;
1216 Ok(())
1217 }
1218
1219 #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1220 async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1221 if batch.keys_value_bytes.is_empty() {
1222 return Ok(());
1223 }
1224 let mut futures = Vec::new();
1225 for (root_key, key_values) in batch.keys_value_bytes {
1226 let store = self.database.open_shared(&root_key)?;
1227 futures.push(async move { Self::write_entry(&store, key_values).await });
1228 }
1229 futures::future::try_join_all(futures).await?;
1230 Ok(())
1231 }
1232}
1233
1234impl<Database, C> DbStorage<Database, C> {
1235 fn new(database: Database, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
1236 Self {
1237 database: Arc::new(database),
1238 clock,
1239 #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1241 thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1242 wasm_runtime,
1243 user_contracts: Arc::new(papaya::HashMap::new()),
1244 user_services: Arc::new(papaya::HashMap::new()),
1245 execution_runtime_config: ExecutionRuntimeConfig::default(),
1246 }
1247 }
1248
1249 pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1251 self.execution_runtime_config.allow_application_logs = allow;
1252 self
1253 }
1254}
1255
1256impl<Database> DbStorage<Database, WallClock>
1257where
1258 Database: KeyValueDatabase + Clone + 'static,
1259 Database::Error: Send + Sync,
1260 Database::Store: KeyValueStore + Clone + 'static,
1261{
1262 pub async fn maybe_create_and_connect(
1263 config: &Database::Config,
1264 namespace: &str,
1265 wasm_runtime: Option<WasmRuntime>,
1266 ) -> Result<Self, Database::Error> {
1267 let database = Database::maybe_create_and_connect(config, namespace).await?;
1268 Ok(Self::new(database, wasm_runtime, WallClock))
1269 }
1270
1271 pub async fn connect(
1272 config: &Database::Config,
1273 namespace: &str,
1274 wasm_runtime: Option<WasmRuntime>,
1275 ) -> Result<Self, Database::Error> {
1276 let database = Database::connect(config, namespace).await?;
1277 Ok(Self::new(database, wasm_runtime, WallClock))
1278 }
1279}
1280
1281#[cfg(with_testing)]
1282impl<Database> DbStorage<Database, TestClock>
1283where
1284 Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1285 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1286 Database::Error: Send + Sync,
1287{
1288 pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1289 let config = Database::new_test_config().await.unwrap();
1290 let namespace = generate_test_namespace();
1291 DbStorage::<Database, TestClock>::new_for_testing(
1292 config,
1293 &namespace,
1294 wasm_runtime,
1295 TestClock::new(),
1296 )
1297 .await
1298 .unwrap()
1299 }
1300
1301 pub async fn new_for_testing(
1302 config: Database::Config,
1303 namespace: &str,
1304 wasm_runtime: Option<WasmRuntime>,
1305 clock: TestClock,
1306 ) -> Result<Self, Database::Error> {
1307 let database = Database::recreate_and_connect(&config, namespace).await?;
1308 Ok(Self::new(database, wasm_runtime, clock))
1309 }
1310}
1311
1312#[cfg(test)]
1313mod tests {
1314 use linera_base::{
1315 crypto::{CryptoHash, TestString},
1316 data_types::{BlockHeight, Epoch, Round, Timestamp},
1317 identifiers::{
1318 ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
1319 StreamName,
1320 },
1321 };
1322 use linera_chain::{
1323 block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
1324 types::ConfirmedBlockCertificate,
1325 };
1326 use linera_views::{
1327 memory::MemoryDatabase,
1328 store::{KeyValueDatabase, ReadableKeyValueStore as _},
1329 };
1330
1331 use crate::{
1332 db_storage::{
1333 to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
1334 EVENT_ID_TAG,
1335 },
1336 DbStorage, Storage, TestClock,
1337 };
1338
1339 #[test]
1346 fn test_root_key_blob_serialization() {
1347 let hash = CryptoHash::default();
1348 let blob_type = BlobType::default();
1349 let blob_id = BlobId::new(hash, blob_type);
1350 let root_key = RootKey::BlobId(blob_id).bytes();
1351 assert_eq!(root_key[0], BLOB_ID_TAG);
1352 assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
1353 }
1354
1355 #[test]
1358 fn test_root_key_chainstate_serialization() {
1359 let hash = CryptoHash::default();
1360 let chain_id = ChainId(hash);
1361 let root_key = RootKey::ChainState(chain_id).bytes();
1362 assert_eq!(root_key[0], CHAIN_ID_TAG);
1363 assert_eq!(
1364 bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
1365 chain_id
1366 );
1367 }
1368
1369 #[test]
1372 fn test_root_key_event_serialization() {
1373 let hash = CryptoHash::test_hash("49");
1374 let chain_id = ChainId(hash);
1375 let application_description_hash = CryptoHash::test_hash("42");
1376 let application_id = ApplicationId::new(application_description_hash);
1377 let application_id = GenericApplicationId::User(application_id);
1378 let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
1379 let stream_id = StreamId {
1380 application_id,
1381 stream_name,
1382 };
1383 let prefix = bcs::to_bytes(&stream_id).unwrap();
1384
1385 let index = 1567;
1386 let event_id = EventId {
1387 chain_id,
1388 stream_id,
1389 index,
1390 };
1391 let root_key = RootKey::Event(chain_id).bytes();
1392 assert_eq!(root_key[0], EVENT_ID_TAG);
1393 let key = to_event_key(&event_id);
1394 assert!(key.starts_with(&prefix));
1395 }
1396
1397 #[test]
1400 fn test_root_key_block_by_height_serialization() {
1401 use linera_base::data_types::BlockHeight;
1402
1403 let hash = CryptoHash::default();
1404 let chain_id = ChainId(hash);
1405 let height = BlockHeight(42);
1406
1407 let root_key = RootKey::BlockByHeight(chain_id).bytes();
1409 let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
1410 assert_eq!(deserialized_chain_id, chain_id);
1411
1412 let height_key = to_height_key(height);
1414 let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
1415 assert_eq!(deserialized_height, height);
1416 }
1417
1418 #[cfg(with_testing)]
1419 #[tokio::test]
1420 async fn test_add_certificate_creates_height_index() {
1421 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1423
1424 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1426 let height = BlockHeight(5);
1427 let block = Block {
1428 header: BlockHeader {
1429 chain_id,
1430 epoch: Epoch::ZERO,
1431 height,
1432 timestamp: Timestamp::from(0),
1433 state_hash: CryptoHash::new(&TestString::new("state_hash")),
1434 previous_block_hash: None,
1435 authenticated_owner: None,
1436 transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
1437 messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
1438 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1439 "prev_msg_blocks_hash",
1440 )),
1441 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1442 "prev_event_blocks_hash",
1443 )),
1444 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
1445 events_hash: CryptoHash::new(&TestString::new("events_hash")),
1446 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1447 operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
1448 },
1449 body: BlockBody {
1450 transactions: vec![],
1451 messages: vec![],
1452 previous_message_blocks: Default::default(),
1453 previous_event_blocks: Default::default(),
1454 oracle_responses: vec![],
1455 events: vec![],
1456 blobs: vec![],
1457 operation_results: vec![],
1458 },
1459 };
1460 let confirmed_block = ConfirmedBlock::new(block);
1461 let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1462
1463 let mut batch = MultiPartitionBatch::new();
1465 batch.add_certificate(&certificate).unwrap();
1466 storage.write_batch(batch).await.unwrap();
1467
1468 let hash = certificate.hash();
1470 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1471 let store = storage.database.open_shared(&index_root_key).unwrap();
1472 let height_key = to_height_key(height);
1473 let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
1474
1475 assert!(value_bytes.is_some(), "Height index was not created");
1476 let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
1477 assert_eq!(stored_hash, hash, "Height index contains wrong hash");
1478 }
1479
1480 #[cfg(with_testing)]
1481 #[tokio::test]
1482 async fn test_read_certificates_by_heights() {
1483 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1484 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1485
1486 let mut batch = MultiPartitionBatch::new();
1488 let mut expected_certs = vec![];
1489
1490 for height in [1, 3, 5] {
1491 let block = Block {
1492 header: BlockHeader {
1493 chain_id,
1494 epoch: Epoch::ZERO,
1495 height: BlockHeight(height),
1496 timestamp: Timestamp::from(0),
1497 state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
1498 previous_block_hash: None,
1499 authenticated_owner: None,
1500 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
1501 messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
1502 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1503 "pmb_hash_{height}",
1504 )),
1505 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1506 "peb_hash_{height}",
1507 )),
1508 oracle_responses_hash: CryptoHash::new(&TestString::new(
1509 "oracle_hash_{height}",
1510 )),
1511 events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
1512 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
1513 operation_results_hash: CryptoHash::new(&TestString::new(
1514 "op_results_hash_{height}",
1515 )),
1516 },
1517 body: BlockBody {
1518 transactions: vec![],
1519 messages: vec![],
1520 previous_message_blocks: Default::default(),
1521 previous_event_blocks: Default::default(),
1522 oracle_responses: vec![],
1523 events: vec![],
1524 blobs: vec![],
1525 operation_results: vec![],
1526 },
1527 };
1528 let confirmed_block = ConfirmedBlock::new(block);
1529 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1530 expected_certs.push((height, cert.clone()));
1531 batch.add_certificate(&cert).unwrap();
1532 }
1533 storage.write_batch(batch).await.unwrap();
1534
1535 let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
1537 let result = storage
1538 .read_certificates_by_heights(chain_id, &heights)
1539 .await
1540 .unwrap();
1541 assert_eq!(result.len(), 3);
1542 assert_eq!(
1543 result[0].as_ref().unwrap().hash(),
1544 expected_certs[0].1.hash()
1545 );
1546 assert_eq!(
1547 result[1].as_ref().unwrap().hash(),
1548 expected_certs[1].1.hash()
1549 );
1550 assert_eq!(
1551 result[2].as_ref().unwrap().hash(),
1552 expected_certs[2].1.hash()
1553 );
1554
1555 let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
1557 let result = storage
1558 .read_certificates_by_heights(chain_id, &heights)
1559 .await
1560 .unwrap();
1561 assert_eq!(result.len(), 3);
1562 assert_eq!(
1563 result[0].as_ref().unwrap().hash(),
1564 expected_certs[2].1.hash()
1565 );
1566 assert_eq!(
1567 result[1].as_ref().unwrap().hash(),
1568 expected_certs[0].1.hash()
1569 );
1570 assert_eq!(
1571 result[2].as_ref().unwrap().hash(),
1572 expected_certs[1].1.hash()
1573 );
1574
1575 let heights = vec![
1577 BlockHeight(1),
1578 BlockHeight(2),
1579 BlockHeight(3),
1580 BlockHeight(3),
1581 ];
1582 let result = storage
1583 .read_certificates_by_heights(chain_id, &heights)
1584 .await
1585 .unwrap();
1586 assert_eq!(result.len(), 4); assert!(result[0].is_some());
1588 assert!(result[1].is_none()); assert!(result[2].is_some());
1590 assert!(result[3].is_some());
1591 assert_eq!(
1592 result[2].as_ref().unwrap().hash(),
1593 result[3].as_ref().unwrap().hash()
1594 ); let heights = vec![];
1598 let result = storage
1599 .read_certificates_by_heights(chain_id, &heights)
1600 .await
1601 .unwrap();
1602 assert_eq!(result.len(), 0);
1603 }
1604
1605 #[cfg(with_testing)]
1606 #[tokio::test]
1607 async fn test_read_certificates_by_heights_multiple_chains() {
1608 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1609
1610 let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
1612 let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
1613
1614 let mut batch = MultiPartitionBatch::new();
1615
1616 let block_a = Block {
1617 header: BlockHeader {
1618 chain_id: chain_a,
1619 epoch: Epoch::ZERO,
1620 height: BlockHeight(10),
1621 timestamp: Timestamp::from(0),
1622 state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
1623 previous_block_hash: None,
1624 authenticated_owner: None,
1625 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
1626 messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
1627 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
1628 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
1629 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
1630 events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
1631 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
1632 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
1633 },
1634 body: BlockBody {
1635 transactions: vec![],
1636 messages: vec![],
1637 previous_message_blocks: Default::default(),
1638 previous_event_blocks: Default::default(),
1639 oracle_responses: vec![],
1640 events: vec![],
1641 blobs: vec![],
1642 operation_results: vec![],
1643 },
1644 };
1645 let confirmed_block_a = ConfirmedBlock::new(block_a);
1646 let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
1647 batch.add_certificate(&cert_a).unwrap();
1648
1649 let block_b = Block {
1650 header: BlockHeader {
1651 chain_id: chain_b,
1652 epoch: Epoch::ZERO,
1653 height: BlockHeight(10),
1654 timestamp: Timestamp::from(0),
1655 state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
1656 previous_block_hash: None,
1657 authenticated_owner: None,
1658 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
1659 messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
1660 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
1661 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
1662 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
1663 events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
1664 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
1665 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
1666 },
1667 body: BlockBody {
1668 transactions: vec![],
1669 messages: vec![],
1670 previous_message_blocks: Default::default(),
1671 previous_event_blocks: Default::default(),
1672 oracle_responses: vec![],
1673 events: vec![],
1674 blobs: vec![],
1675 operation_results: vec![],
1676 },
1677 };
1678 let confirmed_block_b = ConfirmedBlock::new(block_b);
1679 let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
1680 batch.add_certificate(&cert_b).unwrap();
1681
1682 storage.write_batch(batch).await.unwrap();
1683
1684 let result = storage
1686 .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
1687 .await
1688 .unwrap();
1689 assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
1690
1691 let result = storage
1693 .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
1694 .await
1695 .unwrap();
1696 assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
1697
1698 let result = storage
1700 .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
1701 .await
1702 .unwrap();
1703 assert!(result[0].is_none());
1704 }
1705
1706 #[cfg(with_testing)]
1707 #[tokio::test]
1708 async fn test_read_certificates_by_heights_consistency() {
1709 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1710 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1711
1712 let mut batch = MultiPartitionBatch::new();
1714 let block = Block {
1715 header: BlockHeader {
1716 chain_id,
1717 epoch: Epoch::ZERO,
1718 height: BlockHeight(7),
1719 timestamp: Timestamp::from(0),
1720 state_hash: CryptoHash::new(&TestString::new("state_hash")),
1721 previous_block_hash: None,
1722 authenticated_owner: None,
1723 transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
1724 messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
1725 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
1726 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
1727 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
1728 events_hash: CryptoHash::new(&TestString::new("events_hash")),
1729 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1730 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
1731 },
1732 body: BlockBody {
1733 transactions: vec![],
1734 messages: vec![],
1735 previous_message_blocks: Default::default(),
1736 previous_event_blocks: Default::default(),
1737 oracle_responses: vec![],
1738 events: vec![],
1739 blobs: vec![],
1740 operation_results: vec![],
1741 },
1742 };
1743 let confirmed_block = ConfirmedBlock::new(block);
1744 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1745 let hash = cert.hash();
1746 batch.add_certificate(&cert).unwrap();
1747 storage.write_batch(batch).await.unwrap();
1748
1749 let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
1751
1752 let certs_by_height = storage
1754 .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
1755 .await
1756 .unwrap();
1757 let cert_by_height = certs_by_height[0].as_ref().unwrap();
1758
1759 assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
1761 assert_eq!(
1762 cert_by_hash.value().block().header,
1763 cert_by_height.value().block().header
1764 );
1765 }
1766}