1use std::{fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7use dashmap::DashMap;
8#[cfg(with_metrics)]
9use linera_base::prometheus_util::MeasureLatency as _;
10use linera_base::{
11 crypto::CryptoHash,
12 data_types::{Blob, NetworkDescription, TimeDelta, Timestamp},
13 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
14};
15use linera_chain::{
16 types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
17 ChainStateView,
18};
19use linera_execution::{
20 BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
21};
22use linera_views::{
23 backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
24 context::ViewContext,
25 store::{AdminKeyValueStore, KeyValueStore},
26 views::View,
27 ViewError,
28};
29use serde::{Deserialize, Serialize};
30#[cfg(with_testing)]
31use {
32 futures::channel::oneshot::{self, Receiver},
33 linera_views::{random::generate_test_namespace, store::TestKeyValueStore},
34 std::{cmp::Reverse, collections::BTreeMap},
35};
36
37use crate::{ChainRuntimeContext, Clock, Storage};
38
39#[cfg(with_metrics)]
40pub mod metrics {
41 use std::sync::LazyLock;
42
43 use linera_base::prometheus_util::{
44 exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
45 };
46 use prometheus::{HistogramVec, IntCounterVec};
47
48 pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
50 register_int_counter_vec(
51 "contains_blob",
52 "The metric counting how often a blob is tested for existence from storage",
53 &[],
54 )
55 });
56
57 pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
59 register_int_counter_vec(
60 "contains_blobs",
61 "The metric counting how often multiple blobs are tested for existence from storage",
62 &[],
63 )
64 });
65
66 pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
68 register_int_counter_vec(
69 "contains_blob_state",
70 "The metric counting how often a blob state is tested for existence from storage",
71 &[],
72 )
73 });
74
75 pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
77 register_int_counter_vec(
78 "contains_certificate",
79 "The metric counting how often a certificate is tested for existence from storage",
80 &[],
81 )
82 });
83
84 #[doc(hidden)]
86 pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
87 register_int_counter_vec(
88 "read_confirmed_block",
89 "The metric counting how often a hashed confirmed block is read from storage",
90 &[],
91 )
92 });
93
94 #[doc(hidden)]
96 pub static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
97 register_int_counter_vec(
98 "read_blob",
99 "The metric counting how often a blob is read from storage",
100 &[],
101 )
102 });
103
104 #[doc(hidden)]
106 pub static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
107 register_int_counter_vec(
108 "read_blob_state",
109 "The metric counting how often a blob state is read from storage",
110 &[],
111 )
112 });
113
114 #[doc(hidden)]
116 pub static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
117 register_int_counter_vec(
118 "read_blob_states",
119 "The metric counting how often blob states are read from storage",
120 &[],
121 )
122 });
123
124 #[doc(hidden)]
126 pub static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
127 register_int_counter_vec(
128 "write_blob",
129 "The metric counting how often a blob is written to storage",
130 &[],
131 )
132 });
133
134 #[doc(hidden)]
136 pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
137 register_int_counter_vec(
138 "read_certificate",
139 "The metric counting how often a certificate is read from storage",
140 &[],
141 )
142 });
143
144 #[doc(hidden)]
146 pub static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
147 register_int_counter_vec(
148 "read_certificates",
149 "The metric counting how often certificate are read from storage",
150 &[],
151 )
152 });
153
154 #[doc(hidden)]
156 pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
157 register_int_counter_vec(
158 "write_certificate",
159 "The metric counting how often a certificate is written to storage",
160 &[],
161 )
162 });
163
164 #[doc(hidden)]
166 pub static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
167 register_histogram_vec(
168 "load_chain_latency",
169 "The latency to load a chain state",
170 &[],
171 exponential_bucket_latencies(10.0),
172 )
173 });
174
175 #[doc(hidden)]
177 pub static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
178 register_int_counter_vec(
179 "read_event",
180 "The metric counting how often an event is read from storage",
181 &[],
182 )
183 });
184
185 pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
187 register_int_counter_vec(
188 "contains_event",
189 "The metric counting how often an event is tested for existence from storage",
190 &[],
191 )
192 });
193
194 #[doc(hidden)]
196 pub static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
197 register_int_counter_vec(
198 "write_event",
199 "The metric counting how often an event is written to storage",
200 &[],
201 )
202 });
203
204 #[doc(hidden)]
206 pub static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
207 register_int_counter_vec(
208 "network_description",
209 "The metric counting how often the network description is read from storage",
210 &[],
211 )
212 });
213
214 #[doc(hidden)]
216 pub static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
217 register_int_counter_vec(
218 "write_network_description",
219 "The metric counting how often the network description is written to storage",
220 &[],
221 )
222 });
223}
224
225#[derive(Default)]
226struct Batch {
227 key_value_bytes: Vec<(Vec<u8>, Vec<u8>)>,
228}
229
230impl Batch {
231 fn new() -> Self {
232 Self::default()
233 }
234
235 fn put_key_value_bytes(&mut self, key: Vec<u8>, value: Vec<u8>) {
236 self.key_value_bytes.push((key, value));
237 }
238
239 fn put_key_value<T: Serialize>(&mut self, key: Vec<u8>, value: &T) -> Result<(), ViewError> {
240 let bytes = bcs::to_bytes(value)?;
241 self.key_value_bytes.push((key, bytes));
242 Ok(())
243 }
244
245 fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
246 #[cfg(with_metrics)]
247 metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
248 let blob_key = bcs::to_bytes(&BaseKey::Blob(blob.id()))?;
249 self.put_key_value_bytes(blob_key.to_vec(), blob.bytes().to_vec());
250 Ok(())
251 }
252
253 fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
254 let blob_state_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
255 self.put_key_value(blob_state_key.to_vec(), blob_state)?;
256 Ok(())
257 }
258
259 fn add_certificate(
260 &mut self,
261 certificate: &ConfirmedBlockCertificate,
262 ) -> Result<(), ViewError> {
263 #[cfg(with_metrics)]
264 metrics::WRITE_CERTIFICATE_COUNTER
265 .with_label_values(&[])
266 .inc();
267 let hash = certificate.hash();
268 let cert_key = bcs::to_bytes(&BaseKey::Certificate(hash))?;
269 let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
270 self.put_key_value(cert_key.to_vec(), &certificate.lite_certificate())?;
271 self.put_key_value(block_key.to_vec(), certificate.value())?;
272 Ok(())
273 }
274
275 fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
276 #[cfg(with_metrics)]
277 metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
278 let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
279 self.put_key_value_bytes(event_key.to_vec(), value);
280 Ok(())
281 }
282
283 fn add_network_description(
284 &mut self,
285 information: &NetworkDescription,
286 ) -> Result<(), ViewError> {
287 #[cfg(with_metrics)]
288 metrics::WRITE_NETWORK_DESCRIPTION
289 .with_label_values(&[])
290 .inc();
291 let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
292 self.put_key_value(key, information)?;
293 Ok(())
294 }
295}
296
297#[derive(Clone)]
299pub struct DbStorage<Store, Clock = WallClock> {
300 store: Arc<Store>,
301 clock: Clock,
302 wasm_runtime: Option<WasmRuntime>,
303 user_contracts: Arc<DashMap<ApplicationId, UserContractCode>>,
304 user_services: Arc<DashMap<ApplicationId, UserServiceCode>>,
305 execution_runtime_config: ExecutionRuntimeConfig,
306}
307
308#[derive(Debug, Serialize, Deserialize)]
309enum BaseKey {
310 ChainState(ChainId),
311 Certificate(CryptoHash),
312 ConfirmedBlock(CryptoHash),
313 Blob(BlobId),
314 BlobState(BlobId),
315 Event(EventId),
316 BlockExporterState(u32),
317 NetworkDescription,
318}
319
320const INDEX_CHAIN_ID: u8 = 0;
321const INDEX_BLOB_ID: u8 = 3;
322const INDEX_EVENT_ID: u8 = 5;
323const CHAIN_ID_LENGTH: usize = std::mem::size_of::<ChainId>();
324const BLOB_ID_LENGTH: usize = std::mem::size_of::<BlobId>();
325
326#[cfg(test)]
327mod tests {
328 use linera_base::{
329 crypto::CryptoHash,
330 identifiers::{
331 ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
332 StreamName,
333 },
334 };
335
336 use crate::db_storage::{
337 BaseKey, BLOB_ID_LENGTH, CHAIN_ID_LENGTH, INDEX_BLOB_ID, INDEX_CHAIN_ID, INDEX_EVENT_ID,
338 };
339
340 #[test]
347 fn test_basekey_blob_serialization() {
348 let hash = CryptoHash::default();
349 let blob_type = BlobType::default();
350 let blob_id = BlobId::new(hash, blob_type);
351 let base_key = BaseKey::Blob(blob_id);
352 let key = bcs::to_bytes(&base_key).expect("a key");
353 assert_eq!(key[0], INDEX_BLOB_ID);
354 assert_eq!(key.len(), 1 + BLOB_ID_LENGTH);
355 }
356
357 #[test]
360 fn test_basekey_chainstate_serialization() {
361 let hash = CryptoHash::default();
362 let chain_id = ChainId(hash);
363 let base_key = BaseKey::ChainState(chain_id);
364 let key = bcs::to_bytes(&base_key).expect("a key");
365 assert_eq!(key[0], INDEX_CHAIN_ID);
366 assert_eq!(key.len(), 1 + CHAIN_ID_LENGTH);
367 }
368
369 #[test]
372 fn test_basekey_event_serialization() {
373 let hash = CryptoHash::test_hash("49");
374 let chain_id = ChainId(hash);
375 let application_description_hash = CryptoHash::test_hash("42");
376 let application_id = ApplicationId::new(application_description_hash);
377 let application_id = GenericApplicationId::User(application_id);
378 let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
379 let stream_id = StreamId {
380 application_id,
381 stream_name,
382 };
383 let mut prefix = vec![INDEX_EVENT_ID];
384 prefix.extend(bcs::to_bytes(&chain_id).unwrap());
385 prefix.extend(bcs::to_bytes(&stream_id).unwrap());
386
387 let index = 1567;
388 let event_id = EventId {
389 chain_id,
390 stream_id,
391 index,
392 };
393 let base_key = BaseKey::Event(event_id);
394 let key = bcs::to_bytes(&base_key).unwrap();
395 assert!(key.starts_with(&prefix));
396 }
397}
398
399#[derive(Clone, Copy)]
402pub struct ChainStatesFirstAssignment;
403
404impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
405 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
406 if root_key.is_empty() {
407 return Ok(StoreInUse::Second);
408 }
409 let store = match bcs::from_bytes(root_key)? {
410 BaseKey::ChainState(_) => StoreInUse::First,
411 _ => StoreInUse::Second,
412 };
413 Ok(store)
414 }
415}
416
417#[derive(Clone)]
419pub struct WallClock;
420
421#[cfg_attr(not(web), async_trait)]
422#[cfg_attr(web, async_trait(?Send))]
423impl Clock for WallClock {
424 fn current_time(&self) -> Timestamp {
425 Timestamp::now()
426 }
427
428 async fn sleep(&self, delta: TimeDelta) {
429 linera_base::time::timer::sleep(delta.as_duration()).await
430 }
431
432 async fn sleep_until(&self, timestamp: Timestamp) {
433 let delta = timestamp.delta_since(Timestamp::now());
434 if delta > TimeDelta::ZERO {
435 self.sleep(delta).await
436 }
437 }
438}
439
440#[cfg(with_testing)]
441#[derive(Default)]
442struct TestClockInner {
443 time: Timestamp,
444 sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
445}
446
447#[cfg(with_testing)]
448impl TestClockInner {
449 fn set(&mut self, time: Timestamp) {
450 self.time = time;
451 let senders = self.sleeps.split_off(&Reverse(time));
452 for sender in senders.into_values().flatten() {
453 let _ = sender.send(());
454 }
455 }
456
457 fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
458 self.add_sleep_until(self.time.saturating_add(delta))
459 }
460
461 fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
462 let (sender, receiver) = oneshot::channel();
463 if self.time >= time {
464 let _ = sender.send(());
465 } else {
466 self.sleeps.entry(Reverse(time)).or_default().push(sender);
467 }
468 receiver
469 }
470}
471
472#[cfg(with_testing)]
475#[derive(Clone, Default)]
476pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
477
478#[cfg(with_testing)]
479#[cfg_attr(not(web), async_trait)]
480#[cfg_attr(web, async_trait(?Send))]
481impl Clock for TestClock {
482 fn current_time(&self) -> Timestamp {
483 self.lock().time
484 }
485
486 async fn sleep(&self, delta: TimeDelta) {
487 if delta == TimeDelta::ZERO {
488 return;
489 }
490 let receiver = self.lock().add_sleep(delta);
491 let _ = receiver.await;
492 }
493
494 async fn sleep_until(&self, timestamp: Timestamp) {
495 let receiver = self.lock().add_sleep_until(timestamp);
496 let _ = receiver.await;
497 }
498}
499
500#[cfg(with_testing)]
501impl TestClock {
502 pub fn new() -> Self {
504 TestClock(Arc::default())
505 }
506
507 pub fn set(&self, time: Timestamp) {
509 self.lock().set(time);
510 }
511
512 pub fn add(&self, delta: TimeDelta) {
514 let mut guard = self.lock();
515 let time = guard.time.saturating_add(delta);
516 guard.set(time);
517 }
518
519 pub fn current_time(&self) -> Timestamp {
521 self.lock().time
522 }
523
524 fn lock(&self) -> std::sync::MutexGuard<TestClockInner> {
525 self.0.lock().expect("poisoned TestClock mutex")
526 }
527}
528
529#[cfg_attr(not(web), async_trait)]
530#[cfg_attr(web, async_trait(?Send))]
531impl<Store, C> Storage for DbStorage<Store, C>
532where
533 Store: KeyValueStore + Clone + Send + Sync + 'static,
534 C: Clock + Clone + Send + Sync + 'static,
535 Store::Error: Send + Sync,
536{
537 type Context = ViewContext<ChainRuntimeContext<Self>, Store>;
538 type Clock = C;
539 type BlockExporterContext = ViewContext<u32, Store>;
540
541 fn clock(&self) -> &C {
542 &self.clock
543 }
544
545 async fn load_chain(
546 &self,
547 chain_id: ChainId,
548 ) -> Result<ChainStateView<Self::Context>, ViewError> {
549 #[cfg(with_metrics)]
550 let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
551 let runtime_context = ChainRuntimeContext {
552 storage: self.clone(),
553 chain_id,
554 execution_runtime_config: self.execution_runtime_config,
555 user_contracts: self.user_contracts.clone(),
556 user_services: self.user_services.clone(),
557 };
558 let root_key = bcs::to_bytes(&BaseKey::ChainState(chain_id))?;
559 let store = self.store.open_exclusive(&root_key)?;
560 let context = ViewContext::create_root_context(store, runtime_context).await?;
561 ChainStateView::load(context).await
562 }
563
564 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
565 let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
566 let test = self.store.contains_key(&blob_key).await?;
567 #[cfg(with_metrics)]
568 metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
569 Ok(test)
570 }
571
572 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
573 let mut keys = Vec::new();
574 for blob_id in blob_ids {
575 let key = bcs::to_bytes(&BaseKey::Blob(*blob_id))?;
576 keys.push(key);
577 }
578 let results = self.store.contains_keys(keys).await?;
579 let mut missing_blobs = Vec::new();
580 for (blob_id, result) in blob_ids.iter().zip(results) {
581 if !result {
582 missing_blobs.push(*blob_id);
583 }
584 }
585 #[cfg(with_metrics)]
586 metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
587 Ok(missing_blobs)
588 }
589
590 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
591 let blob_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
592 let test = self.store.contains_key(&blob_key).await?;
593 #[cfg(with_metrics)]
594 metrics::CONTAINS_BLOB_STATE_COUNTER
595 .with_label_values(&[])
596 .inc();
597 Ok(test)
598 }
599
600 async fn read_confirmed_block(
601 &self,
602 hash: CryptoHash,
603 ) -> Result<Option<ConfirmedBlock>, ViewError> {
604 let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
605 let value = self.store.read_value(&block_key).await?;
606 #[cfg(with_metrics)]
607 metrics::READ_CONFIRMED_BLOCK_COUNTER
608 .with_label_values(&[])
609 .inc();
610 Ok(value)
611 }
612
613 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
614 let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
615 let maybe_blob_bytes = self.store.read_value_bytes(&blob_key).await?;
616 #[cfg(with_metrics)]
617 metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
618 Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
619 }
620
621 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
622 if blob_ids.is_empty() {
623 return Ok(Vec::new());
624 }
625 let blob_keys = blob_ids
626 .iter()
627 .map(|blob_id| bcs::to_bytes(&BaseKey::Blob(*blob_id)))
628 .collect::<Result<Vec<_>, _>>()?;
629 let maybe_blob_bytes = self.store.read_multi_values_bytes(blob_keys).await?;
630 #[cfg(with_metrics)]
631 metrics::READ_BLOB_COUNTER
632 .with_label_values(&[])
633 .inc_by(blob_ids.len() as u64);
634
635 Ok(blob_ids
636 .iter()
637 .zip(maybe_blob_bytes)
638 .map(|(blob_id, maybe_blob_bytes)| {
639 maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(*blob_id, blob_bytes))
640 })
641 .collect())
642 }
643
644 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
645 let blob_state_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
646 let blob_state = self.store.read_value::<BlobState>(&blob_state_key).await?;
647 #[cfg(with_metrics)]
648 metrics::READ_BLOB_STATE_COUNTER
649 .with_label_values(&[])
650 .inc();
651 Ok(blob_state)
652 }
653
654 async fn read_blob_states(
655 &self,
656 blob_ids: &[BlobId],
657 ) -> Result<Vec<Option<BlobState>>, ViewError> {
658 if blob_ids.is_empty() {
659 return Ok(Vec::new());
660 }
661 let blob_state_keys = blob_ids
662 .iter()
663 .map(|blob_id| bcs::to_bytes(&BaseKey::BlobState(*blob_id)))
664 .collect::<Result<_, _>>()?;
665 let blob_states = self
666 .store
667 .read_multi_values::<BlobState>(blob_state_keys)
668 .await?;
669 #[cfg(with_metrics)]
670 metrics::READ_BLOB_STATES_COUNTER
671 .with_label_values(&[])
672 .inc_by(blob_ids.len() as u64);
673 Ok(blob_states)
674 }
675
676 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
677 let mut batch = Batch::new();
678 batch.add_blob(blob)?;
679 self.write_batch(batch).await?;
680 Ok(())
681 }
682
683 async fn maybe_write_blob_states(
684 &self,
685 blob_ids: &[BlobId],
686 blob_state: BlobState,
687 ) -> Result<(), ViewError> {
688 if blob_ids.is_empty() {
689 return Ok(());
690 }
691 let blob_state_keys = blob_ids
692 .iter()
693 .map(|blob_id| bcs::to_bytes(&BaseKey::BlobState(*blob_id)))
694 .collect::<Result<_, _>>()?;
695 let maybe_blob_states = self
696 .store
697 .read_multi_values::<BlobState>(blob_state_keys)
698 .await?;
699 let mut batch = Batch::new();
700 for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
701 match maybe_blob_state {
702 None => {
703 batch.add_blob_state(*blob_id, &blob_state)?;
704 }
705 Some(state) => {
706 if state.epoch < blob_state.epoch {
707 batch.add_blob_state(*blob_id, &blob_state)?;
708 }
709 }
710 }
711 }
712 self.write_batch(batch).await?;
716 Ok(())
717 }
718
719 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
720 if blobs.is_empty() {
721 return Ok(Vec::new());
722 }
723 let blob_state_keys = blobs
724 .iter()
725 .map(|blob| bcs::to_bytes(&BaseKey::BlobState(blob.id())))
726 .collect::<Result<_, _>>()?;
727 let blob_states = self.store.contains_keys(blob_state_keys).await?;
728 let mut batch = Batch::new();
729 for (blob, has_state) in blobs.iter().zip(&blob_states) {
730 if *has_state {
731 batch.add_blob(blob)?;
732 }
733 }
734 self.write_batch(batch).await?;
735 Ok(blob_states)
736 }
737
738 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
739 if blobs.is_empty() {
740 return Ok(());
741 }
742 let mut batch = Batch::new();
743 for blob in blobs {
744 batch.add_blob(blob)?;
745 }
746 self.write_batch(batch).await
747 }
748
749 async fn write_blobs_and_certificate(
750 &self,
751 blobs: &[Blob],
752 certificate: &ConfirmedBlockCertificate,
753 ) -> Result<(), ViewError> {
754 let mut batch = Batch::new();
755 for blob in blobs {
756 batch.add_blob(blob)?;
757 }
758 batch.add_certificate(certificate)?;
759 self.write_batch(batch).await
760 }
761
762 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
763 let keys = Self::get_keys_for_certificates(&[hash])?;
764 let results = self.store.contains_keys(keys).await?;
765 #[cfg(with_metrics)]
766 metrics::CONTAINS_CERTIFICATE_COUNTER
767 .with_label_values(&[])
768 .inc();
769 Ok(results[0] && results[1])
770 }
771
772 async fn read_certificate(
773 &self,
774 hash: CryptoHash,
775 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
776 let keys = Self::get_keys_for_certificates(&[hash])?;
777 let values = self.store.read_multi_values_bytes(keys).await;
778 if values.is_ok() {
779 #[cfg(with_metrics)]
780 metrics::READ_CERTIFICATE_COUNTER
781 .with_label_values(&[])
782 .inc();
783 }
784 let values = values?;
785 Self::deserialize_certificate(&values, hash)
786 }
787
788 async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
789 &self,
790 hashes: I,
791 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
792 let hashes = hashes.into_iter().collect::<Vec<_>>();
793 if hashes.is_empty() {
794 return Ok(Vec::new());
795 }
796 let keys = Self::get_keys_for_certificates(&hashes)?;
797 let values = self.store.read_multi_values_bytes(keys).await;
798 if values.is_ok() {
799 #[cfg(with_metrics)]
800 metrics::READ_CERTIFICATES_COUNTER
801 .with_label_values(&[])
802 .inc_by(hashes.len() as u64);
803 }
804 let values = values?;
805 let mut certificates = Vec::new();
806 for (pair, hash) in values.chunks_exact(2).zip(hashes) {
807 let certificate = Self::deserialize_certificate(pair, hash)?;
808 certificates.push(certificate);
809 }
810 Ok(certificates)
811 }
812
813 async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
814 let event_key = bcs::to_bytes(&BaseKey::Event(event_id.clone()))?;
815 let event = self.store.read_value_bytes(&event_key).await?;
816 #[cfg(with_metrics)]
817 metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
818 Ok(event)
819 }
820
821 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
822 let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
823 let exists = self.store.contains_key(&event_key).await?;
824 #[cfg(with_metrics)]
825 metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
826 Ok(exists)
827 }
828
829 async fn read_events_from_index(
830 &self,
831 chain_id: &ChainId,
832 stream_id: &StreamId,
833 start_index: u32,
834 ) -> Result<Vec<IndexAndEvent>, ViewError> {
835 let mut prefix = vec![INDEX_EVENT_ID];
836 prefix.extend(bcs::to_bytes(chain_id).unwrap());
837 prefix.extend(bcs::to_bytes(stream_id).unwrap());
838 let mut keys = Vec::new();
839 let mut indices = Vec::new();
840 for short_key in self.store.find_keys_by_prefix(&prefix).await? {
841 let index = bcs::from_bytes::<u32>(&short_key)?;
842 if index >= start_index {
843 let mut key = prefix.clone();
844 key.extend(short_key);
845 keys.push(key);
846 indices.push(index);
847 }
848 }
849 let values = self.store.read_multi_values_bytes(keys).await?;
850 let mut returned_values = Vec::new();
851 for (index, value) in indices.into_iter().zip(values) {
852 let event = value.unwrap();
853 returned_values.push(IndexAndEvent { index, event });
854 }
855 Ok(returned_values)
856 }
857
858 async fn write_events(
859 &self,
860 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
861 ) -> Result<(), ViewError> {
862 let mut batch = Batch::new();
863 for (event_id, value) in events {
864 batch.add_event(event_id, value)?;
865 }
866 self.write_batch(batch).await
867 }
868
869 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
870 let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
871 let maybe_value = self.store.read_value(&key).await?;
872 #[cfg(with_metrics)]
873 metrics::READ_NETWORK_DESCRIPTION
874 .with_label_values(&[])
875 .inc();
876 Ok(maybe_value)
877 }
878
879 async fn write_network_description(
880 &self,
881 information: &NetworkDescription,
882 ) -> Result<(), ViewError> {
883 let mut batch = Batch::new();
884 batch.add_network_description(information)?;
885 self.write_batch(batch).await?;
886 Ok(())
887 }
888
889 fn wasm_runtime(&self) -> Option<WasmRuntime> {
890 self.wasm_runtime
891 }
892
893 async fn block_exporter_context(
894 &self,
895 block_exporter_id: u32,
896 ) -> Result<Self::BlockExporterContext, ViewError> {
897 let root_key = bcs::to_bytes(&BaseKey::BlockExporterState(block_exporter_id))?;
898 let store = self.store.open_exclusive(&root_key)?;
899 Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
900 }
901}
902
903impl<Store, C> DbStorage<Store, C>
904where
905 Store: KeyValueStore + Clone + Send + Sync + 'static,
906 C: Clock,
907 Store::Error: Send + Sync,
908{
909 fn get_keys_for_certificates(hashes: &[CryptoHash]) -> Result<Vec<Vec<u8>>, ViewError> {
910 Ok(hashes
911 .iter()
912 .flat_map(|hash| {
913 let cert_key = bcs::to_bytes(&BaseKey::Certificate(*hash));
914 let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(*hash));
915 vec![cert_key, block_key]
916 })
917 .collect::<Result<_, _>>()?)
918 }
919
920 fn deserialize_certificate(
921 pair: &[Option<Vec<u8>>],
922 hash: CryptoHash,
923 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
924 let Some(cert_bytes) = pair[0].as_ref() else {
925 return Ok(None);
926 };
927 let Some(value_bytes) = pair[1].as_ref() else {
928 return Ok(None);
929 };
930 let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
931 let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
932 assert_eq!(value.hash(), hash);
933 let certificate = cert
934 .with_value(value)
935 .ok_or(ViewError::InconsistentEntries)?;
936 Ok(Some(certificate))
937 }
938
939 async fn write_entry(store: &Store, key: Vec<u8>, bytes: Vec<u8>) -> Result<(), ViewError> {
940 let mut batch = linera_views::batch::Batch::new();
941 batch.put_key_value_bytes(key, bytes);
942 store.write_batch(batch).await?;
943 Ok(())
944 }
945
946 async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> {
947 if batch.key_value_bytes.is_empty() {
948 return Ok(());
949 }
950 let mut futures = Vec::new();
951 for (key, bytes) in batch.key_value_bytes.into_iter() {
952 let store = self.store.clone();
953 futures.push(async move { Self::write_entry(&store, key, bytes).await });
954 }
955 futures::future::try_join_all(futures).await?;
956 Ok(())
957 }
958
959 fn new(store: Store, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
960 Self {
961 store: Arc::new(store),
962 clock,
963 wasm_runtime,
964 user_contracts: Arc::new(DashMap::new()),
965 user_services: Arc::new(DashMap::new()),
966 execution_runtime_config: ExecutionRuntimeConfig::default(),
967 }
968 }
969}
970
971impl<Store> DbStorage<Store, WallClock>
972where
973 Store: KeyValueStore + Clone + Send + Sync + 'static,
974 Store::Error: Send + Sync,
975{
976 pub async fn maybe_create_and_connect(
977 config: &Store::Config,
978 namespace: &str,
979 wasm_runtime: Option<WasmRuntime>,
980 ) -> Result<Self, Store::Error> {
981 let store = Store::maybe_create_and_connect(config, namespace).await?;
982 Ok(Self::new(store, wasm_runtime, WallClock))
983 }
984
985 pub async fn connect(
986 config: &Store::Config,
987 namespace: &str,
988 wasm_runtime: Option<WasmRuntime>,
989 ) -> Result<Self, Store::Error> {
990 let store = Store::connect(config, namespace).await?;
991 Ok(Self::new(store, wasm_runtime, WallClock))
992 }
993
994 pub async fn list_blob_ids(
996 config: &Store::Config,
997 namespace: &str,
998 ) -> Result<Vec<BlobId>, ViewError> {
999 let store = Store::maybe_create_and_connect(config, namespace).await?;
1000 let prefix = &[INDEX_BLOB_ID];
1001 let keys = store.find_keys_by_prefix(prefix).await?;
1002 let mut blob_ids = Vec::new();
1003 for key in keys {
1004 let key_red = &key[..BLOB_ID_LENGTH];
1005 let blob_id = bcs::from_bytes(key_red)?;
1006 blob_ids.push(blob_id);
1007 }
1008 Ok(blob_ids)
1009 }
1010}
1011
1012impl<Store> DbStorage<Store, WallClock>
1013where
1014 Store: AdminKeyValueStore + Clone + Send + Sync + 'static,
1015 Store::Error: Send + Sync,
1016{
1017 pub async fn list_chain_ids(
1019 config: &Store::Config,
1020 namespace: &str,
1021 ) -> Result<Vec<ChainId>, ViewError> {
1022 let root_keys = Store::list_root_keys(config, namespace).await?;
1023 let mut chain_ids = Vec::new();
1024 for root_key in root_keys {
1025 if root_key.len() == 1 + CHAIN_ID_LENGTH && root_key[0] == INDEX_CHAIN_ID {
1026 let root_key_red = &root_key[1..1 + CHAIN_ID_LENGTH];
1027 let chain_id = bcs::from_bytes(root_key_red)?;
1028 chain_ids.push(chain_id);
1029 }
1030 }
1031 Ok(chain_ids)
1032 }
1033}
1034
1035#[cfg(with_testing)]
1036impl<Store> DbStorage<Store, TestClock>
1037where
1038 Store: TestKeyValueStore + Clone + Send + Sync + 'static,
1039 Store::Error: Send + Sync,
1040{
1041 pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1042 let config = Store::new_test_config().await.unwrap();
1043 let namespace = generate_test_namespace();
1044 DbStorage::<Store, TestClock>::new_for_testing(
1045 config,
1046 &namespace,
1047 wasm_runtime,
1048 TestClock::new(),
1049 )
1050 .await
1051 .unwrap()
1052 }
1053
1054 pub async fn new_for_testing(
1055 config: Store::Config,
1056 namespace: &str,
1057 wasm_runtime: Option<WasmRuntime>,
1058 clock: TestClock,
1059 ) -> Result<Self, Store::Error> {
1060 let store = Store::recreate_and_connect(&config, namespace).await?;
1061 Ok(Self::new(store, wasm_runtime, clock))
1062 }
1063}