1#[cfg(with_metrics)]
14use std::any::type_name;
15use std::{
16 borrow::Cow,
17 hash::Hash,
18 sync::{Arc, Weak},
19};
20
21use linera_base::{crypto::CryptoHash, hashed::Hashed};
22use papaya::{Compute, Operation};
23use quick_cache::sync::Cache;
24
25pub const DEFAULT_CLEANUP_INTERVAL_SECS: u64 = 30;
27
28pub struct ValueCache<K, V> {
37 cache: Cache<K, Arc<V>>,
38 weak_index: Arc<papaya::HashMap<K, Weak<V>>>,
39}
40
41impl<K, V> ValueCache<K, V>
42where
43 K: Hash + Eq + Clone + Send + Sync + 'static,
44 V: Send + Sync + 'static,
45{
46 #[cfg(not(web))]
49 pub fn new(size: usize, cleanup_interval_secs: u64) -> Self {
50 let weak_index = Arc::new(papaya::HashMap::new());
51 Self::spawn_cleanup_task(
52 Arc::clone(&weak_index),
53 std::time::Duration::from_secs(cleanup_interval_secs),
54 );
55 ValueCache {
56 cache: Cache::new(size),
57 weak_index,
58 }
59 }
60
61 #[cfg(web)]
63 pub fn new(size: usize, _cleanup_interval_secs: u64) -> Self {
64 ValueCache {
65 cache: Cache::new(size),
66 weak_index: Arc::new(papaya::HashMap::new()),
67 }
68 }
69
70 pub fn insert(&self, key: &K, value: V) -> Arc<V> {
76 self.dedup_insert(key, Arc::new(value))
77 }
78
79 pub fn insert_arc(&self, key: &K, value: Arc<V>) -> Arc<V> {
81 self.dedup_insert(key, value)
82 }
83
84 pub fn remove(&self, key: &K) -> Option<Arc<V>> {
91 let value = self.cache.peek(key);
92 if value.is_some() {
93 self.cache.remove(key);
94 }
95 Self::track_cache_usage(value)
96 }
97
98 pub fn get(&self, key: &K) -> Option<Arc<V>> {
101 if let Some(arc) = self.cache.get(key) {
103 return Self::track_cache_usage(Some(arc));
104 }
105
106 let guard = self.weak_index.guard();
108 if let Some(weak) = self.weak_index.get(key, &guard) {
109 if let Some(arc) = weak.upgrade() {
110 self.cache.insert(key.clone(), arc.clone());
112 return Self::track_cache_usage(Some(arc));
113 }
114 }
115
116 Self::track_cache_usage(None)
117 }
118
119 pub fn contains(&self, key: &K) -> bool {
122 if self.cache.peek(key).is_some() {
123 return true;
124 }
125 let guard = self.weak_index.guard();
126 self.weak_index
127 .get(key, &guard)
128 .is_some_and(|weak| weak.strong_count() > 0)
129 }
130
131 #[cfg(with_testing)]
133 pub fn cleanup_dead_entries(&self) {
134 let guard = self.weak_index.guard();
135 self.weak_index
136 .retain(|_, weak| weak.strong_count() > 0, &guard);
137 }
138
139 #[cfg(not(web))]
143 fn spawn_cleanup_task(
144 weak_index: Arc<papaya::HashMap<K, Weak<V>>>,
145 cleanup_interval: std::time::Duration,
146 ) {
147 if tokio::runtime::Handle::try_current().is_err() {
148 return;
149 }
150 tokio::spawn(async move {
151 let mut interval = tokio::time::interval(cleanup_interval);
152 loop {
153 interval.tick().await;
154 let guard = weak_index.guard();
155 weak_index.retain(|_, weak| weak.strong_count() > 0, &guard);
156 }
157 });
158 }
159
160 fn dedup_insert(&self, key: &K, new_arc: Arc<V>) -> Arc<V> {
164 let guard = self.weak_index.guard();
165 let weak = Arc::downgrade(&new_arc);
166
167 let result = self.weak_index.compute(
168 key.clone(),
169 |entry| match entry {
170 Some((_k, existing_weak)) => match existing_weak.upgrade() {
171 Some(existing_arc) => Operation::Abort(existing_arc),
172 None => Operation::Insert(weak.clone()),
173 },
174 None => Operation::Insert(weak.clone()),
175 },
176 &guard,
177 );
178
179 let canonical_arc = match result {
180 Compute::Inserted(..) | Compute::Updated { .. } => new_arc,
181 Compute::Aborted(existing_arc) => existing_arc,
182 _ => unreachable!(),
183 };
184
185 self.cache.insert(key.clone(), canonical_arc.clone());
186 canonical_arc
187 }
188
189 fn track_cache_usage(maybe_value: Option<Arc<V>>) -> Option<Arc<V>> {
190 #[cfg(with_metrics)]
191 {
192 let metric = if maybe_value.is_some() {
193 &metrics::CACHE_HIT_COUNT
194 } else {
195 &metrics::CACHE_MISS_COUNT
196 };
197
198 metric
199 .with_label_values(&[type_name::<K>(), type_name::<V>()])
200 .inc();
201 }
202 maybe_value
203 }
204}
205
206impl<V: Clone + Send + Sync + 'static> ValueCache<CryptoHash, V> {
207 pub fn insert_hashed<T>(&self, value: Cow<Hashed<T>>) -> Arc<V>
213 where
214 T: Clone,
215 V: From<Hashed<T>>,
216 {
217 let hash = (*value).hash();
218 if let Some(arc) = self.cache.peek(&hash) {
220 return arc;
221 }
222 let guard = self.weak_index.guard();
224 if let Some(weak) = self.weak_index.get(&hash, &guard) {
225 if let Some(arc) = weak.upgrade() {
226 self.cache.insert(hash, arc.clone());
227 return arc;
228 }
229 }
230 drop(guard);
231 self.dedup_insert(&hash, Arc::new(value.into_owned().into()))
232 }
233
234 #[cfg(with_testing)]
236 pub fn insert_all_hashed<'a, T>(&self, values: impl IntoIterator<Item = Cow<'a, Hashed<T>>>)
237 where
238 T: Clone + 'a,
239 V: From<Hashed<T>>,
240 {
241 for value in values {
242 self.insert_hashed(value);
243 }
244 }
245}
246
247#[cfg(with_metrics)]
248mod metrics {
249 use std::sync::LazyLock;
250
251 use linera_base::prometheus_util::register_int_counter_vec;
252 use prometheus::IntCounterVec;
253
254 pub static CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
255 register_int_counter_vec(
256 "value_cache_hit",
257 "Cache hits in `ValueCache`",
258 &["key_type", "value_type"],
259 )
260 });
261
262 pub static CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
263 register_int_counter_vec(
264 "value_cache_miss",
265 "Cache misses in `ValueCache`",
266 &["key_type", "value_type"],
267 )
268 });
269}
270
271#[cfg(test)]
272mod tests {
273 use std::{borrow::Cow, sync::Arc};
274
275 use linera_base::{crypto::CryptoHash, hashed::Hashed};
276 use serde::{Deserialize, Serialize};
277
278 use super::{ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
279
280 const TEST_CACHE_SIZE: usize = 10;
282
283 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
285 struct TestValue(u64);
286
287 impl linera_base::crypto::BcsHashable<'_> for TestValue {}
288
289 impl From<Hashed<TestValue>> for TestValue {
290 fn from(value: Hashed<TestValue>) -> Self {
291 value.into_inner()
292 }
293 }
294
295 fn create_test_value(n: u64) -> Hashed<TestValue> {
296 Hashed::new(TestValue(n))
297 }
298
299 fn create_test_values(iter: impl IntoIterator<Item = u64>) -> Vec<Hashed<TestValue>> {
300 iter.into_iter().map(create_test_value).collect()
301 }
302
303 fn new_hashed_cache(size: usize) -> ValueCache<CryptoHash, TestValue> {
304 ValueCache::new(size, DEFAULT_CLEANUP_INTERVAL_SECS)
305 }
306
307 fn new_string_cache(size: usize) -> ValueCache<u64, String> {
308 ValueCache::new(size, DEFAULT_CLEANUP_INTERVAL_SECS)
309 }
310
311 #[test]
312 fn test_retrieve_missing_value() {
313 let cache = new_hashed_cache(TEST_CACHE_SIZE);
314 let hash = CryptoHash::test_hash("Missing value");
315
316 assert!(cache.get(&hash).is_none());
317 assert!(!cache.contains(&hash));
318 }
319
320 #[test]
321 fn test_insert_and_get() {
322 let cache = new_hashed_cache(TEST_CACHE_SIZE);
323 let value = create_test_value(0);
324 let hash = value.hash();
325
326 cache.insert_hashed(Cow::Borrowed(&value));
327 assert!(cache.contains(&hash));
328 assert_eq!(cache.get(&hash).as_deref(), Some(value.inner()));
329 }
330
331 #[test]
332 fn test_insert_many_values() {
333 let cache = new_hashed_cache(TEST_CACHE_SIZE);
334 let values = create_test_values(0..TEST_CACHE_SIZE as u64);
335
336 for value in &values {
337 cache.insert_hashed(Cow::Borrowed(value));
338 }
339
340 for value in &values {
341 assert!(cache.contains(&value.hash()));
342 assert_eq!(cache.get(&value.hash()).as_deref(), Some(value.inner()));
343 }
344
345 let cache2 = new_hashed_cache(TEST_CACHE_SIZE);
347 cache2.insert_all_hashed(values.iter().map(Cow::Borrowed));
348 for value in &values {
349 assert_eq!(cache2.get(&value.hash()).as_deref(), Some(value.inner()));
350 }
351 }
352
353 #[test]
354 fn test_reinsertion_dedup() {
355 let cache = new_hashed_cache(TEST_CACHE_SIZE);
356 let values = create_test_values(0..TEST_CACHE_SIZE as u64);
357
358 let first_arcs: Vec<_> = values
360 .iter()
361 .map(|v| cache.insert_hashed(Cow::Borrowed(v)))
362 .collect();
363
364 for (value, first_arc) in values.iter().zip(&first_arcs) {
366 let second_arc = cache.insert_hashed(Cow::Borrowed(value));
367 assert!(Arc::ptr_eq(&second_arc, first_arc));
368 }
369 }
370
371 #[test]
372 fn test_eviction() {
373 let cache = new_hashed_cache(TEST_CACHE_SIZE);
374 let total = TEST_CACHE_SIZE * 3;
375 let values = create_test_values(0..total as u64);
376
377 for value in &values {
378 cache.insert_hashed(Cow::Borrowed(value));
379 }
380
381 let present_count = values.iter().filter(|v| cache.contains(&v.hash())).count();
382 assert!(
383 present_count <= TEST_CACHE_SIZE + 1,
384 "cache should not hold significantly more than its capacity, \
385 but has {present_count} entries for capacity {TEST_CACHE_SIZE}"
386 );
387 assert!(present_count > 0, "cache should still hold some entries");
388 }
389
390 #[test]
391 fn test_accessed_entry_survives_eviction() {
392 let cache = new_hashed_cache(TEST_CACHE_SIZE);
393 let promoted = create_test_value(0);
394 let promoted_hash = promoted.hash();
395
396 cache.insert_hashed(Cow::Borrowed(&promoted));
397 cache.get(&promoted_hash); let extras = create_test_values(1..=TEST_CACHE_SIZE as u64 * 2);
400 for value in &extras {
401 cache.insert_hashed(Cow::Borrowed(value));
402 }
403
404 assert!(
405 cache.contains(&promoted_hash),
406 "recently accessed entry should survive eviction"
407 );
408 }
409
410 #[test]
411 fn test_promotion_of_reinsertion() {
412 let cache = new_hashed_cache(TEST_CACHE_SIZE);
413 let promoted = create_test_value(0);
414 let promoted_hash = promoted.hash();
415
416 let first = cache.insert_hashed(Cow::Borrowed(&promoted));
417 let second = cache.insert_hashed(Cow::Borrowed(&promoted));
418 assert!(Arc::ptr_eq(&first, &second));
419
420 let extras = create_test_values(1..=TEST_CACHE_SIZE as u64 * 2);
421 for value in &extras {
422 cache.insert_hashed(Cow::Borrowed(value));
423 }
424
425 assert!(
426 cache.contains(&promoted_hash),
427 "re-inserted entry should survive eviction"
428 );
429 }
430
431 #[test]
432 fn test_weak_index_dedup_after_eviction() {
433 let cache = new_string_cache(2);
434
435 let held = cache.insert(&1, "hello".to_string());
437
438 cache.insert(&2, "world".to_string());
440 cache.insert(&3, "foo".to_string());
441 cache.insert(&4, "bar".to_string());
442
443 let retrieved = cache
445 .get(&1)
446 .expect("held Arc should keep entry findable via weak index");
447 assert!(
448 Arc::ptr_eq(&retrieved, &held),
449 "must return same allocation, not a duplicate"
450 );
451
452 let reinserted = cache.insert(&1, "replacement".to_string());
454 assert!(Arc::ptr_eq(&reinserted, &held));
455 assert_eq!(&*reinserted, "hello");
456 }
457
458 #[test]
459 fn test_remove_preserves_weak_for_held_arcs() {
460 let cache = new_string_cache(TEST_CACHE_SIZE);
461
462 let held = cache.insert(&1, "hello".to_string());
463
464 cache.remove(&1);
466
467 let retrieved = cache.get(&1).expect("weak index should find held Arc");
469 assert!(Arc::ptr_eq(&retrieved, &held));
470 }
471
472 #[test]
473 fn test_remove_without_holder() {
474 let cache = new_string_cache(TEST_CACHE_SIZE);
475
476 cache.insert(&1, "hello".to_string());
477
478 cache.remove(&1);
480 assert!(!cache.contains(&1));
481 assert!(cache.get(&1).is_none());
482 }
483
484 #[test]
485 fn test_cleanup_dead_entries() {
486 let cache = new_string_cache(2);
487
488 cache.insert(&1, "alive".to_string());
489 let _held = cache.get(&1).expect("just inserted"); cache.insert(&2, "dead".to_string());
492 cache.insert(&3, "a".to_string());
496 cache.insert(&4, "b".to_string());
497 cache.insert(&5, "c".to_string());
498
499 cache.cleanup_dead_entries();
500
501 assert!(cache.contains(&1));
503 }
504
505 #[test]
506 fn test_insert_arc_dedup() {
507 let cache = new_string_cache(TEST_CACHE_SIZE);
508 let value = Arc::new("hello".to_string());
509
510 let first = cache.insert_arc(&1, value.clone());
511 assert!(Arc::ptr_eq(&first, &value));
512
513 let second = cache.insert_arc(&1, Arc::new("other".to_string()));
514 assert!(Arc::ptr_eq(&second, &value));
515
516 assert_eq!(&*cache.get(&1).expect("just inserted"), "hello");
517 }
518}