linera_cache/
value_cache.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A concurrent cache with efficient eviction and single-allocation guarantees.
5//!
6//! Values are stored internally as `Arc<V>`, so cache hits return a cheap
7//! `Arc` clone instead of cloning the underlying data. A secondary weak
8//! index (`papaya::HashMap<K, Weak<V>>`) ensures that at most one allocation
9//! exists per key: if the bounded cache evicts an entry while a consumer
10//! still holds an `Arc`, re-requesting the same key returns the same
11//! allocation instead of creating a duplicate.
12
13#[cfg(with_metrics)]
14use std::any::type_name;
15use std::{
16    borrow::Cow,
17    hash::Hash,
18    sync::{Arc, Weak},
19};
20
21use linera_base::{crypto::CryptoHash, hashed::Hashed};
22use papaya::{Compute, Operation};
23use quick_cache::sync::Cache;
24
25/// Default interval between dead-entry cleanup sweeps of the weak index.
26pub const DEFAULT_CLEANUP_INTERVAL_SECS: u64 = 30;
27
28/// A concurrent cache with efficient eviction and single-allocation guarantees.
29///
30/// Backed by `quick_cache` (S3-FIFO eviction) for bounded hot-path caching, plus
31/// a lock-free `papaya::HashMap` weak index for deduplication. Together they
32/// guarantee that at most one `Arc<V>` allocation exists per key at any time.
33///
34/// A background task periodically sweeps dead `Weak` entries from the index
35/// to prevent unbounded memory growth.
36pub struct ValueCache<K, V> {
37    cache: Cache<K, Arc<V>>,
38    weak_index: Arc<papaya::HashMap<K, Weak<V>>>,
39}
40
41impl<K, V> ValueCache<K, V>
42where
43    K: Hash + Eq + Clone + Send + Sync + 'static,
44    V: Send + Sync + 'static,
45{
46    /// Creates a new `ValueCache` with the given bounded-cache capacity
47    /// and cleanup interval for the weak-reference index.
48    #[cfg(not(web))]
49    pub fn new(size: usize, cleanup_interval_secs: u64) -> Self {
50        let weak_index = Arc::new(papaya::HashMap::new());
51        Self::spawn_cleanup_task(
52            Arc::clone(&weak_index),
53            std::time::Duration::from_secs(cleanup_interval_secs),
54        );
55        ValueCache {
56            cache: Cache::new(size),
57            weak_index,
58        }
59    }
60
61    /// Creates a new `ValueCache` (web variant, no background cleanup task).
62    #[cfg(web)]
63    pub fn new(size: usize, _cleanup_interval_secs: u64) -> Self {
64        ValueCache {
65            cache: Cache::new(size),
66            weak_index: Arc::new(papaya::HashMap::new()),
67        }
68    }
69
70    /// Inserts a value into the cache, returning the canonical `Arc`.
71    ///
72    /// The value is wrapped in `Arc` internally. If a live `Arc` for this key
73    /// already exists (held by another consumer), the existing allocation is
74    /// reused and the new value is dropped.
75    pub fn insert(&self, key: &K, value: V) -> Arc<V> {
76        self.dedup_insert(key, Arc::new(value))
77    }
78
79    /// Inserts a pre-wrapped `Arc<V>` into the cache, returning the canonical `Arc`.
80    pub fn insert_arc(&self, key: &K, value: Arc<V>) -> Arc<V> {
81        self.dedup_insert(key, value)
82    }
83
84    /// Removes a value from the bounded cache.
85    ///
86    /// The weak index entry is intentionally kept — another consumer may
87    /// still hold an `Arc` to this value, and the weak index must be able
88    /// to deduplicate against it. Dead weak entries are cleaned up by the
89    /// background task.
90    pub fn remove(&self, key: &K) -> Option<Arc<V>> {
91        let value = self.cache.peek(key);
92        if value.is_some() {
93            self.cache.remove(key);
94        }
95        Self::track_cache_usage(value)
96    }
97
98    /// Returns an `Arc` reference to the value, checking both the bounded
99    /// cache and the weak index.
100    pub fn get(&self, key: &K) -> Option<Arc<V>> {
101        // Tier 1: bounded cache (hot path)
102        if let Some(arc) = self.cache.get(key) {
103            return Self::track_cache_usage(Some(arc));
104        }
105
106        // Tier 2: weak index (catches evicted-but-still-held entries)
107        let guard = self.weak_index.guard();
108        if let Some(weak) = self.weak_index.get(key, &guard) {
109            if let Some(arc) = weak.upgrade() {
110                // Re-insert into bounded cache for future fast lookups
111                self.cache.insert(key.clone(), arc.clone());
112                return Self::track_cache_usage(Some(arc));
113            }
114        }
115
116        Self::track_cache_usage(None)
117    }
118
119    /// Returns `true` if the value exists in either the bounded cache or
120    /// the weak index (with a live allocation).
121    pub fn contains(&self, key: &K) -> bool {
122        if self.cache.peek(key).is_some() {
123            return true;
124        }
125        let guard = self.weak_index.guard();
126        self.weak_index
127            .get(key, &guard)
128            .is_some_and(|weak| weak.strong_count() > 0)
129    }
130
131    /// Removes all dead `Weak` entries from the weak index.
132    #[cfg(with_testing)]
133    pub fn cleanup_dead_entries(&self) {
134        let guard = self.weak_index.guard();
135        self.weak_index
136            .retain(|_, weak| weak.strong_count() > 0, &guard);
137    }
138
139    /// Spawns a background task that periodically sweeps dead weak entries.
140    ///
141    /// No-op if no tokio runtime is available (e.g. in unit tests).
142    #[cfg(not(web))]
143    fn spawn_cleanup_task(
144        weak_index: Arc<papaya::HashMap<K, Weak<V>>>,
145        cleanup_interval: std::time::Duration,
146    ) {
147        if tokio::runtime::Handle::try_current().is_err() {
148            return;
149        }
150        tokio::spawn(async move {
151            let mut interval = tokio::time::interval(cleanup_interval);
152            loop {
153                interval.tick().await;
154                let guard = weak_index.guard();
155                weak_index.retain(|_, weak| weak.strong_count() > 0, &guard);
156            }
157        });
158    }
159
160    /// Core dedup logic: atomically checks the weak index for an existing
161    /// live allocation. If found, reuses it. Otherwise inserts the new Arc.
162    /// Returns the canonical `Arc`.
163    fn dedup_insert(&self, key: &K, new_arc: Arc<V>) -> Arc<V> {
164        let guard = self.weak_index.guard();
165        let weak = Arc::downgrade(&new_arc);
166
167        let result = self.weak_index.compute(
168            key.clone(),
169            |entry| match entry {
170                Some((_k, existing_weak)) => match existing_weak.upgrade() {
171                    Some(existing_arc) => Operation::Abort(existing_arc),
172                    None => Operation::Insert(weak.clone()),
173                },
174                None => Operation::Insert(weak.clone()),
175            },
176            &guard,
177        );
178
179        let canonical_arc = match result {
180            Compute::Inserted(..) | Compute::Updated { .. } => new_arc,
181            Compute::Aborted(existing_arc) => existing_arc,
182            _ => unreachable!(),
183        };
184
185        self.cache.insert(key.clone(), canonical_arc.clone());
186        canonical_arc
187    }
188
189    fn track_cache_usage(maybe_value: Option<Arc<V>>) -> Option<Arc<V>> {
190        #[cfg(with_metrics)]
191        {
192            let metric = if maybe_value.is_some() {
193                &metrics::CACHE_HIT_COUNT
194            } else {
195                &metrics::CACHE_MISS_COUNT
196            };
197
198            metric
199                .with_label_values(&[type_name::<K>(), type_name::<V>()])
200                .inc();
201        }
202        maybe_value
203    }
204}
205
206impl<V: Clone + Send + Sync + 'static> ValueCache<CryptoHash, V> {
207    /// Inserts a value constructed from a [`Hashed<T>`] into the cache, keyed
208    /// by its hash, returning the canonical `Arc<V>`.
209    ///
210    /// The `value` is wrapped in a [`Cow`] so that it is only cloned if it
211    /// needs to be inserted in the cache.
212    pub fn insert_hashed<T>(&self, value: Cow<Hashed<T>>) -> Arc<V>
213    where
214        T: Clone,
215        V: From<Hashed<T>>,
216    {
217        let hash = (*value).hash();
218        // Fast path: already in bounded cache
219        if let Some(arc) = self.cache.peek(&hash) {
220            return arc;
221        }
222        // Check weak index before cloning from Cow
223        let guard = self.weak_index.guard();
224        if let Some(weak) = self.weak_index.get(&hash, &guard) {
225            if let Some(arc) = weak.upgrade() {
226                self.cache.insert(hash, arc.clone());
227                return arc;
228            }
229        }
230        drop(guard);
231        self.dedup_insert(&hash, Arc::new(value.into_owned().into()))
232    }
233
234    /// Inserts multiple values constructed from [`Hashed<T>`]s into the cache.
235    #[cfg(with_testing)]
236    pub fn insert_all_hashed<'a, T>(&self, values: impl IntoIterator<Item = Cow<'a, Hashed<T>>>)
237    where
238        T: Clone + 'a,
239        V: From<Hashed<T>>,
240    {
241        for value in values {
242            self.insert_hashed(value);
243        }
244    }
245}
246
247#[cfg(with_metrics)]
248mod metrics {
249    use std::sync::LazyLock;
250
251    use linera_base::prometheus_util::register_int_counter_vec;
252    use prometheus::IntCounterVec;
253
254    pub static CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
255        register_int_counter_vec(
256            "value_cache_hit",
257            "Cache hits in `ValueCache`",
258            &["key_type", "value_type"],
259        )
260    });
261
262    pub static CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
263        register_int_counter_vec(
264            "value_cache_miss",
265            "Cache misses in `ValueCache`",
266            &["key_type", "value_type"],
267        )
268    });
269}
270
271#[cfg(test)]
272mod tests {
273    use std::{borrow::Cow, sync::Arc};
274
275    use linera_base::{crypto::CryptoHash, hashed::Hashed};
276    use serde::{Deserialize, Serialize};
277
278    use super::{ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
279
280    /// Test cache size for unit tests.
281    const TEST_CACHE_SIZE: usize = 10;
282
283    /// A minimal hashable value for testing `ValueCache<CryptoHash, Hashed<T>>`.
284    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
285    struct TestValue(u64);
286
287    impl linera_base::crypto::BcsHashable<'_> for TestValue {}
288
289    impl From<Hashed<TestValue>> for TestValue {
290        fn from(value: Hashed<TestValue>) -> Self {
291            value.into_inner()
292        }
293    }
294
295    fn create_test_value(n: u64) -> Hashed<TestValue> {
296        Hashed::new(TestValue(n))
297    }
298
299    fn create_test_values(iter: impl IntoIterator<Item = u64>) -> Vec<Hashed<TestValue>> {
300        iter.into_iter().map(create_test_value).collect()
301    }
302
303    fn new_hashed_cache(size: usize) -> ValueCache<CryptoHash, TestValue> {
304        ValueCache::new(size, DEFAULT_CLEANUP_INTERVAL_SECS)
305    }
306
307    fn new_string_cache(size: usize) -> ValueCache<u64, String> {
308        ValueCache::new(size, DEFAULT_CLEANUP_INTERVAL_SECS)
309    }
310
311    #[test]
312    fn test_retrieve_missing_value() {
313        let cache = new_hashed_cache(TEST_CACHE_SIZE);
314        let hash = CryptoHash::test_hash("Missing value");
315
316        assert!(cache.get(&hash).is_none());
317        assert!(!cache.contains(&hash));
318    }
319
320    #[test]
321    fn test_insert_and_get() {
322        let cache = new_hashed_cache(TEST_CACHE_SIZE);
323        let value = create_test_value(0);
324        let hash = value.hash();
325
326        cache.insert_hashed(Cow::Borrowed(&value));
327        assert!(cache.contains(&hash));
328        assert_eq!(cache.get(&hash).as_deref(), Some(value.inner()));
329    }
330
331    #[test]
332    fn test_insert_many_values() {
333        let cache = new_hashed_cache(TEST_CACHE_SIZE);
334        let values = create_test_values(0..TEST_CACHE_SIZE as u64);
335
336        for value in &values {
337            cache.insert_hashed(Cow::Borrowed(value));
338        }
339
340        for value in &values {
341            assert!(cache.contains(&value.hash()));
342            assert_eq!(cache.get(&value.hash()).as_deref(), Some(value.inner()));
343        }
344
345        // Batch insert
346        let cache2 = new_hashed_cache(TEST_CACHE_SIZE);
347        cache2.insert_all_hashed(values.iter().map(Cow::Borrowed));
348        for value in &values {
349            assert_eq!(cache2.get(&value.hash()).as_deref(), Some(value.inner()));
350        }
351    }
352
353    #[test]
354    fn test_reinsertion_dedup() {
355        let cache = new_hashed_cache(TEST_CACHE_SIZE);
356        let values = create_test_values(0..TEST_CACHE_SIZE as u64);
357
358        // First insert
359        let first_arcs: Vec<_> = values
360            .iter()
361            .map(|v| cache.insert_hashed(Cow::Borrowed(v)))
362            .collect();
363
364        // Re-inserting should return the same Arc (dedup)
365        for (value, first_arc) in values.iter().zip(&first_arcs) {
366            let second_arc = cache.insert_hashed(Cow::Borrowed(value));
367            assert!(Arc::ptr_eq(&second_arc, first_arc));
368        }
369    }
370
371    #[test]
372    fn test_eviction() {
373        let cache = new_hashed_cache(TEST_CACHE_SIZE);
374        let total = TEST_CACHE_SIZE * 3;
375        let values = create_test_values(0..total as u64);
376
377        for value in &values {
378            cache.insert_hashed(Cow::Borrowed(value));
379        }
380
381        let present_count = values.iter().filter(|v| cache.contains(&v.hash())).count();
382        assert!(
383            present_count <= TEST_CACHE_SIZE + 1,
384            "cache should not hold significantly more than its capacity, \
385             but has {present_count} entries for capacity {TEST_CACHE_SIZE}"
386        );
387        assert!(present_count > 0, "cache should still hold some entries");
388    }
389
390    #[test]
391    fn test_accessed_entry_survives_eviction() {
392        let cache = new_hashed_cache(TEST_CACHE_SIZE);
393        let promoted = create_test_value(0);
394        let promoted_hash = promoted.hash();
395
396        cache.insert_hashed(Cow::Borrowed(&promoted));
397        cache.get(&promoted_hash); // mark as hot
398
399        let extras = create_test_values(1..=TEST_CACHE_SIZE as u64 * 2);
400        for value in &extras {
401            cache.insert_hashed(Cow::Borrowed(value));
402        }
403
404        assert!(
405            cache.contains(&promoted_hash),
406            "recently accessed entry should survive eviction"
407        );
408    }
409
410    #[test]
411    fn test_promotion_of_reinsertion() {
412        let cache = new_hashed_cache(TEST_CACHE_SIZE);
413        let promoted = create_test_value(0);
414        let promoted_hash = promoted.hash();
415
416        let first = cache.insert_hashed(Cow::Borrowed(&promoted));
417        let second = cache.insert_hashed(Cow::Borrowed(&promoted));
418        assert!(Arc::ptr_eq(&first, &second));
419
420        let extras = create_test_values(1..=TEST_CACHE_SIZE as u64 * 2);
421        for value in &extras {
422            cache.insert_hashed(Cow::Borrowed(value));
423        }
424
425        assert!(
426            cache.contains(&promoted_hash),
427            "re-inserted entry should survive eviction"
428        );
429    }
430
431    #[test]
432    fn test_weak_index_dedup_after_eviction() {
433        let cache = new_string_cache(2);
434
435        // Insert and hold onto the Arc
436        let held = cache.insert(&1, "hello".to_string());
437
438        // Force eviction by filling the cache
439        cache.insert(&2, "world".to_string());
440        cache.insert(&3, "foo".to_string());
441        cache.insert(&4, "bar".to_string());
442
443        // Weak index should find it via the held Arc
444        let retrieved = cache
445            .get(&1)
446            .expect("held Arc should keep entry findable via weak index");
447        assert!(
448            Arc::ptr_eq(&retrieved, &held),
449            "must return same allocation, not a duplicate"
450        );
451
452        // Re-inserting should also return the same Arc
453        let reinserted = cache.insert(&1, "replacement".to_string());
454        assert!(Arc::ptr_eq(&reinserted, &held));
455        assert_eq!(&*reinserted, "hello");
456    }
457
458    #[test]
459    fn test_remove_preserves_weak_for_held_arcs() {
460        let cache = new_string_cache(TEST_CACHE_SIZE);
461
462        let held = cache.insert(&1, "hello".to_string());
463
464        // remove() evicts from bounded cache but NOT the weak index
465        cache.remove(&1);
466
467        // Still findable via weak index since we hold an Arc
468        let retrieved = cache.get(&1).expect("weak index should find held Arc");
469        assert!(Arc::ptr_eq(&retrieved, &held));
470    }
471
472    #[test]
473    fn test_remove_without_holder() {
474        let cache = new_string_cache(TEST_CACHE_SIZE);
475
476        cache.insert(&1, "hello".to_string());
477
478        // remove() without anyone holding an Arc — weak entry becomes dead
479        cache.remove(&1);
480        assert!(!cache.contains(&1));
481        assert!(cache.get(&1).is_none());
482    }
483
484    #[test]
485    fn test_cleanup_dead_entries() {
486        let cache = new_string_cache(2);
487
488        cache.insert(&1, "alive".to_string());
489        let _held = cache.get(&1).expect("just inserted"); // keep alive
490
491        cache.insert(&2, "dead".to_string());
492        // Don't hold key 2
493
494        // Force eviction of both by filling the cache
495        cache.insert(&3, "a".to_string());
496        cache.insert(&4, "b".to_string());
497        cache.insert(&5, "c".to_string());
498
499        cache.cleanup_dead_entries();
500
501        // Key 1 still findable (we hold an Arc)
502        assert!(cache.contains(&1));
503    }
504
505    #[test]
506    fn test_insert_arc_dedup() {
507        let cache = new_string_cache(TEST_CACHE_SIZE);
508        let value = Arc::new("hello".to_string());
509
510        let first = cache.insert_arc(&1, value.clone());
511        assert!(Arc::ptr_eq(&first, &value));
512
513        let second = cache.insert_arc(&1, Arc::new("other".to_string()));
514        assert!(Arc::ptr_eq(&second, &value));
515
516        assert_eq!(&*cache.get(&1).expect("just inserted"), "hello");
517    }
518}