Skip to main content

linera_cache/
value_cache.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A concurrent cache with efficient eviction and single-allocation guarantees.
5//!
6//! Values are stored internally as `Arc<V>`, so cache hits return a cheap
7//! `Arc` clone instead of cloning the underlying data. A secondary weak
8//! index (`papaya::HashMap<K, Weak<V>>`) ensures that at most one allocation
9//! exists per key: if the bounded cache evicts an entry while a consumer
10//! still holds an `Arc`, re-requesting the same key returns the same
11//! allocation instead of creating a duplicate.
12
13#[cfg(with_metrics)]
14use std::any::type_name;
15use std::{
16    borrow::Cow,
17    hash::Hash,
18    sync::{Arc, Weak},
19};
20
21use linera_base::{crypto::CryptoHash, hashed::Hashed};
22use papaya::{Compute, Operation};
23use quick_cache::sync::Cache;
24
25/// Default interval between dead-entry cleanup sweeps of the weak index.
26pub const DEFAULT_CLEANUP_INTERVAL_SECS: u64 = 30;
27
28/// A concurrent cache with efficient eviction and single-allocation guarantees.
29///
30/// Backed by `quick_cache` (S3-FIFO eviction) for bounded hot-path caching, plus
31/// a lock-free `papaya::HashMap` weak index for deduplication. Together they
32/// guarantee that at most one `Arc<V>` allocation exists per key at any time.
33///
34/// A background task periodically sweeps dead `Weak` entries from the index
35/// to prevent unbounded memory growth.
36pub struct ValueCache<K, V> {
37    /// Stable instance name distinguishing this cache in metrics; several
38    /// caches may share the same key/value types within one process.
39    ///
40    /// Must be unique among the `ValueCache` instances of a process:
41    /// instances sharing a name and key/value types report into the same
42    /// metric series, so their gauges would overwrite each other. The name
43    /// is used verbatim as the `cache` Prometheus label value; any UTF-8
44    /// string is valid, but prefer a short, stable `snake_case` identifier
45    /// since dashboards and alerts query it.
46    name: &'static str,
47    cache: Arc<Cache<K, crate::Arc<V>>>,
48    weak_index: Arc<papaya::HashMap<K, Weak<V>>>,
49}
50
51impl<K, V> ValueCache<K, V>
52where
53    K: Hash + Eq + Clone + Send + Sync + 'static,
54    V: Send + Sync + 'static,
55{
56    /// Creates a new `ValueCache` with the given instance name (used as the
57    /// `cache` metric label), bounded-cache capacity, and cleanup interval
58    /// for the weak-reference index.
59    #[cfg(not(web))]
60    pub fn new(name: &'static str, size: usize, cleanup_interval_secs: u64) -> Self {
61        let cache = Arc::new(Cache::new(size));
62        let weak_index = Arc::new(papaya::HashMap::new());
63        // Report quick_cache's actual capacity, which may round the requested
64        // size up, so that occupancy (entries / capacity) cannot exceed 1.
65        #[cfg(with_metrics)]
66        metrics::CACHE_CAPACITY
67            .with_label_values(&[name, type_name::<K>(), type_name::<V>()])
68            .set(i64::try_from(cache.capacity()).unwrap_or(i64::MAX));
69        Self::spawn_cleanup_task(
70            Arc::clone(&weak_index),
71            #[cfg(with_metrics)]
72            (name, Arc::clone(&cache)),
73            std::time::Duration::from_secs(cleanup_interval_secs),
74        );
75        ValueCache {
76            name,
77            cache,
78            weak_index,
79        }
80    }
81
82    /// Creates a new `ValueCache` (web variant, no background cleanup task).
83    #[cfg(web)]
84    pub fn new(name: &'static str, size: usize, _cleanup_interval_secs: u64) -> Self {
85        ValueCache {
86            name,
87            cache: Arc::new(Cache::new(size)),
88            weak_index: Arc::new(papaya::HashMap::new()),
89        }
90    }
91
92    /// Returns the instance name of this cache.
93    pub fn name(&self) -> &'static str {
94        self.name
95    }
96
97    /// Returns the number of entries currently held in the bounded cache.
98    ///
99    /// This is the live occupancy (excludes the weak dedup index), and is
100    /// periodically sampled as the `value_cache_entries` gauge for the
101    /// occupancy-vs-capacity diagnosis.
102    pub fn len(&self) -> usize {
103        self.cache.len()
104    }
105
106    /// Returns `true` if the bounded cache currently holds no entries.
107    pub fn is_empty(&self) -> bool {
108        self.cache.is_empty()
109    }
110
111    /// Inserts a value into the cache, returning the canonical [`crate::Arc`].
112    ///
113    /// The value is wrapped in `Arc` internally. If a live `Arc` for this key
114    /// already exists (held by another consumer), the existing allocation is
115    /// reused and the new value is dropped.
116    pub fn insert(&self, key: &K, value: V) -> crate::Arc<V> {
117        self.dedup_insert(key, crate::Arc(Arc::new(value)))
118    }
119
120    /// Removes a value from the bounded cache.
121    ///
122    /// The weak index entry is intentionally kept — another consumer may
123    /// still hold an `Arc` to this value, and the weak index must be able
124    /// to deduplicate against it. Dead weak entries are cleaned up by the
125    /// background task.
126    pub fn remove(&self, key: &K) -> Option<crate::Arc<V>> {
127        let value = self.cache.peek(key);
128        if value.is_some() {
129            self.cache.remove(key);
130        }
131        self.track_cache_usage(value)
132    }
133
134    /// Returns an [`crate::Arc`] to the value, checking both the bounded
135    /// cache and the weak index.
136    pub fn get(&self, key: &K) -> Option<crate::Arc<V>> {
137        // Tier 1: bounded cache (hot path)
138        if let Some(arc) = self.cache.get(key) {
139            return self.track_cache_usage(Some(arc));
140        }
141
142        // Tier 2: weak index (catches evicted-but-still-held entries)
143        let guard = self.weak_index.guard();
144        if let Some(weak) = self.weak_index.get(key, &guard) {
145            if let Some(arc) = weak.upgrade() {
146                let arc = crate::Arc(arc);
147                // Re-insert into bounded cache for future fast lookups
148                self.cache.insert(key.clone(), arc.clone());
149                return self.track_cache_usage(Some(arc));
150            }
151        }
152
153        self.track_cache_usage(None)
154    }
155
156    /// Returns `true` if the value exists in either the bounded cache or
157    /// the weak index (with a live allocation).
158    pub fn contains(&self, key: &K) -> bool {
159        if self.cache.peek(key).is_some() {
160            return true;
161        }
162        let guard = self.weak_index.guard();
163        self.weak_index
164            .get(key, &guard)
165            .is_some_and(|weak| weak.strong_count() > 0)
166    }
167
168    /// Removes all dead `Weak` entries from the weak index.
169    #[cfg(with_testing)]
170    pub fn cleanup_dead_entries(&self) {
171        let guard = self.weak_index.guard();
172        self.weak_index
173            .retain(|_, weak| weak.strong_count() > 0, &guard);
174    }
175
176    /// Spawns a background task that periodically sweeps dead weak entries.
177    ///
178    /// No-op if no tokio runtime is available (e.g. in unit tests).
179    #[cfg(not(web))]
180    fn spawn_cleanup_task(
181        weak_index: Arc<papaya::HashMap<K, Weak<V>>>,
182        #[cfg(with_metrics)] metrics_handle: (&'static str, Arc<Cache<K, crate::Arc<V>>>),
183        cleanup_interval: std::time::Duration,
184    ) {
185        if tokio::runtime::Handle::try_current().is_err() {
186            return;
187        }
188        tokio::spawn(async move {
189            let mut interval = tokio::time::interval(cleanup_interval);
190            loop {
191                interval.tick().await;
192                {
193                    let guard = weak_index.guard();
194                    weak_index.retain(|_, weak| weak.strong_count() > 0, &guard);
195                }
196                // Refresh the occupancy gauge on the same cadence as the sweep
197                // (after the guard is dropped); keeps it off the cache hot path.
198                #[cfg(with_metrics)]
199                {
200                    let (name, cache) = &metrics_handle;
201                    metrics::CACHE_ENTRIES
202                        .with_label_values(&[name, type_name::<K>(), type_name::<V>()])
203                        .set(i64::try_from(cache.len()).unwrap_or(i64::MAX));
204                }
205            }
206        });
207    }
208
209    /// Core dedup logic: atomically checks the weak index for an existing
210    /// live allocation. If found, reuses it. Otherwise inserts the new Arc.
211    /// Returns the canonical `Arc`.
212    fn dedup_insert(&self, key: &K, new_arc: crate::Arc<V>) -> crate::Arc<V> {
213        let guard = self.weak_index.guard();
214        let weak = Arc::downgrade(&new_arc.0);
215
216        let result = self.weak_index.compute(
217            key.clone(),
218            |entry| match entry {
219                Some((_k, existing_weak)) => match existing_weak.upgrade() {
220                    Some(existing_arc) => Operation::Abort(existing_arc),
221                    None => Operation::Insert(weak.clone()),
222                },
223                None => Operation::Insert(weak.clone()),
224            },
225            &guard,
226        );
227
228        let canonical_arc = match result {
229            Compute::Inserted(..) | Compute::Updated { .. } => new_arc,
230            Compute::Aborted(existing_arc) => crate::Arc(existing_arc),
231            _ => unreachable!(),
232        };
233
234        self.cache.insert(key.clone(), canonical_arc.clone());
235        canonical_arc
236    }
237
238    fn track_cache_usage(&self, maybe_value: Option<crate::Arc<V>>) -> Option<crate::Arc<V>> {
239        #[cfg(with_metrics)]
240        {
241            let metric = if maybe_value.is_some() {
242                &metrics::CACHE_HIT_COUNT
243            } else {
244                &metrics::CACHE_MISS_COUNT
245            };
246
247            metric
248                .with_label_values(&[self.name, type_name::<K>(), type_name::<V>()])
249                .inc();
250        }
251        maybe_value
252    }
253}
254
255impl<V: Clone + Send + Sync + 'static> ValueCache<CryptoHash, V> {
256    /// Inserts a value constructed from a [`Hashed<T>`] into the cache, keyed
257    /// by its hash, returning the canonical `Arc<V>`.
258    ///
259    /// The `value` is wrapped in a [`Cow`] so that it is only cloned if it
260    /// needs to be inserted in the cache.
261    pub fn insert_hashed<T>(&self, value: Cow<Hashed<T>>) -> crate::Arc<V>
262    where
263        T: Clone,
264        V: From<Hashed<T>>,
265    {
266        let hash = (*value).hash();
267        // Fast path: already in bounded cache
268        if let Some(arc) = self.cache.peek(&hash) {
269            return arc;
270        }
271        // Check weak index before cloning from Cow
272        let guard = self.weak_index.guard();
273        if let Some(weak) = self.weak_index.get(&hash, &guard) {
274            if let Some(arc) = weak.upgrade() {
275                let arc = crate::Arc(arc);
276                self.cache.insert(hash, arc.clone());
277                return arc;
278            }
279        }
280        drop(guard);
281        self.dedup_insert(&hash, crate::Arc(Arc::new(value.into_owned().into())))
282    }
283
284    /// Inserts multiple values constructed from [`Hashed<T>`]s into the cache.
285    #[cfg(with_testing)]
286    pub fn insert_all_hashed<'a, T>(&self, values: impl IntoIterator<Item = Cow<'a, Hashed<T>>>)
287    where
288        T: Clone + 'a,
289        V: From<Hashed<T>>,
290    {
291        for value in values {
292            self.insert_hashed(value);
293        }
294    }
295}
296
297#[cfg(with_metrics)]
298mod metrics {
299    use std::sync::LazyLock;
300
301    use linera_base::prometheus_util::{register_int_counter_vec, register_int_gauge_vec};
302    use prometheus::{IntCounterVec, IntGaugeVec};
303
304    /// Shared label set: `cache` is the per-instance name passed to
305    /// [`super::ValueCache::new`], required because several caches may share
306    /// the same key/value types within one process.
307    const LABELS: &[&str] = &["cache", "key_type", "value_type"];
308
309    pub static CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
310        register_int_counter_vec("value_cache_hit", "Cache hits in `ValueCache`", LABELS)
311    });
312
313    pub static CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
314        register_int_counter_vec("value_cache_miss", "Cache misses in `ValueCache`", LABELS)
315    });
316
317    pub static CACHE_ENTRIES: LazyLock<IntGaugeVec> = LazyLock::new(|| {
318        register_int_gauge_vec(
319            "value_cache_entries",
320            "Number of entries held in the bounded `ValueCache`, sampled at \
321             each cleanup sweep",
322            LABELS,
323        )
324    });
325
326    pub static CACHE_CAPACITY: LazyLock<IntGaugeVec> = LazyLock::new(|| {
327        register_int_gauge_vec(
328            "value_cache_capacity",
329            "Maximum number of entries of the bounded `ValueCache`",
330            LABELS,
331        )
332    });
333}
334
335#[cfg(test)]
336mod tests {
337    use std::borrow::Cow;
338
339    use linera_base::{crypto::CryptoHash, hashed::Hashed};
340    use serde::{Deserialize, Serialize};
341
342    use super::{ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
343    use crate::Arc as CacheArc;
344
345    /// Test cache size for unit tests.
346    const TEST_CACHE_SIZE: usize = 10;
347
348    /// A minimal hashable value for testing `ValueCache<CryptoHash, Hashed<T>>`.
349    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
350    struct TestValue(u64);
351
352    impl linera_base::crypto::BcsHashable<'_> for TestValue {}
353
354    impl From<Hashed<TestValue>> for TestValue {
355        fn from(value: Hashed<TestValue>) -> Self {
356            value.into_inner()
357        }
358    }
359
360    fn create_test_value(n: u64) -> Hashed<TestValue> {
361        Hashed::new(TestValue(n))
362    }
363
364    fn create_test_values(iter: impl IntoIterator<Item = u64>) -> Vec<Hashed<TestValue>> {
365        iter.into_iter().map(create_test_value).collect()
366    }
367
368    fn new_hashed_cache(size: usize) -> ValueCache<CryptoHash, TestValue> {
369        ValueCache::new("test_hashed", size, DEFAULT_CLEANUP_INTERVAL_SECS)
370    }
371
372    fn new_string_cache(size: usize) -> ValueCache<u64, String> {
373        ValueCache::new("test_string", size, DEFAULT_CLEANUP_INTERVAL_SECS)
374    }
375
376    #[test]
377    fn test_retrieve_missing_value() {
378        let cache = new_hashed_cache(TEST_CACHE_SIZE);
379        let hash = CryptoHash::test_hash("Missing value");
380
381        assert!(cache.get(&hash).is_none());
382        assert!(!cache.contains(&hash));
383    }
384
385    #[test]
386    fn test_insert_and_get() {
387        let cache = new_hashed_cache(TEST_CACHE_SIZE);
388        let value = create_test_value(0);
389        let hash = value.hash();
390
391        cache.insert_hashed(Cow::Borrowed(&value));
392        assert!(cache.contains(&hash));
393        assert_eq!(cache.get(&hash).as_deref(), Some(value.inner()));
394    }
395
396    #[test]
397    fn test_insert_many_values() {
398        let cache = new_hashed_cache(TEST_CACHE_SIZE);
399        let values = create_test_values(0..TEST_CACHE_SIZE as u64);
400
401        for value in &values {
402            cache.insert_hashed(Cow::Borrowed(value));
403        }
404
405        for value in &values {
406            assert!(cache.contains(&value.hash()));
407            assert_eq!(cache.get(&value.hash()).as_deref(), Some(value.inner()));
408        }
409
410        // Batch insert
411        let cache2 = new_hashed_cache(TEST_CACHE_SIZE);
412        cache2.insert_all_hashed(values.iter().map(Cow::Borrowed));
413        for value in &values {
414            assert_eq!(cache2.get(&value.hash()).as_deref(), Some(value.inner()));
415        }
416    }
417
418    #[test]
419    fn test_reinsertion_dedup() {
420        let cache = new_hashed_cache(TEST_CACHE_SIZE);
421        let values = create_test_values(0..TEST_CACHE_SIZE as u64);
422
423        // First insert
424        let first_arcs: Vec<_> = values
425            .iter()
426            .map(|v| cache.insert_hashed(Cow::Borrowed(v)))
427            .collect();
428
429        // Re-inserting should return the same Arc (dedup)
430        for (value, first_arc) in values.iter().zip(&first_arcs) {
431            let second_arc = cache.insert_hashed(Cow::Borrowed(value));
432            assert!(CacheArc::ptr_eq(&second_arc, first_arc));
433        }
434    }
435
436    #[test]
437    fn test_eviction() {
438        let cache = new_hashed_cache(TEST_CACHE_SIZE);
439        let total = TEST_CACHE_SIZE * 3;
440        let values = create_test_values(0..total as u64);
441
442        for value in &values {
443            cache.insert_hashed(Cow::Borrowed(value));
444        }
445
446        let present_count = values.iter().filter(|v| cache.contains(&v.hash())).count();
447        assert!(
448            present_count <= TEST_CACHE_SIZE + 1,
449            "cache should not hold significantly more than its capacity, \
450             but has {present_count} entries for capacity {TEST_CACHE_SIZE}"
451        );
452        assert!(present_count > 0, "cache should still hold some entries");
453    }
454
455    #[test]
456    fn test_len_and_is_empty() {
457        let cache = new_string_cache(TEST_CACHE_SIZE);
458        assert!(cache.is_empty());
459        assert_eq!(cache.len(), 0);
460
461        cache.insert(&1, "a".to_string());
462        cache.insert(&2, "b".to_string());
463        assert!(!cache.is_empty());
464        assert_eq!(cache.len(), 2);
465
466        // Filling well beyond capacity caps the bounded-cache occupancy, which
467        // is exactly what the `value_cache_entries` gauge reports. Note that
468        // `len()` counts only the bounded cache, not the weak dedup index.
469        for i in 3..(TEST_CACHE_SIZE as u64 * 3) {
470            cache.insert(&i, format!("v{i}"));
471        }
472        assert!(
473            cache.len() <= TEST_CACHE_SIZE,
474            "bounded-cache occupancy {} must not exceed capacity {TEST_CACHE_SIZE}",
475            cache.len(),
476        );
477    }
478
479    #[test]
480    fn test_accessed_entry_survives_eviction() {
481        let cache = new_hashed_cache(TEST_CACHE_SIZE);
482        let promoted = create_test_value(0);
483        let promoted_hash = promoted.hash();
484
485        cache.insert_hashed(Cow::Borrowed(&promoted));
486        cache.get(&promoted_hash); // mark as hot
487
488        let extras = create_test_values(1..=TEST_CACHE_SIZE as u64 * 2);
489        for value in &extras {
490            cache.insert_hashed(Cow::Borrowed(value));
491        }
492
493        assert!(
494            cache.contains(&promoted_hash),
495            "recently accessed entry should survive eviction"
496        );
497    }
498
499    #[test]
500    fn test_promotion_of_reinsertion() {
501        let cache = new_hashed_cache(TEST_CACHE_SIZE);
502        let promoted = create_test_value(0);
503        let promoted_hash = promoted.hash();
504
505        let first = cache.insert_hashed(Cow::Borrowed(&promoted));
506        let second = cache.insert_hashed(Cow::Borrowed(&promoted));
507        assert!(CacheArc::ptr_eq(&first, &second));
508
509        let extras = create_test_values(1..=TEST_CACHE_SIZE as u64 * 2);
510        for value in &extras {
511            cache.insert_hashed(Cow::Borrowed(value));
512        }
513
514        assert!(
515            cache.contains(&promoted_hash),
516            "re-inserted entry should survive eviction"
517        );
518    }
519
520    #[test]
521    fn test_weak_index_dedup_after_eviction() {
522        let cache = new_string_cache(2);
523
524        // Insert and hold onto the Arc
525        let held = cache.insert(&1, "hello".to_string());
526
527        // Force eviction by filling the cache
528        cache.insert(&2, "world".to_string());
529        cache.insert(&3, "foo".to_string());
530        cache.insert(&4, "bar".to_string());
531
532        // Weak index should find it via the held Arc
533        let retrieved = cache
534            .get(&1)
535            .expect("held Arc should keep entry findable via weak index");
536        assert!(
537            CacheArc::ptr_eq(&retrieved, &held),
538            "must return same allocation, not a duplicate"
539        );
540
541        // Re-inserting should also return the same Arc
542        let reinserted = cache.insert(&1, "replacement".to_string());
543        assert!(CacheArc::ptr_eq(&reinserted, &held));
544        assert_eq!(&*reinserted, "hello");
545    }
546
547    #[test]
548    fn test_remove_preserves_weak_for_held_arcs() {
549        let cache = new_string_cache(TEST_CACHE_SIZE);
550
551        let held = cache.insert(&1, "hello".to_string());
552
553        // remove() evicts from bounded cache but NOT the weak index
554        cache.remove(&1);
555
556        // Still findable via weak index since we hold an Arc
557        let retrieved = cache.get(&1).expect("weak index should find held Arc");
558        assert!(CacheArc::ptr_eq(&retrieved, &held));
559    }
560
561    #[test]
562    fn test_remove_without_holder() {
563        let cache = new_string_cache(TEST_CACHE_SIZE);
564
565        cache.insert(&1, "hello".to_string());
566
567        // remove() without anyone holding an Arc — weak entry becomes dead
568        cache.remove(&1);
569        assert!(!cache.contains(&1));
570        assert!(cache.get(&1).is_none());
571    }
572
573    #[test]
574    fn test_cleanup_dead_entries() {
575        let cache = new_string_cache(2);
576
577        cache.insert(&1, "alive".to_string());
578        let _held = cache.get(&1).expect("just inserted"); // keep alive
579
580        cache.insert(&2, "dead".to_string());
581        // Don't hold key 2
582
583        // Force eviction of both by filling the cache
584        cache.insert(&3, "a".to_string());
585        cache.insert(&4, "b".to_string());
586        cache.insert(&5, "c".to_string());
587
588        cache.cleanup_dead_entries();
589
590        // Key 1 still findable (we hold an Arc)
591        assert!(cache.contains(&1));
592    }
593}