1use std::{fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use linera_base::{
10 crypto::CryptoHash,
11 data_types::{Blob, NetworkDescription, TimeDelta, Timestamp},
12 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
13};
14use linera_chain::{
15 types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
16 ChainStateView,
17};
18use linera_execution::{
19 BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
20};
21use linera_views::{
22 backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
23 batch::Batch,
24 context::ViewContext,
25 store::{
26 KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
27 },
28 views::View,
29 ViewError,
30};
31use serde::{Deserialize, Serialize};
32use tracing::instrument;
33#[cfg(with_testing)]
34use {
35 futures::channel::oneshot::{self, Receiver},
36 linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
37 std::{cmp::Reverse, collections::BTreeMap},
38};
39
40use crate::{ChainRuntimeContext, Clock, Storage};
41
42#[cfg(with_metrics)]
43pub mod metrics {
44 use std::sync::LazyLock;
45
46 use linera_base::prometheus_util::{
47 exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
48 };
49 use prometheus::{HistogramVec, IntCounterVec};
50
51 pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
53 register_int_counter_vec(
54 "contains_blob",
55 "The metric counting how often a blob is tested for existence from storage",
56 &[],
57 )
58 });
59
60 pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
62 register_int_counter_vec(
63 "contains_blobs",
64 "The metric counting how often multiple blobs are tested for existence from storage",
65 &[],
66 )
67 });
68
69 pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
71 register_int_counter_vec(
72 "contains_blob_state",
73 "The metric counting how often a blob state is tested for existence from storage",
74 &[],
75 )
76 });
77
78 pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
80 register_int_counter_vec(
81 "contains_certificate",
82 "The metric counting how often a certificate is tested for existence from storage",
83 &[],
84 )
85 });
86
87 #[doc(hidden)]
89 pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
90 register_int_counter_vec(
91 "read_confirmed_block",
92 "The metric counting how often a hashed confirmed block is read from storage",
93 &[],
94 )
95 });
96
97 #[doc(hidden)]
99 pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
100 register_int_counter_vec(
101 "read_blob",
102 "The metric counting how often a blob is read from storage",
103 &[],
104 )
105 });
106
107 #[doc(hidden)]
109 pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
110 register_int_counter_vec(
111 "read_blob_state",
112 "The metric counting how often a blob state is read from storage",
113 &[],
114 )
115 });
116
117 #[doc(hidden)]
119 pub(super) static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
120 register_int_counter_vec(
121 "read_blob_states",
122 "The metric counting how often blob states are read from storage",
123 &[],
124 )
125 });
126
127 #[doc(hidden)]
129 pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
130 register_int_counter_vec(
131 "write_blob",
132 "The metric counting how often a blob is written to storage",
133 &[],
134 )
135 });
136
137 #[doc(hidden)]
139 pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
140 register_int_counter_vec(
141 "read_certificate",
142 "The metric counting how often a certificate is read from storage",
143 &[],
144 )
145 });
146
147 #[doc(hidden)]
149 pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
150 register_int_counter_vec(
151 "read_certificates",
152 "The metric counting how often certificate are read from storage",
153 &[],
154 )
155 });
156
157 #[doc(hidden)]
159 pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
160 register_int_counter_vec(
161 "write_certificate",
162 "The metric counting how often a certificate is written to storage",
163 &[],
164 )
165 });
166
167 #[doc(hidden)]
169 pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
170 register_histogram_vec(
171 "load_chain_latency",
172 "The latency to load a chain state",
173 &[],
174 exponential_bucket_latencies(10.0),
175 )
176 });
177
178 #[doc(hidden)]
180 pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
181 register_int_counter_vec(
182 "read_event",
183 "The metric counting how often an event is read from storage",
184 &[],
185 )
186 });
187
188 pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
190 register_int_counter_vec(
191 "contains_event",
192 "The metric counting how often an event is tested for existence from storage",
193 &[],
194 )
195 });
196
197 #[doc(hidden)]
199 pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
200 register_int_counter_vec(
201 "write_event",
202 "The metric counting how often an event is written to storage",
203 &[],
204 )
205 });
206
207 #[doc(hidden)]
209 pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
210 register_int_counter_vec(
211 "network_description",
212 "The metric counting how often the network description is read from storage",
213 &[],
214 )
215 });
216
217 #[doc(hidden)]
219 pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
220 register_int_counter_vec(
221 "write_network_description",
222 "The metric counting how often the network description is written to storage",
223 &[],
224 )
225 });
226}
227
228const DEFAULT_KEY: &[u8] = &[0];
230
231const ONE_KEY: &[u8] = &[1];
233
234fn get_01_keys() -> Vec<Vec<u8>> {
235 vec![vec![0], vec![1]]
236}
237
238#[derive(Default)]
239struct MultiPartitionBatch {
240 keys_value_bytes: Vec<(Vec<u8>, Vec<u8>, Vec<u8>)>,
241}
242
243impl MultiPartitionBatch {
244 fn new() -> Self {
245 Self::default()
246 }
247
248 fn put_key_value_bytes(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
249 self.keys_value_bytes.push((root_key, key, value));
250 }
251
252 fn put_key_value<T: Serialize>(
253 &mut self,
254 root_key: Vec<u8>,
255 value: &T,
256 ) -> Result<(), ViewError> {
257 let bytes = bcs::to_bytes(value)?;
258 let key = DEFAULT_KEY.to_vec();
259 self.keys_value_bytes.push((root_key, key, bytes));
260 Ok(())
261 }
262
263 fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
264 #[cfg(with_metrics)]
265 metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
266 let root_key = RootKey::Blob(blob.id()).bytes();
267 let key = DEFAULT_KEY.to_vec();
268 self.put_key_value_bytes(root_key, key, blob.bytes().to_vec());
269 Ok(())
270 }
271
272 fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
273 let root_key = RootKey::Blob(blob_id).bytes();
274 let key = ONE_KEY.to_vec();
275 let value = bcs::to_bytes(blob_state)?;
276 self.put_key_value_bytes(root_key, key, value);
277 Ok(())
278 }
279
280 fn add_certificate(
281 &mut self,
282 certificate: &ConfirmedBlockCertificate,
283 ) -> Result<(), ViewError> {
284 #[cfg(with_metrics)]
285 metrics::WRITE_CERTIFICATE_COUNTER
286 .with_label_values(&[])
287 .inc();
288 let hash = certificate.hash();
289 let root_key = RootKey::CryptoHash(hash).bytes();
290 self.put_key_value(root_key.clone(), &certificate.lite_certificate())?;
291 let value = bcs::to_bytes(&certificate.value())?;
292 self.put_key_value_bytes(root_key, ONE_KEY.to_vec(), value);
293 Ok(())
294 }
295
296 fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
297 #[cfg(with_metrics)]
298 metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
299 let key = event_key(&event_id);
300 let root_key = RootKey::Event(event_id.chain_id).bytes();
301 self.put_key_value_bytes(root_key, key, value);
302 Ok(())
303 }
304
305 fn add_network_description(
306 &mut self,
307 information: &NetworkDescription,
308 ) -> Result<(), ViewError> {
309 #[cfg(with_metrics)]
310 metrics::WRITE_NETWORK_DESCRIPTION
311 .with_label_values(&[])
312 .inc();
313 let root_key = RootKey::NetworkDescription.bytes();
314 self.put_key_value(root_key, information)?;
315 Ok(())
316 }
317}
318
319#[derive(Clone)]
321pub struct DbStorage<Database, Clock = WallClock> {
322 database: Arc<Database>,
323 clock: Clock,
324 wasm_runtime: Option<WasmRuntime>,
325 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
326 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
327 execution_runtime_config: ExecutionRuntimeConfig,
328}
329
330#[derive(Debug, Serialize, Deserialize)]
331enum RootKey {
332 ChainState(ChainId),
333 CryptoHash(CryptoHash),
334 Blob(BlobId),
335 Event(ChainId),
336 BlockExporterState(u32),
337 NetworkDescription,
338}
339
340impl RootKey {
341 fn bytes(&self) -> Vec<u8> {
342 bcs::to_bytes(self).unwrap()
343 }
344}
345
346fn event_key(event_id: &EventId) -> Vec<u8> {
347 let mut key = bcs::to_bytes(&event_id.stream_id).unwrap();
348 key.extend(bcs::to_bytes(&event_id.index).unwrap());
349 key
350}
351
352fn is_chain_state(root_key: &[u8]) -> bool {
353 if root_key.is_empty() {
354 return false;
355 }
356 root_key[0] == CHAIN_ID_TAG
357}
358
359const CHAIN_ID_TAG: u8 = 0;
360const BLOB_ID_TAG: u8 = 2;
361const CHAIN_ID_LENGTH: usize = std::mem::size_of::<ChainId>();
362const BLOB_ID_LENGTH: usize = std::mem::size_of::<BlobId>();
363
364#[cfg(test)]
365mod tests {
366 use linera_base::{
367 crypto::CryptoHash,
368 identifiers::{
369 ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
370 StreamName,
371 },
372 };
373
374 use crate::db_storage::{
375 event_key, RootKey, BLOB_ID_LENGTH, BLOB_ID_TAG, CHAIN_ID_LENGTH, CHAIN_ID_TAG,
376 };
377
378 #[test]
385 fn test_root_key_blob_serialization() {
386 let hash = CryptoHash::default();
387 let blob_type = BlobType::default();
388 let blob_id = BlobId::new(hash, blob_type);
389 let root_key = RootKey::Blob(blob_id).bytes();
390 assert_eq!(root_key[0], BLOB_ID_TAG);
391 assert_eq!(root_key.len(), 1 + BLOB_ID_LENGTH);
392 }
393
394 #[test]
397 fn test_root_key_chainstate_serialization() {
398 let hash = CryptoHash::default();
399 let chain_id = ChainId(hash);
400 let root_key = RootKey::ChainState(chain_id).bytes();
401 assert_eq!(root_key[0], CHAIN_ID_TAG);
402 assert_eq!(root_key.len(), 1 + CHAIN_ID_LENGTH);
403 }
404
405 #[test]
408 fn test_root_key_event_serialization() {
409 let hash = CryptoHash::test_hash("49");
410 let chain_id = ChainId(hash);
411 let application_description_hash = CryptoHash::test_hash("42");
412 let application_id = ApplicationId::new(application_description_hash);
413 let application_id = GenericApplicationId::User(application_id);
414 let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
415 let stream_id = StreamId {
416 application_id,
417 stream_name,
418 };
419 let prefix = bcs::to_bytes(&stream_id).unwrap();
420
421 let index = 1567;
422 let event_id = EventId {
423 chain_id,
424 stream_id,
425 index,
426 };
427 let key = event_key(&event_id);
428 assert!(key.starts_with(&prefix));
429 }
430}
431
432#[derive(Clone, Copy)]
435pub struct ChainStatesFirstAssignment;
436
437impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
438 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
439 if root_key.is_empty() {
440 return Ok(StoreInUse::Second);
441 }
442 let store = match is_chain_state(root_key) {
443 true => StoreInUse::First,
444 false => StoreInUse::Second,
445 };
446 Ok(store)
447 }
448}
449
450#[derive(Clone)]
452pub struct WallClock;
453
454#[cfg_attr(not(web), async_trait)]
455#[cfg_attr(web, async_trait(?Send))]
456impl Clock for WallClock {
457 fn current_time(&self) -> Timestamp {
458 Timestamp::now()
459 }
460
461 async fn sleep(&self, delta: TimeDelta) {
462 linera_base::time::timer::sleep(delta.as_duration()).await
463 }
464
465 async fn sleep_until(&self, timestamp: Timestamp) {
466 let delta = timestamp.delta_since(Timestamp::now());
467 if delta > TimeDelta::ZERO {
468 self.sleep(delta).await
469 }
470 }
471}
472
473#[cfg(with_testing)]
474#[derive(Default)]
475struct TestClockInner {
476 time: Timestamp,
477 sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
478}
479
480#[cfg(with_testing)]
481impl TestClockInner {
482 fn set(&mut self, time: Timestamp) {
483 self.time = time;
484 let senders = self.sleeps.split_off(&Reverse(time));
485 for sender in senders.into_values().flatten() {
486 let _ = sender.send(());
487 }
488 }
489
490 fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
491 self.add_sleep_until(self.time.saturating_add(delta))
492 }
493
494 fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
495 let (sender, receiver) = oneshot::channel();
496 if self.time >= time {
497 let _ = sender.send(());
498 } else {
499 self.sleeps.entry(Reverse(time)).or_default().push(sender);
500 }
501 receiver
502 }
503}
504
505#[cfg(with_testing)]
508#[derive(Clone, Default)]
509pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
510
511#[cfg(with_testing)]
512#[cfg_attr(not(web), async_trait)]
513#[cfg_attr(web, async_trait(?Send))]
514impl Clock for TestClock {
515 fn current_time(&self) -> Timestamp {
516 self.lock().time
517 }
518
519 async fn sleep(&self, delta: TimeDelta) {
520 if delta == TimeDelta::ZERO {
521 return;
522 }
523 let receiver = self.lock().add_sleep(delta);
524 let _ = receiver.await;
525 }
526
527 async fn sleep_until(&self, timestamp: Timestamp) {
528 let receiver = self.lock().add_sleep_until(timestamp);
529 let _ = receiver.await;
530 }
531}
532
533#[cfg(with_testing)]
534impl TestClock {
535 pub fn new() -> Self {
537 TestClock(Arc::default())
538 }
539
540 pub fn set(&self, time: Timestamp) {
542 self.lock().set(time);
543 }
544
545 pub fn add(&self, delta: TimeDelta) {
547 let mut guard = self.lock();
548 let time = guard.time.saturating_add(delta);
549 guard.set(time);
550 }
551
552 pub fn current_time(&self) -> Timestamp {
554 self.lock().time
555 }
556
557 fn lock(&self) -> std::sync::MutexGuard<TestClockInner> {
558 self.0.lock().expect("poisoned TestClock mutex")
559 }
560}
561
562#[cfg_attr(not(web), async_trait)]
563#[cfg_attr(web, async_trait(?Send))]
564impl<Database, C> Storage for DbStorage<Database, C>
565where
566 Database: KeyValueDatabase + Clone + Send + Sync + 'static,
567 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
568 C: Clock + Clone + Send + Sync + 'static,
569 Database::Error: Send + Sync,
570{
571 type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
572 type Clock = C;
573 type BlockExporterContext = ViewContext<u32, Database::Store>;
574
575 fn clock(&self) -> &C {
576 &self.clock
577 }
578
579 #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
580 async fn load_chain(
581 &self,
582 chain_id: ChainId,
583 ) -> Result<ChainStateView<Self::Context>, ViewError> {
584 #[cfg(with_metrics)]
585 let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
586 let runtime_context = ChainRuntimeContext {
587 storage: self.clone(),
588 chain_id,
589 execution_runtime_config: self.execution_runtime_config,
590 user_contracts: self.user_contracts.clone(),
591 user_services: self.user_services.clone(),
592 };
593 let root_key = RootKey::ChainState(chain_id).bytes();
594 let store = self.database.open_exclusive(&root_key)?;
595 let context = ViewContext::create_root_context(store, runtime_context).await?;
596 ChainStateView::load(context).await
597 }
598
599 #[instrument(level = "trace", skip_all, fields(blob_id = %blob_id))]
600 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
601 let root_key = RootKey::Blob(blob_id).bytes();
602 let store = self.database.open_shared(&root_key)?;
603 let test = store.contains_key(DEFAULT_KEY).await?;
604 #[cfg(with_metrics)]
605 metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
606 Ok(test)
607 }
608
609 #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
610 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
611 let mut missing_blobs = Vec::new();
612 for blob_id in blob_ids {
613 let root_key = RootKey::Blob(*blob_id).bytes();
614 let store = self.database.open_shared(&root_key)?;
615 let test = store.contains_key(DEFAULT_KEY).await?;
616 if !test {
617 missing_blobs.push(*blob_id);
618 }
619 }
620 #[cfg(with_metrics)]
621 metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
622 Ok(missing_blobs)
623 }
624
625 #[instrument(skip_all, fields(blob_id = %blob_id))]
626 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
627 let root_key = RootKey::Blob(blob_id).bytes();
628 let store = self.database.open_shared(&root_key)?;
629 let test = store.contains_key(ONE_KEY).await?;
630 #[cfg(with_metrics)]
631 metrics::CONTAINS_BLOB_STATE_COUNTER
632 .with_label_values(&[])
633 .inc();
634 Ok(test)
635 }
636
637 #[instrument(skip_all, fields(hash = %hash))]
638 async fn read_confirmed_block(
639 &self,
640 hash: CryptoHash,
641 ) -> Result<Option<ConfirmedBlock>, ViewError> {
642 let root_key = RootKey::CryptoHash(hash).bytes();
643 let store = self.database.open_shared(&root_key)?;
644 let value = store.read_value(ONE_KEY).await?;
645 #[cfg(with_metrics)]
646 metrics::READ_CONFIRMED_BLOCK_COUNTER
647 .with_label_values(&[])
648 .inc();
649 Ok(value)
650 }
651
652 #[instrument(skip_all, fields(blob_id = %blob_id))]
653 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
654 let root_key = RootKey::Blob(blob_id).bytes();
655 let store = self.database.open_shared(&root_key)?;
656 let maybe_blob_bytes = store.read_value_bytes(DEFAULT_KEY).await?;
657 #[cfg(with_metrics)]
658 metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
659 Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
660 }
661
662 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
663 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
664 if blob_ids.is_empty() {
665 return Ok(Vec::new());
666 }
667 let mut blobs = Vec::new();
668 for blob_id in blob_ids {
669 blobs.push(self.read_blob(*blob_id).await?);
670 }
671 #[cfg(with_metrics)]
672 metrics::READ_BLOB_COUNTER
673 .with_label_values(&[])
674 .inc_by(blob_ids.len() as u64);
675 Ok(blobs)
676 }
677
678 #[instrument(skip_all, fields(blob_id = %blob_id))]
679 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
680 let root_key = RootKey::Blob(blob_id).bytes();
681 let store = self.database.open_shared(&root_key)?;
682 let blob_state = store.read_value::<BlobState>(ONE_KEY).await?;
683 #[cfg(with_metrics)]
684 metrics::READ_BLOB_STATE_COUNTER
685 .with_label_values(&[])
686 .inc();
687 Ok(blob_state)
688 }
689
690 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
691 async fn read_blob_states(
692 &self,
693 blob_ids: &[BlobId],
694 ) -> Result<Vec<Option<BlobState>>, ViewError> {
695 if blob_ids.is_empty() {
696 return Ok(Vec::new());
697 }
698 let mut blob_states = Vec::new();
699 for blob_id in blob_ids {
700 blob_states.push(self.read_blob_state(*blob_id).await?);
701 }
702 #[cfg(with_metrics)]
703 metrics::READ_BLOB_STATES_COUNTER
704 .with_label_values(&[])
705 .inc_by(blob_ids.len() as u64);
706 Ok(blob_states)
707 }
708
709 #[instrument(skip_all, fields(blob_id = %blob.id()))]
710 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
711 let mut batch = MultiPartitionBatch::new();
712 batch.add_blob(blob)?;
713 self.write_batch(batch).await?;
714 Ok(())
715 }
716
717 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
718 async fn maybe_write_blob_states(
719 &self,
720 blob_ids: &[BlobId],
721 blob_state: BlobState,
722 ) -> Result<(), ViewError> {
723 if blob_ids.is_empty() {
724 return Ok(());
725 }
726 let mut maybe_blob_states = Vec::new();
727 for blob_id in blob_ids {
728 let root_key = RootKey::Blob(*blob_id).bytes();
729 let store = self.database.open_shared(&root_key)?;
730 let maybe_blob_state = store.read_value::<BlobState>(ONE_KEY).await?;
731 maybe_blob_states.push(maybe_blob_state);
732 }
733 let mut batch = MultiPartitionBatch::new();
734 for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
735 match maybe_blob_state {
736 None => {
737 batch.add_blob_state(*blob_id, &blob_state)?;
738 }
739 Some(state) => {
740 if state.epoch < blob_state.epoch {
741 batch.add_blob_state(*blob_id, &blob_state)?;
742 }
743 }
744 }
745 }
746 self.write_batch(batch).await?;
750 Ok(())
751 }
752
753 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
754 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
755 if blobs.is_empty() {
756 return Ok(Vec::new());
757 }
758 let mut batch = MultiPartitionBatch::new();
759 let mut blob_states = Vec::new();
760 for blob in blobs {
761 let root_key = RootKey::Blob(blob.id()).bytes();
762 let store = self.database.open_shared(&root_key)?;
763 let has_state = store.contains_key(ONE_KEY).await?;
764 blob_states.push(has_state);
765 if has_state {
766 batch.add_blob(blob)?;
767 }
768 }
769 self.write_batch(batch).await?;
770 Ok(blob_states)
771 }
772
773 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
774 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
775 if blobs.is_empty() {
776 return Ok(());
777 }
778 let mut batch = MultiPartitionBatch::new();
779 for blob in blobs {
780 batch.add_blob(blob)?;
781 }
782 self.write_batch(batch).await
783 }
784
785 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
786 async fn write_blobs_and_certificate(
787 &self,
788 blobs: &[Blob],
789 certificate: &ConfirmedBlockCertificate,
790 ) -> Result<(), ViewError> {
791 let mut batch = MultiPartitionBatch::new();
792 for blob in blobs {
793 batch.add_blob(blob)?;
794 }
795 batch.add_certificate(certificate)?;
796 self.write_batch(batch).await
797 }
798
799 #[instrument(skip_all, fields(hash = %hash))]
800 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
801 let root_key = RootKey::CryptoHash(hash).bytes();
802 let store = self.database.open_shared(&root_key)?;
803 let results = store.contains_keys(get_01_keys()).await?;
804 #[cfg(with_metrics)]
805 metrics::CONTAINS_CERTIFICATE_COUNTER
806 .with_label_values(&[])
807 .inc();
808 Ok(results[0] && results[1])
809 }
810
811 #[instrument(skip_all, fields(hash = %hash))]
812 async fn read_certificate(
813 &self,
814 hash: CryptoHash,
815 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
816 let root_key = RootKey::CryptoHash(hash).bytes();
817 let store = self.database.open_shared(&root_key)?;
818 let values = store.read_multi_values_bytes(get_01_keys()).await?;
819 #[cfg(with_metrics)]
820 metrics::READ_CERTIFICATE_COUNTER
821 .with_label_values(&[])
822 .inc();
823 Self::deserialize_certificate(&values, hash)
824 }
825
826 #[instrument(skip_all)]
827 async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
828 &self,
829 hashes: I,
830 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
831 let hashes = hashes.into_iter().collect::<Vec<_>>();
832 if hashes.is_empty() {
833 return Ok(Vec::new());
834 }
835 let root_keys = Self::get_root_keys_for_certificates(&hashes);
836 let mut values = Vec::new();
837 for root_key in root_keys {
838 let store = self.database.open_shared(&root_key)?;
839 values.extend(store.read_multi_values_bytes(get_01_keys()).await?);
840 }
841 #[cfg(with_metrics)]
842 metrics::READ_CERTIFICATES_COUNTER
843 .with_label_values(&[])
844 .inc_by(hashes.len() as u64);
845 let mut certificates = Vec::new();
846 for (pair, hash) in values.chunks_exact(2).zip(hashes) {
847 let certificate = Self::deserialize_certificate(pair, hash)?;
848 certificates.push(certificate);
849 }
850 Ok(certificates)
851 }
852
853 #[instrument(skip_all)]
860 async fn read_certificates_raw<I: IntoIterator<Item = CryptoHash> + Send>(
861 &self,
862 hashes: I,
863 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
864 let hashes = hashes.into_iter().collect::<Vec<_>>();
865 if hashes.is_empty() {
866 return Ok(Vec::new());
867 }
868 let root_keys = Self::get_root_keys_for_certificates(&hashes);
869 let mut values = Vec::new();
870 for root_key in root_keys {
871 let store = self.database.open_shared(&root_key)?;
872 values.extend(store.read_multi_values_bytes(get_01_keys()).await?);
873 }
874 #[cfg(with_metrics)]
875 metrics::READ_CERTIFICATES_COUNTER
876 .with_label_values(&[])
877 .inc_by(hashes.len() as u64);
878 Ok(values
879 .chunks_exact(2)
880 .filter_map(|chunk| {
881 let lite_cert_bytes = chunk[0].as_ref()?;
882 let confirmed_block_bytes = chunk[1].as_ref()?;
883 Some((lite_cert_bytes.clone(), confirmed_block_bytes.clone()))
884 })
885 .collect())
886 }
887
888 #[instrument(skip_all, fields(event_id = ?event_id))]
889 async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
890 let event_key = event_key(&event_id);
891 let root_key = RootKey::Event(event_id.chain_id).bytes();
892 let store = self.database.open_shared(&root_key)?;
893 let event = store.read_value_bytes(&event_key).await?;
894 #[cfg(with_metrics)]
895 metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
896 Ok(event)
897 }
898
899 #[instrument(skip_all, fields(event_id = ?event_id))]
900 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
901 let event_key = event_key(&event_id);
902 let root_key = RootKey::Event(event_id.chain_id).bytes();
903 let store = self.database.open_shared(&root_key)?;
904 let exists = store.contains_key(&event_key).await?;
905 #[cfg(with_metrics)]
906 metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
907 Ok(exists)
908 }
909
910 #[instrument(skip_all, fields(chain_id = %chain_id, stream_id = %stream_id, start_index = %start_index))]
911 async fn read_events_from_index(
912 &self,
913 chain_id: &ChainId,
914 stream_id: &StreamId,
915 start_index: u32,
916 ) -> Result<Vec<IndexAndEvent>, ViewError> {
917 let root_key = RootKey::Event(*chain_id).bytes();
918 let store = self.database.open_shared(&root_key)?;
919 let mut keys = Vec::new();
920 let mut indices = Vec::new();
921 let prefix = bcs::to_bytes(stream_id).unwrap();
922 for short_key in store.find_keys_by_prefix(&prefix).await? {
923 let index = bcs::from_bytes::<u32>(&short_key)?;
924 if index >= start_index {
925 let mut key = prefix.clone();
926 key.extend(short_key);
927 keys.push(key);
928 indices.push(index);
929 }
930 }
931 let values = store.read_multi_values_bytes(keys).await?;
932 let mut returned_values = Vec::new();
933 for (index, value) in indices.into_iter().zip(values) {
934 let event = value.unwrap();
935 returned_values.push(IndexAndEvent { index, event });
936 }
937 Ok(returned_values)
938 }
939
940 #[instrument(skip_all)]
941 async fn write_events(
942 &self,
943 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
944 ) -> Result<(), ViewError> {
945 let mut batch = MultiPartitionBatch::new();
946 for (event_id, value) in events {
947 batch.add_event(event_id, value)?;
948 }
949 self.write_batch(batch).await
950 }
951
952 #[instrument(skip_all)]
953 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
954 let root_key = RootKey::NetworkDescription.bytes();
955 let store = self.database.open_shared(&root_key)?;
956 let maybe_value = store.read_value(DEFAULT_KEY).await?;
957 #[cfg(with_metrics)]
958 metrics::READ_NETWORK_DESCRIPTION
959 .with_label_values(&[])
960 .inc();
961 Ok(maybe_value)
962 }
963
964 #[instrument(skip_all)]
965 async fn write_network_description(
966 &self,
967 information: &NetworkDescription,
968 ) -> Result<(), ViewError> {
969 let mut batch = MultiPartitionBatch::new();
970 batch.add_network_description(information)?;
971 self.write_batch(batch).await?;
972 Ok(())
973 }
974
975 fn wasm_runtime(&self) -> Option<WasmRuntime> {
976 self.wasm_runtime
977 }
978
979 #[instrument(skip_all)]
980 async fn block_exporter_context(
981 &self,
982 block_exporter_id: u32,
983 ) -> Result<Self::BlockExporterContext, ViewError> {
984 let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
985 let store = self.database.open_exclusive(&root_key)?;
986 Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
987 }
988}
989
990impl<Database, C> DbStorage<Database, C>
991where
992 Database: KeyValueDatabase + Clone + Send + Sync + 'static,
993 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
994 C: Clock,
995 Database::Error: Send + Sync,
996{
997 #[instrument(skip_all)]
998 fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
999 hashes
1000 .iter()
1001 .map(|hash| RootKey::CryptoHash(*hash).bytes())
1002 .collect()
1003 }
1004
1005 #[instrument(skip_all)]
1006 fn deserialize_certificate(
1007 pair: &[Option<Vec<u8>>],
1008 hash: CryptoHash,
1009 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
1010 let Some(cert_bytes) = pair[0].as_ref() else {
1011 return Ok(None);
1012 };
1013 let Some(value_bytes) = pair[1].as_ref() else {
1014 return Ok(None);
1015 };
1016 let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
1017 let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
1018 assert_eq!(value.hash(), hash);
1019 let certificate = cert
1020 .with_value(value)
1021 .ok_or(ViewError::InconsistentEntries)?;
1022 Ok(Some(certificate))
1023 }
1024
1025 #[instrument(skip_all)]
1026 async fn write_entry(
1027 store: &Database::Store,
1028 key: Vec<u8>,
1029 bytes: Vec<u8>,
1030 ) -> Result<(), ViewError> {
1031 let mut batch = Batch::new();
1032 batch.put_key_value_bytes(key, bytes);
1033 store.write_batch(batch).await?;
1034 Ok(())
1035 }
1036
1037 #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1038 async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1039 if batch.keys_value_bytes.is_empty() {
1040 return Ok(());
1041 }
1042 let mut futures = Vec::new();
1043 for (root_key, key, bytes) in batch.keys_value_bytes {
1044 let store = self.database.open_shared(&root_key)?;
1045 futures.push(async move { Self::write_entry(&store, key, bytes).await });
1046 }
1047 futures::future::try_join_all(futures).await?;
1048 Ok(())
1049 }
1050}
1051
1052impl<Database, C> DbStorage<Database, C> {
1053 fn new(database: Database, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
1054 Self {
1055 database: Arc::new(database),
1056 clock,
1057 wasm_runtime,
1058 user_contracts: Arc::new(papaya::HashMap::new()),
1059 user_services: Arc::new(papaya::HashMap::new()),
1060 execution_runtime_config: ExecutionRuntimeConfig::default(),
1061 }
1062 }
1063}
1064
1065impl<Database> DbStorage<Database, WallClock>
1066where
1067 Database: KeyValueDatabase + Clone + Send + Sync + 'static,
1068 Database::Error: Send + Sync,
1069 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1070{
1071 pub async fn maybe_create_and_connect(
1072 config: &Database::Config,
1073 namespace: &str,
1074 wasm_runtime: Option<WasmRuntime>,
1075 ) -> Result<Self, Database::Error> {
1076 let database = Database::maybe_create_and_connect(config, namespace).await?;
1077 Ok(Self::new(database, wasm_runtime, WallClock))
1078 }
1079
1080 pub async fn connect(
1081 config: &Database::Config,
1082 namespace: &str,
1083 wasm_runtime: Option<WasmRuntime>,
1084 ) -> Result<Self, Database::Error> {
1085 let database = Database::connect(config, namespace).await?;
1086 Ok(Self::new(database, wasm_runtime, WallClock))
1087 }
1088
1089 pub async fn list_blob_ids(
1091 config: &Database::Config,
1092 namespace: &str,
1093 ) -> Result<Vec<BlobId>, ViewError> {
1094 let database = Database::connect(config, namespace).await?;
1095 let root_keys = database.list_root_keys().await?;
1096 let mut blob_ids = Vec::new();
1097 for root_key in root_keys {
1098 if root_key.len() == 1 + BLOB_ID_LENGTH && root_key[0] == BLOB_ID_TAG {
1099 let root_key_red = &root_key[1..=BLOB_ID_LENGTH];
1100 let blob_id = bcs::from_bytes(root_key_red)?;
1101 blob_ids.push(blob_id);
1102 }
1103 }
1104 Ok(blob_ids)
1105 }
1106}
1107
1108impl<Database> DbStorage<Database, WallClock>
1109where
1110 Database: KeyValueDatabase + Clone + Send + Sync + 'static,
1111 Database::Error: Send + Sync,
1112{
1113 pub async fn list_chain_ids(
1115 config: &Database::Config,
1116 namespace: &str,
1117 ) -> Result<Vec<ChainId>, ViewError> {
1118 let database = Database::connect(config, namespace).await?;
1119 let root_keys = database.list_root_keys().await?;
1120 let mut chain_ids = Vec::new();
1121 for root_key in root_keys {
1122 if root_key.len() == 1 + CHAIN_ID_LENGTH && root_key[0] == CHAIN_ID_TAG {
1123 let root_key_red = &root_key[1..=CHAIN_ID_LENGTH];
1124 let chain_id = bcs::from_bytes(root_key_red)?;
1125 chain_ids.push(chain_id);
1126 }
1127 }
1128 Ok(chain_ids)
1129 }
1130}
1131
1132#[cfg(with_testing)]
1133impl<Database> DbStorage<Database, TestClock>
1134where
1135 Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1136 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1137 Database::Error: Send + Sync,
1138{
1139 pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1140 let config = Database::new_test_config().await.unwrap();
1141 let namespace = generate_test_namespace();
1142 DbStorage::<Database, TestClock>::new_for_testing(
1143 config,
1144 &namespace,
1145 wasm_runtime,
1146 TestClock::new(),
1147 )
1148 .await
1149 .unwrap()
1150 }
1151
1152 pub async fn new_for_testing(
1153 config: Database::Config,
1154 namespace: &str,
1155 wasm_runtime: Option<WasmRuntime>,
1156 clock: TestClock,
1157 ) -> Result<Self, Database::Error> {
1158 let database = Database::recreate_and_connect(&config, namespace).await?;
1159 Ok(Self::new(database, wasm_runtime, clock))
1160 }
1161}