1use std::{
5 collections::{BTreeMap, HashMap},
6 fmt::Debug,
7 sync::Arc,
8};
9
10use async_trait::async_trait;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14 crypto::CryptoHash,
15 data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
16 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
17};
18use linera_chain::{
19 types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
20 ChainStateView,
21};
22use linera_execution::{
23 BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
24};
25use linera_views::{
26 backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
27 batch::Batch,
28 context::ViewContext,
29 store::{
30 KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
31 },
32 views::View,
33 ViewError,
34};
35use serde::{Deserialize, Serialize};
36use tracing::instrument;
37#[cfg(with_testing)]
38use {
39 futures::channel::oneshot::{self, Receiver},
40 linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
41 std::cmp::Reverse,
42};
43
44use crate::{ChainRuntimeContext, Clock, Storage};
45
46#[cfg(with_metrics)]
47pub mod metrics {
48 use std::sync::LazyLock;
49
50 use linera_base::prometheus_util::{
51 exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
52 };
53 use prometheus::{HistogramVec, IntCounterVec};
54
55 pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
57 register_int_counter_vec(
58 "contains_blob",
59 "The metric counting how often a blob is tested for existence from storage",
60 &[],
61 )
62 });
63
64 pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
66 register_int_counter_vec(
67 "contains_blobs",
68 "The metric counting how often multiple blobs are tested for existence from storage",
69 &[],
70 )
71 });
72
73 pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
75 register_int_counter_vec(
76 "contains_blob_state",
77 "The metric counting how often a blob state is tested for existence from storage",
78 &[],
79 )
80 });
81
82 pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
84 register_int_counter_vec(
85 "contains_certificate",
86 "The metric counting how often a certificate is tested for existence from storage",
87 &[],
88 )
89 });
90
91 #[doc(hidden)]
93 pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
94 register_int_counter_vec(
95 "read_confirmed_block",
96 "The metric counting how often a hashed confirmed block is read from storage",
97 &[],
98 )
99 });
100
101 #[doc(hidden)]
103 pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
104 LazyLock::new(|| {
105 register_int_counter_vec(
106 "read_confirmed_blocks",
107 "The metric counting how often confirmed blocks are read from storage",
108 &[],
109 )
110 });
111
112 #[doc(hidden)]
114 pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
115 register_int_counter_vec(
116 "read_blob",
117 "The metric counting how often a blob is read from storage",
118 &[],
119 )
120 });
121
122 #[doc(hidden)]
124 pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
125 register_int_counter_vec(
126 "read_blob_state",
127 "The metric counting how often a blob state is read from storage",
128 &[],
129 )
130 });
131
132 #[doc(hidden)]
134 pub(super) static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
135 register_int_counter_vec(
136 "read_blob_states",
137 "The metric counting how often blob states are read from storage",
138 &[],
139 )
140 });
141
142 #[doc(hidden)]
144 pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
145 register_int_counter_vec(
146 "write_blob",
147 "The metric counting how often a blob is written to storage",
148 &[],
149 )
150 });
151
152 #[doc(hidden)]
154 pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
155 register_int_counter_vec(
156 "read_certificate",
157 "The metric counting how often a certificate is read from storage",
158 &[],
159 )
160 });
161
162 #[doc(hidden)]
164 pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
165 register_int_counter_vec(
166 "read_certificates",
167 "The metric counting how often certificate are read from storage",
168 &[],
169 )
170 });
171
172 #[doc(hidden)]
174 pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
175 register_int_counter_vec(
176 "write_certificate",
177 "The metric counting how often a certificate is written to storage",
178 &[],
179 )
180 });
181
182 #[doc(hidden)]
184 pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
185 register_histogram_vec(
186 "load_chain_latency",
187 "The latency to load a chain state",
188 &[],
189 exponential_bucket_latencies(1000.0),
190 )
191 });
192
193 #[doc(hidden)]
195 pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
196 register_int_counter_vec(
197 "read_event",
198 "The metric counting how often an event is read from storage",
199 &[],
200 )
201 });
202
203 pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
205 register_int_counter_vec(
206 "contains_event",
207 "The metric counting how often an event is tested for existence from storage",
208 &[],
209 )
210 });
211
212 #[doc(hidden)]
214 pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
215 register_int_counter_vec(
216 "write_event",
217 "The metric counting how often an event is written to storage",
218 &[],
219 )
220 });
221
222 #[doc(hidden)]
224 pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
225 register_int_counter_vec(
226 "network_description",
227 "The metric counting how often the network description is read from storage",
228 &[],
229 )
230 });
231
232 #[doc(hidden)]
234 pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
235 register_int_counter_vec(
236 "write_network_description",
237 "The metric counting how often the network description is written to storage",
238 &[],
239 )
240 });
241}
242
243const BLOB_KEY: &[u8] = &[0];
245
246const BLOB_STATE_KEY: &[u8] = &[1];
248
249const LITE_CERTIFICATE_KEY: &[u8] = &[2];
251
252const BLOCK_KEY: &[u8] = &[3];
254
255const NETWORK_DESCRIPTION_KEY: &[u8] = &[4];
257
258fn get_block_keys() -> Vec<Vec<u8>> {
259 vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
260}
261
262#[derive(Default)]
263#[allow(clippy::type_complexity)]
264struct MultiPartitionBatch {
265 keys_value_bytes: BTreeMap<Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>>,
266}
267
268impl MultiPartitionBatch {
269 fn new() -> Self {
270 Self::default()
271 }
272
273 fn put_key_values(&mut self, root_key: Vec<u8>, key_values: Vec<(Vec<u8>, Vec<u8>)>) {
274 let entry = self.keys_value_bytes.entry(root_key).or_default();
275 entry.extend(key_values);
276 }
277
278 fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
279 self.put_key_values(root_key, vec![(key, value)]);
280 }
281
282 fn add_blob(&mut self, blob: &Blob) {
283 #[cfg(with_metrics)]
284 metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
285 let root_key = RootKey::BlobId(blob.id()).bytes();
286 let key = BLOB_KEY.to_vec();
287 self.put_key_value(root_key, key, blob.bytes().to_vec());
288 }
289
290 fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
291 let root_key = RootKey::BlobId(blob_id).bytes();
292 let key = BLOB_STATE_KEY.to_vec();
293 let value = bcs::to_bytes(blob_state)?;
294 self.put_key_value(root_key, key, value);
295 Ok(())
296 }
297
298 fn add_certificate(
307 &mut self,
308 certificate: &ConfirmedBlockCertificate,
309 ) -> Result<(), ViewError> {
310 #[cfg(with_metrics)]
311 metrics::WRITE_CERTIFICATE_COUNTER
312 .with_label_values(&[])
313 .inc();
314 let hash = certificate.hash();
315
316 let root_key = RootKey::BlockHash(hash).bytes();
318 let mut key_values = Vec::new();
319 let key = LITE_CERTIFICATE_KEY.to_vec();
320 let value = bcs::to_bytes(&certificate.lite_certificate())?;
321 key_values.push((key, value));
322 let key = BLOCK_KEY.to_vec();
323 let value = bcs::to_bytes(&certificate.value())?;
324 key_values.push((key, value));
325 self.put_key_values(root_key, key_values);
326
327 let chain_id = certificate.value().block().header.chain_id;
329 let height = certificate.value().block().header.height;
330 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
331 let height_key = to_height_key(height);
332 let index_value = bcs::to_bytes(&hash)?;
333 self.put_key_value(index_root_key, height_key, index_value);
334
335 Ok(())
336 }
337
338 fn add_event(&mut self, event_id: &EventId, value: Vec<u8>) {
339 #[cfg(with_metrics)]
340 metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
341 let key = to_event_key(event_id);
342 let root_key = RootKey::Event(event_id.chain_id).bytes();
343 self.put_key_value(root_key, key, value);
344 }
345
346 fn add_network_description(
347 &mut self,
348 information: &NetworkDescription,
349 ) -> Result<(), ViewError> {
350 #[cfg(with_metrics)]
351 metrics::WRITE_NETWORK_DESCRIPTION
352 .with_label_values(&[])
353 .inc();
354 let root_key = RootKey::NetworkDescription.bytes();
355 let key = NETWORK_DESCRIPTION_KEY.to_vec();
356 let value = bcs::to_bytes(information)?;
357 self.put_key_value(root_key, key, value);
358 Ok(())
359 }
360}
361
362#[derive(Clone)]
364pub struct DbStorage<Database, Clock = WallClock> {
365 database: Arc<Database>,
366 clock: Clock,
367 thread_pool: Arc<linera_execution::ThreadPool>,
368 wasm_runtime: Option<WasmRuntime>,
369 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
370 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
371 execution_runtime_config: ExecutionRuntimeConfig,
372}
373
374#[derive(Debug, Serialize, Deserialize)]
375enum RootKey {
376 NetworkDescription,
377 BlockExporterState(u32),
378 ChainState(ChainId),
379 BlockHash(CryptoHash),
380 BlobId(BlobId),
381 Event(ChainId),
382 BlockByHeight(ChainId),
383}
384
385const CHAIN_ID_TAG: u8 = 2;
386const BLOB_ID_TAG: u8 = 4;
387const EVENT_ID_TAG: u8 = 5;
388
389impl RootKey {
390 fn bytes(&self) -> Vec<u8> {
391 bcs::to_bytes(self).unwrap()
392 }
393}
394
395#[derive(Debug, Serialize, Deserialize)]
396struct RestrictedEventId {
397 pub stream_id: StreamId,
398 pub index: u32,
399}
400
401fn to_event_key(event_id: &EventId) -> Vec<u8> {
402 let restricted_event_id = RestrictedEventId {
403 stream_id: event_id.stream_id.clone(),
404 index: event_id.index,
405 };
406 bcs::to_bytes(&restricted_event_id).unwrap()
407}
408
409pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
410 bcs::to_bytes(&height).unwrap()
411}
412
413fn is_chain_state(root_key: &[u8]) -> bool {
414 if root_key.is_empty() {
415 return false;
416 }
417 root_key[0] == CHAIN_ID_TAG
418}
419
420#[derive(Clone, Copy)]
423pub struct ChainStatesFirstAssignment;
424
425impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
426 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
427 if root_key.is_empty() {
428 return Ok(StoreInUse::Second);
429 }
430 let store = match is_chain_state(root_key) {
431 true => StoreInUse::First,
432 false => StoreInUse::Second,
433 };
434 Ok(store)
435 }
436}
437
438#[derive(Clone)]
440pub struct WallClock;
441
442#[cfg_attr(not(web), async_trait)]
443#[cfg_attr(web, async_trait(?Send))]
444impl Clock for WallClock {
445 fn current_time(&self) -> Timestamp {
446 Timestamp::now()
447 }
448
449 async fn sleep(&self, delta: TimeDelta) {
450 linera_base::time::timer::sleep(delta.as_duration()).await
451 }
452
453 async fn sleep_until(&self, timestamp: Timestamp) {
454 let delta = timestamp.delta_since(Timestamp::now());
455 if delta > TimeDelta::ZERO {
456 self.sleep(delta).await
457 }
458 }
459}
460
461#[cfg(with_testing)]
462#[derive(Default)]
463struct TestClockInner {
464 time: Timestamp,
465 sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
466 sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
469}
470
471#[cfg(with_testing)]
472impl TestClockInner {
473 fn set(&mut self, time: Timestamp) {
474 self.time = time;
475 let senders = self.sleeps.split_off(&Reverse(time));
476 for sender in senders.into_values().flatten() {
477 sender.send(()).ok();
479 }
480 }
481
482 fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
483 let target_time = self.time.saturating_add(delta);
484 self.add_sleep_until(target_time)
485 }
486
487 fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
488 let (sender, receiver) = oneshot::channel();
489 let should_auto_advance = self
490 .sleep_callback
491 .as_ref()
492 .is_some_and(|callback| callback(time));
493 if should_auto_advance && time > self.time {
494 self.set(time);
496 sender.send(()).ok();
498 } else if self.time >= time {
499 sender.send(()).ok();
501 } else {
502 self.sleeps.entry(Reverse(time)).or_default().push(sender);
503 }
504 receiver
505 }
506}
507
508#[cfg(with_testing)]
511#[derive(Clone, Default)]
512pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
513
514#[cfg(with_testing)]
515#[cfg_attr(not(web), async_trait)]
516#[cfg_attr(web, async_trait(?Send))]
517impl Clock for TestClock {
518 fn current_time(&self) -> Timestamp {
519 self.lock().time
520 }
521
522 async fn sleep(&self, delta: TimeDelta) {
523 if delta == TimeDelta::ZERO {
524 return;
525 }
526 let receiver = self.lock().add_sleep(delta);
527 receiver.await.ok();
529 }
530
531 async fn sleep_until(&self, timestamp: Timestamp) {
532 let receiver = self.lock().add_sleep_until(timestamp);
533 receiver.await.ok();
535 }
536}
537
538#[cfg(with_testing)]
539impl TestClock {
540 pub fn new() -> Self {
542 TestClock(Arc::default())
543 }
544
545 pub fn set(&self, time: Timestamp) {
547 self.lock().set(time);
548 }
549
550 pub fn add(&self, delta: TimeDelta) {
552 let mut guard = self.lock();
553 let time = guard.time.saturating_add(delta);
554 guard.set(time);
555 }
556
557 pub fn current_time(&self) -> Timestamp {
559 self.lock().time
560 }
561
562 pub fn set_sleep_callback<F>(&self, callback: F)
567 where
568 F: Fn(Timestamp) -> bool + Send + Sync + 'static,
569 {
570 self.lock().sleep_callback = Some(Box::new(callback));
571 }
572
573 pub fn clear_sleep_callback(&self) {
575 self.lock().sleep_callback = None;
576 }
577
578 fn lock(&self) -> std::sync::MutexGuard<'_, TestClockInner> {
579 self.0.lock().expect("poisoned TestClock mutex")
580 }
581}
582
583#[cfg_attr(not(web), async_trait)]
584#[cfg_attr(web, async_trait(?Send))]
585impl<Database, C> Storage for DbStorage<Database, C>
586where
587 Database: KeyValueDatabase<
588 Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
589 Error: Send + Sync,
590 > + Clone
591 + linera_base::util::traits::AutoTraits
592 + 'static,
593 C: Clock + Clone + Send + Sync + 'static,
594{
595 type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
596 type Clock = C;
597 type BlockExporterContext = ViewContext<u32, Database::Store>;
598
599 fn clock(&self) -> &C {
600 &self.clock
601 }
602
603 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
604 &self.thread_pool
605 }
606
607 #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
608 async fn load_chain(
609 &self,
610 chain_id: ChainId,
611 ) -> Result<ChainStateView<Self::Context>, ViewError> {
612 #[cfg(with_metrics)]
613 let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
614 let runtime_context = ChainRuntimeContext {
615 storage: self.clone(),
616 thread_pool: self.thread_pool.clone(),
617 chain_id,
618 execution_runtime_config: self.execution_runtime_config,
619 user_contracts: self.user_contracts.clone(),
620 user_services: self.user_services.clone(),
621 };
622 let root_key = RootKey::ChainState(chain_id).bytes();
623 let store = self.database.open_exclusive(&root_key)?;
624 let context = ViewContext::create_root_context(store, runtime_context).await?;
625 ChainStateView::load(context).await
626 }
627
628 #[instrument(level = "trace", skip_all, fields(%blob_id))]
629 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
630 let root_key = RootKey::BlobId(blob_id).bytes();
631 let store = self.database.open_shared(&root_key)?;
632 let test = store.contains_key(BLOB_KEY).await?;
633 #[cfg(with_metrics)]
634 metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
635 Ok(test)
636 }
637
638 #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
639 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
640 let mut missing_blobs = Vec::new();
641 for blob_id in blob_ids {
642 let root_key = RootKey::BlobId(*blob_id).bytes();
643 let store = self.database.open_shared(&root_key)?;
644 if !store.contains_key(BLOB_KEY).await? {
645 missing_blobs.push(*blob_id);
646 }
647 }
648 #[cfg(with_metrics)]
649 metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
650 Ok(missing_blobs)
651 }
652
653 #[instrument(skip_all, fields(%blob_id))]
654 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
655 let root_key = RootKey::BlobId(blob_id).bytes();
656 let store = self.database.open_shared(&root_key)?;
657 let test = store.contains_key(BLOB_STATE_KEY).await?;
658 #[cfg(with_metrics)]
659 metrics::CONTAINS_BLOB_STATE_COUNTER
660 .with_label_values(&[])
661 .inc();
662 Ok(test)
663 }
664
665 #[instrument(skip_all, fields(%hash))]
666 async fn read_confirmed_block(
667 &self,
668 hash: CryptoHash,
669 ) -> Result<Option<ConfirmedBlock>, ViewError> {
670 let root_key = RootKey::BlockHash(hash).bytes();
671 let store = self.database.open_shared(&root_key)?;
672 let value = store.read_value(BLOCK_KEY).await?;
673 #[cfg(with_metrics)]
674 metrics::READ_CONFIRMED_BLOCK_COUNTER
675 .with_label_values(&[])
676 .inc();
677 Ok(value)
678 }
679
680 #[instrument(skip_all)]
681 async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
682 &self,
683 hashes: I,
684 ) -> Result<Vec<Option<ConfirmedBlock>>, ViewError> {
685 let hashes = hashes.into_iter().collect::<Vec<_>>();
686 if hashes.is_empty() {
687 return Ok(Vec::new());
688 }
689 let root_keys = Self::get_root_keys_for_certificates(&hashes);
690 let mut blocks = Vec::new();
691 for root_key in root_keys {
692 let store = self.database.open_shared(&root_key)?;
693 blocks.push(store.read_value(BLOCK_KEY).await?);
694 }
695 #[cfg(with_metrics)]
696 metrics::READ_CONFIRMED_BLOCKS_COUNTER
697 .with_label_values(&[])
698 .inc_by(hashes.len() as u64);
699 Ok(blocks)
700 }
701
702 #[instrument(skip_all, fields(%blob_id))]
703 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
704 let root_key = RootKey::BlobId(blob_id).bytes();
705 let store = self.database.open_shared(&root_key)?;
706 let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
707 #[cfg(with_metrics)]
708 metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
709 Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
710 }
711
712 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
713 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
714 if blob_ids.is_empty() {
715 return Ok(Vec::new());
716 }
717 let mut blobs = Vec::new();
718 for blob_id in blob_ids {
719 blobs.push(self.read_blob(*blob_id).await?);
720 }
721 #[cfg(with_metrics)]
722 metrics::READ_BLOB_COUNTER
723 .with_label_values(&[])
724 .inc_by(blob_ids.len() as u64);
725 Ok(blobs)
726 }
727
728 #[instrument(skip_all, fields(%blob_id))]
729 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
730 let root_key = RootKey::BlobId(blob_id).bytes();
731 let store = self.database.open_shared(&root_key)?;
732 let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
733 #[cfg(with_metrics)]
734 metrics::READ_BLOB_STATE_COUNTER
735 .with_label_values(&[])
736 .inc();
737 Ok(blob_state)
738 }
739
740 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
741 async fn read_blob_states(
742 &self,
743 blob_ids: &[BlobId],
744 ) -> Result<Vec<Option<BlobState>>, ViewError> {
745 if blob_ids.is_empty() {
746 return Ok(Vec::new());
747 }
748 let mut blob_states = Vec::new();
749 for blob_id in blob_ids {
750 blob_states.push(self.read_blob_state(*blob_id).await?);
751 }
752 #[cfg(with_metrics)]
753 metrics::READ_BLOB_STATES_COUNTER
754 .with_label_values(&[])
755 .inc_by(blob_ids.len() as u64);
756 Ok(blob_states)
757 }
758
759 #[instrument(skip_all, fields(blob_id = %blob.id()))]
760 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
761 let mut batch = MultiPartitionBatch::new();
762 batch.add_blob(blob);
763 self.write_batch(batch).await?;
764 Ok(())
765 }
766
767 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
768 async fn maybe_write_blob_states(
769 &self,
770 blob_ids: &[BlobId],
771 blob_state: BlobState,
772 ) -> Result<(), ViewError> {
773 if blob_ids.is_empty() {
774 return Ok(());
775 }
776 let mut maybe_blob_states = Vec::new();
777 for blob_id in blob_ids {
778 let root_key = RootKey::BlobId(*blob_id).bytes();
779 let store = self.database.open_shared(&root_key)?;
780 let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
781 maybe_blob_states.push(maybe_blob_state);
782 }
783 let mut batch = MultiPartitionBatch::new();
784 for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
785 match maybe_blob_state {
786 None => {
787 batch.add_blob_state(*blob_id, &blob_state)?;
788 }
789 Some(state) => {
790 if state.epoch < blob_state.epoch {
791 batch.add_blob_state(*blob_id, &blob_state)?;
792 }
793 }
794 }
795 }
796 self.write_batch(batch).await?;
800 Ok(())
801 }
802
803 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
804 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
805 if blobs.is_empty() {
806 return Ok(Vec::new());
807 }
808 let mut batch = MultiPartitionBatch::new();
809 let mut blob_states = Vec::new();
810 for blob in blobs {
811 let root_key = RootKey::BlobId(blob.id()).bytes();
812 let store = self.database.open_shared(&root_key)?;
813 let has_state = store.contains_key(BLOB_STATE_KEY).await?;
814 blob_states.push(has_state);
815 if has_state {
816 batch.add_blob(blob);
817 }
818 }
819 self.write_batch(batch).await?;
820 Ok(blob_states)
821 }
822
823 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
824 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
825 if blobs.is_empty() {
826 return Ok(());
827 }
828 let mut batch = MultiPartitionBatch::new();
829 for blob in blobs {
830 batch.add_blob(blob);
831 }
832 self.write_batch(batch).await
833 }
834
835 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
836 async fn write_blobs_and_certificate(
837 &self,
838 blobs: &[Blob],
839 certificate: &ConfirmedBlockCertificate,
840 ) -> Result<(), ViewError> {
841 let mut batch = MultiPartitionBatch::new();
842 for blob in blobs {
843 batch.add_blob(blob);
844 }
845 batch.add_certificate(certificate)?;
846 self.write_batch(batch).await
847 }
848
849 #[instrument(skip_all, fields(%hash))]
850 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
851 let root_key = RootKey::BlockHash(hash).bytes();
852 let store = self.database.open_shared(&root_key)?;
853 let results = store.contains_keys(&get_block_keys()).await?;
854 #[cfg(with_metrics)]
855 metrics::CONTAINS_CERTIFICATE_COUNTER
856 .with_label_values(&[])
857 .inc();
858 Ok(results[0] && results[1])
859 }
860
861 #[instrument(skip_all, fields(%hash))]
862 async fn read_certificate(
863 &self,
864 hash: CryptoHash,
865 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
866 let root_key = RootKey::BlockHash(hash).bytes();
867 let store = self.database.open_shared(&root_key)?;
868 let values = store.read_multi_values_bytes(&get_block_keys()).await?;
869 #[cfg(with_metrics)]
870 metrics::READ_CERTIFICATE_COUNTER
871 .with_label_values(&[])
872 .inc();
873 Self::deserialize_certificate(&values, hash)
874 }
875
876 #[instrument(skip_all)]
877 async fn read_certificates(
878 &self,
879 hashes: &[CryptoHash],
880 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
881 let raw_certs = self.read_certificates_raw(hashes).await?;
882
883 raw_certs
884 .into_iter()
885 .zip(hashes)
886 .map(|(maybe_raw, hash)| {
887 let Some((lite_cert_bytes, confirmed_block_bytes)) = maybe_raw else {
888 return Ok(None);
889 };
890 let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
891 let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
892 assert_eq!(&value.hash(), hash);
893 let certificate = cert
894 .with_value(value)
895 .ok_or(ViewError::InconsistentEntries)?;
896 Ok(Some(certificate))
897 })
898 .collect()
899 }
900
901 #[instrument(skip_all)]
902 async fn read_certificates_raw(
903 &self,
904 hashes: &[CryptoHash],
905 ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
906 if hashes.is_empty() {
907 return Ok(Vec::new());
908 }
909 let root_keys = Self::get_root_keys_for_certificates(hashes);
910 let mut values = Vec::new();
911 for root_key in root_keys {
912 let store = self.database.open_shared(&root_key)?;
913 values.extend(store.read_multi_values_bytes(&get_block_keys()).await?);
914 }
915 #[cfg(with_metrics)]
916 metrics::READ_CERTIFICATES_COUNTER
917 .with_label_values(&[])
918 .inc_by(hashes.len() as u64);
919 Ok(values
920 .chunks_exact(2)
921 .map(|chunk| {
922 let lite_cert_bytes = chunk[0].as_ref()?;
923 let confirmed_block_bytes = chunk[1].as_ref()?;
924 Some((lite_cert_bytes.clone(), confirmed_block_bytes.clone()))
925 })
926 .collect())
927 }
928
929 async fn read_certificate_hashes_by_heights(
930 &self,
931 chain_id: ChainId,
932 heights: &[BlockHeight],
933 ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
934 if heights.is_empty() {
935 return Ok(Vec::new());
936 }
937
938 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
939 let store = self.database.open_shared(&index_root_key)?;
940 let height_keys: Vec<Vec<u8>> = heights.iter().map(|h| to_height_key(*h)).collect();
941 let hash_bytes = store.read_multi_values_bytes(&height_keys).await?;
942 let hash_options: Vec<Option<CryptoHash>> = hash_bytes
943 .into_iter()
944 .map(|opt| {
945 opt.map(|bytes| bcs::from_bytes::<CryptoHash>(&bytes))
946 .transpose()
947 })
948 .collect::<Result<_, _>>()?;
949
950 Ok(hash_options)
951 }
952
953 #[instrument(skip_all)]
954 async fn read_certificates_by_heights_raw(
955 &self,
956 chain_id: ChainId,
957 heights: &[BlockHeight],
958 ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
959 let hashes: Vec<Option<CryptoHash>> = self
960 .read_certificate_hashes_by_heights(chain_id, heights)
961 .await?;
962
963 let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
965 for (index, maybe_hash) in hashes.iter().enumerate() {
966 if let Some(hash) = maybe_hash {
967 indices.entry(*hash).or_default().push(index);
968 }
969 }
970
971 let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
973
974 let mut result = vec![None; heights.len()];
975
976 for (raw_cert, hash) in self
977 .read_certificates_raw(&unique_hashes)
978 .await?
979 .into_iter()
980 .zip(unique_hashes)
981 {
982 if let Some(idx_list) = indices.get(&hash) {
983 for &index in idx_list {
984 result[index] = raw_cert.clone();
985 }
986 } else {
987 tracing::error!(?hash, "certificate hash not found in indices map",);
989 }
990 }
991
992 Ok(result)
993 }
994
995 #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
996 async fn read_certificates_by_heights(
997 &self,
998 chain_id: ChainId,
999 heights: &[BlockHeight],
1000 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
1001 self.read_certificates_by_heights_raw(chain_id, heights)
1002 .await?
1003 .into_iter()
1004 .map(|maybe_raw| match maybe_raw {
1005 None => Ok(None),
1006 Some((lite_cert_bytes, confirmed_block_bytes)) => {
1007 let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
1008 let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
1009 let certificate = cert
1010 .with_value(value)
1011 .ok_or(ViewError::InconsistentEntries)?;
1012 Ok(Some(certificate))
1013 }
1014 })
1015 .collect()
1016 }
1017
1018 #[instrument(skip_all, fields(event_id = ?event_id))]
1019 async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
1020 let event_key = to_event_key(&event_id);
1021 let root_key = RootKey::Event(event_id.chain_id).bytes();
1022 let store = self.database.open_shared(&root_key)?;
1023 let event = store.read_value_bytes(&event_key).await?;
1024 #[cfg(with_metrics)]
1025 metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
1026 Ok(event)
1027 }
1028
1029 #[instrument(skip_all, fields(event_id = ?event_id))]
1030 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1031 let event_key = to_event_key(&event_id);
1032 let root_key = RootKey::Event(event_id.chain_id).bytes();
1033 let store = self.database.open_shared(&root_key)?;
1034 let exists = store.contains_key(&event_key).await?;
1035 #[cfg(with_metrics)]
1036 metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
1037 Ok(exists)
1038 }
1039
1040 #[instrument(skip_all, fields(chain_id = %chain_id, stream_id = %stream_id, start_index = %start_index))]
1041 async fn read_events_from_index(
1042 &self,
1043 chain_id: &ChainId,
1044 stream_id: &StreamId,
1045 start_index: u32,
1046 ) -> Result<Vec<IndexAndEvent>, ViewError> {
1047 let root_key = RootKey::Event(*chain_id).bytes();
1048 let store = self.database.open_shared(&root_key)?;
1049 let mut keys = Vec::new();
1050 let mut indices = Vec::new();
1051 let prefix = bcs::to_bytes(stream_id).unwrap();
1052 for short_key in store.find_keys_by_prefix(&prefix).await? {
1053 let index = bcs::from_bytes::<u32>(&short_key)?;
1054 if index >= start_index {
1055 let mut key = prefix.clone();
1056 key.extend(short_key);
1057 keys.push(key);
1058 indices.push(index);
1059 }
1060 }
1061 let values = store.read_multi_values_bytes(&keys).await?;
1062 let mut returned_values = Vec::new();
1063 for (index, value) in indices.into_iter().zip(values) {
1064 let event = value.unwrap();
1065 returned_values.push(IndexAndEvent { index, event });
1066 }
1067 Ok(returned_values)
1068 }
1069
1070 #[instrument(skip_all)]
1071 async fn write_events(
1072 &self,
1073 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1074 ) -> Result<(), ViewError> {
1075 let mut batch = MultiPartitionBatch::new();
1076 for (event_id, value) in events {
1077 batch.add_event(&event_id, value);
1078 }
1079 self.write_batch(batch).await
1080 }
1081
1082 #[instrument(skip_all)]
1083 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1084 let root_key = RootKey::NetworkDescription.bytes();
1085 let store = self.database.open_shared(&root_key)?;
1086 let maybe_value = store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1087 #[cfg(with_metrics)]
1088 metrics::READ_NETWORK_DESCRIPTION
1089 .with_label_values(&[])
1090 .inc();
1091 Ok(maybe_value)
1092 }
1093
1094 #[instrument(skip_all)]
1095 async fn write_network_description(
1096 &self,
1097 information: &NetworkDescription,
1098 ) -> Result<(), ViewError> {
1099 let mut batch = MultiPartitionBatch::new();
1100 batch.add_network_description(information)?;
1101 self.write_batch(batch).await?;
1102 Ok(())
1103 }
1104
1105 fn wasm_runtime(&self) -> Option<WasmRuntime> {
1106 self.wasm_runtime
1107 }
1108
1109 #[instrument(skip_all)]
1110 async fn block_exporter_context(
1111 &self,
1112 block_exporter_id: u32,
1113 ) -> Result<Self::BlockExporterContext, ViewError> {
1114 let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1115 let store = self.database.open_exclusive(&root_key)?;
1116 Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1117 }
1118
1119 async fn list_blob_ids(&self) -> Result<Vec<BlobId>, ViewError> {
1120 let root_keys = self.database.list_root_keys().await?;
1121 let mut blob_ids = Vec::new();
1122 for root_key in root_keys {
1123 if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
1124 let root_key_red = &root_key[1..];
1125 let blob_id = bcs::from_bytes(root_key_red)?;
1126 blob_ids.push(blob_id);
1127 }
1128 }
1129 Ok(blob_ids)
1130 }
1131
1132 async fn list_chain_ids(&self) -> Result<Vec<ChainId>, ViewError> {
1133 let root_keys = self.database.list_root_keys().await?;
1134 let mut chain_ids = Vec::new();
1135 for root_key in root_keys {
1136 if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
1137 let root_key_red = &root_key[1..];
1138 let chain_id = bcs::from_bytes(root_key_red)?;
1139 chain_ids.push(chain_id);
1140 }
1141 }
1142 Ok(chain_ids)
1143 }
1144
1145 async fn list_event_ids(&self) -> Result<Vec<EventId>, ViewError> {
1146 let root_keys = self.database.list_root_keys().await?;
1147 let mut event_ids = Vec::new();
1148 for root_key in root_keys {
1149 if !root_key.is_empty() && root_key[0] == EVENT_ID_TAG {
1150 let root_key_red = &root_key[1..];
1151 let chain_id = bcs::from_bytes(root_key_red)?;
1152 let store = self.database.open_shared(&root_key)?;
1153 let keys = store.find_keys_by_prefix(&[]).await?;
1154 for key in keys {
1155 let restricted_event_id = bcs::from_bytes::<RestrictedEventId>(&key)?;
1156 let event_id = EventId {
1157 chain_id,
1158 stream_id: restricted_event_id.stream_id,
1159 index: restricted_event_id.index,
1160 };
1161 event_ids.push(event_id);
1162 }
1163 }
1164 }
1165 Ok(event_ids)
1166 }
1167}
1168
1169impl<Database, C> DbStorage<Database, C>
1170where
1171 Database: KeyValueDatabase + Clone,
1172 Database::Store: KeyValueStore + Clone,
1173 C: Clock,
1174 Database::Error: Send + Sync,
1175{
1176 #[instrument(skip_all)]
1177 fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1178 hashes
1179 .iter()
1180 .map(|hash| RootKey::BlockHash(*hash).bytes())
1181 .collect()
1182 }
1183
1184 #[instrument(skip_all)]
1185 fn deserialize_certificate(
1186 pair: &[Option<Vec<u8>>],
1187 hash: CryptoHash,
1188 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
1189 let Some(cert_bytes) = pair[0].as_ref() else {
1190 return Ok(None);
1191 };
1192 let Some(value_bytes) = pair[1].as_ref() else {
1193 return Ok(None);
1194 };
1195 let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
1196 let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
1197 assert_eq!(value.hash(), hash);
1198 let certificate = cert
1199 .with_value(value)
1200 .ok_or(ViewError::InconsistentEntries)?;
1201 Ok(Some(certificate))
1202 }
1203
1204 #[instrument(skip_all)]
1205 async fn write_entry(
1206 store: &Database::Store,
1207 key_values: Vec<(Vec<u8>, Vec<u8>)>,
1208 ) -> Result<(), ViewError> {
1209 let mut batch = Batch::new();
1210 for (key, value) in key_values {
1211 batch.put_key_value_bytes(key, value);
1212 }
1213 store.write_batch(batch).await?;
1214 Ok(())
1215 }
1216
1217 #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1218 async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1219 if batch.keys_value_bytes.is_empty() {
1220 return Ok(());
1221 }
1222 let mut futures = Vec::new();
1223 for (root_key, key_values) in batch.keys_value_bytes {
1224 let store = self.database.open_shared(&root_key)?;
1225 futures.push(async move { Self::write_entry(&store, key_values).await });
1226 }
1227 futures::future::try_join_all(futures).await?;
1228 Ok(())
1229 }
1230}
1231
1232impl<Database, C> DbStorage<Database, C> {
1233 fn new(database: Database, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
1234 Self {
1235 database: Arc::new(database),
1236 clock,
1237 #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1239 thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1240 wasm_runtime,
1241 user_contracts: Arc::new(papaya::HashMap::new()),
1242 user_services: Arc::new(papaya::HashMap::new()),
1243 execution_runtime_config: ExecutionRuntimeConfig::default(),
1244 }
1245 }
1246
1247 pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1249 self.execution_runtime_config.allow_application_logs = allow;
1250 self
1251 }
1252}
1253
1254impl<Database> DbStorage<Database, WallClock>
1255where
1256 Database: KeyValueDatabase + Clone + 'static,
1257 Database::Error: Send + Sync,
1258 Database::Store: KeyValueStore + Clone + 'static,
1259{
1260 pub async fn maybe_create_and_connect(
1261 config: &Database::Config,
1262 namespace: &str,
1263 wasm_runtime: Option<WasmRuntime>,
1264 ) -> Result<Self, Database::Error> {
1265 let database = Database::maybe_create_and_connect(config, namespace).await?;
1266 Ok(Self::new(database, wasm_runtime, WallClock))
1267 }
1268
1269 pub async fn connect(
1270 config: &Database::Config,
1271 namespace: &str,
1272 wasm_runtime: Option<WasmRuntime>,
1273 ) -> Result<Self, Database::Error> {
1274 let database = Database::connect(config, namespace).await?;
1275 Ok(Self::new(database, wasm_runtime, WallClock))
1276 }
1277}
1278
1279#[cfg(with_testing)]
1280impl<Database> DbStorage<Database, TestClock>
1281where
1282 Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1283 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1284 Database::Error: Send + Sync,
1285{
1286 pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1287 let config = Database::new_test_config().await.unwrap();
1288 let namespace = generate_test_namespace();
1289 DbStorage::<Database, TestClock>::new_for_testing(
1290 config,
1291 &namespace,
1292 wasm_runtime,
1293 TestClock::new(),
1294 )
1295 .await
1296 .unwrap()
1297 }
1298
1299 pub async fn new_for_testing(
1300 config: Database::Config,
1301 namespace: &str,
1302 wasm_runtime: Option<WasmRuntime>,
1303 clock: TestClock,
1304 ) -> Result<Self, Database::Error> {
1305 let database = Database::recreate_and_connect(&config, namespace).await?;
1306 Ok(Self::new(database, wasm_runtime, clock))
1307 }
1308}
1309
1310#[cfg(test)]
1311mod tests {
1312 use linera_base::{
1313 crypto::{CryptoHash, TestString},
1314 data_types::{BlockHeight, Epoch, Round, Timestamp},
1315 identifiers::{
1316 ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
1317 StreamName,
1318 },
1319 };
1320 use linera_chain::{
1321 block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
1322 types::ConfirmedBlockCertificate,
1323 };
1324 use linera_views::{
1325 memory::MemoryDatabase,
1326 store::{KeyValueDatabase, ReadableKeyValueStore as _},
1327 };
1328
1329 use crate::{
1330 db_storage::{
1331 to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
1332 EVENT_ID_TAG,
1333 },
1334 DbStorage, Storage, TestClock,
1335 };
1336
1337 #[test]
1344 fn test_root_key_blob_serialization() {
1345 let hash = CryptoHash::default();
1346 let blob_type = BlobType::default();
1347 let blob_id = BlobId::new(hash, blob_type);
1348 let root_key = RootKey::BlobId(blob_id).bytes();
1349 assert_eq!(root_key[0], BLOB_ID_TAG);
1350 assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
1351 }
1352
1353 #[test]
1356 fn test_root_key_chainstate_serialization() {
1357 let hash = CryptoHash::default();
1358 let chain_id = ChainId(hash);
1359 let root_key = RootKey::ChainState(chain_id).bytes();
1360 assert_eq!(root_key[0], CHAIN_ID_TAG);
1361 assert_eq!(
1362 bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
1363 chain_id
1364 );
1365 }
1366
1367 #[test]
1370 fn test_root_key_event_serialization() {
1371 let hash = CryptoHash::test_hash("49");
1372 let chain_id = ChainId(hash);
1373 let application_description_hash = CryptoHash::test_hash("42");
1374 let application_id = ApplicationId::new(application_description_hash);
1375 let application_id = GenericApplicationId::User(application_id);
1376 let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
1377 let stream_id = StreamId {
1378 application_id,
1379 stream_name,
1380 };
1381 let prefix = bcs::to_bytes(&stream_id).unwrap();
1382
1383 let index = 1567;
1384 let event_id = EventId {
1385 chain_id,
1386 stream_id,
1387 index,
1388 };
1389 let root_key = RootKey::Event(chain_id).bytes();
1390 assert_eq!(root_key[0], EVENT_ID_TAG);
1391 let key = to_event_key(&event_id);
1392 assert!(key.starts_with(&prefix));
1393 }
1394
1395 #[test]
1398 fn test_root_key_block_by_height_serialization() {
1399 use linera_base::data_types::BlockHeight;
1400
1401 let hash = CryptoHash::default();
1402 let chain_id = ChainId(hash);
1403 let height = BlockHeight(42);
1404
1405 let root_key = RootKey::BlockByHeight(chain_id).bytes();
1407 let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
1408 assert_eq!(deserialized_chain_id, chain_id);
1409
1410 let height_key = to_height_key(height);
1412 let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
1413 assert_eq!(deserialized_height, height);
1414 }
1415
1416 #[cfg(with_testing)]
1417 #[tokio::test]
1418 async fn test_add_certificate_creates_height_index() {
1419 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1421
1422 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1424 let height = BlockHeight(5);
1425 let block = Block {
1426 header: BlockHeader {
1427 chain_id,
1428 epoch: Epoch::ZERO,
1429 height,
1430 timestamp: Timestamp::from(0),
1431 state_hash: CryptoHash::new(&TestString::new("state_hash")),
1432 previous_block_hash: None,
1433 authenticated_owner: None,
1434 transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
1435 messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
1436 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1437 "prev_msg_blocks_hash",
1438 )),
1439 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1440 "prev_event_blocks_hash",
1441 )),
1442 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
1443 events_hash: CryptoHash::new(&TestString::new("events_hash")),
1444 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1445 operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
1446 },
1447 body: BlockBody {
1448 transactions: vec![],
1449 messages: vec![],
1450 previous_message_blocks: Default::default(),
1451 previous_event_blocks: Default::default(),
1452 oracle_responses: vec![],
1453 events: vec![],
1454 blobs: vec![],
1455 operation_results: vec![],
1456 },
1457 };
1458 let confirmed_block = ConfirmedBlock::new(block);
1459 let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1460
1461 let mut batch = MultiPartitionBatch::new();
1463 batch.add_certificate(&certificate).unwrap();
1464 storage.write_batch(batch).await.unwrap();
1465
1466 let hash = certificate.hash();
1468 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1469 let store = storage.database.open_shared(&index_root_key).unwrap();
1470 let height_key = to_height_key(height);
1471 let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
1472
1473 assert!(value_bytes.is_some(), "Height index was not created");
1474 let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
1475 assert_eq!(stored_hash, hash, "Height index contains wrong hash");
1476 }
1477
1478 #[cfg(with_testing)]
1479 #[tokio::test]
1480 async fn test_read_certificates_by_heights() {
1481 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1482 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1483
1484 let mut batch = MultiPartitionBatch::new();
1486 let mut expected_certs = vec![];
1487
1488 for height in [1, 3, 5] {
1489 let block = Block {
1490 header: BlockHeader {
1491 chain_id,
1492 epoch: Epoch::ZERO,
1493 height: BlockHeight(height),
1494 timestamp: Timestamp::from(0),
1495 state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
1496 previous_block_hash: None,
1497 authenticated_owner: None,
1498 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
1499 messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
1500 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
1501 "pmb_hash_{height}",
1502 )),
1503 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
1504 "peb_hash_{height}",
1505 )),
1506 oracle_responses_hash: CryptoHash::new(&TestString::new(
1507 "oracle_hash_{height}",
1508 )),
1509 events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
1510 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
1511 operation_results_hash: CryptoHash::new(&TestString::new(
1512 "op_results_hash_{height}",
1513 )),
1514 },
1515 body: BlockBody {
1516 transactions: vec![],
1517 messages: vec![],
1518 previous_message_blocks: Default::default(),
1519 previous_event_blocks: Default::default(),
1520 oracle_responses: vec![],
1521 events: vec![],
1522 blobs: vec![],
1523 operation_results: vec![],
1524 },
1525 };
1526 let confirmed_block = ConfirmedBlock::new(block);
1527 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1528 expected_certs.push((height, cert.clone()));
1529 batch.add_certificate(&cert).unwrap();
1530 }
1531 storage.write_batch(batch).await.unwrap();
1532
1533 let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
1535 let result = storage
1536 .read_certificates_by_heights(chain_id, &heights)
1537 .await
1538 .unwrap();
1539 assert_eq!(result.len(), 3);
1540 assert_eq!(
1541 result[0].as_ref().unwrap().hash(),
1542 expected_certs[0].1.hash()
1543 );
1544 assert_eq!(
1545 result[1].as_ref().unwrap().hash(),
1546 expected_certs[1].1.hash()
1547 );
1548 assert_eq!(
1549 result[2].as_ref().unwrap().hash(),
1550 expected_certs[2].1.hash()
1551 );
1552
1553 let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
1555 let result = storage
1556 .read_certificates_by_heights(chain_id, &heights)
1557 .await
1558 .unwrap();
1559 assert_eq!(result.len(), 3);
1560 assert_eq!(
1561 result[0].as_ref().unwrap().hash(),
1562 expected_certs[2].1.hash()
1563 );
1564 assert_eq!(
1565 result[1].as_ref().unwrap().hash(),
1566 expected_certs[0].1.hash()
1567 );
1568 assert_eq!(
1569 result[2].as_ref().unwrap().hash(),
1570 expected_certs[1].1.hash()
1571 );
1572
1573 let heights = vec![
1575 BlockHeight(1),
1576 BlockHeight(2),
1577 BlockHeight(3),
1578 BlockHeight(3),
1579 ];
1580 let result = storage
1581 .read_certificates_by_heights(chain_id, &heights)
1582 .await
1583 .unwrap();
1584 assert_eq!(result.len(), 4); assert!(result[0].is_some());
1586 assert!(result[1].is_none()); assert!(result[2].is_some());
1588 assert!(result[3].is_some());
1589 assert_eq!(
1590 result[2].as_ref().unwrap().hash(),
1591 result[3].as_ref().unwrap().hash()
1592 ); let heights = vec![];
1596 let result = storage
1597 .read_certificates_by_heights(chain_id, &heights)
1598 .await
1599 .unwrap();
1600 assert_eq!(result.len(), 0);
1601 }
1602
1603 #[cfg(with_testing)]
1604 #[tokio::test]
1605 async fn test_read_certificates_by_heights_multiple_chains() {
1606 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1607
1608 let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
1610 let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
1611
1612 let mut batch = MultiPartitionBatch::new();
1613
1614 let block_a = Block {
1615 header: BlockHeader {
1616 chain_id: chain_a,
1617 epoch: Epoch::ZERO,
1618 height: BlockHeight(10),
1619 timestamp: Timestamp::from(0),
1620 state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
1621 previous_block_hash: None,
1622 authenticated_owner: None,
1623 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
1624 messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
1625 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
1626 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
1627 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
1628 events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
1629 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
1630 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
1631 },
1632 body: BlockBody {
1633 transactions: vec![],
1634 messages: vec![],
1635 previous_message_blocks: Default::default(),
1636 previous_event_blocks: Default::default(),
1637 oracle_responses: vec![],
1638 events: vec![],
1639 blobs: vec![],
1640 operation_results: vec![],
1641 },
1642 };
1643 let confirmed_block_a = ConfirmedBlock::new(block_a);
1644 let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
1645 batch.add_certificate(&cert_a).unwrap();
1646
1647 let block_b = Block {
1648 header: BlockHeader {
1649 chain_id: chain_b,
1650 epoch: Epoch::ZERO,
1651 height: BlockHeight(10),
1652 timestamp: Timestamp::from(0),
1653 state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
1654 previous_block_hash: None,
1655 authenticated_owner: None,
1656 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
1657 messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
1658 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
1659 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
1660 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
1661 events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
1662 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
1663 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
1664 },
1665 body: BlockBody {
1666 transactions: vec![],
1667 messages: vec![],
1668 previous_message_blocks: Default::default(),
1669 previous_event_blocks: Default::default(),
1670 oracle_responses: vec![],
1671 events: vec![],
1672 blobs: vec![],
1673 operation_results: vec![],
1674 },
1675 };
1676 let confirmed_block_b = ConfirmedBlock::new(block_b);
1677 let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
1678 batch.add_certificate(&cert_b).unwrap();
1679
1680 storage.write_batch(batch).await.unwrap();
1681
1682 let result = storage
1684 .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
1685 .await
1686 .unwrap();
1687 assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
1688
1689 let result = storage
1691 .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
1692 .await
1693 .unwrap();
1694 assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
1695
1696 let result = storage
1698 .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
1699 .await
1700 .unwrap();
1701 assert!(result[0].is_none());
1702 }
1703
1704 #[cfg(with_testing)]
1705 #[tokio::test]
1706 async fn test_read_certificates_by_heights_consistency() {
1707 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
1708 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
1709
1710 let mut batch = MultiPartitionBatch::new();
1712 let block = Block {
1713 header: BlockHeader {
1714 chain_id,
1715 epoch: Epoch::ZERO,
1716 height: BlockHeight(7),
1717 timestamp: Timestamp::from(0),
1718 state_hash: CryptoHash::new(&TestString::new("state_hash")),
1719 previous_block_hash: None,
1720 authenticated_owner: None,
1721 transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
1722 messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
1723 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
1724 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
1725 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
1726 events_hash: CryptoHash::new(&TestString::new("events_hash")),
1727 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
1728 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
1729 },
1730 body: BlockBody {
1731 transactions: vec![],
1732 messages: vec![],
1733 previous_message_blocks: Default::default(),
1734 previous_event_blocks: Default::default(),
1735 oracle_responses: vec![],
1736 events: vec![],
1737 blobs: vec![],
1738 operation_results: vec![],
1739 },
1740 };
1741 let confirmed_block = ConfirmedBlock::new(block);
1742 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
1743 let hash = cert.hash();
1744 batch.add_certificate(&cert).unwrap();
1745 storage.write_batch(batch).await.unwrap();
1746
1747 let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
1749
1750 let certs_by_height = storage
1752 .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
1753 .await
1754 .unwrap();
1755 let cert_by_height = certs_by_height[0].as_ref().unwrap();
1756
1757 assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
1759 assert_eq!(
1760 cert_by_hash.value().block().header,
1761 cert_by_height.value().block().header
1762 );
1763 }
1764}