linera_views/backends/
lru_caching.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Add LRU (least recently used) caching to a given store.
5
6use std::{
7    collections::{btree_map, hash_map::RandomState, BTreeMap},
8    sync::{Arc, Mutex},
9};
10
11use linked_hash_map::LinkedHashMap;
12use serde::{Deserialize, Serialize};
13
14use crate::{
15    batch::{Batch, WriteOperation},
16    common::get_interval,
17    store::{AdminKeyValueStore, ReadableKeyValueStore, WithError, WritableKeyValueStore},
18};
19#[cfg(with_testing)]
20use crate::{memory::MemoryStore, store::TestKeyValueStore};
21
22#[cfg(with_metrics)]
23mod metrics {
24    use std::sync::LazyLock;
25
26    use linera_base::prometheus_util::register_int_counter_vec;
27    use prometheus::IntCounterVec;
28
29    /// The total number of cache read value misses
30    pub static READ_VALUE_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
31        register_int_counter_vec(
32            "num_read_value_cache_miss",
33            "Number of read value cache misses",
34            &[],
35        )
36    });
37
38    /// The total number of read value cache hits
39    pub static READ_VALUE_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
40        register_int_counter_vec(
41            "num_read_value_cache_hits",
42            "Number of read value cache hits",
43            &[],
44        )
45    });
46
47    /// The total number of contains key cache misses
48    pub static CONTAINS_KEY_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
49        register_int_counter_vec(
50            "num_contains_key_cache_miss",
51            "Number of contains key cache misses",
52            &[],
53        )
54    });
55
56    /// The total number of contains key cache hits
57    pub static CONTAINS_KEY_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
58        register_int_counter_vec(
59            "num_contains_key_cache_hit",
60            "Number of contains key cache hits",
61            &[],
62        )
63    });
64}
65
66/// The parametrization of the cache
67#[derive(Clone, Debug, Serialize, Deserialize)]
68pub struct StorageCacheConfig {
69    /// The maximum size of the cache, in bytes (keys size + value sizes)
70    pub max_cache_size: usize,
71    /// The maximum size of an entry size, in bytes
72    pub max_entry_size: usize,
73    /// The maximum number of entries in the cache.
74    pub max_cache_entries: usize,
75}
76
77/// The maximum number of entries in the cache.
78/// If the number of entries in the cache is too large then the underlying maps
79/// become the limiting factor
80pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
81    max_cache_size: 10000000,
82    max_entry_size: 1000000,
83    max_cache_entries: 1000,
84};
85
86enum CacheEntry {
87    DoesNotExist,
88    Exists,
89    Value(Vec<u8>),
90}
91
92impl CacheEntry {
93    fn size(&self) -> usize {
94        match self {
95            CacheEntry::Value(vec) => vec.len(),
96            _ => 0,
97        }
98    }
99}
100
101/// Stores the data for simple `read_values` queries.
102///
103/// This data structure is inspired by the crate `lru-cache` but was modified to support
104/// range deletions.
105struct LruPrefixCache {
106    map: BTreeMap<Vec<u8>, CacheEntry>,
107    queue: LinkedHashMap<Vec<u8>, usize, RandomState>,
108    storage_cache_config: StorageCacheConfig,
109    total_size: usize,
110    /// Whether we have exclusive R/W access to the keys under the root key of the store.
111    has_exclusive_access: bool,
112}
113
114impl LruPrefixCache {
115    /// Creates an `LruPrefixCache`.
116    pub fn new(storage_cache_config: StorageCacheConfig) -> Self {
117        Self {
118            map: BTreeMap::new(),
119            queue: LinkedHashMap::new(),
120            storage_cache_config,
121            total_size: 0,
122            has_exclusive_access: false,
123        }
124    }
125
126    /// Trim the cache so that it fits within the constraints.
127    fn trim_cache(&mut self) {
128        while self.total_size > self.storage_cache_config.max_cache_size
129            || self.queue.len() > self.storage_cache_config.max_cache_entries
130        {
131            let Some((key, key_value_size)) = self.queue.pop_front() else {
132                break;
133            };
134            self.map.remove(&key);
135            self.total_size -= key_value_size;
136        }
137    }
138
139    /// Inserts an entry into the cache.
140    pub fn insert(&mut self, key: Vec<u8>, cache_entry: CacheEntry) {
141        let key_value_size = key.len() + cache_entry.size();
142        if (matches!(cache_entry, CacheEntry::DoesNotExist) && !self.has_exclusive_access)
143            || key_value_size > self.storage_cache_config.max_entry_size
144        {
145            // Just forget about the entry.
146            if let Some(old_key_value_size) = self.queue.remove(&key) {
147                self.total_size -= old_key_value_size;
148                self.map.remove(&key);
149            };
150            return;
151        }
152        match self.map.entry(key.clone()) {
153            btree_map::Entry::Occupied(mut entry) => {
154                entry.insert(cache_entry);
155                // Put it on first position for LRU
156                let old_key_value_size = self.queue.remove(&key).expect("old_key_value_size");
157                self.total_size -= old_key_value_size;
158                self.queue.insert(key, key_value_size);
159                self.total_size += key_value_size;
160            }
161            btree_map::Entry::Vacant(entry) => {
162                entry.insert(cache_entry);
163                self.queue.insert(key, key_value_size);
164                self.total_size += key_value_size;
165            }
166        }
167        self.trim_cache();
168    }
169
170    /// Inserts a read_value entry into the cache.
171    pub fn insert_read_value(&mut self, key: Vec<u8>, value: &Option<Vec<u8>>) {
172        let cache_entry = match value {
173            None => CacheEntry::DoesNotExist,
174            Some(vec) => CacheEntry::Value(vec.to_vec()),
175        };
176        self.insert(key, cache_entry)
177    }
178
179    /// Inserts a read_value entry into the cache.
180    pub fn insert_contains_key(&mut self, key: Vec<u8>, result: bool) {
181        let cache_entry = match result {
182            false => CacheEntry::DoesNotExist,
183            true => CacheEntry::Exists,
184        };
185        self.insert(key, cache_entry)
186    }
187
188    /// Marks cached keys that match the prefix as deleted. Importantly, this does not
189    /// create new entries in the cache.
190    pub fn delete_prefix(&mut self, key_prefix: &[u8]) {
191        if self.has_exclusive_access {
192            for (key, value) in self.map.range_mut(get_interval(key_prefix.to_vec())) {
193                *self.queue.get_mut(key).unwrap() = key.len();
194                self.total_size -= value.size();
195                *value = CacheEntry::DoesNotExist;
196            }
197        } else {
198            // Just forget about the entries.
199            let mut keys = Vec::new();
200            for (key, _) in self.map.range(get_interval(key_prefix.to_vec())) {
201                keys.push(key.to_vec());
202            }
203            for key in keys {
204                self.map.remove(&key);
205                let Some(key_value_size) = self.queue.remove(&key) else {
206                    unreachable!("The key should be in the queue");
207                };
208                self.total_size -= key_value_size;
209            }
210        }
211    }
212
213    /// Returns the cached value, or `Some(None)` if the entry does not exist in the
214    /// database. If `None` is returned, the entry might exist in the database but is
215    /// not in the cache.
216    pub fn query_read_value(&mut self, key: &[u8]) -> Option<Option<Vec<u8>>> {
217        let result = match self.map.get(key) {
218            None => None,
219            Some(entry) => match entry {
220                CacheEntry::DoesNotExist => Some(None),
221                CacheEntry::Exists => None,
222                CacheEntry::Value(vec) => Some(Some(vec.clone())),
223            },
224        };
225        if result.is_some() {
226            // Put back the key on top
227            let key_value_size = self.queue.remove(key).expect("key_value_size");
228            self.queue.insert(key.to_vec(), key_value_size);
229        }
230        result
231    }
232
233    /// Returns `Some(true)` or `Some(false)` if we know that the entry does or does not
234    /// exist in the database. Returns `None` if that information is not in the cache.
235    pub fn query_contains_key(&mut self, key: &[u8]) -> Option<bool> {
236        let result = self
237            .map
238            .get(key)
239            .map(|entry| !matches!(entry, CacheEntry::DoesNotExist));
240        if result.is_some() {
241            // Put back the key on top
242            let key_value_size = self.queue.remove(key).expect("key_value_size");
243            self.queue.insert(key.to_vec(), key_value_size);
244        }
245        result
246    }
247}
248
249/// We take a store, a maximum size and build a LRU-based system.
250#[derive(Clone)]
251pub struct LruCachingStore<K> {
252    /// The inner store that is called by the LRU cache one
253    store: K,
254    /// The LRU cache of values.
255    cache: Option<Arc<Mutex<LruPrefixCache>>>,
256}
257
258impl<K> WithError for LruCachingStore<K>
259where
260    K: WithError,
261{
262    type Error = K::Error;
263}
264
265impl<K> ReadableKeyValueStore for LruCachingStore<K>
266where
267    K: ReadableKeyValueStore,
268{
269    // The LRU cache does not change the underlying store's size limits.
270    const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
271    type Keys = K::Keys;
272    type KeyValues = K::KeyValues;
273
274    fn max_stream_queries(&self) -> usize {
275        self.store.max_stream_queries()
276    }
277
278    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
279        let Some(cache) = &self.cache else {
280            return self.store.read_value_bytes(key).await;
281        };
282        // First inquiring in the read_value_bytes LRU
283        {
284            let mut cache = cache.lock().unwrap();
285            if let Some(value) = cache.query_read_value(key) {
286                #[cfg(with_metrics)]
287                metrics::READ_VALUE_CACHE_HIT_COUNT
288                    .with_label_values(&[])
289                    .inc();
290                return Ok(value);
291            }
292        }
293        #[cfg(with_metrics)]
294        metrics::READ_VALUE_CACHE_MISS_COUNT
295            .with_label_values(&[])
296            .inc();
297        let value = self.store.read_value_bytes(key).await?;
298        let mut cache = cache.lock().unwrap();
299        cache.insert_read_value(key.to_vec(), &value);
300        Ok(value)
301    }
302
303    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
304        let Some(cache) = &self.cache else {
305            return self.store.contains_key(key).await;
306        };
307        {
308            let mut cache = cache.lock().unwrap();
309            if let Some(value) = cache.query_contains_key(key) {
310                #[cfg(with_metrics)]
311                metrics::CONTAINS_KEY_CACHE_HIT_COUNT
312                    .with_label_values(&[])
313                    .inc();
314                return Ok(value);
315            }
316        }
317        #[cfg(with_metrics)]
318        metrics::CONTAINS_KEY_CACHE_MISS_COUNT
319            .with_label_values(&[])
320            .inc();
321        let result = self.store.contains_key(key).await?;
322        let mut cache = cache.lock().unwrap();
323        cache.insert_contains_key(key.to_vec(), result);
324        Ok(result)
325    }
326
327    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
328        let Some(cache) = &self.cache else {
329            return self.store.contains_keys(keys).await;
330        };
331        let size = keys.len();
332        let mut results = vec![false; size];
333        let mut indices = Vec::new();
334        let mut key_requests = Vec::new();
335        {
336            let mut cache = cache.lock().unwrap();
337            for i in 0..size {
338                if let Some(value) = cache.query_contains_key(&keys[i]) {
339                    #[cfg(with_metrics)]
340                    metrics::CONTAINS_KEY_CACHE_HIT_COUNT
341                        .with_label_values(&[])
342                        .inc();
343                    results[i] = value;
344                } else {
345                    #[cfg(with_metrics)]
346                    metrics::CONTAINS_KEY_CACHE_MISS_COUNT
347                        .with_label_values(&[])
348                        .inc();
349                    indices.push(i);
350                    key_requests.push(keys[i].clone());
351                }
352            }
353        }
354        if !key_requests.is_empty() {
355            let key_results = self.store.contains_keys(key_requests.clone()).await?;
356            let mut cache = cache.lock().unwrap();
357            for ((index, result), key) in indices.into_iter().zip(key_results).zip(key_requests) {
358                results[index] = result;
359                cache.insert_contains_key(key, result);
360            }
361        }
362        Ok(results)
363    }
364
365    async fn read_multi_values_bytes(
366        &self,
367        keys: Vec<Vec<u8>>,
368    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
369        let Some(cache) = &self.cache else {
370            return self.store.read_multi_values_bytes(keys).await;
371        };
372
373        let mut result = Vec::with_capacity(keys.len());
374        let mut cache_miss_indices = Vec::new();
375        let mut miss_keys = Vec::new();
376        {
377            let mut cache = cache.lock().unwrap();
378            for (i, key) in keys.into_iter().enumerate() {
379                if let Some(value) = cache.query_read_value(&key) {
380                    #[cfg(with_metrics)]
381                    metrics::READ_VALUE_CACHE_HIT_COUNT
382                        .with_label_values(&[])
383                        .inc();
384                    result.push(value);
385                } else {
386                    #[cfg(with_metrics)]
387                    metrics::READ_VALUE_CACHE_MISS_COUNT
388                        .with_label_values(&[])
389                        .inc();
390                    result.push(None);
391                    cache_miss_indices.push(i);
392                    miss_keys.push(key);
393                }
394            }
395        }
396        if !miss_keys.is_empty() {
397            let values = self
398                .store
399                .read_multi_values_bytes(miss_keys.clone())
400                .await?;
401            let mut cache = cache.lock().unwrap();
402            for (i, (key, value)) in cache_miss_indices
403                .into_iter()
404                .zip(miss_keys.into_iter().zip(values))
405            {
406                cache.insert_read_value(key, &value);
407                result[i] = value;
408            }
409        }
410        Ok(result)
411    }
412
413    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, Self::Error> {
414        self.store.find_keys_by_prefix(key_prefix).await
415    }
416
417    async fn find_key_values_by_prefix(
418        &self,
419        key_prefix: &[u8],
420    ) -> Result<Self::KeyValues, Self::Error> {
421        self.store.find_key_values_by_prefix(key_prefix).await
422    }
423}
424
425impl<K> WritableKeyValueStore for LruCachingStore<K>
426where
427    K: WritableKeyValueStore,
428{
429    // The LRU cache does not change the underlying store's size limits.
430    const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
431
432    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
433        let Some(cache) = &self.cache else {
434            return self.store.write_batch(batch).await;
435        };
436
437        {
438            let mut cache = cache.lock().unwrap();
439            for operation in &batch.operations {
440                match operation {
441                    WriteOperation::Put { key, value } => {
442                        let cache_entry = CacheEntry::Value(value.to_vec());
443                        cache.insert(key.to_vec(), cache_entry);
444                    }
445                    WriteOperation::Delete { key } => {
446                        let cache_entry = CacheEntry::DoesNotExist;
447                        cache.insert(key.to_vec(), cache_entry);
448                    }
449                    WriteOperation::DeletePrefix { key_prefix } => {
450                        cache.delete_prefix(key_prefix);
451                    }
452                }
453            }
454        }
455        self.store.write_batch(batch).await
456    }
457
458    async fn clear_journal(&self) -> Result<(), Self::Error> {
459        self.store.clear_journal().await
460    }
461}
462
463/// The configuration type for the `LruCachingStore`.
464#[derive(Debug, Clone, Serialize, Deserialize)]
465pub struct LruCachingConfig<C> {
466    /// The inner configuration of the `LruCachingStore`.
467    pub inner_config: C,
468    /// The cache size being used
469    pub storage_cache_config: StorageCacheConfig,
470}
471
472impl<K> AdminKeyValueStore for LruCachingStore<K>
473where
474    K: AdminKeyValueStore,
475{
476    type Config = LruCachingConfig<K::Config>;
477
478    fn get_name() -> String {
479        format!("lru caching {}", K::get_name())
480    }
481
482    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
483        let store = K::connect(&config.inner_config, namespace).await?;
484        Ok(LruCachingStore::new(
485            store,
486            config.storage_cache_config.clone(),
487        ))
488    }
489
490    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
491        let store = self.store.open_exclusive(root_key)?;
492        let store = LruCachingStore::new(store, self.storage_cache_config());
493        store.enable_exclusive_access();
494        Ok(store)
495    }
496
497    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
498        K::list_all(&config.inner_config).await
499    }
500
501    async fn list_root_keys(
502        config: &Self::Config,
503        namespace: &str,
504    ) -> Result<Vec<Vec<u8>>, Self::Error> {
505        K::list_root_keys(&config.inner_config, namespace).await
506    }
507
508    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
509        K::delete_all(&config.inner_config).await
510    }
511
512    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
513        K::exists(&config.inner_config, namespace).await
514    }
515
516    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
517        K::create(&config.inner_config, namespace).await
518    }
519
520    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
521        K::delete(&config.inner_config, namespace).await
522    }
523}
524
525#[cfg(with_testing)]
526impl<K> TestKeyValueStore for LruCachingStore<K>
527where
528    K: TestKeyValueStore,
529{
530    async fn new_test_config() -> Result<LruCachingConfig<K::Config>, K::Error> {
531        let inner_config = K::new_test_config().await?;
532        let storage_cache_config = DEFAULT_STORAGE_CACHE_CONFIG;
533        Ok(LruCachingConfig {
534            inner_config,
535            storage_cache_config,
536        })
537    }
538}
539
540impl<K> LruCachingStore<K> {
541    /// Creates a new key-value store that provides LRU caching at top of the given store.
542    pub fn new(store: K, storage_cache_config: StorageCacheConfig) -> Self {
543        let cache = {
544            if storage_cache_config.max_cache_entries == 0 {
545                None
546            } else {
547                Some(Arc::new(Mutex::new(LruPrefixCache::new(
548                    storage_cache_config,
549                ))))
550            }
551        };
552        Self { store, cache }
553    }
554
555    /// Gets the `cache_size`.
556    pub fn storage_cache_config(&self) -> StorageCacheConfig {
557        match &self.cache {
558            None => StorageCacheConfig {
559                max_cache_size: 0,
560                max_entry_size: 0,
561                max_cache_entries: 0,
562            },
563            Some(cache) => {
564                let cache = cache.lock().unwrap();
565                cache.storage_cache_config.clone()
566            }
567        }
568    }
569
570    /// Sets the value `has_exclusive_access` to `true`, if applicable.
571    pub fn enable_exclusive_access(&self) {
572        if let Some(cache) = &self.cache {
573            let mut cache = cache.lock().unwrap();
574            cache.has_exclusive_access = true;
575        }
576    }
577}
578
579/// A memory store with caching.
580#[cfg(with_testing)]
581pub type LruCachingMemoryStore = LruCachingStore<MemoryStore>;