1use std::{fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use linera_base::{
10 crypto::CryptoHash,
11 data_types::{Blob, NetworkDescription, TimeDelta, Timestamp},
12 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
13};
14use linera_chain::{
15 types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
16 ChainStateView,
17};
18use linera_execution::{
19 BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
20};
21use linera_views::{
22 backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
23 context::ViewContext,
24 store::{
25 KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
26 },
27 views::View,
28 ViewError,
29};
30use serde::{Deserialize, Serialize};
31#[cfg(with_testing)]
32use {
33 futures::channel::oneshot::{self, Receiver},
34 linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
35 std::{cmp::Reverse, collections::BTreeMap},
36};
37
38use crate::{ChainRuntimeContext, Clock, Storage};
39
40#[cfg(with_metrics)]
41pub mod metrics {
42 use std::sync::LazyLock;
43
44 use linera_base::prometheus_util::{
45 exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
46 };
47 use prometheus::{HistogramVec, IntCounterVec};
48
49 pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
51 register_int_counter_vec(
52 "contains_blob",
53 "The metric counting how often a blob is tested for existence from storage",
54 &[],
55 )
56 });
57
58 pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
60 register_int_counter_vec(
61 "contains_blobs",
62 "The metric counting how often multiple blobs are tested for existence from storage",
63 &[],
64 )
65 });
66
67 pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
69 register_int_counter_vec(
70 "contains_blob_state",
71 "The metric counting how often a blob state is tested for existence from storage",
72 &[],
73 )
74 });
75
76 pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
78 register_int_counter_vec(
79 "contains_certificate",
80 "The metric counting how often a certificate is tested for existence from storage",
81 &[],
82 )
83 });
84
85 #[doc(hidden)]
87 pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
88 register_int_counter_vec(
89 "read_confirmed_block",
90 "The metric counting how often a hashed confirmed block is read from storage",
91 &[],
92 )
93 });
94
95 #[doc(hidden)]
97 pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
98 register_int_counter_vec(
99 "read_blob",
100 "The metric counting how often a blob is read from storage",
101 &[],
102 )
103 });
104
105 #[doc(hidden)]
107 pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
108 register_int_counter_vec(
109 "read_blob_state",
110 "The metric counting how often a blob state is read from storage",
111 &[],
112 )
113 });
114
115 #[doc(hidden)]
117 pub(super) static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
118 register_int_counter_vec(
119 "read_blob_states",
120 "The metric counting how often blob states are read from storage",
121 &[],
122 )
123 });
124
125 #[doc(hidden)]
127 pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
128 register_int_counter_vec(
129 "write_blob",
130 "The metric counting how often a blob is written to storage",
131 &[],
132 )
133 });
134
135 #[doc(hidden)]
137 pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
138 register_int_counter_vec(
139 "read_certificate",
140 "The metric counting how often a certificate is read from storage",
141 &[],
142 )
143 });
144
145 #[doc(hidden)]
147 pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
148 register_int_counter_vec(
149 "read_certificates",
150 "The metric counting how often certificate are read from storage",
151 &[],
152 )
153 });
154
155 #[doc(hidden)]
157 pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
158 register_int_counter_vec(
159 "write_certificate",
160 "The metric counting how often a certificate is written to storage",
161 &[],
162 )
163 });
164
165 #[doc(hidden)]
167 pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
168 register_histogram_vec(
169 "load_chain_latency",
170 "The latency to load a chain state",
171 &[],
172 exponential_bucket_latencies(10.0),
173 )
174 });
175
176 #[doc(hidden)]
178 pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
179 register_int_counter_vec(
180 "read_event",
181 "The metric counting how often an event is read from storage",
182 &[],
183 )
184 });
185
186 pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
188 register_int_counter_vec(
189 "contains_event",
190 "The metric counting how often an event is tested for existence from storage",
191 &[],
192 )
193 });
194
195 #[doc(hidden)]
197 pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
198 register_int_counter_vec(
199 "write_event",
200 "The metric counting how often an event is written to storage",
201 &[],
202 )
203 });
204
205 #[doc(hidden)]
207 pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
208 register_int_counter_vec(
209 "network_description",
210 "The metric counting how often the network description is read from storage",
211 &[],
212 )
213 });
214
215 #[doc(hidden)]
217 pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
218 register_int_counter_vec(
219 "write_network_description",
220 "The metric counting how often the network description is written to storage",
221 &[],
222 )
223 });
224}
225
226#[derive(Default)]
227struct Batch {
228 key_value_bytes: Vec<(Vec<u8>, Vec<u8>)>,
229}
230
231impl Batch {
232 fn new() -> Self {
233 Self::default()
234 }
235
236 fn put_key_value_bytes(&mut self, key: Vec<u8>, value: Vec<u8>) {
237 self.key_value_bytes.push((key, value));
238 }
239
240 fn put_key_value<T: Serialize>(&mut self, key: Vec<u8>, value: &T) -> Result<(), ViewError> {
241 let bytes = bcs::to_bytes(value)?;
242 self.key_value_bytes.push((key, bytes));
243 Ok(())
244 }
245
246 fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
247 #[cfg(with_metrics)]
248 metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
249 let blob_key = bcs::to_bytes(&BaseKey::Blob(blob.id()))?;
250 self.put_key_value_bytes(blob_key.to_vec(), blob.bytes().to_vec());
251 Ok(())
252 }
253
254 fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
255 let blob_state_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
256 self.put_key_value(blob_state_key.to_vec(), blob_state)?;
257 Ok(())
258 }
259
260 fn add_certificate(
261 &mut self,
262 certificate: &ConfirmedBlockCertificate,
263 ) -> Result<(), ViewError> {
264 #[cfg(with_metrics)]
265 metrics::WRITE_CERTIFICATE_COUNTER
266 .with_label_values(&[])
267 .inc();
268 let hash = certificate.hash();
269 let cert_key = bcs::to_bytes(&BaseKey::Certificate(hash))?;
270 let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
271 self.put_key_value(cert_key.to_vec(), &certificate.lite_certificate())?;
272 self.put_key_value(block_key.to_vec(), certificate.value())?;
273 Ok(())
274 }
275
276 fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
277 #[cfg(with_metrics)]
278 metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
279 let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
280 self.put_key_value_bytes(event_key.to_vec(), value);
281 Ok(())
282 }
283
284 fn add_network_description(
285 &mut self,
286 information: &NetworkDescription,
287 ) -> Result<(), ViewError> {
288 #[cfg(with_metrics)]
289 metrics::WRITE_NETWORK_DESCRIPTION
290 .with_label_values(&[])
291 .inc();
292 let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
293 self.put_key_value(key, information)?;
294 Ok(())
295 }
296}
297
298#[derive(Clone)]
300pub struct DbStorage<Database, Clock = WallClock> {
301 database: Arc<Database>,
302 clock: Clock,
303 wasm_runtime: Option<WasmRuntime>,
304 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
305 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
306 execution_runtime_config: ExecutionRuntimeConfig,
307}
308
309#[derive(Debug, Serialize, Deserialize)]
310enum BaseKey {
311 ChainState(ChainId),
312 Certificate(CryptoHash),
313 ConfirmedBlock(CryptoHash),
314 Blob(BlobId),
315 BlobState(BlobId),
316 Event(EventId),
317 BlockExporterState(u32),
318 NetworkDescription,
319}
320
321const INDEX_CHAIN_ID: u8 = 0;
322const INDEX_BLOB_ID: u8 = 3;
323const INDEX_EVENT_ID: u8 = 5;
324const CHAIN_ID_LENGTH: usize = std::mem::size_of::<ChainId>();
325const BLOB_ID_LENGTH: usize = std::mem::size_of::<BlobId>();
326
327#[cfg(test)]
328mod tests {
329 use linera_base::{
330 crypto::CryptoHash,
331 identifiers::{
332 ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
333 StreamName,
334 },
335 };
336
337 use crate::db_storage::{
338 BaseKey, BLOB_ID_LENGTH, CHAIN_ID_LENGTH, INDEX_BLOB_ID, INDEX_CHAIN_ID, INDEX_EVENT_ID,
339 };
340
341 #[test]
348 fn test_basekey_blob_serialization() {
349 let hash = CryptoHash::default();
350 let blob_type = BlobType::default();
351 let blob_id = BlobId::new(hash, blob_type);
352 let base_key = BaseKey::Blob(blob_id);
353 let key = bcs::to_bytes(&base_key).expect("a key");
354 assert_eq!(key[0], INDEX_BLOB_ID);
355 assert_eq!(key.len(), 1 + BLOB_ID_LENGTH);
356 }
357
358 #[test]
361 fn test_basekey_chainstate_serialization() {
362 let hash = CryptoHash::default();
363 let chain_id = ChainId(hash);
364 let base_key = BaseKey::ChainState(chain_id);
365 let key = bcs::to_bytes(&base_key).expect("a key");
366 assert_eq!(key[0], INDEX_CHAIN_ID);
367 assert_eq!(key.len(), 1 + CHAIN_ID_LENGTH);
368 }
369
370 #[test]
373 fn test_basekey_event_serialization() {
374 let hash = CryptoHash::test_hash("49");
375 let chain_id = ChainId(hash);
376 let application_description_hash = CryptoHash::test_hash("42");
377 let application_id = ApplicationId::new(application_description_hash);
378 let application_id = GenericApplicationId::User(application_id);
379 let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
380 let stream_id = StreamId {
381 application_id,
382 stream_name,
383 };
384 let mut prefix = vec![INDEX_EVENT_ID];
385 prefix.extend(bcs::to_bytes(&chain_id).unwrap());
386 prefix.extend(bcs::to_bytes(&stream_id).unwrap());
387
388 let index = 1567;
389 let event_id = EventId {
390 chain_id,
391 stream_id,
392 index,
393 };
394 let base_key = BaseKey::Event(event_id);
395 let key = bcs::to_bytes(&base_key).unwrap();
396 assert!(key.starts_with(&prefix));
397 }
398}
399
400#[derive(Clone, Copy)]
403pub struct ChainStatesFirstAssignment;
404
405impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
406 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
407 if root_key.is_empty() {
408 return Ok(StoreInUse::Second);
409 }
410 let store = match bcs::from_bytes(root_key)? {
411 BaseKey::ChainState(_) => StoreInUse::First,
412 _ => StoreInUse::Second,
413 };
414 Ok(store)
415 }
416}
417
418#[derive(Clone)]
420pub struct WallClock;
421
422#[cfg_attr(not(web), async_trait)]
423#[cfg_attr(web, async_trait(?Send))]
424impl Clock for WallClock {
425 fn current_time(&self) -> Timestamp {
426 Timestamp::now()
427 }
428
429 async fn sleep(&self, delta: TimeDelta) {
430 linera_base::time::timer::sleep(delta.as_duration()).await
431 }
432
433 async fn sleep_until(&self, timestamp: Timestamp) {
434 let delta = timestamp.delta_since(Timestamp::now());
435 if delta > TimeDelta::ZERO {
436 self.sleep(delta).await
437 }
438 }
439}
440
441#[cfg(with_testing)]
442#[derive(Default)]
443struct TestClockInner {
444 time: Timestamp,
445 sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
446}
447
448#[cfg(with_testing)]
449impl TestClockInner {
450 fn set(&mut self, time: Timestamp) {
451 self.time = time;
452 let senders = self.sleeps.split_off(&Reverse(time));
453 for sender in senders.into_values().flatten() {
454 let _ = sender.send(());
455 }
456 }
457
458 fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
459 self.add_sleep_until(self.time.saturating_add(delta))
460 }
461
462 fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
463 let (sender, receiver) = oneshot::channel();
464 if self.time >= time {
465 let _ = sender.send(());
466 } else {
467 self.sleeps.entry(Reverse(time)).or_default().push(sender);
468 }
469 receiver
470 }
471}
472
473#[cfg(with_testing)]
476#[derive(Clone, Default)]
477pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
478
479#[cfg(with_testing)]
480#[cfg_attr(not(web), async_trait)]
481#[cfg_attr(web, async_trait(?Send))]
482impl Clock for TestClock {
483 fn current_time(&self) -> Timestamp {
484 self.lock().time
485 }
486
487 async fn sleep(&self, delta: TimeDelta) {
488 if delta == TimeDelta::ZERO {
489 return;
490 }
491 let receiver = self.lock().add_sleep(delta);
492 let _ = receiver.await;
493 }
494
495 async fn sleep_until(&self, timestamp: Timestamp) {
496 let receiver = self.lock().add_sleep_until(timestamp);
497 let _ = receiver.await;
498 }
499}
500
501#[cfg(with_testing)]
502impl TestClock {
503 pub fn new() -> Self {
505 TestClock(Arc::default())
506 }
507
508 pub fn set(&self, time: Timestamp) {
510 self.lock().set(time);
511 }
512
513 pub fn add(&self, delta: TimeDelta) {
515 let mut guard = self.lock();
516 let time = guard.time.saturating_add(delta);
517 guard.set(time);
518 }
519
520 pub fn current_time(&self) -> Timestamp {
522 self.lock().time
523 }
524
525 fn lock(&self) -> std::sync::MutexGuard<TestClockInner> {
526 self.0.lock().expect("poisoned TestClock mutex")
527 }
528}
529
530#[cfg_attr(not(web), async_trait)]
531#[cfg_attr(web, async_trait(?Send))]
532impl<Database, C> Storage for DbStorage<Database, C>
533where
534 Database: KeyValueDatabase + Clone + Send + Sync + 'static,
535 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
536 C: Clock + Clone + Send + Sync + 'static,
537 Database::Error: Send + Sync,
538{
539 type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
540 type Clock = C;
541 type BlockExporterContext = ViewContext<u32, Database::Store>;
542
543 fn clock(&self) -> &C {
544 &self.clock
545 }
546
547 async fn load_chain(
548 &self,
549 chain_id: ChainId,
550 ) -> Result<ChainStateView<Self::Context>, ViewError> {
551 #[cfg(with_metrics)]
552 let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
553 let runtime_context = ChainRuntimeContext {
554 storage: self.clone(),
555 chain_id,
556 execution_runtime_config: self.execution_runtime_config,
557 user_contracts: self.user_contracts.clone(),
558 user_services: self.user_services.clone(),
559 };
560 let root_key = bcs::to_bytes(&BaseKey::ChainState(chain_id))?;
561 let store = self.database.open_exclusive(&root_key)?;
562 let context = ViewContext::create_root_context(store, runtime_context).await?;
563 ChainStateView::load(context).await
564 }
565
566 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
567 let store = self.database.open_shared(&[])?;
568 let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
569 let test = store.contains_key(&blob_key).await?;
570 #[cfg(with_metrics)]
571 metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
572 Ok(test)
573 }
574
575 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
576 let store = self.database.open_shared(&[])?;
577 let mut keys = Vec::new();
578 for blob_id in blob_ids {
579 let key = bcs::to_bytes(&BaseKey::Blob(*blob_id))?;
580 keys.push(key);
581 }
582 let results = store.contains_keys(keys).await?;
583 let mut missing_blobs = Vec::new();
584 for (blob_id, result) in blob_ids.iter().zip(results) {
585 if !result {
586 missing_blobs.push(*blob_id);
587 }
588 }
589 #[cfg(with_metrics)]
590 metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
591 Ok(missing_blobs)
592 }
593
594 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
595 let store = self.database.open_shared(&[])?;
596 let blob_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
597 let test = store.contains_key(&blob_key).await?;
598 #[cfg(with_metrics)]
599 metrics::CONTAINS_BLOB_STATE_COUNTER
600 .with_label_values(&[])
601 .inc();
602 Ok(test)
603 }
604
605 async fn read_confirmed_block(
606 &self,
607 hash: CryptoHash,
608 ) -> Result<Option<ConfirmedBlock>, ViewError> {
609 let store = self.database.open_shared(&[])?;
610 let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
611 let value = store.read_value(&block_key).await?;
612 #[cfg(with_metrics)]
613 metrics::READ_CONFIRMED_BLOCK_COUNTER
614 .with_label_values(&[])
615 .inc();
616 Ok(value)
617 }
618
619 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
620 let store = self.database.open_shared(&[])?;
621 let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
622 let maybe_blob_bytes = store.read_value_bytes(&blob_key).await?;
623 #[cfg(with_metrics)]
624 metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
625 Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
626 }
627
628 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
629 if blob_ids.is_empty() {
630 return Ok(Vec::new());
631 }
632 let blob_keys = blob_ids
633 .iter()
634 .map(|blob_id| bcs::to_bytes(&BaseKey::Blob(*blob_id)))
635 .collect::<Result<Vec<_>, _>>()?;
636 let store = self.database.open_shared(&[])?;
637 let maybe_blob_bytes = store.read_multi_values_bytes(blob_keys).await?;
638 #[cfg(with_metrics)]
639 metrics::READ_BLOB_COUNTER
640 .with_label_values(&[])
641 .inc_by(blob_ids.len() as u64);
642
643 Ok(blob_ids
644 .iter()
645 .zip(maybe_blob_bytes)
646 .map(|(blob_id, maybe_blob_bytes)| {
647 maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(*blob_id, blob_bytes))
648 })
649 .collect())
650 }
651
652 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
653 let store = self.database.open_shared(&[])?;
654 let blob_state_key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
655 let blob_state = store.read_value::<BlobState>(&blob_state_key).await?;
656 #[cfg(with_metrics)]
657 metrics::READ_BLOB_STATE_COUNTER
658 .with_label_values(&[])
659 .inc();
660 Ok(blob_state)
661 }
662
663 async fn read_blob_states(
664 &self,
665 blob_ids: &[BlobId],
666 ) -> Result<Vec<Option<BlobState>>, ViewError> {
667 if blob_ids.is_empty() {
668 return Ok(Vec::new());
669 }
670 let blob_state_keys = blob_ids
671 .iter()
672 .map(|blob_id| bcs::to_bytes(&BaseKey::BlobState(*blob_id)))
673 .collect::<Result<_, _>>()?;
674 let store = self.database.open_shared(&[])?;
675 let blob_states = store
676 .read_multi_values::<BlobState>(blob_state_keys)
677 .await?;
678 #[cfg(with_metrics)]
679 metrics::READ_BLOB_STATES_COUNTER
680 .with_label_values(&[])
681 .inc_by(blob_ids.len() as u64);
682 Ok(blob_states)
683 }
684
685 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
686 let mut batch = Batch::new();
687 batch.add_blob(blob)?;
688 self.write_batch(batch).await?;
689 Ok(())
690 }
691
692 async fn maybe_write_blob_states(
693 &self,
694 blob_ids: &[BlobId],
695 blob_state: BlobState,
696 ) -> Result<(), ViewError> {
697 if blob_ids.is_empty() {
698 return Ok(());
699 }
700 let blob_state_keys = blob_ids
701 .iter()
702 .map(|blob_id| bcs::to_bytes(&BaseKey::BlobState(*blob_id)))
703 .collect::<Result<_, _>>()?;
704 let store = self.database.open_shared(&[])?;
705 let maybe_blob_states = store
706 .read_multi_values::<BlobState>(blob_state_keys)
707 .await?;
708 let mut batch = Batch::new();
709 for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
710 match maybe_blob_state {
711 None => {
712 batch.add_blob_state(*blob_id, &blob_state)?;
713 }
714 Some(state) => {
715 if state.epoch < blob_state.epoch {
716 batch.add_blob_state(*blob_id, &blob_state)?;
717 }
718 }
719 }
720 }
721 self.write_batch(batch).await?;
725 Ok(())
726 }
727
728 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
729 if blobs.is_empty() {
730 return Ok(Vec::new());
731 }
732 let blob_state_keys = blobs
733 .iter()
734 .map(|blob| bcs::to_bytes(&BaseKey::BlobState(blob.id())))
735 .collect::<Result<_, _>>()?;
736 let store = self.database.open_shared(&[])?;
737 let blob_states = store.contains_keys(blob_state_keys).await?;
738 let mut batch = Batch::new();
739 for (blob, has_state) in blobs.iter().zip(&blob_states) {
740 if *has_state {
741 batch.add_blob(blob)?;
742 }
743 }
744 self.write_batch(batch).await?;
745 Ok(blob_states)
746 }
747
748 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
749 if blobs.is_empty() {
750 return Ok(());
751 }
752 let mut batch = Batch::new();
753 for blob in blobs {
754 batch.add_blob(blob)?;
755 }
756 self.write_batch(batch).await
757 }
758
759 async fn write_blobs_and_certificate(
760 &self,
761 blobs: &[Blob],
762 certificate: &ConfirmedBlockCertificate,
763 ) -> Result<(), ViewError> {
764 let mut batch = Batch::new();
765 for blob in blobs {
766 batch.add_blob(blob)?;
767 }
768 batch.add_certificate(certificate)?;
769 self.write_batch(batch).await
770 }
771
772 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
773 let keys = Self::get_keys_for_certificates(&[hash])?;
774 let store = self.database.open_shared(&[])?;
775 let results = store.contains_keys(keys).await?;
776 #[cfg(with_metrics)]
777 metrics::CONTAINS_CERTIFICATE_COUNTER
778 .with_label_values(&[])
779 .inc();
780 Ok(results[0] && results[1])
781 }
782
783 async fn read_certificate(
784 &self,
785 hash: CryptoHash,
786 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
787 let store = self.database.open_shared(&[])?;
788 let keys = Self::get_keys_for_certificates(&[hash])?;
789 let values = store.read_multi_values_bytes(keys).await;
790 if values.is_ok() {
791 #[cfg(with_metrics)]
792 metrics::READ_CERTIFICATE_COUNTER
793 .with_label_values(&[])
794 .inc();
795 }
796 let values = values?;
797 Self::deserialize_certificate(&values, hash)
798 }
799
800 async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
801 &self,
802 hashes: I,
803 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
804 let hashes = hashes.into_iter().collect::<Vec<_>>();
805 if hashes.is_empty() {
806 return Ok(Vec::new());
807 }
808 let keys = Self::get_keys_for_certificates(&hashes)?;
809 let store = self.database.open_shared(&[])?;
810 let values = store.read_multi_values_bytes(keys).await;
811 if values.is_ok() {
812 #[cfg(with_metrics)]
813 metrics::READ_CERTIFICATES_COUNTER
814 .with_label_values(&[])
815 .inc_by(hashes.len() as u64);
816 }
817 let values = values?;
818 let mut certificates = Vec::new();
819 for (pair, hash) in values.chunks_exact(2).zip(hashes) {
820 let certificate = Self::deserialize_certificate(pair, hash)?;
821 certificates.push(certificate);
822 }
823 Ok(certificates)
824 }
825
826 async fn read_certificates_raw<I: IntoIterator<Item = CryptoHash> + Send>(
833 &self,
834 hashes: I,
835 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
836 let hashes = hashes.into_iter().collect::<Vec<_>>();
837 if hashes.is_empty() {
838 return Ok(Vec::new());
839 }
840 let keys = Self::get_keys_for_certificates(&hashes)?;
841 let store = self.database.open_shared(&[])?;
842 let values = store.read_multi_values_bytes(keys).await?;
843 #[cfg(with_metrics)]
844 metrics::READ_CERTIFICATES_COUNTER
845 .with_label_values(&[])
846 .inc_by(hashes.len() as u64);
847 Ok(values
848 .chunks_exact(2)
849 .filter_map(|chunk| {
850 let lite_cert_bytes = chunk[0].as_ref()?;
851 let confirmed_block_bytes = chunk[1].as_ref()?;
852 Some((lite_cert_bytes.clone(), confirmed_block_bytes.clone()))
853 })
854 .collect())
855 }
856
857 async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
858 let store = self.database.open_shared(&[])?;
859 let event_key = bcs::to_bytes(&BaseKey::Event(event_id.clone()))?;
860 let event = store.read_value_bytes(&event_key).await?;
861 #[cfg(with_metrics)]
862 metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
863 Ok(event)
864 }
865
866 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
867 let store = self.database.open_shared(&[])?;
868 let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
869 let exists = store.contains_key(&event_key).await?;
870 #[cfg(with_metrics)]
871 metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
872 Ok(exists)
873 }
874
875 async fn read_events_from_index(
876 &self,
877 chain_id: &ChainId,
878 stream_id: &StreamId,
879 start_index: u32,
880 ) -> Result<Vec<IndexAndEvent>, ViewError> {
881 let mut prefix = vec![INDEX_EVENT_ID];
882 prefix.extend(bcs::to_bytes(chain_id).unwrap());
883 prefix.extend(bcs::to_bytes(stream_id).unwrap());
884 let mut keys = Vec::new();
885 let mut indices = Vec::new();
886 let store = self.database.open_shared(&[])?;
887 for short_key in store.find_keys_by_prefix(&prefix).await? {
888 let index = bcs::from_bytes::<u32>(&short_key)?;
889 if index >= start_index {
890 let mut key = prefix.clone();
891 key.extend(short_key);
892 keys.push(key);
893 indices.push(index);
894 }
895 }
896 let values = store.read_multi_values_bytes(keys).await?;
897 let mut returned_values = Vec::new();
898 for (index, value) in indices.into_iter().zip(values) {
899 let event = value.unwrap();
900 returned_values.push(IndexAndEvent { index, event });
901 }
902 Ok(returned_values)
903 }
904
905 async fn write_events(
906 &self,
907 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
908 ) -> Result<(), ViewError> {
909 let mut batch = Batch::new();
910 for (event_id, value) in events {
911 batch.add_event(event_id, value)?;
912 }
913 self.write_batch(batch).await
914 }
915
916 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
917 let store = self.database.open_shared(&[])?;
918 let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
919 let maybe_value = store.read_value(&key).await?;
920 #[cfg(with_metrics)]
921 metrics::READ_NETWORK_DESCRIPTION
922 .with_label_values(&[])
923 .inc();
924 Ok(maybe_value)
925 }
926
927 async fn write_network_description(
928 &self,
929 information: &NetworkDescription,
930 ) -> Result<(), ViewError> {
931 let mut batch = Batch::new();
932 batch.add_network_description(information)?;
933 self.write_batch(batch).await?;
934 Ok(())
935 }
936
937 fn wasm_runtime(&self) -> Option<WasmRuntime> {
938 self.wasm_runtime
939 }
940
941 async fn block_exporter_context(
942 &self,
943 block_exporter_id: u32,
944 ) -> Result<Self::BlockExporterContext, ViewError> {
945 let root_key = bcs::to_bytes(&BaseKey::BlockExporterState(block_exporter_id))?;
946 let store = self.database.open_exclusive(&root_key)?;
947 Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
948 }
949}
950
951impl<Database, C> DbStorage<Database, C>
952where
953 Database: KeyValueDatabase + Clone + Send + Sync + 'static,
954 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
955 C: Clock,
956 Database::Error: Send + Sync,
957{
958 fn get_keys_for_certificates(hashes: &[CryptoHash]) -> Result<Vec<Vec<u8>>, ViewError> {
959 Ok(hashes
960 .iter()
961 .flat_map(|hash| {
962 let cert_key = bcs::to_bytes(&BaseKey::Certificate(*hash));
963 let block_key = bcs::to_bytes(&BaseKey::ConfirmedBlock(*hash));
964 vec![cert_key, block_key]
965 })
966 .collect::<Result<_, _>>()?)
967 }
968
969 fn deserialize_certificate(
970 pair: &[Option<Vec<u8>>],
971 hash: CryptoHash,
972 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
973 let Some(cert_bytes) = pair[0].as_ref() else {
974 return Ok(None);
975 };
976 let Some(value_bytes) = pair[1].as_ref() else {
977 return Ok(None);
978 };
979 let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
980 let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
981 assert_eq!(value.hash(), hash);
982 let certificate = cert
983 .with_value(value)
984 .ok_or(ViewError::InconsistentEntries)?;
985 Ok(Some(certificate))
986 }
987
988 async fn write_entry(
989 store: &Database::Store,
990 key: Vec<u8>,
991 bytes: Vec<u8>,
992 ) -> Result<(), ViewError> {
993 let mut batch = linera_views::batch::Batch::new();
994 batch.put_key_value_bytes(key, bytes);
995 store.write_batch(batch).await?;
996 Ok(())
997 }
998
999 async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> {
1000 if batch.key_value_bytes.is_empty() {
1001 return Ok(());
1002 }
1003 let mut futures = Vec::new();
1004 for (key, bytes) in batch.key_value_bytes {
1005 let store = self.database.open_shared(&[])?;
1006 futures.push(async move { Self::write_entry(&store, key, bytes).await });
1007 }
1008 futures::future::try_join_all(futures).await?;
1009 Ok(())
1010 }
1011}
1012
1013impl<Database, C> DbStorage<Database, C> {
1014 fn new(database: Database, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
1015 Self {
1016 database: Arc::new(database),
1017 clock,
1018 wasm_runtime,
1019 user_contracts: Arc::new(papaya::HashMap::new()),
1020 user_services: Arc::new(papaya::HashMap::new()),
1021 execution_runtime_config: ExecutionRuntimeConfig::default(),
1022 }
1023 }
1024}
1025
1026impl<Database> DbStorage<Database, WallClock>
1027where
1028 Database: KeyValueDatabase + Clone + Send + Sync + 'static,
1029 Database::Error: Send + Sync,
1030 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1031{
1032 pub async fn maybe_create_and_connect(
1033 config: &Database::Config,
1034 namespace: &str,
1035 wasm_runtime: Option<WasmRuntime>,
1036 ) -> Result<Self, Database::Error> {
1037 let database = Database::maybe_create_and_connect(config, namespace).await?;
1038 Ok(Self::new(database, wasm_runtime, WallClock))
1039 }
1040
1041 pub async fn connect(
1042 config: &Database::Config,
1043 namespace: &str,
1044 wasm_runtime: Option<WasmRuntime>,
1045 ) -> Result<Self, Database::Error> {
1046 let database = Database::connect(config, namespace).await?;
1047 Ok(Self::new(database, wasm_runtime, WallClock))
1048 }
1049
1050 pub async fn list_blob_ids(
1052 config: &Database::Config,
1053 namespace: &str,
1054 ) -> Result<Vec<BlobId>, ViewError> {
1055 let database = Database::maybe_create_and_connect(config, namespace).await?;
1056 let store = database.open_shared(&[])?;
1057 let prefix = &[INDEX_BLOB_ID];
1058 let keys = store.find_keys_by_prefix(prefix).await?;
1059 let mut blob_ids = Vec::new();
1060 for key in keys {
1061 let key_red = &key[..BLOB_ID_LENGTH];
1062 let blob_id = bcs::from_bytes(key_red)?;
1063 blob_ids.push(blob_id);
1064 }
1065 Ok(blob_ids)
1066 }
1067}
1068
1069impl<Database> DbStorage<Database, WallClock>
1070where
1071 Database: KeyValueDatabase + Clone + Send + Sync + 'static,
1072 Database::Error: Send + Sync,
1073{
1074 pub async fn list_chain_ids(
1076 config: &Database::Config,
1077 namespace: &str,
1078 ) -> Result<Vec<ChainId>, ViewError> {
1079 let root_keys = Database::list_root_keys(config, namespace).await?;
1080 let mut chain_ids = Vec::new();
1081 for root_key in root_keys {
1082 if root_key.len() == 1 + CHAIN_ID_LENGTH && root_key[0] == INDEX_CHAIN_ID {
1083 let root_key_red = &root_key[1..=CHAIN_ID_LENGTH];
1084 let chain_id = bcs::from_bytes(root_key_red)?;
1085 chain_ids.push(chain_id);
1086 }
1087 }
1088 Ok(chain_ids)
1089 }
1090}
1091
1092#[cfg(with_testing)]
1093impl<Database> DbStorage<Database, TestClock>
1094where
1095 Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1096 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1097 Database::Error: Send + Sync,
1098{
1099 pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1100 let config = Database::new_test_config().await.unwrap();
1101 let namespace = generate_test_namespace();
1102 DbStorage::<Database, TestClock>::new_for_testing(
1103 config,
1104 &namespace,
1105 wasm_runtime,
1106 TestClock::new(),
1107 )
1108 .await
1109 .unwrap()
1110 }
1111
1112 pub async fn new_for_testing(
1113 config: Database::Config,
1114 namespace: &str,
1115 wasm_runtime: Option<WasmRuntime>,
1116 clock: TestClock,
1117 ) -> Result<Self, Database::Error> {
1118 let database = Database::recreate_and_connect(&config, namespace).await?;
1119 Ok(Self::new(database, wasm_runtime, clock))
1120 }
1121}