linera_views/backends/
lru_caching.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Add LRU (least recently used) caching to a given store.
5
6use std::{
7    collections::{btree_map, hash_map::RandomState, BTreeMap},
8    sync::{Arc, Mutex},
9};
10
11use linked_hash_map::LinkedHashMap;
12use serde::{Deserialize, Serialize};
13
14#[cfg(with_testing)]
15use crate::memory::MemoryDatabase;
16#[cfg(with_testing)]
17use crate::store::TestKeyValueDatabase;
18use crate::{
19    batch::{Batch, WriteOperation},
20    common::get_interval,
21    store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
22};
23
24#[cfg(with_metrics)]
25mod metrics {
26    use std::sync::LazyLock;
27
28    use linera_base::prometheus_util::register_int_counter_vec;
29    use prometheus::IntCounterVec;
30
31    /// The total number of cache read value misses
32    pub static READ_VALUE_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
33        register_int_counter_vec(
34            "num_read_value_cache_miss",
35            "Number of read value cache misses",
36            &[],
37        )
38    });
39
40    /// The total number of read value cache hits
41    pub static READ_VALUE_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
42        register_int_counter_vec(
43            "num_read_value_cache_hits",
44            "Number of read value cache hits",
45            &[],
46        )
47    });
48
49    /// The total number of contains key cache misses
50    pub static CONTAINS_KEY_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
51        register_int_counter_vec(
52            "num_contains_key_cache_miss",
53            "Number of contains key cache misses",
54            &[],
55        )
56    });
57
58    /// The total number of contains key cache hits
59    pub static CONTAINS_KEY_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
60        register_int_counter_vec(
61            "num_contains_key_cache_hit",
62            "Number of contains key cache hits",
63            &[],
64        )
65    });
66}
67
68/// The parametrization of the cache.
69#[derive(Clone, Debug, Serialize, Deserialize)]
70pub struct StorageCacheConfig {
71    /// The maximum size of the cache, in bytes (keys size + value sizes)
72    pub max_cache_size: usize,
73    /// The maximum size of an entry size, in bytes
74    pub max_entry_size: usize,
75    /// The maximum number of entries in the cache.
76    pub max_cache_entries: usize,
77}
78
79/// The maximum number of entries in the cache.
80/// If the number of entries in the cache is too large then the underlying maps
81/// become the limiting factor
82pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
83    max_cache_size: 10000000,
84    max_entry_size: 1000000,
85    max_cache_entries: 1000,
86};
87
88enum CacheEntry {
89    DoesNotExist,
90    Exists,
91    Value(Vec<u8>),
92}
93
94impl CacheEntry {
95    fn size(&self) -> usize {
96        match self {
97            CacheEntry::Value(vec) => vec.len(),
98            _ => 0,
99        }
100    }
101}
102
103/// Stores the data for simple `read_values` queries.
104///
105/// This data structure is inspired by the crate `lru-cache` but was modified to support
106/// range deletions.
107struct LruPrefixCache {
108    map: BTreeMap<Vec<u8>, CacheEntry>,
109    queue: LinkedHashMap<Vec<u8>, usize, RandomState>,
110    config: StorageCacheConfig,
111    total_size: usize,
112    /// Whether we have exclusive R/W access to the keys under the root key of the store.
113    has_exclusive_access: bool,
114}
115
116impl LruPrefixCache {
117    /// Creates an `LruPrefixCache`.
118    fn new(config: StorageCacheConfig, has_exclusive_access: bool) -> Self {
119        Self {
120            map: BTreeMap::new(),
121            queue: LinkedHashMap::new(),
122            config,
123            total_size: 0,
124            has_exclusive_access,
125        }
126    }
127
128    /// Trim the cache so that it fits within the constraints.
129    fn trim_cache(&mut self) {
130        while self.total_size > self.config.max_cache_size
131            || self.queue.len() > self.config.max_cache_entries
132        {
133            let Some((key, key_value_size)) = self.queue.pop_front() else {
134                break;
135            };
136            self.map.remove(&key);
137            self.total_size -= key_value_size;
138        }
139    }
140
141    /// Inserts an entry into the cache.
142    fn insert(&mut self, key: Vec<u8>, cache_entry: CacheEntry) {
143        let key_value_size = key.len() + cache_entry.size();
144        if (matches!(cache_entry, CacheEntry::DoesNotExist) && !self.has_exclusive_access)
145            || key_value_size > self.config.max_entry_size
146        {
147            // Just forget about the entry.
148            if let Some(old_key_value_size) = self.queue.remove(&key) {
149                self.total_size -= old_key_value_size;
150                self.map.remove(&key);
151            };
152            return;
153        }
154        match self.map.entry(key.clone()) {
155            btree_map::Entry::Occupied(mut entry) => {
156                entry.insert(cache_entry);
157                // Put it on first position for LRU
158                let old_key_value_size = self.queue.remove(&key).expect("old_key_value_size");
159                self.total_size -= old_key_value_size;
160                self.queue.insert(key, key_value_size);
161                self.total_size += key_value_size;
162            }
163            btree_map::Entry::Vacant(entry) => {
164                entry.insert(cache_entry);
165                self.queue.insert(key, key_value_size);
166                self.total_size += key_value_size;
167            }
168        }
169        self.trim_cache();
170    }
171
172    /// Inserts a read_value entry into the cache.
173    fn insert_read_value(&mut self, key: Vec<u8>, value: &Option<Vec<u8>>) {
174        let cache_entry = match value {
175            None => CacheEntry::DoesNotExist,
176            Some(vec) => CacheEntry::Value(vec.to_vec()),
177        };
178        self.insert(key, cache_entry)
179    }
180
181    /// Inserts a read_value entry into the cache.
182    fn insert_contains_key(&mut self, key: Vec<u8>, result: bool) {
183        let cache_entry = match result {
184            false => CacheEntry::DoesNotExist,
185            true => CacheEntry::Exists,
186        };
187        self.insert(key, cache_entry)
188    }
189
190    /// Marks cached keys that match the prefix as deleted. Importantly, this does not
191    /// create new entries in the cache.
192    fn delete_prefix(&mut self, key_prefix: &[u8]) {
193        if self.has_exclusive_access {
194            for (key, value) in self.map.range_mut(get_interval(key_prefix.to_vec())) {
195                *self.queue.get_mut(key).unwrap() = key.len();
196                self.total_size -= value.size();
197                *value = CacheEntry::DoesNotExist;
198            }
199        } else {
200            // Just forget about the entries.
201            let mut keys = Vec::new();
202            for (key, _) in self.map.range(get_interval(key_prefix.to_vec())) {
203                keys.push(key.to_vec());
204            }
205            for key in keys {
206                self.map.remove(&key);
207                let Some(key_value_size) = self.queue.remove(&key) else {
208                    unreachable!("The key should be in the queue");
209                };
210                self.total_size -= key_value_size;
211            }
212        }
213    }
214
215    /// Returns the cached value, or `Some(None)` if the entry does not exist in the
216    /// database. If `None` is returned, the entry might exist in the database but is
217    /// not in the cache.
218    fn query_read_value(&mut self, key: &[u8]) -> Option<Option<Vec<u8>>> {
219        let result = match self.map.get(key) {
220            None => None,
221            Some(entry) => match entry {
222                CacheEntry::DoesNotExist => Some(None),
223                CacheEntry::Exists => None,
224                CacheEntry::Value(vec) => Some(Some(vec.clone())),
225            },
226        };
227        if result.is_some() {
228            // Put back the key on top
229            let key_value_size = self.queue.remove(key).expect("key_value_size");
230            self.queue.insert(key.to_vec(), key_value_size);
231        }
232        result
233    }
234
235    /// Returns `Some(true)` or `Some(false)` if we know that the entry does or does not
236    /// exist in the database. Returns `None` if that information is not in the cache.
237    fn query_contains_key(&mut self, key: &[u8]) -> Option<bool> {
238        let result = self
239            .map
240            .get(key)
241            .map(|entry| !matches!(entry, CacheEntry::DoesNotExist));
242        if result.is_some() {
243            // Put back the key on top
244            let key_value_size = self.queue.remove(key).expect("key_value_size");
245            self.queue.insert(key.to_vec(), key_value_size);
246        }
247        result
248    }
249}
250
251/// A key-value database with added LRU caching.
252#[derive(Clone)]
253pub struct LruCachingDatabase<D> {
254    /// The inner store that is called by the LRU cache one
255    database: D,
256    /// The configuration.
257    config: StorageCacheConfig,
258}
259
260/// A key-value store with added LRU caching.
261#[derive(Clone)]
262pub struct LruCachingStore<S> {
263    /// The inner store that is called by the LRU cache one
264    store: S,
265    /// The LRU cache of values.
266    cache: Option<Arc<Mutex<LruPrefixCache>>>,
267}
268
269impl<D> WithError for LruCachingDatabase<D>
270where
271    D: WithError,
272{
273    type Error = D::Error;
274}
275
276impl<S> WithError for LruCachingStore<S>
277where
278    S: WithError,
279{
280    type Error = S::Error;
281}
282
283impl<K> ReadableKeyValueStore for LruCachingStore<K>
284where
285    K: ReadableKeyValueStore,
286{
287    // The LRU cache does not change the underlying store's size limits.
288    const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
289
290    fn max_stream_queries(&self) -> usize {
291        self.store.max_stream_queries()
292    }
293
294    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
295        let Some(cache) = &self.cache else {
296            return self.store.read_value_bytes(key).await;
297        };
298        // First inquiring in the read_value_bytes LRU
299        {
300            let mut cache = cache.lock().unwrap();
301            if let Some(value) = cache.query_read_value(key) {
302                #[cfg(with_metrics)]
303                metrics::READ_VALUE_CACHE_HIT_COUNT
304                    .with_label_values(&[])
305                    .inc();
306                return Ok(value);
307            }
308        }
309        #[cfg(with_metrics)]
310        metrics::READ_VALUE_CACHE_MISS_COUNT
311            .with_label_values(&[])
312            .inc();
313        let value = self.store.read_value_bytes(key).await?;
314        let mut cache = cache.lock().unwrap();
315        cache.insert_read_value(key.to_vec(), &value);
316        Ok(value)
317    }
318
319    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
320        let Some(cache) = &self.cache else {
321            return self.store.contains_key(key).await;
322        };
323        {
324            let mut cache = cache.lock().unwrap();
325            if let Some(value) = cache.query_contains_key(key) {
326                #[cfg(with_metrics)]
327                metrics::CONTAINS_KEY_CACHE_HIT_COUNT
328                    .with_label_values(&[])
329                    .inc();
330                return Ok(value);
331            }
332        }
333        #[cfg(with_metrics)]
334        metrics::CONTAINS_KEY_CACHE_MISS_COUNT
335            .with_label_values(&[])
336            .inc();
337        let result = self.store.contains_key(key).await?;
338        let mut cache = cache.lock().unwrap();
339        cache.insert_contains_key(key.to_vec(), result);
340        Ok(result)
341    }
342
343    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
344        let Some(cache) = &self.cache else {
345            return self.store.contains_keys(keys).await;
346        };
347        let size = keys.len();
348        let mut results = vec![false; size];
349        let mut indices = Vec::new();
350        let mut key_requests = Vec::new();
351        {
352            let mut cache = cache.lock().unwrap();
353            for i in 0..size {
354                if let Some(value) = cache.query_contains_key(&keys[i]) {
355                    #[cfg(with_metrics)]
356                    metrics::CONTAINS_KEY_CACHE_HIT_COUNT
357                        .with_label_values(&[])
358                        .inc();
359                    results[i] = value;
360                } else {
361                    #[cfg(with_metrics)]
362                    metrics::CONTAINS_KEY_CACHE_MISS_COUNT
363                        .with_label_values(&[])
364                        .inc();
365                    indices.push(i);
366                    key_requests.push(keys[i].clone());
367                }
368            }
369        }
370        if !key_requests.is_empty() {
371            let key_results = self.store.contains_keys(key_requests.clone()).await?;
372            let mut cache = cache.lock().unwrap();
373            for ((index, result), key) in indices.into_iter().zip(key_results).zip(key_requests) {
374                results[index] = result;
375                cache.insert_contains_key(key, result);
376            }
377        }
378        Ok(results)
379    }
380
381    async fn read_multi_values_bytes(
382        &self,
383        keys: Vec<Vec<u8>>,
384    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
385        let Some(cache) = &self.cache else {
386            return self.store.read_multi_values_bytes(keys).await;
387        };
388
389        let mut result = Vec::with_capacity(keys.len());
390        let mut cache_miss_indices = Vec::new();
391        let mut miss_keys = Vec::new();
392        {
393            let mut cache = cache.lock().unwrap();
394            for (i, key) in keys.into_iter().enumerate() {
395                if let Some(value) = cache.query_read_value(&key) {
396                    #[cfg(with_metrics)]
397                    metrics::READ_VALUE_CACHE_HIT_COUNT
398                        .with_label_values(&[])
399                        .inc();
400                    result.push(value);
401                } else {
402                    #[cfg(with_metrics)]
403                    metrics::READ_VALUE_CACHE_MISS_COUNT
404                        .with_label_values(&[])
405                        .inc();
406                    result.push(None);
407                    cache_miss_indices.push(i);
408                    miss_keys.push(key);
409                }
410            }
411        }
412        if !miss_keys.is_empty() {
413            let values = self
414                .store
415                .read_multi_values_bytes(miss_keys.clone())
416                .await?;
417            let mut cache = cache.lock().unwrap();
418            for (i, (key, value)) in cache_miss_indices
419                .into_iter()
420                .zip(miss_keys.into_iter().zip(values))
421            {
422                cache.insert_read_value(key, &value);
423                result[i] = value;
424            }
425        }
426        Ok(result)
427    }
428
429    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
430        self.store.find_keys_by_prefix(key_prefix).await
431    }
432
433    async fn find_key_values_by_prefix(
434        &self,
435        key_prefix: &[u8],
436    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
437        self.store.find_key_values_by_prefix(key_prefix).await
438    }
439}
440
441impl<K> WritableKeyValueStore for LruCachingStore<K>
442where
443    K: WritableKeyValueStore,
444{
445    // The LRU cache does not change the underlying store's size limits.
446    const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
447
448    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
449        let Some(cache) = &self.cache else {
450            return self.store.write_batch(batch).await;
451        };
452
453        {
454            let mut cache = cache.lock().unwrap();
455            for operation in &batch.operations {
456                match operation {
457                    WriteOperation::Put { key, value } => {
458                        let cache_entry = CacheEntry::Value(value.to_vec());
459                        cache.insert(key.to_vec(), cache_entry);
460                    }
461                    WriteOperation::Delete { key } => {
462                        let cache_entry = CacheEntry::DoesNotExist;
463                        cache.insert(key.to_vec(), cache_entry);
464                    }
465                    WriteOperation::DeletePrefix { key_prefix } => {
466                        cache.delete_prefix(key_prefix);
467                    }
468                }
469            }
470        }
471        self.store.write_batch(batch).await
472    }
473
474    async fn clear_journal(&self) -> Result<(), Self::Error> {
475        self.store.clear_journal().await
476    }
477}
478
479/// The configuration type for the `LruCachingStore`.
480#[derive(Debug, Clone, Serialize, Deserialize)]
481pub struct LruCachingConfig<C> {
482    /// The inner configuration of the `LruCachingStore`.
483    pub inner_config: C,
484    /// The cache size being used
485    pub storage_cache_config: StorageCacheConfig,
486}
487
488impl<D> KeyValueDatabase for LruCachingDatabase<D>
489where
490    D: KeyValueDatabase,
491{
492    type Config = LruCachingConfig<D::Config>;
493
494    type Store = LruCachingStore<D::Store>;
495
496    fn get_name() -> String {
497        format!("lru caching {}", D::get_name())
498    }
499
500    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
501        let database = D::connect(&config.inner_config, namespace).await?;
502        Ok(LruCachingDatabase {
503            database,
504            config: config.storage_cache_config.clone(),
505        })
506    }
507
508    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
509        let store = self.database.open_shared(root_key)?;
510        let store = LruCachingStore::new(
511            store,
512            self.config.clone(),
513            /* has_exclusive_access */ false,
514        );
515        Ok(store)
516    }
517
518    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
519        let store = self.database.open_exclusive(root_key)?;
520        let store = LruCachingStore::new(
521            store,
522            self.config.clone(),
523            /* has_exclusive_access */ true,
524        );
525        Ok(store)
526    }
527
528    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
529        D::list_all(&config.inner_config).await
530    }
531
532    async fn list_root_keys(
533        config: &Self::Config,
534        namespace: &str,
535    ) -> Result<Vec<Vec<u8>>, Self::Error> {
536        D::list_root_keys(&config.inner_config, namespace).await
537    }
538
539    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
540        D::delete_all(&config.inner_config).await
541    }
542
543    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
544        D::exists(&config.inner_config, namespace).await
545    }
546
547    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
548        D::create(&config.inner_config, namespace).await
549    }
550
551    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
552        D::delete(&config.inner_config, namespace).await
553    }
554}
555
556impl<S> LruCachingStore<S> {
557    /// Creates a new key-value store that provides LRU caching at top of the given store.
558    fn new(store: S, config: StorageCacheConfig, has_exclusive_access: bool) -> Self {
559        let cache = {
560            if config.max_cache_entries == 0 {
561                None
562            } else {
563                Some(Arc::new(Mutex::new(LruPrefixCache::new(
564                    config,
565                    has_exclusive_access,
566                ))))
567            }
568        };
569        Self { store, cache }
570    }
571}
572
573/// A memory darabase with caching.
574#[cfg(with_testing)]
575pub type LruCachingMemoryDatabase = LruCachingDatabase<MemoryDatabase>;
576
577#[cfg(with_testing)]
578impl<D> TestKeyValueDatabase for LruCachingDatabase<D>
579where
580    D: TestKeyValueDatabase,
581{
582    async fn new_test_config() -> Result<LruCachingConfig<D::Config>, D::Error> {
583        let inner_config = D::new_test_config().await?;
584        let storage_cache_config = DEFAULT_STORAGE_CACHE_CONFIG;
585        Ok(LruCachingConfig {
586            inner_config,
587            storage_cache_config,
588        })
589    }
590}