1#[cfg(with_metrics)]
14use std::any::type_name;
15use std::{
16 borrow::Cow,
17 hash::Hash,
18 sync::{Arc, Weak},
19};
20
21use linera_base::{crypto::CryptoHash, hashed::Hashed};
22use papaya::{Compute, Operation};
23use quick_cache::sync::Cache;
24
25pub const DEFAULT_CLEANUP_INTERVAL_SECS: u64 = 30;
27
28pub struct ValueCache<K, V> {
37 name: &'static str,
47 cache: Arc<Cache<K, crate::Arc<V>>>,
48 weak_index: Arc<papaya::HashMap<K, Weak<V>>>,
49}
50
51impl<K, V> ValueCache<K, V>
52where
53 K: Hash + Eq + Clone + Send + Sync + 'static,
54 V: Send + Sync + 'static,
55{
56 #[cfg(not(web))]
60 pub fn new(name: &'static str, size: usize, cleanup_interval_secs: u64) -> Self {
61 let cache = Arc::new(Cache::new(size));
62 let weak_index = Arc::new(papaya::HashMap::new());
63 #[cfg(with_metrics)]
66 metrics::CACHE_CAPACITY
67 .with_label_values(&[name, type_name::<K>(), type_name::<V>()])
68 .set(i64::try_from(cache.capacity()).unwrap_or(i64::MAX));
69 Self::spawn_cleanup_task(
70 Arc::clone(&weak_index),
71 #[cfg(with_metrics)]
72 (name, Arc::clone(&cache)),
73 std::time::Duration::from_secs(cleanup_interval_secs),
74 );
75 ValueCache {
76 name,
77 cache,
78 weak_index,
79 }
80 }
81
82 #[cfg(web)]
84 pub fn new(name: &'static str, size: usize, _cleanup_interval_secs: u64) -> Self {
85 ValueCache {
86 name,
87 cache: Arc::new(Cache::new(size)),
88 weak_index: Arc::new(papaya::HashMap::new()),
89 }
90 }
91
92 pub fn name(&self) -> &'static str {
94 self.name
95 }
96
97 pub fn len(&self) -> usize {
103 self.cache.len()
104 }
105
106 pub fn is_empty(&self) -> bool {
108 self.cache.is_empty()
109 }
110
111 pub fn insert(&self, key: &K, value: V) -> crate::Arc<V> {
117 self.dedup_insert(key, crate::Arc(Arc::new(value)))
118 }
119
120 pub fn remove(&self, key: &K) -> Option<crate::Arc<V>> {
127 let value = self.cache.peek(key);
128 if value.is_some() {
129 self.cache.remove(key);
130 }
131 self.track_cache_usage(value)
132 }
133
134 pub fn get(&self, key: &K) -> Option<crate::Arc<V>> {
137 if let Some(arc) = self.cache.get(key) {
139 return self.track_cache_usage(Some(arc));
140 }
141
142 let guard = self.weak_index.guard();
144 if let Some(weak) = self.weak_index.get(key, &guard) {
145 if let Some(arc) = weak.upgrade() {
146 let arc = crate::Arc(arc);
147 self.cache.insert(key.clone(), arc.clone());
149 return self.track_cache_usage(Some(arc));
150 }
151 }
152
153 self.track_cache_usage(None)
154 }
155
156 pub fn contains(&self, key: &K) -> bool {
159 if self.cache.peek(key).is_some() {
160 return true;
161 }
162 let guard = self.weak_index.guard();
163 self.weak_index
164 .get(key, &guard)
165 .is_some_and(|weak| weak.strong_count() > 0)
166 }
167
168 #[cfg(with_testing)]
170 pub fn cleanup_dead_entries(&self) {
171 let guard = self.weak_index.guard();
172 self.weak_index
173 .retain(|_, weak| weak.strong_count() > 0, &guard);
174 }
175
176 #[cfg(not(web))]
180 fn spawn_cleanup_task(
181 weak_index: Arc<papaya::HashMap<K, Weak<V>>>,
182 #[cfg(with_metrics)] metrics_handle: (&'static str, Arc<Cache<K, crate::Arc<V>>>),
183 cleanup_interval: std::time::Duration,
184 ) {
185 if tokio::runtime::Handle::try_current().is_err() {
186 return;
187 }
188 tokio::spawn(async move {
189 let mut interval = tokio::time::interval(cleanup_interval);
190 loop {
191 interval.tick().await;
192 {
193 let guard = weak_index.guard();
194 weak_index.retain(|_, weak| weak.strong_count() > 0, &guard);
195 }
196 #[cfg(with_metrics)]
199 {
200 let (name, cache) = &metrics_handle;
201 metrics::CACHE_ENTRIES
202 .with_label_values(&[name, type_name::<K>(), type_name::<V>()])
203 .set(i64::try_from(cache.len()).unwrap_or(i64::MAX));
204 }
205 }
206 });
207 }
208
209 fn dedup_insert(&self, key: &K, new_arc: crate::Arc<V>) -> crate::Arc<V> {
213 let guard = self.weak_index.guard();
214 let weak = Arc::downgrade(&new_arc.0);
215
216 let result = self.weak_index.compute(
217 key.clone(),
218 |entry| match entry {
219 Some((_k, existing_weak)) => match existing_weak.upgrade() {
220 Some(existing_arc) => Operation::Abort(existing_arc),
221 None => Operation::Insert(weak.clone()),
222 },
223 None => Operation::Insert(weak.clone()),
224 },
225 &guard,
226 );
227
228 let canonical_arc = match result {
229 Compute::Inserted(..) | Compute::Updated { .. } => new_arc,
230 Compute::Aborted(existing_arc) => crate::Arc(existing_arc),
231 _ => unreachable!(),
232 };
233
234 self.cache.insert(key.clone(), canonical_arc.clone());
235 canonical_arc
236 }
237
238 fn track_cache_usage(&self, maybe_value: Option<crate::Arc<V>>) -> Option<crate::Arc<V>> {
239 #[cfg(with_metrics)]
240 {
241 let metric = if maybe_value.is_some() {
242 &metrics::CACHE_HIT_COUNT
243 } else {
244 &metrics::CACHE_MISS_COUNT
245 };
246
247 metric
248 .with_label_values(&[self.name, type_name::<K>(), type_name::<V>()])
249 .inc();
250 }
251 maybe_value
252 }
253}
254
255impl<V: Clone + Send + Sync + 'static> ValueCache<CryptoHash, V> {
256 pub fn insert_hashed<T>(&self, value: Cow<Hashed<T>>) -> crate::Arc<V>
262 where
263 T: Clone,
264 V: From<Hashed<T>>,
265 {
266 let hash = (*value).hash();
267 if let Some(arc) = self.cache.peek(&hash) {
269 return arc;
270 }
271 let guard = self.weak_index.guard();
273 if let Some(weak) = self.weak_index.get(&hash, &guard) {
274 if let Some(arc) = weak.upgrade() {
275 let arc = crate::Arc(arc);
276 self.cache.insert(hash, arc.clone());
277 return arc;
278 }
279 }
280 drop(guard);
281 self.dedup_insert(&hash, crate::Arc(Arc::new(value.into_owned().into())))
282 }
283
284 #[cfg(with_testing)]
286 pub fn insert_all_hashed<'a, T>(&self, values: impl IntoIterator<Item = Cow<'a, Hashed<T>>>)
287 where
288 T: Clone + 'a,
289 V: From<Hashed<T>>,
290 {
291 for value in values {
292 self.insert_hashed(value);
293 }
294 }
295}
296
297#[cfg(with_metrics)]
298mod metrics {
299 use std::sync::LazyLock;
300
301 use linera_base::prometheus_util::{register_int_counter_vec, register_int_gauge_vec};
302 use prometheus::{IntCounterVec, IntGaugeVec};
303
304 const LABELS: &[&str] = &["cache", "key_type", "value_type"];
308
309 pub static CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
310 register_int_counter_vec("value_cache_hit", "Cache hits in `ValueCache`", LABELS)
311 });
312
313 pub static CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
314 register_int_counter_vec("value_cache_miss", "Cache misses in `ValueCache`", LABELS)
315 });
316
317 pub static CACHE_ENTRIES: LazyLock<IntGaugeVec> = LazyLock::new(|| {
318 register_int_gauge_vec(
319 "value_cache_entries",
320 "Number of entries held in the bounded `ValueCache`, sampled at \
321 each cleanup sweep",
322 LABELS,
323 )
324 });
325
326 pub static CACHE_CAPACITY: LazyLock<IntGaugeVec> = LazyLock::new(|| {
327 register_int_gauge_vec(
328 "value_cache_capacity",
329 "Maximum number of entries of the bounded `ValueCache`",
330 LABELS,
331 )
332 });
333}
334
335#[cfg(test)]
336mod tests {
337 use std::borrow::Cow;
338
339 use linera_base::{crypto::CryptoHash, hashed::Hashed};
340 use serde::{Deserialize, Serialize};
341
342 use super::{ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
343 use crate::Arc as CacheArc;
344
345 const TEST_CACHE_SIZE: usize = 10;
347
348 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
350 struct TestValue(u64);
351
352 impl linera_base::crypto::BcsHashable<'_> for TestValue {}
353
354 impl From<Hashed<TestValue>> for TestValue {
355 fn from(value: Hashed<TestValue>) -> Self {
356 value.into_inner()
357 }
358 }
359
360 fn create_test_value(n: u64) -> Hashed<TestValue> {
361 Hashed::new(TestValue(n))
362 }
363
364 fn create_test_values(iter: impl IntoIterator<Item = u64>) -> Vec<Hashed<TestValue>> {
365 iter.into_iter().map(create_test_value).collect()
366 }
367
368 fn new_hashed_cache(size: usize) -> ValueCache<CryptoHash, TestValue> {
369 ValueCache::new("test_hashed", size, DEFAULT_CLEANUP_INTERVAL_SECS)
370 }
371
372 fn new_string_cache(size: usize) -> ValueCache<u64, String> {
373 ValueCache::new("test_string", size, DEFAULT_CLEANUP_INTERVAL_SECS)
374 }
375
376 #[test]
377 fn test_retrieve_missing_value() {
378 let cache = new_hashed_cache(TEST_CACHE_SIZE);
379 let hash = CryptoHash::test_hash("Missing value");
380
381 assert!(cache.get(&hash).is_none());
382 assert!(!cache.contains(&hash));
383 }
384
385 #[test]
386 fn test_insert_and_get() {
387 let cache = new_hashed_cache(TEST_CACHE_SIZE);
388 let value = create_test_value(0);
389 let hash = value.hash();
390
391 cache.insert_hashed(Cow::Borrowed(&value));
392 assert!(cache.contains(&hash));
393 assert_eq!(cache.get(&hash).as_deref(), Some(value.inner()));
394 }
395
396 #[test]
397 fn test_insert_many_values() {
398 let cache = new_hashed_cache(TEST_CACHE_SIZE);
399 let values = create_test_values(0..TEST_CACHE_SIZE as u64);
400
401 for value in &values {
402 cache.insert_hashed(Cow::Borrowed(value));
403 }
404
405 for value in &values {
406 assert!(cache.contains(&value.hash()));
407 assert_eq!(cache.get(&value.hash()).as_deref(), Some(value.inner()));
408 }
409
410 let cache2 = new_hashed_cache(TEST_CACHE_SIZE);
412 cache2.insert_all_hashed(values.iter().map(Cow::Borrowed));
413 for value in &values {
414 assert_eq!(cache2.get(&value.hash()).as_deref(), Some(value.inner()));
415 }
416 }
417
418 #[test]
419 fn test_reinsertion_dedup() {
420 let cache = new_hashed_cache(TEST_CACHE_SIZE);
421 let values = create_test_values(0..TEST_CACHE_SIZE as u64);
422
423 let first_arcs: Vec<_> = values
425 .iter()
426 .map(|v| cache.insert_hashed(Cow::Borrowed(v)))
427 .collect();
428
429 for (value, first_arc) in values.iter().zip(&first_arcs) {
431 let second_arc = cache.insert_hashed(Cow::Borrowed(value));
432 assert!(CacheArc::ptr_eq(&second_arc, first_arc));
433 }
434 }
435
436 #[test]
437 fn test_eviction() {
438 let cache = new_hashed_cache(TEST_CACHE_SIZE);
439 let total = TEST_CACHE_SIZE * 3;
440 let values = create_test_values(0..total as u64);
441
442 for value in &values {
443 cache.insert_hashed(Cow::Borrowed(value));
444 }
445
446 let present_count = values.iter().filter(|v| cache.contains(&v.hash())).count();
447 assert!(
448 present_count <= TEST_CACHE_SIZE + 1,
449 "cache should not hold significantly more than its capacity, \
450 but has {present_count} entries for capacity {TEST_CACHE_SIZE}"
451 );
452 assert!(present_count > 0, "cache should still hold some entries");
453 }
454
455 #[test]
456 fn test_len_and_is_empty() {
457 let cache = new_string_cache(TEST_CACHE_SIZE);
458 assert!(cache.is_empty());
459 assert_eq!(cache.len(), 0);
460
461 cache.insert(&1, "a".to_string());
462 cache.insert(&2, "b".to_string());
463 assert!(!cache.is_empty());
464 assert_eq!(cache.len(), 2);
465
466 for i in 3..(TEST_CACHE_SIZE as u64 * 3) {
470 cache.insert(&i, format!("v{i}"));
471 }
472 assert!(
473 cache.len() <= TEST_CACHE_SIZE,
474 "bounded-cache occupancy {} must not exceed capacity {TEST_CACHE_SIZE}",
475 cache.len(),
476 );
477 }
478
479 #[test]
480 fn test_accessed_entry_survives_eviction() {
481 let cache = new_hashed_cache(TEST_CACHE_SIZE);
482 let promoted = create_test_value(0);
483 let promoted_hash = promoted.hash();
484
485 cache.insert_hashed(Cow::Borrowed(&promoted));
486 cache.get(&promoted_hash); let extras = create_test_values(1..=TEST_CACHE_SIZE as u64 * 2);
489 for value in &extras {
490 cache.insert_hashed(Cow::Borrowed(value));
491 }
492
493 assert!(
494 cache.contains(&promoted_hash),
495 "recently accessed entry should survive eviction"
496 );
497 }
498
499 #[test]
500 fn test_promotion_of_reinsertion() {
501 let cache = new_hashed_cache(TEST_CACHE_SIZE);
502 let promoted = create_test_value(0);
503 let promoted_hash = promoted.hash();
504
505 let first = cache.insert_hashed(Cow::Borrowed(&promoted));
506 let second = cache.insert_hashed(Cow::Borrowed(&promoted));
507 assert!(CacheArc::ptr_eq(&first, &second));
508
509 let extras = create_test_values(1..=TEST_CACHE_SIZE as u64 * 2);
510 for value in &extras {
511 cache.insert_hashed(Cow::Borrowed(value));
512 }
513
514 assert!(
515 cache.contains(&promoted_hash),
516 "re-inserted entry should survive eviction"
517 );
518 }
519
520 #[test]
521 fn test_weak_index_dedup_after_eviction() {
522 let cache = new_string_cache(2);
523
524 let held = cache.insert(&1, "hello".to_string());
526
527 cache.insert(&2, "world".to_string());
529 cache.insert(&3, "foo".to_string());
530 cache.insert(&4, "bar".to_string());
531
532 let retrieved = cache
534 .get(&1)
535 .expect("held Arc should keep entry findable via weak index");
536 assert!(
537 CacheArc::ptr_eq(&retrieved, &held),
538 "must return same allocation, not a duplicate"
539 );
540
541 let reinserted = cache.insert(&1, "replacement".to_string());
543 assert!(CacheArc::ptr_eq(&reinserted, &held));
544 assert_eq!(&*reinserted, "hello");
545 }
546
547 #[test]
548 fn test_remove_preserves_weak_for_held_arcs() {
549 let cache = new_string_cache(TEST_CACHE_SIZE);
550
551 let held = cache.insert(&1, "hello".to_string());
552
553 cache.remove(&1);
555
556 let retrieved = cache.get(&1).expect("weak index should find held Arc");
558 assert!(CacheArc::ptr_eq(&retrieved, &held));
559 }
560
561 #[test]
562 fn test_remove_without_holder() {
563 let cache = new_string_cache(TEST_CACHE_SIZE);
564
565 cache.insert(&1, "hello".to_string());
566
567 cache.remove(&1);
569 assert!(!cache.contains(&1));
570 assert!(cache.get(&1).is_none());
571 }
572
573 #[test]
574 fn test_cleanup_dead_entries() {
575 let cache = new_string_cache(2);
576
577 cache.insert(&1, "alive".to_string());
578 let _held = cache.get(&1).expect("just inserted"); cache.insert(&2, "dead".to_string());
581 cache.insert(&3, "a".to_string());
585 cache.insert(&4, "b".to_string());
586 cache.insert(&5, "c".to_string());
587
588 cache.cleanup_dead_entries();
589
590 assert!(cache.contains(&1));
592 }
593}