linera_views/backends/
lru_caching.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Add LRU (least recently used) caching to a given store.
5
6use std::sync::{Arc, Mutex};
7
8use serde::{Deserialize, Serialize};
9
10#[cfg(with_testing)]
11use crate::memory::MemoryDatabase;
12#[cfg(with_testing)]
13use crate::store::TestKeyValueDatabase;
14use crate::{
15    batch::{Batch, WriteOperation},
16    lru_prefix_cache::{LruPrefixCache, StorageCacheConfig},
17    store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
18};
19
20#[cfg(with_metrics)]
21mod metrics {
22    use std::sync::LazyLock;
23
24    use linera_base::prometheus_util::register_int_counter_vec;
25    use prometheus::IntCounterVec;
26
27    /// The total number of cache read value misses.
28    pub static READ_VALUE_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
29        register_int_counter_vec(
30            "num_read_value_cache_miss",
31            "Number of read value cache misses",
32            &[],
33        )
34    });
35
36    /// The total number of read value cache hits.
37    pub static READ_VALUE_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
38        register_int_counter_vec(
39            "num_read_value_cache_hits",
40            "Number of read value cache hits",
41            &[],
42        )
43    });
44
45    /// The total number of contains key cache misses.
46    pub static CONTAINS_KEY_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
47        register_int_counter_vec(
48            "num_contains_key_cache_miss",
49            "Number of contains key cache misses",
50            &[],
51        )
52    });
53
54    /// The total number of contains key cache hits.
55    pub static CONTAINS_KEY_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
56        register_int_counter_vec(
57            "num_contains_key_cache_hit",
58            "Number of contains key cache hits",
59            &[],
60        )
61    });
62
63    /// The total number of find_keys_by_prefix cache misses.
64    pub static FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT: LazyLock<IntCounterVec> =
65        LazyLock::new(|| {
66            register_int_counter_vec(
67                "num_find_keys_by_prefix_cache_miss",
68                "Number of find keys by prefix cache misses",
69                &[],
70            )
71        });
72
73    /// The total number of find_keys_by_prefix cache hits.
74    pub static FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
75        register_int_counter_vec(
76            "num_find_keys_by_prefix_cache_hit",
77            "Number of find keys by prefix cache hits",
78            &[],
79        )
80    });
81
82    /// The total number of find_key_values_by_prefix cache misses.
83    pub static FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT: LazyLock<IntCounterVec> =
84        LazyLock::new(|| {
85            register_int_counter_vec(
86                "num_find_key_values_by_prefix_cache_miss",
87                "Number of find key values by prefix cache misses",
88                &[],
89            )
90        });
91
92    /// The total number of find_key_values_by_prefix cache hits.
93    pub static FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT: LazyLock<IntCounterVec> =
94        LazyLock::new(|| {
95            register_int_counter_vec(
96                "num_find_key_values_by_prefix_cache_hit",
97                "Number of find key values by prefix cache hits",
98                &[],
99            )
100        });
101}
102
103/// The maximum number of entries in the cache.
104/// If the number of entries in the cache is too large then the underlying maps
105/// become the limiting factor.
106pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
107    max_cache_size: 10000000,
108    max_value_entry_size: 1000000,
109    max_find_keys_entry_size: 1000000,
110    max_find_key_values_entry_size: 1000000,
111    max_cache_entries: 1000,
112    max_cache_value_size: 10000000,
113    max_cache_find_keys_size: 10000000,
114    max_cache_find_key_values_size: 10000000,
115};
116
117/// A key-value database with added LRU caching.
118#[derive(Clone)]
119pub struct LruCachingDatabase<D> {
120    /// The inner store that is called by the LRU cache one.
121    database: D,
122    /// The configuration.
123    config: StorageCacheConfig,
124}
125
126/// A key-value store with added LRU caching.
127#[derive(Clone)]
128pub struct LruCachingStore<S> {
129    /// The inner store that is called by the LRU cache one.
130    store: S,
131    /// The LRU cache of values.
132    cache: Option<Arc<Mutex<LruPrefixCache>>>,
133}
134
135impl<D> WithError for LruCachingDatabase<D>
136where
137    D: WithError,
138{
139    type Error = D::Error;
140}
141
142impl<S> WithError for LruCachingStore<S>
143where
144    S: WithError,
145{
146    type Error = S::Error;
147}
148
149impl<K> ReadableKeyValueStore for LruCachingStore<K>
150where
151    K: ReadableKeyValueStore,
152{
153    // The LRU cache does not change the underlying store's size limits.
154    const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
155
156    fn max_stream_queries(&self) -> usize {
157        self.store.max_stream_queries()
158    }
159
160    fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
161        self.store.root_key()
162    }
163
164    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
165        let Some(cache) = &self.cache else {
166            return self.store.read_value_bytes(key).await;
167        };
168        // First inquiring in the read_value_bytes LRU
169        {
170            let mut cache = cache.lock().unwrap();
171            if let Some(value) = cache.query_read_value(key) {
172                #[cfg(with_metrics)]
173                metrics::READ_VALUE_CACHE_HIT_COUNT
174                    .with_label_values(&[])
175                    .inc();
176                return Ok(value);
177            }
178        }
179        #[cfg(with_metrics)]
180        metrics::READ_VALUE_CACHE_MISS_COUNT
181            .with_label_values(&[])
182            .inc();
183        let value = self.store.read_value_bytes(key).await?;
184        let mut cache = cache.lock().unwrap();
185        cache.insert_read_value(key, &value);
186        Ok(value)
187    }
188
189    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
190        let Some(cache) = &self.cache else {
191            return self.store.contains_key(key).await;
192        };
193        {
194            let mut cache = cache.lock().unwrap();
195            if let Some(value) = cache.query_contains_key(key) {
196                #[cfg(with_metrics)]
197                metrics::CONTAINS_KEY_CACHE_HIT_COUNT
198                    .with_label_values(&[])
199                    .inc();
200                return Ok(value);
201            }
202        }
203        #[cfg(with_metrics)]
204        metrics::CONTAINS_KEY_CACHE_MISS_COUNT
205            .with_label_values(&[])
206            .inc();
207        let result = self.store.contains_key(key).await?;
208        let mut cache = cache.lock().unwrap();
209        cache.insert_contains_key(key, result);
210        Ok(result)
211    }
212
213    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
214        let Some(cache) = &self.cache else {
215            return self.store.contains_keys(keys).await;
216        };
217        let size = keys.len();
218        let mut results = vec![false; size];
219        let mut indices = Vec::new();
220        let mut key_requests = Vec::new();
221        {
222            let mut cache = cache.lock().unwrap();
223            for i in 0..size {
224                if let Some(value) = cache.query_contains_key(&keys[i]) {
225                    #[cfg(with_metrics)]
226                    metrics::CONTAINS_KEY_CACHE_HIT_COUNT
227                        .with_label_values(&[])
228                        .inc();
229                    results[i] = value;
230                } else {
231                    #[cfg(with_metrics)]
232                    metrics::CONTAINS_KEY_CACHE_MISS_COUNT
233                        .with_label_values(&[])
234                        .inc();
235                    indices.push(i);
236                    key_requests.push(keys[i].clone());
237                }
238            }
239        }
240        if !key_requests.is_empty() {
241            let key_results = self.store.contains_keys(key_requests.clone()).await?;
242            let mut cache = cache.lock().unwrap();
243            for ((index, result), key) in indices.into_iter().zip(key_results).zip(key_requests) {
244                results[index] = result;
245                cache.insert_contains_key(&key, result);
246            }
247        }
248        Ok(results)
249    }
250
251    async fn read_multi_values_bytes(
252        &self,
253        keys: Vec<Vec<u8>>,
254    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
255        let Some(cache) = &self.cache else {
256            return self.store.read_multi_values_bytes(keys).await;
257        };
258
259        let mut result = Vec::with_capacity(keys.len());
260        let mut cache_miss_indices = Vec::new();
261        let mut miss_keys = Vec::new();
262        {
263            let mut cache = cache.lock().unwrap();
264            for (i, key) in keys.into_iter().enumerate() {
265                if let Some(value) = cache.query_read_value(&key) {
266                    #[cfg(with_metrics)]
267                    metrics::READ_VALUE_CACHE_HIT_COUNT
268                        .with_label_values(&[])
269                        .inc();
270                    result.push(value);
271                } else {
272                    #[cfg(with_metrics)]
273                    metrics::READ_VALUE_CACHE_MISS_COUNT
274                        .with_label_values(&[])
275                        .inc();
276                    result.push(None);
277                    cache_miss_indices.push(i);
278                    miss_keys.push(key);
279                }
280            }
281        }
282        if !miss_keys.is_empty() {
283            let values = self
284                .store
285                .read_multi_values_bytes(miss_keys.clone())
286                .await?;
287            let mut cache = cache.lock().unwrap();
288            for (i, (key, value)) in cache_miss_indices
289                .into_iter()
290                .zip(miss_keys.into_iter().zip(values))
291            {
292                cache.insert_read_value(&key, &value);
293                result[i] = value;
294            }
295        }
296        Ok(result)
297    }
298
299    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
300        let Some(cache) = self.get_exclusive_cache() else {
301            return self.store.find_keys_by_prefix(key_prefix).await;
302        };
303        {
304            let mut cache = cache.lock().unwrap();
305            if let Some(value) = cache.query_find_keys(key_prefix) {
306                #[cfg(with_metrics)]
307                metrics::FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT
308                    .with_label_values(&[])
309                    .inc();
310                return Ok(value);
311            }
312        }
313        #[cfg(with_metrics)]
314        metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT
315            .with_label_values(&[])
316            .inc();
317        let keys = self.store.find_keys_by_prefix(key_prefix).await?;
318        let mut cache = cache.lock().unwrap();
319        cache.insert_find_keys(key_prefix.to_vec(), &keys);
320        Ok(keys)
321    }
322
323    async fn find_key_values_by_prefix(
324        &self,
325        key_prefix: &[u8],
326    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
327        let Some(cache) = self.get_exclusive_cache() else {
328            return self.store.find_key_values_by_prefix(key_prefix).await;
329        };
330        {
331            let mut cache = cache.lock().unwrap();
332            if let Some(value) = cache.query_find_key_values(key_prefix) {
333                #[cfg(with_metrics)]
334                metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT
335                    .with_label_values(&[])
336                    .inc();
337                return Ok(value);
338            }
339        }
340        #[cfg(with_metrics)]
341        metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT
342            .with_label_values(&[])
343            .inc();
344        let key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
345        let mut cache = cache.lock().unwrap();
346        cache.insert_find_key_values(key_prefix.to_vec(), &key_values);
347        Ok(key_values)
348    }
349}
350
351impl<K> WritableKeyValueStore for LruCachingStore<K>
352where
353    K: WritableKeyValueStore,
354{
355    // The LRU cache does not change the underlying store's size limits.
356    const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
357
358    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
359        let Some(cache) = &self.cache else {
360            return self.store.write_batch(batch).await;
361        };
362
363        {
364            let mut cache = cache.lock().unwrap();
365            for operation in &batch.operations {
366                match operation {
367                    WriteOperation::Put { key, value } => {
368                        cache.put_key_value(key, value);
369                    }
370                    WriteOperation::Delete { key } => {
371                        cache.delete_key(key);
372                    }
373                    WriteOperation::DeletePrefix { key_prefix } => {
374                        cache.delete_prefix(key_prefix);
375                    }
376                }
377            }
378        }
379        self.store.write_batch(batch).await
380    }
381
382    async fn clear_journal(&self) -> Result<(), Self::Error> {
383        self.store.clear_journal().await
384    }
385}
386
387/// The configuration type for the `LruCachingStore`.
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct LruCachingConfig<C> {
390    /// The inner configuration of the `LruCachingStore`.
391    pub inner_config: C,
392    /// The cache size being used.
393    pub storage_cache_config: StorageCacheConfig,
394}
395
396impl<D> KeyValueDatabase for LruCachingDatabase<D>
397where
398    D: KeyValueDatabase,
399{
400    type Config = LruCachingConfig<D::Config>;
401
402    type Store = LruCachingStore<D::Store>;
403
404    fn get_name() -> String {
405        format!("lru caching {}", D::get_name())
406    }
407
408    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
409        let database = D::connect(&config.inner_config, namespace).await?;
410        Ok(LruCachingDatabase {
411            database,
412            config: config.storage_cache_config.clone(),
413        })
414    }
415
416    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
417        let store = self.database.open_shared(root_key)?;
418        let store = LruCachingStore::new(
419            store,
420            self.config.clone(),
421            /* has_exclusive_access */ false,
422        );
423        Ok(store)
424    }
425
426    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
427        let store = self.database.open_exclusive(root_key)?;
428        let store = LruCachingStore::new(
429            store,
430            self.config.clone(),
431            /* has_exclusive_access */ true,
432        );
433        Ok(store)
434    }
435
436    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
437        D::list_all(&config.inner_config).await
438    }
439
440    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
441        self.database.list_root_keys().await
442    }
443
444    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
445        D::delete_all(&config.inner_config).await
446    }
447
448    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
449        D::exists(&config.inner_config, namespace).await
450    }
451
452    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
453        D::create(&config.inner_config, namespace).await
454    }
455
456    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
457        D::delete(&config.inner_config, namespace).await
458    }
459}
460
461impl<S> LruCachingStore<S> {
462    /// Creates a new key-value store that provides LRU caching at top of the given store.
463    fn new(store: S, config: StorageCacheConfig, has_exclusive_access: bool) -> Self {
464        let cache = {
465            if config.max_cache_entries == 0 {
466                None
467            } else {
468                Some(Arc::new(Mutex::new(LruPrefixCache::new(
469                    config,
470                    has_exclusive_access,
471                ))))
472            }
473        };
474        Self { store, cache }
475    }
476
477    /// Returns a cache with exclusive access if one exists.
478    fn get_exclusive_cache(&self) -> Option<&Arc<Mutex<LruPrefixCache>>> {
479        let Some(cache) = &self.cache else {
480            return None;
481        };
482        let has_exclusive_access = {
483            let cache = cache.lock().unwrap();
484            cache.has_exclusive_access()
485        };
486        if has_exclusive_access {
487            Some(cache)
488        } else {
489            None
490        }
491    }
492}
493
494/// A memory database with caching.
495#[cfg(with_testing)]
496pub type LruCachingMemoryDatabase = LruCachingDatabase<MemoryDatabase>;
497
498#[cfg(with_testing)]
499impl<D> TestKeyValueDatabase for LruCachingDatabase<D>
500where
501    D: TestKeyValueDatabase,
502{
503    async fn new_test_config() -> Result<LruCachingConfig<D::Config>, D::Error> {
504        let inner_config = D::new_test_config().await?;
505        let storage_cache_config = DEFAULT_STORAGE_CACHE_CONFIG;
506        Ok(LruCachingConfig {
507            inner_config,
508            storage_cache_config,
509        })
510    }
511}