1use std::{
7 collections::{btree_map, hash_map::RandomState, BTreeMap},
8 sync::{Arc, Mutex},
9};
10
11use linked_hash_map::LinkedHashMap;
12use serde::{Deserialize, Serialize};
13
14#[cfg(with_testing)]
15use crate::memory::MemoryDatabase;
16#[cfg(with_testing)]
17use crate::store::TestKeyValueDatabase;
18use crate::{
19 batch::{Batch, WriteOperation},
20 common::get_interval,
21 store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
22};
23
24#[cfg(with_metrics)]
25mod metrics {
26 use std::sync::LazyLock;
27
28 use linera_base::prometheus_util::register_int_counter_vec;
29 use prometheus::IntCounterVec;
30
31 pub static READ_VALUE_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
33 register_int_counter_vec(
34 "num_read_value_cache_miss",
35 "Number of read value cache misses",
36 &[],
37 )
38 });
39
40 pub static READ_VALUE_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
42 register_int_counter_vec(
43 "num_read_value_cache_hits",
44 "Number of read value cache hits",
45 &[],
46 )
47 });
48
49 pub static CONTAINS_KEY_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
51 register_int_counter_vec(
52 "num_contains_key_cache_miss",
53 "Number of contains key cache misses",
54 &[],
55 )
56 });
57
58 pub static CONTAINS_KEY_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
60 register_int_counter_vec(
61 "num_contains_key_cache_hit",
62 "Number of contains key cache hits",
63 &[],
64 )
65 });
66}
67
68#[derive(Clone, Debug, Serialize, Deserialize)]
70pub struct StorageCacheConfig {
71 pub max_cache_size: usize,
73 pub max_entry_size: usize,
75 pub max_cache_entries: usize,
77}
78
79pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
83 max_cache_size: 10000000,
84 max_entry_size: 1000000,
85 max_cache_entries: 1000,
86};
87
88enum CacheEntry {
89 DoesNotExist,
90 Exists,
91 Value(Vec<u8>),
92}
93
94impl CacheEntry {
95 fn size(&self) -> usize {
96 match self {
97 CacheEntry::Value(vec) => vec.len(),
98 _ => 0,
99 }
100 }
101}
102
103struct LruPrefixCache {
108 map: BTreeMap<Vec<u8>, CacheEntry>,
109 queue: LinkedHashMap<Vec<u8>, usize, RandomState>,
110 config: StorageCacheConfig,
111 total_size: usize,
112 has_exclusive_access: bool,
114}
115
116impl LruPrefixCache {
117 fn new(config: StorageCacheConfig, has_exclusive_access: bool) -> Self {
119 Self {
120 map: BTreeMap::new(),
121 queue: LinkedHashMap::new(),
122 config,
123 total_size: 0,
124 has_exclusive_access,
125 }
126 }
127
128 fn trim_cache(&mut self) {
130 while self.total_size > self.config.max_cache_size
131 || self.queue.len() > self.config.max_cache_entries
132 {
133 let Some((key, key_value_size)) = self.queue.pop_front() else {
134 break;
135 };
136 self.map.remove(&key);
137 self.total_size -= key_value_size;
138 }
139 }
140
141 fn insert(&mut self, key: Vec<u8>, cache_entry: CacheEntry) {
143 let key_value_size = key.len() + cache_entry.size();
144 if (matches!(cache_entry, CacheEntry::DoesNotExist) && !self.has_exclusive_access)
145 || key_value_size > self.config.max_entry_size
146 {
147 if let Some(old_key_value_size) = self.queue.remove(&key) {
149 self.total_size -= old_key_value_size;
150 self.map.remove(&key);
151 };
152 return;
153 }
154 match self.map.entry(key.clone()) {
155 btree_map::Entry::Occupied(mut entry) => {
156 entry.insert(cache_entry);
157 let old_key_value_size = self.queue.remove(&key).expect("old_key_value_size");
159 self.total_size -= old_key_value_size;
160 self.queue.insert(key, key_value_size);
161 self.total_size += key_value_size;
162 }
163 btree_map::Entry::Vacant(entry) => {
164 entry.insert(cache_entry);
165 self.queue.insert(key, key_value_size);
166 self.total_size += key_value_size;
167 }
168 }
169 self.trim_cache();
170 }
171
172 fn insert_read_value(&mut self, key: Vec<u8>, value: &Option<Vec<u8>>) {
174 let cache_entry = match value {
175 None => CacheEntry::DoesNotExist,
176 Some(vec) => CacheEntry::Value(vec.to_vec()),
177 };
178 self.insert(key, cache_entry)
179 }
180
181 fn insert_contains_key(&mut self, key: Vec<u8>, result: bool) {
183 let cache_entry = match result {
184 false => CacheEntry::DoesNotExist,
185 true => CacheEntry::Exists,
186 };
187 self.insert(key, cache_entry)
188 }
189
190 fn delete_prefix(&mut self, key_prefix: &[u8]) {
193 if self.has_exclusive_access {
194 for (key, value) in self.map.range_mut(get_interval(key_prefix.to_vec())) {
195 *self.queue.get_mut(key).unwrap() = key.len();
196 self.total_size -= value.size();
197 *value = CacheEntry::DoesNotExist;
198 }
199 } else {
200 let mut keys = Vec::new();
202 for (key, _) in self.map.range(get_interval(key_prefix.to_vec())) {
203 keys.push(key.to_vec());
204 }
205 for key in keys {
206 self.map.remove(&key);
207 let Some(key_value_size) = self.queue.remove(&key) else {
208 unreachable!("The key should be in the queue");
209 };
210 self.total_size -= key_value_size;
211 }
212 }
213 }
214
215 fn query_read_value(&mut self, key: &[u8]) -> Option<Option<Vec<u8>>> {
219 let result = match self.map.get(key) {
220 None => None,
221 Some(entry) => match entry {
222 CacheEntry::DoesNotExist => Some(None),
223 CacheEntry::Exists => None,
224 CacheEntry::Value(vec) => Some(Some(vec.clone())),
225 },
226 };
227 if result.is_some() {
228 let key_value_size = self.queue.remove(key).expect("key_value_size");
230 self.queue.insert(key.to_vec(), key_value_size);
231 }
232 result
233 }
234
235 fn query_contains_key(&mut self, key: &[u8]) -> Option<bool> {
238 let result = self
239 .map
240 .get(key)
241 .map(|entry| !matches!(entry, CacheEntry::DoesNotExist));
242 if result.is_some() {
243 let key_value_size = self.queue.remove(key).expect("key_value_size");
245 self.queue.insert(key.to_vec(), key_value_size);
246 }
247 result
248 }
249}
250
251#[derive(Clone)]
253pub struct LruCachingDatabase<D> {
254 database: D,
256 config: StorageCacheConfig,
258}
259
260#[derive(Clone)]
262pub struct LruCachingStore<S> {
263 store: S,
265 cache: Option<Arc<Mutex<LruPrefixCache>>>,
267}
268
269impl<D> WithError for LruCachingDatabase<D>
270where
271 D: WithError,
272{
273 type Error = D::Error;
274}
275
276impl<S> WithError for LruCachingStore<S>
277where
278 S: WithError,
279{
280 type Error = S::Error;
281}
282
283impl<K> ReadableKeyValueStore for LruCachingStore<K>
284where
285 K: ReadableKeyValueStore,
286{
287 const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
289
290 fn max_stream_queries(&self) -> usize {
291 self.store.max_stream_queries()
292 }
293
294 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
295 let Some(cache) = &self.cache else {
296 return self.store.read_value_bytes(key).await;
297 };
298 {
300 let mut cache = cache.lock().unwrap();
301 if let Some(value) = cache.query_read_value(key) {
302 #[cfg(with_metrics)]
303 metrics::READ_VALUE_CACHE_HIT_COUNT
304 .with_label_values(&[])
305 .inc();
306 return Ok(value);
307 }
308 }
309 #[cfg(with_metrics)]
310 metrics::READ_VALUE_CACHE_MISS_COUNT
311 .with_label_values(&[])
312 .inc();
313 let value = self.store.read_value_bytes(key).await?;
314 let mut cache = cache.lock().unwrap();
315 cache.insert_read_value(key.to_vec(), &value);
316 Ok(value)
317 }
318
319 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
320 let Some(cache) = &self.cache else {
321 return self.store.contains_key(key).await;
322 };
323 {
324 let mut cache = cache.lock().unwrap();
325 if let Some(value) = cache.query_contains_key(key) {
326 #[cfg(with_metrics)]
327 metrics::CONTAINS_KEY_CACHE_HIT_COUNT
328 .with_label_values(&[])
329 .inc();
330 return Ok(value);
331 }
332 }
333 #[cfg(with_metrics)]
334 metrics::CONTAINS_KEY_CACHE_MISS_COUNT
335 .with_label_values(&[])
336 .inc();
337 let result = self.store.contains_key(key).await?;
338 let mut cache = cache.lock().unwrap();
339 cache.insert_contains_key(key.to_vec(), result);
340 Ok(result)
341 }
342
343 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
344 let Some(cache) = &self.cache else {
345 return self.store.contains_keys(keys).await;
346 };
347 let size = keys.len();
348 let mut results = vec![false; size];
349 let mut indices = Vec::new();
350 let mut key_requests = Vec::new();
351 {
352 let mut cache = cache.lock().unwrap();
353 for i in 0..size {
354 if let Some(value) = cache.query_contains_key(&keys[i]) {
355 #[cfg(with_metrics)]
356 metrics::CONTAINS_KEY_CACHE_HIT_COUNT
357 .with_label_values(&[])
358 .inc();
359 results[i] = value;
360 } else {
361 #[cfg(with_metrics)]
362 metrics::CONTAINS_KEY_CACHE_MISS_COUNT
363 .with_label_values(&[])
364 .inc();
365 indices.push(i);
366 key_requests.push(keys[i].clone());
367 }
368 }
369 }
370 if !key_requests.is_empty() {
371 let key_results = self.store.contains_keys(key_requests.clone()).await?;
372 let mut cache = cache.lock().unwrap();
373 for ((index, result), key) in indices.into_iter().zip(key_results).zip(key_requests) {
374 results[index] = result;
375 cache.insert_contains_key(key, result);
376 }
377 }
378 Ok(results)
379 }
380
381 async fn read_multi_values_bytes(
382 &self,
383 keys: Vec<Vec<u8>>,
384 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
385 let Some(cache) = &self.cache else {
386 return self.store.read_multi_values_bytes(keys).await;
387 };
388
389 let mut result = Vec::with_capacity(keys.len());
390 let mut cache_miss_indices = Vec::new();
391 let mut miss_keys = Vec::new();
392 {
393 let mut cache = cache.lock().unwrap();
394 for (i, key) in keys.into_iter().enumerate() {
395 if let Some(value) = cache.query_read_value(&key) {
396 #[cfg(with_metrics)]
397 metrics::READ_VALUE_CACHE_HIT_COUNT
398 .with_label_values(&[])
399 .inc();
400 result.push(value);
401 } else {
402 #[cfg(with_metrics)]
403 metrics::READ_VALUE_CACHE_MISS_COUNT
404 .with_label_values(&[])
405 .inc();
406 result.push(None);
407 cache_miss_indices.push(i);
408 miss_keys.push(key);
409 }
410 }
411 }
412 if !miss_keys.is_empty() {
413 let values = self
414 .store
415 .read_multi_values_bytes(miss_keys.clone())
416 .await?;
417 let mut cache = cache.lock().unwrap();
418 for (i, (key, value)) in cache_miss_indices
419 .into_iter()
420 .zip(miss_keys.into_iter().zip(values))
421 {
422 cache.insert_read_value(key, &value);
423 result[i] = value;
424 }
425 }
426 Ok(result)
427 }
428
429 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
430 self.store.find_keys_by_prefix(key_prefix).await
431 }
432
433 async fn find_key_values_by_prefix(
434 &self,
435 key_prefix: &[u8],
436 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
437 self.store.find_key_values_by_prefix(key_prefix).await
438 }
439}
440
441impl<K> WritableKeyValueStore for LruCachingStore<K>
442where
443 K: WritableKeyValueStore,
444{
445 const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
447
448 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
449 let Some(cache) = &self.cache else {
450 return self.store.write_batch(batch).await;
451 };
452
453 {
454 let mut cache = cache.lock().unwrap();
455 for operation in &batch.operations {
456 match operation {
457 WriteOperation::Put { key, value } => {
458 let cache_entry = CacheEntry::Value(value.to_vec());
459 cache.insert(key.to_vec(), cache_entry);
460 }
461 WriteOperation::Delete { key } => {
462 let cache_entry = CacheEntry::DoesNotExist;
463 cache.insert(key.to_vec(), cache_entry);
464 }
465 WriteOperation::DeletePrefix { key_prefix } => {
466 cache.delete_prefix(key_prefix);
467 }
468 }
469 }
470 }
471 self.store.write_batch(batch).await
472 }
473
474 async fn clear_journal(&self) -> Result<(), Self::Error> {
475 self.store.clear_journal().await
476 }
477}
478
479#[derive(Debug, Clone, Serialize, Deserialize)]
481pub struct LruCachingConfig<C> {
482 pub inner_config: C,
484 pub storage_cache_config: StorageCacheConfig,
486}
487
488impl<D> KeyValueDatabase for LruCachingDatabase<D>
489where
490 D: KeyValueDatabase,
491{
492 type Config = LruCachingConfig<D::Config>;
493
494 type Store = LruCachingStore<D::Store>;
495
496 fn get_name() -> String {
497 format!("lru caching {}", D::get_name())
498 }
499
500 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
501 let database = D::connect(&config.inner_config, namespace).await?;
502 Ok(LruCachingDatabase {
503 database,
504 config: config.storage_cache_config.clone(),
505 })
506 }
507
508 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
509 let store = self.database.open_shared(root_key)?;
510 let store = LruCachingStore::new(
511 store,
512 self.config.clone(),
513 false,
514 );
515 Ok(store)
516 }
517
518 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
519 let store = self.database.open_exclusive(root_key)?;
520 let store = LruCachingStore::new(
521 store,
522 self.config.clone(),
523 true,
524 );
525 Ok(store)
526 }
527
528 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
529 D::list_all(&config.inner_config).await
530 }
531
532 async fn list_root_keys(
533 config: &Self::Config,
534 namespace: &str,
535 ) -> Result<Vec<Vec<u8>>, Self::Error> {
536 D::list_root_keys(&config.inner_config, namespace).await
537 }
538
539 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
540 D::delete_all(&config.inner_config).await
541 }
542
543 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
544 D::exists(&config.inner_config, namespace).await
545 }
546
547 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
548 D::create(&config.inner_config, namespace).await
549 }
550
551 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
552 D::delete(&config.inner_config, namespace).await
553 }
554}
555
556impl<S> LruCachingStore<S> {
557 fn new(store: S, config: StorageCacheConfig, has_exclusive_access: bool) -> Self {
559 let cache = {
560 if config.max_cache_entries == 0 {
561 None
562 } else {
563 Some(Arc::new(Mutex::new(LruPrefixCache::new(
564 config,
565 has_exclusive_access,
566 ))))
567 }
568 };
569 Self { store, cache }
570 }
571}
572
573#[cfg(with_testing)]
575pub type LruCachingMemoryDatabase = LruCachingDatabase<MemoryDatabase>;
576
577#[cfg(with_testing)]
578impl<D> TestKeyValueDatabase for LruCachingDatabase<D>
579where
580 D: TestKeyValueDatabase,
581{
582 async fn new_test_config() -> Result<LruCachingConfig<D::Config>, D::Error> {
583 let inner_config = D::new_test_config().await?;
584 let storage_cache_config = DEFAULT_STORAGE_CACHE_CONFIG;
585 Ok(LruCachingConfig {
586 inner_config,
587 storage_cache_config,
588 })
589 }
590}