1use std::{fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7use dashmap::DashMap;
8#[cfg(with_metrics)]
9use linera_base::prometheus_util::MeasureLatency as _;
10use linera_base::{
11 crypto::CryptoHash,
12 data_types::{Blob, NetworkDescription, TimeDelta, Timestamp},
13 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
14};
15use linera_chain::{
16 types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
17 ChainStateView,
18};
19use linera_execution::{
20 BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
21};
22use linera_views::{
23 backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
24 context::ViewContext,
25 store::{AdminKeyValueStore, KeyIterable as _, KeyValueStore},
26 views::View,
27 ViewError,
28};
29use serde::{Deserialize, Serialize};
30#[cfg(with_testing)]
31use {
32 futures::channel::oneshot::{self, Receiver},
33 linera_views::{random::generate_test_namespace, store::TestKeyValueStore},
34 std::{cmp::Reverse, collections::BTreeMap},
35};
36
37use crate::{ChainRuntimeContext, Clock, Storage};
38
39#[cfg(with_metrics)]
40pub mod metrics {
41 use std::sync::LazyLock;
42
43 use linera_base::prometheus_util::{
44 exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
45 };
46 use prometheus::{HistogramVec, IntCounterVec};
47
48 pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
50 register_int_counter_vec(
51 "contains_blob",
52 "The metric counting how often a blob is tested for existence from storage",
53 &[],
54 )
55 });
56
57 pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
59 register_int_counter_vec(
60 "contains_blobs",
61 "The metric counting how often multiple blobs are tested for existence from storage",
62 &[],
63 )
64 });
65
66 pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
68 register_int_counter_vec(
69 "contains_blob_state",
70 "The metric counting how often a blob state is tested for existence from storage",
71 &[],
72 )
73 });
74
75 pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
77 register_int_counter_vec(
78 "contains_certificate",
79 "The metric counting how often a certificate is tested for existence from storage",
80 &[],
81 )
82 });
83
84 #[doc(hidden)]
86 pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
87 register_int_counter_vec(
88 "read_confirmed_block",
89 "The metric counting how often a hashed confirmed block is read from storage",
90 &[],
91 )
92 });
93
94 #[doc(hidden)]
96 pub static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
97 register_int_counter_vec(
98 "read_blob",
99 "The metric counting how often a blob is read from storage",
100 &[],
101 )
102 });
103
104 #[doc(hidden)]
106 pub static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
107 register_int_counter_vec(
108 "read_blob_state",
109 "The metric counting how often a blob state is read from storage",
110 &[],
111 )
112 });
113
114 #[doc(hidden)]
116 pub static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
117 register_int_counter_vec(
118 "read_blob_states",
119 "The metric counting how often blob states are read from storage",
120 &[],
121 )
122 });
123
124 #[doc(hidden)]
126 pub static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
127 register_int_counter_vec(
128 "write_blob",
129 "The metric counting how often a blob is written to storage",
130 &[],
131 )
132 });
133
134 #[doc(hidden)]
136 pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
137 register_int_counter_vec(
138 "read_certificate",
139 "The metric counting how often a certificate is read from storage",
140 &[],
141 )
142 });
143
144 #[doc(hidden)]
146 pub static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
147 register_int_counter_vec(
148 "read_certificates",
149 "The metric counting how often certificate are read from storage",
150 &[],
151 )
152 });
153
154 #[doc(hidden)]
156 pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
157 register_int_counter_vec(
158 "write_certificate",
159 "The metric counting how often a certificate is written to storage",
160 &[],
161 )
162 });
163
164 #[doc(hidden)]
166 pub static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
167 register_histogram_vec(
168 "load_chain_latency",
169 "The latency to load a chain state",
170 &[],
171 exponential_bucket_latencies(10.0),
172 )
173 });
174
175 #[doc(hidden)]
177 pub static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
178 register_int_counter_vec(
179 "read_event",
180 "The metric counting how often an event is read from storage",
181 &[],
182 )
183 });
184
185 pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
187 register_int_counter_vec(
188 "contains_event",
189 "The metric counting how often an event is tested for existence from storage",
190 &[],
191 )
192 });
193
194 #[doc(hidden)]
196 pub static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
197 register_int_counter_vec(
198 "write_event",
199 "The metric counting how often an event is written to storage",
200 &[],
201 )
202 });
203
204 #[doc(hidden)]
206 pub static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
207 register_int_counter_vec(
208 "network_description",
209 "The metric counting how often the network description is read from storage",
210 &[],
211 )
212 });
213
214 #[doc(hidden)]
216 pub static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
217 register_int_counter_vec(
218 "write_network_description",
219 "The metric counting how often the network description is written to storage",
220 &[],
221 )
222 });
223}
224
225#[derive(Default)]
226struct Batch {
227 key_value_bytes: Vec<(Vec<u8>, Vec<u8>)>,
228}
229
230impl Batch {
231 fn new() -> Self {
232 Self::default()
233 }
234
235 fn put_key_value_bytes(&mut self, key: Vec<u8>, value: Vec<u8>) {
236 self.key_value_bytes.push((key, value));
237 }
238
239 fn put_key_value<T: Serialize>(&mut self, key: Vec<u8>, value: &T) -> Result<(), ViewError> {
240 let bytes = bcs::to_bytes(value)?;
241 self.key_value_bytes.push((key, bytes));
242 Ok(())
243 }
244
245 fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
246 #[cfg(with_metrics)]
247 metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
248 let blob_key = bcs::to_bytes(&BaseKey::Blob(blob.id()))?;
249 self.put_key_value_bytes(blob_key.to_vec(), blob.bytes().to_vec());
250 Ok(())
251 }
252
253 fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
254 let blob_state_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
255 self.put_key_value(blob_state_key.to_vec(), blob_state)?;
256 Ok(())
257 }
258
259 fn add_certificate(
260 &mut self,
261 certificate: &ConfirmedBlockCertificate,
262 ) -> Result<(), ViewError> {
263 #[cfg(with_metrics)]
264 metrics::WRITE_CERTIFICATE_COUNTER
265 .with_label_values(&[])
266 .inc();
267 let hash = certificate.hash();
268 let cert_key = bcs::to_bytes(&BaseKey::Certificate(hash))?;
269 let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
270 self.put_key_value(cert_key.to_vec(), &certificate.lite_certificate())?;
271 self.put_key_value(block_key.to_vec(), certificate.value())?;
272 Ok(())
273 }
274
275 fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
276 #[cfg(with_metrics)]
277 metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
278 let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
279 self.put_key_value_bytes(event_key.to_vec(), value);
280 Ok(())
281 }
282
283 fn add_network_description(
284 &mut self,
285 information: &NetworkDescription,
286 ) -> Result<(), ViewError> {
287 #[cfg(with_metrics)]
288 metrics::WRITE_NETWORK_DESCRIPTION
289 .with_label_values(&[])
290 .inc();
291 let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
292 self.put_key_value(key, information)?;
293 Ok(())
294 }
295}
296
297#[derive(Clone)]
299pub struct DbStorage<Store, Clock = WallClock> {
300 store: Arc<Store>,
301 clock: Clock,
302 wasm_runtime: Option<WasmRuntime>,
303 user_contracts: Arc<DashMap<ApplicationId, UserContractCode>>,
304 user_services: Arc<DashMap<ApplicationId, UserServiceCode>>,
305 execution_runtime_config: ExecutionRuntimeConfig,
306}
307
308#[derive(Debug, Serialize, Deserialize)]
309enum BaseKey {
310 ChainState(ChainId),
311 Certificate(CryptoHash),
312 ConfirmedBlock(CryptoHash),
313 Blob(BlobId),
314 BlobState(BlobId),
315 Event(EventId),
316 BlockExporterState(u32),
317 NetworkDescription,
318}
319
320const INDEX_CHAIN_ID: u8 = 0;
321const INDEX_BLOB_ID: u8 = 3;
322const INDEX_EVENT_ID: u8 = 5;
323const CHAIN_ID_LENGTH: usize = std::mem::size_of::<ChainId>();
324const BLOB_ID_LENGTH: usize = std::mem::size_of::<BlobId>();
325
326#[cfg(test)]
327mod tests {
328 use linera_base::{
329 crypto::CryptoHash,
330 identifiers::{
331 ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
332 StreamName,
333 },
334 };
335
336 use crate::db_storage::{
337 BaseKey, BLOB_ID_LENGTH, CHAIN_ID_LENGTH, INDEX_BLOB_ID, INDEX_CHAIN_ID, INDEX_EVENT_ID,
338 };
339
340 #[test]
347 fn test_basekey_blob_serialization() {
348 let hash = CryptoHash::default();
349 let blob_type = BlobType::default();
350 let blob_id = BlobId::new(hash, blob_type);
351 let base_key = BaseKey::Blob(blob_id);
352 let key = bcs::to_bytes(&base_key).expect("a key");
353 assert_eq!(key[0], INDEX_BLOB_ID);
354 assert_eq!(key.len(), 1 + BLOB_ID_LENGTH);
355 }
356
357 #[test]
360 fn test_basekey_chainstate_serialization() {
361 let hash = CryptoHash::default();
362 let chain_id = ChainId(hash);
363 let base_key = BaseKey::ChainState(chain_id);
364 let key = bcs::to_bytes(&base_key).expect("a key");
365 assert_eq!(key[0], INDEX_CHAIN_ID);
366 assert_eq!(key.len(), 1 + CHAIN_ID_LENGTH);
367 }
368
369 #[test]
372 fn test_basekey_event_serialization() {
373 let hash = CryptoHash::test_hash("49");
374 let chain_id = ChainId(hash);
375 let application_description_hash = CryptoHash::test_hash("42");
376 let application_id = ApplicationId::new(application_description_hash);
377 let application_id = GenericApplicationId::User(application_id);
378 let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
379 let stream_id = StreamId {
380 application_id,
381 stream_name,
382 };
383 let mut prefix = vec![INDEX_EVENT_ID];
384 prefix.extend(bcs::to_bytes(&chain_id).unwrap());
385 prefix.extend(bcs::to_bytes(&stream_id).unwrap());
386
387 let index = 1567;
388 let event_id = EventId {
389 chain_id,
390 stream_id,
391 index,
392 };
393 let base_key = BaseKey::Event(event_id);
394 let key = bcs::to_bytes(&base_key).unwrap();
395 assert!(key.starts_with(&prefix));
396 }
397}
398
399#[derive(Clone, Copy)]
402pub struct ChainStatesFirstAssignment;
403
404impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
405 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
406 if root_key.is_empty() {
407 return Ok(StoreInUse::Second);
408 }
409 let store = match bcs::from_bytes(root_key)? {
410 BaseKey::ChainState(_) => StoreInUse::First,
411 _ => StoreInUse::Second,
412 };
413 Ok(store)
414 }
415}
416
417#[derive(Clone)]
419pub struct WallClock;
420
421#[cfg_attr(not(web), async_trait)]
422#[cfg_attr(web, async_trait(?Send))]
423impl Clock for WallClock {
424 fn current_time(&self) -> Timestamp {
425 Timestamp::now()
426 }
427
428 async fn sleep(&self, delta: TimeDelta) {
429 linera_base::time::timer::sleep(delta.as_duration()).await
430 }
431
432 async fn sleep_until(&self, timestamp: Timestamp) {
433 let delta = timestamp.delta_since(Timestamp::now());
434 if delta > TimeDelta::ZERO {
435 self.sleep(delta).await
436 }
437 }
438}
439
440#[cfg(with_testing)]
441#[derive(Default)]
442struct TestClockInner {
443 time: Timestamp,
444 sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
445}
446
447#[cfg(with_testing)]
448impl TestClockInner {
449 fn set(&mut self, time: Timestamp) {
450 self.time = time;
451 let senders = self.sleeps.split_off(&Reverse(time));
452 for sender in senders.into_values().flatten() {
453 let _ = sender.send(());
454 }
455 }
456
457 fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
458 self.add_sleep_until(self.time.saturating_add(delta))
459 }
460
461 fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
462 let (sender, receiver) = oneshot::channel();
463 if self.time >= time {
464 let _ = sender.send(());
465 } else {
466 self.sleeps.entry(Reverse(time)).or_default().push(sender);
467 }
468 receiver
469 }
470}
471
472#[cfg(with_testing)]
475#[derive(Clone, Default)]
476pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
477
478#[cfg(with_testing)]
479#[cfg_attr(not(web), async_trait)]
480#[cfg_attr(web, async_trait(?Send))]
481impl Clock for TestClock {
482 fn current_time(&self) -> Timestamp {
483 self.lock().time
484 }
485
486 async fn sleep(&self, delta: TimeDelta) {
487 if delta == TimeDelta::ZERO {
488 return;
489 }
490 let receiver = self.lock().add_sleep(delta);
491 let _ = receiver.await;
492 }
493
494 async fn sleep_until(&self, timestamp: Timestamp) {
495 let receiver = self.lock().add_sleep_until(timestamp);
496 let _ = receiver.await;
497 }
498}
499
500#[cfg(with_testing)]
501impl TestClock {
502 pub fn new() -> Self {
504 TestClock(Arc::default())
505 }
506
507 pub fn set(&self, time: Timestamp) {
509 self.lock().set(time);
510 }
511
512 pub fn add(&self, delta: TimeDelta) {
514 let mut guard = self.lock();
515 let time = guard.time.saturating_add(delta);
516 guard.set(time);
517 }
518
519 pub fn current_time(&self) -> Timestamp {
521 self.lock().time
522 }
523
524 fn lock(&self) -> std::sync::MutexGuard<TestClockInner> {
525 self.0.lock().expect("poisoned TestClock mutex")
526 }
527}
528
529#[cfg_attr(not(web), async_trait)]
530#[cfg_attr(web, async_trait(?Send))]
531impl<Store, C> Storage for DbStorage<Store, C>
532where
533 Store: KeyValueStore + Clone + Send + Sync + 'static,
534 C: Clock + Clone + Send + Sync + 'static,
535 Store::Error: Send + Sync,
536{
537 type Context = ViewContext<ChainRuntimeContext<Self>, Store>;
538 type Clock = C;
539 type BlockExporterContext = ViewContext<u32, Store>;
540
541 fn clock(&self) -> &C {
542 &self.clock
543 }
544
545 async fn load_chain(
546 &self,
547 chain_id: ChainId,
548 ) -> Result<ChainStateView<Self::Context>, ViewError> {
549 #[cfg(with_metrics)]
550 let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
551 let runtime_context = ChainRuntimeContext {
552 storage: self.clone(),
553 chain_id,
554 execution_runtime_config: self.execution_runtime_config,
555 user_contracts: self.user_contracts.clone(),
556 user_services: self.user_services.clone(),
557 };
558 let root_key = bcs::to_bytes(&BaseKey::ChainState(chain_id))?;
559 let store = self.store.open_exclusive(&root_key)?;
560 let context = ViewContext::create_root_context(store, runtime_context).await?;
561 ChainStateView::load(context).await
562 }
563
564 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
565 let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
566 let test = self.store.contains_key(&blob_key).await?;
567 #[cfg(with_metrics)]
568 metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
569 Ok(test)
570 }
571
572 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
573 let mut keys = Vec::new();
574 for blob_id in blob_ids {
575 let key = bcs::to_bytes(&BaseKey::Blob(*blob_id))?;
576 keys.push(key);
577 }
578 let results = self.store.contains_keys(keys).await?;
579 let mut missing_blobs = Vec::new();
580 for (blob_id, result) in blob_ids.iter().zip(results) {
581 if !result {
582 missing_blobs.push(*blob_id);
583 }
584 }
585 #[cfg(with_metrics)]
586 metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
587 Ok(missing_blobs)
588 }
589
590 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
591 let blob_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
592 let test = self.store.contains_key(&blob_key).await?;
593 #[cfg(with_metrics)]
594 metrics::CONTAINS_BLOB_STATE_COUNTER
595 .with_label_values(&[])
596 .inc();
597 Ok(test)
598 }
599
600 async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, ViewError> {
601 let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
602 let maybe_value = self.store.read_value::<ConfirmedBlock>(&block_key).await?;
603 #[cfg(with_metrics)]
604 metrics::READ_CONFIRMED_BLOCK_COUNTER
605 .with_label_values(&[])
606 .inc();
607 let value = maybe_value.ok_or_else(|| ViewError::not_found("value for hash", hash))?;
608 Ok(value)
609 }
610
611 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
612 let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
613 let maybe_blob_bytes = self.store.read_value_bytes(&blob_key).await?;
614 #[cfg(with_metrics)]
615 metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
616 Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
617 }
618
619 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
620 if blob_ids.is_empty() {
621 return Ok(Vec::new());
622 }
623 let blob_keys = blob_ids
624 .iter()
625 .map(|blob_id| bcs::to_bytes(&BaseKey::Blob(*blob_id)))
626 .collect::<Result<Vec<_>, _>>()?;
627 let maybe_blob_bytes = self.store.read_multi_values_bytes(blob_keys).await?;
628 #[cfg(with_metrics)]
629 metrics::READ_BLOB_COUNTER
630 .with_label_values(&[])
631 .inc_by(blob_ids.len() as u64);
632
633 Ok(blob_ids
634 .iter()
635 .zip(maybe_blob_bytes)
636 .map(|(blob_id, maybe_blob_bytes)| {
637 maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(*blob_id, blob_bytes))
638 })
639 .collect())
640 }
641
642 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
643 let blob_state_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
644 let blob_state = self.store.read_value::<BlobState>(&blob_state_key).await?;
645 #[cfg(with_metrics)]
646 metrics::READ_BLOB_STATE_COUNTER
647 .with_label_values(&[])
648 .inc();
649 Ok(blob_state)
650 }
651
652 async fn read_blob_states(
653 &self,
654 blob_ids: &[BlobId],
655 ) -> Result<Vec<Option<BlobState>>, ViewError> {
656 if blob_ids.is_empty() {
657 return Ok(Vec::new());
658 }
659 let blob_state_keys = blob_ids
660 .iter()
661 .map(|blob_id| bcs::to_bytes(&BaseKey::BlobState(*blob_id)))
662 .collect::<Result<_, _>>()?;
663 let blob_states = self
664 .store
665 .read_multi_values::<BlobState>(blob_state_keys)
666 .await?;
667 #[cfg(with_metrics)]
668 metrics::READ_BLOB_STATES_COUNTER
669 .with_label_values(&[])
670 .inc_by(blob_ids.len() as u64);
671 Ok(blob_states)
672 }
673
674 async fn read_confirmed_blocks_downward(
675 &self,
676 from: CryptoHash,
677 limit: u32,
678 ) -> Result<Vec<ConfirmedBlock>, ViewError> {
679 let mut hash = Some(from);
680 let mut values = Vec::new();
681 for _ in 0..limit {
682 let Some(next_hash) = hash else {
683 break;
684 };
685 let value = self.read_confirmed_block(next_hash).await?;
686 hash = value.block().header.previous_block_hash;
687 values.push(value);
688 }
689 Ok(values)
690 }
691
692 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
693 let mut batch = Batch::new();
694 batch.add_blob(blob)?;
695 self.write_batch(batch).await?;
696 Ok(())
697 }
698
699 async fn maybe_write_blob_states(
700 &self,
701 blob_ids: &[BlobId],
702 blob_state: BlobState,
703 ) -> Result<(), ViewError> {
704 if blob_ids.is_empty() {
705 return Ok(());
706 }
707 let blob_state_keys = blob_ids
708 .iter()
709 .map(|blob_id| bcs::to_bytes(&BaseKey::BlobState(*blob_id)))
710 .collect::<Result<_, _>>()?;
711 let maybe_blob_states = self
712 .store
713 .read_multi_values::<BlobState>(blob_state_keys)
714 .await?;
715 let mut batch = Batch::new();
716 for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
717 match maybe_blob_state {
718 None => {
719 batch.add_blob_state(*blob_id, &blob_state)?;
720 }
721 Some(state) => {
722 if state.epoch < blob_state.epoch {
723 batch.add_blob_state(*blob_id, &blob_state)?;
724 }
725 }
726 }
727 }
728 self.write_batch(batch).await?;
732 Ok(())
733 }
734
735 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
736 if blobs.is_empty() {
737 return Ok(Vec::new());
738 }
739 let blob_state_keys = blobs
740 .iter()
741 .map(|blob| bcs::to_bytes(&BaseKey::BlobState(blob.id())))
742 .collect::<Result<_, _>>()?;
743 let blob_states = self.store.contains_keys(blob_state_keys).await?;
744 let mut batch = Batch::new();
745 for (blob, has_state) in blobs.iter().zip(&blob_states) {
746 if *has_state {
747 batch.add_blob(blob)?;
748 }
749 }
750 self.write_batch(batch).await?;
751 Ok(blob_states)
752 }
753
754 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
755 if blobs.is_empty() {
756 return Ok(());
757 }
758 let mut batch = Batch::new();
759 for blob in blobs {
760 batch.add_blob(blob)?;
761 }
762 self.write_batch(batch).await
763 }
764
765 async fn write_blobs_and_certificate(
766 &self,
767 blobs: &[Blob],
768 certificate: &ConfirmedBlockCertificate,
769 ) -> Result<(), ViewError> {
770 let mut batch = Batch::new();
771 for blob in blobs {
772 batch.add_blob(blob)?;
773 }
774 batch.add_certificate(certificate)?;
775 self.write_batch(batch).await
776 }
777
778 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
779 let keys = Self::get_keys_for_certificates(&[hash])?;
780 let results = self.store.contains_keys(keys).await?;
781 #[cfg(with_metrics)]
782 metrics::CONTAINS_CERTIFICATE_COUNTER
783 .with_label_values(&[])
784 .inc();
785 Ok(results[0] && results[1])
786 }
787
788 async fn read_certificate(
789 &self,
790 hash: CryptoHash,
791 ) -> Result<ConfirmedBlockCertificate, ViewError> {
792 let keys = Self::get_keys_for_certificates(&[hash])?;
793 let values = self.store.read_multi_values_bytes(keys).await;
794 if values.is_ok() {
795 #[cfg(with_metrics)]
796 metrics::READ_CERTIFICATE_COUNTER
797 .with_label_values(&[])
798 .inc();
799 }
800 let values = values?;
801 Self::deserialize_certificate(&values, hash)
802 }
803
804 async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
805 &self,
806 hashes: I,
807 ) -> Result<Vec<ConfirmedBlockCertificate>, ViewError> {
808 let hashes = hashes.into_iter().collect::<Vec<_>>();
809 if hashes.is_empty() {
810 return Ok(Vec::new());
811 }
812 let keys = Self::get_keys_for_certificates(&hashes)?;
813 let values = self.store.read_multi_values_bytes(keys).await;
814 if values.is_ok() {
815 #[cfg(with_metrics)]
816 metrics::READ_CERTIFICATES_COUNTER
817 .with_label_values(&[])
818 .inc_by(hashes.len() as u64);
819 }
820 let values = values?;
821 let mut certificates = Vec::new();
822 for (pair, hash) in values.chunks_exact(2).zip(hashes) {
823 let certificate = Self::deserialize_certificate(pair, hash)?;
824 certificates.push(certificate);
825 }
826 Ok(certificates)
827 }
828
829 async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
830 let event_key = bcs::to_bytes(&BaseKey::Event(event_id.clone()))?;
831 let event = self.store.read_value_bytes(&event_key).await?;
832 #[cfg(with_metrics)]
833 metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
834 Ok(event)
835 }
836
837 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
838 let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
839 let exists = self.store.contains_key(&event_key).await?;
840 #[cfg(with_metrics)]
841 metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
842 Ok(exists)
843 }
844
845 async fn read_events_from_index(
846 &self,
847 chain_id: &ChainId,
848 stream_id: &StreamId,
849 start_index: u32,
850 ) -> Result<Vec<IndexAndEvent>, ViewError> {
851 let mut prefix = vec![INDEX_EVENT_ID];
852 prefix.extend(bcs::to_bytes(chain_id).unwrap());
853 prefix.extend(bcs::to_bytes(stream_id).unwrap());
854 let mut keys = Vec::new();
855 let mut indices = Vec::new();
856 for short_key in self.store.find_keys_by_prefix(&prefix).await?.iterator() {
857 let short_key = short_key?;
858 let index = bcs::from_bytes::<u32>(short_key)?;
859 if index >= start_index {
860 let mut key = prefix.clone();
861 key.extend(short_key);
862 keys.push(key);
863 indices.push(index);
864 }
865 }
866 let values = self.store.read_multi_values_bytes(keys).await?;
867 let mut returned_values = Vec::new();
868 for (index, value) in indices.into_iter().zip(values) {
869 let event = value.unwrap();
870 returned_values.push(IndexAndEvent { index, event });
871 }
872 Ok(returned_values)
873 }
874
875 async fn write_events(
876 &self,
877 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
878 ) -> Result<(), ViewError> {
879 let mut batch = Batch::new();
880 for (event_id, value) in events {
881 batch.add_event(event_id, value)?;
882 }
883 self.write_batch(batch).await
884 }
885
886 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
887 let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
888 let maybe_value = self.store.read_value(&key).await?;
889 #[cfg(with_metrics)]
890 metrics::READ_NETWORK_DESCRIPTION
891 .with_label_values(&[])
892 .inc();
893 Ok(maybe_value)
894 }
895
896 async fn write_network_description(
897 &self,
898 information: &NetworkDescription,
899 ) -> Result<(), ViewError> {
900 let mut batch = Batch::new();
901 batch.add_network_description(information)?;
902 self.write_batch(batch).await?;
903 Ok(())
904 }
905
906 fn wasm_runtime(&self) -> Option<WasmRuntime> {
907 self.wasm_runtime
908 }
909
910 async fn block_exporter_context(
911 &self,
912 block_exporter_id: u32,
913 ) -> Result<Self::BlockExporterContext, ViewError> {
914 let root_key = bcs::to_bytes(&BaseKey::BlockExporterState(block_exporter_id))?;
915 let store = self.store.open_exclusive(&root_key)?;
916 Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
917 }
918}
919
920impl<Store, C> DbStorage<Store, C>
921where
922 Store: KeyValueStore + Clone + Send + Sync + 'static,
923 C: Clock,
924 Store::Error: Send + Sync,
925{
926 fn get_keys_for_certificates(hashes: &[CryptoHash]) -> Result<Vec<Vec<u8>>, ViewError> {
927 Ok(hashes
928 .iter()
929 .flat_map(|hash| {
930 let cert_key = bcs::to_bytes(&BaseKey::Certificate(*hash));
931 let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(*hash));
932 vec![cert_key, block_key]
933 })
934 .collect::<Result<_, _>>()?)
935 }
936
937 fn deserialize_certificate(
938 pair: &[Option<Vec<u8>>],
939 hash: CryptoHash,
940 ) -> Result<ConfirmedBlockCertificate, ViewError> {
941 let cert_bytes = pair[0]
942 .as_ref()
943 .ok_or_else(|| ViewError::not_found("certificate bytes for hash", hash))?;
944 let value_bytes = pair[1]
945 .as_ref()
946 .ok_or_else(|| ViewError::not_found("value bytes for hash", hash))?;
947 let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
948 let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
949 assert_eq!(value.hash(), hash);
950 let certificate = cert
951 .with_value(value)
952 .ok_or(ViewError::InconsistentEntries)?;
953 Ok(certificate)
954 }
955
956 async fn write_entry(store: &Store, key: Vec<u8>, bytes: Vec<u8>) -> Result<(), ViewError> {
957 let mut batch = linera_views::batch::Batch::new();
958 batch.put_key_value_bytes(key, bytes);
959 store.write_batch(batch).await?;
960 Ok(())
961 }
962
963 async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> {
964 if batch.key_value_bytes.is_empty() {
965 return Ok(());
966 }
967 let mut futures = Vec::new();
968 for (key, bytes) in batch.key_value_bytes.into_iter() {
969 let store = self.store.clone();
970 futures.push(async move { Self::write_entry(&store, key, bytes).await });
971 }
972 futures::future::try_join_all(futures).await?;
973 Ok(())
974 }
975
976 fn new(store: Store, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
977 Self {
978 store: Arc::new(store),
979 clock,
980 wasm_runtime,
981 user_contracts: Arc::new(DashMap::new()),
982 user_services: Arc::new(DashMap::new()),
983 execution_runtime_config: ExecutionRuntimeConfig::default(),
984 }
985 }
986}
987
988impl<Store> DbStorage<Store, WallClock>
989where
990 Store: KeyValueStore + Clone + Send + Sync + 'static,
991 Store::Error: Send + Sync,
992{
993 pub async fn maybe_create_and_connect(
994 config: &Store::Config,
995 namespace: &str,
996 wasm_runtime: Option<WasmRuntime>,
997 ) -> Result<Self, Store::Error> {
998 let store = Store::maybe_create_and_connect(config, namespace).await?;
999 Ok(Self::new(store, wasm_runtime, WallClock))
1000 }
1001
1002 pub async fn connect(
1003 config: &Store::Config,
1004 namespace: &str,
1005 wasm_runtime: Option<WasmRuntime>,
1006 ) -> Result<Self, Store::Error> {
1007 let store = Store::connect(config, namespace).await?;
1008 Ok(Self::new(store, wasm_runtime, WallClock))
1009 }
1010
1011 pub async fn list_blob_ids(
1013 config: &Store::Config,
1014 namespace: &str,
1015 ) -> Result<Vec<BlobId>, ViewError> {
1016 let store = Store::maybe_create_and_connect(config, namespace).await?;
1017 let prefix = &[INDEX_BLOB_ID];
1018 let keys = store.find_keys_by_prefix(prefix).await?;
1019 let mut blob_ids = Vec::new();
1020 for key in keys.iterator() {
1021 let key = key?;
1022 let key_red = &key[..BLOB_ID_LENGTH];
1023 let blob_id = bcs::from_bytes(key_red)?;
1024 blob_ids.push(blob_id);
1025 }
1026 Ok(blob_ids)
1027 }
1028}
1029
1030impl<Store> DbStorage<Store, WallClock>
1031where
1032 Store: AdminKeyValueStore + Clone + Send + Sync + 'static,
1033 Store::Error: Send + Sync,
1034{
1035 pub async fn list_chain_ids(
1037 config: &Store::Config,
1038 namespace: &str,
1039 ) -> Result<Vec<ChainId>, ViewError> {
1040 let root_keys = Store::list_root_keys(config, namespace).await?;
1041 let mut chain_ids = Vec::new();
1042 for root_key in root_keys {
1043 if root_key.len() == 1 + CHAIN_ID_LENGTH && root_key[0] == INDEX_CHAIN_ID {
1044 let root_key_red = &root_key[1..1 + CHAIN_ID_LENGTH];
1045 let chain_id = bcs::from_bytes(root_key_red)?;
1046 chain_ids.push(chain_id);
1047 }
1048 }
1049 Ok(chain_ids)
1050 }
1051}
1052
1053#[cfg(with_testing)]
1054impl<Store> DbStorage<Store, TestClock>
1055where
1056 Store: TestKeyValueStore + Clone + Send + Sync + 'static,
1057 Store::Error: Send + Sync,
1058{
1059 pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1060 let config = Store::new_test_config().await.unwrap();
1061 let namespace = generate_test_namespace();
1062 DbStorage::<Store, TestClock>::new_for_testing(
1063 config,
1064 &namespace,
1065 wasm_runtime,
1066 TestClock::new(),
1067 )
1068 .await
1069 .unwrap()
1070 }
1071
1072 pub async fn new_for_testing(
1073 config: Store::Config,
1074 namespace: &str,
1075 wasm_runtime: Option<WasmRuntime>,
1076 clock: TestClock,
1077 ) -> Result<Self, Store::Error> {
1078 let store = Store::recreate_and_connect(&config, namespace).await?;
1079 Ok(Self::new(store, wasm_runtime, clock))
1080 }
1081}