1use std::{
7 collections::{btree_map, hash_map::RandomState, BTreeMap},
8 sync::{Arc, Mutex},
9};
10
11use linked_hash_map::LinkedHashMap;
12use serde::{Deserialize, Serialize};
13
14use crate::{
15 batch::{Batch, WriteOperation},
16 common::get_interval,
17 store::{AdminKeyValueStore, ReadableKeyValueStore, WithError, WritableKeyValueStore},
18};
19#[cfg(with_testing)]
20use crate::{memory::MemoryStore, store::TestKeyValueStore};
21
22#[cfg(with_metrics)]
23mod metrics {
24 use std::sync::LazyLock;
25
26 use linera_base::prometheus_util::register_int_counter_vec;
27 use prometheus::IntCounterVec;
28
29 pub static READ_VALUE_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
31 register_int_counter_vec(
32 "num_read_value_cache_miss",
33 "Number of read value cache misses",
34 &[],
35 )
36 });
37
38 pub static READ_VALUE_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
40 register_int_counter_vec(
41 "num_read_value_cache_hits",
42 "Number of read value cache hits",
43 &[],
44 )
45 });
46
47 pub static CONTAINS_KEY_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
49 register_int_counter_vec(
50 "num_contains_key_cache_miss",
51 "Number of contains key cache misses",
52 &[],
53 )
54 });
55
56 pub static CONTAINS_KEY_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
58 register_int_counter_vec(
59 "num_contains_key_cache_hit",
60 "Number of contains key cache hits",
61 &[],
62 )
63 });
64}
65
66#[derive(Clone, Debug, Serialize, Deserialize)]
68pub struct StorageCacheConfig {
69 pub max_cache_size: usize,
71 pub max_entry_size: usize,
73 pub max_cache_entries: usize,
75}
76
77pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
81 max_cache_size: 10000000,
82 max_entry_size: 1000000,
83 max_cache_entries: 1000,
84};
85
86enum CacheEntry {
87 DoesNotExist,
88 Exists,
89 Value(Vec<u8>),
90}
91
92impl CacheEntry {
93 fn size(&self) -> usize {
94 match self {
95 CacheEntry::Value(vec) => vec.len(),
96 _ => 0,
97 }
98 }
99}
100
101struct LruPrefixCache {
106 map: BTreeMap<Vec<u8>, CacheEntry>,
107 queue: LinkedHashMap<Vec<u8>, usize, RandomState>,
108 storage_cache_config: StorageCacheConfig,
109 total_size: usize,
110 has_exclusive_access: bool,
112}
113
114impl LruPrefixCache {
115 pub fn new(storage_cache_config: StorageCacheConfig) -> Self {
117 Self {
118 map: BTreeMap::new(),
119 queue: LinkedHashMap::new(),
120 storage_cache_config,
121 total_size: 0,
122 has_exclusive_access: false,
123 }
124 }
125
126 fn trim_cache(&mut self) {
128 while self.total_size > self.storage_cache_config.max_cache_size
129 || self.queue.len() > self.storage_cache_config.max_cache_entries
130 {
131 let Some((key, key_value_size)) = self.queue.pop_front() else {
132 break;
133 };
134 self.map.remove(&key);
135 self.total_size -= key_value_size;
136 }
137 }
138
139 pub fn insert(&mut self, key: Vec<u8>, cache_entry: CacheEntry) {
141 let key_value_size = key.len() + cache_entry.size();
142 if (matches!(cache_entry, CacheEntry::DoesNotExist) && !self.has_exclusive_access)
143 || key_value_size > self.storage_cache_config.max_entry_size
144 {
145 if let Some(old_key_value_size) = self.queue.remove(&key) {
147 self.total_size -= old_key_value_size;
148 self.map.remove(&key);
149 };
150 return;
151 }
152 match self.map.entry(key.clone()) {
153 btree_map::Entry::Occupied(mut entry) => {
154 entry.insert(cache_entry);
155 let old_key_value_size = self.queue.remove(&key).expect("old_key_value_size");
157 self.total_size -= old_key_value_size;
158 self.queue.insert(key, key_value_size);
159 self.total_size += key_value_size;
160 }
161 btree_map::Entry::Vacant(entry) => {
162 entry.insert(cache_entry);
163 self.queue.insert(key, key_value_size);
164 self.total_size += key_value_size;
165 }
166 }
167 self.trim_cache();
168 }
169
170 pub fn insert_read_value(&mut self, key: Vec<u8>, value: &Option<Vec<u8>>) {
172 let cache_entry = match value {
173 None => CacheEntry::DoesNotExist,
174 Some(vec) => CacheEntry::Value(vec.to_vec()),
175 };
176 self.insert(key, cache_entry)
177 }
178
179 pub fn insert_contains_key(&mut self, key: Vec<u8>, result: bool) {
181 let cache_entry = match result {
182 false => CacheEntry::DoesNotExist,
183 true => CacheEntry::Exists,
184 };
185 self.insert(key, cache_entry)
186 }
187
188 pub fn delete_prefix(&mut self, key_prefix: &[u8]) {
191 if self.has_exclusive_access {
192 for (key, value) in self.map.range_mut(get_interval(key_prefix.to_vec())) {
193 *self.queue.get_mut(key).unwrap() = key.len();
194 self.total_size -= value.size();
195 *value = CacheEntry::DoesNotExist;
196 }
197 } else {
198 let mut keys = Vec::new();
200 for (key, _) in self.map.range(get_interval(key_prefix.to_vec())) {
201 keys.push(key.to_vec());
202 }
203 for key in keys {
204 self.map.remove(&key);
205 let Some(key_value_size) = self.queue.remove(&key) else {
206 unreachable!("The key should be in the queue");
207 };
208 self.total_size -= key_value_size;
209 }
210 }
211 }
212
213 pub fn query_read_value(&mut self, key: &[u8]) -> Option<Option<Vec<u8>>> {
217 let result = match self.map.get(key) {
218 None => None,
219 Some(entry) => match entry {
220 CacheEntry::DoesNotExist => Some(None),
221 CacheEntry::Exists => None,
222 CacheEntry::Value(vec) => Some(Some(vec.clone())),
223 },
224 };
225 if result.is_some() {
226 let key_value_size = self.queue.remove(key).expect("key_value_size");
228 self.queue.insert(key.to_vec(), key_value_size);
229 }
230 result
231 }
232
233 pub fn query_contains_key(&mut self, key: &[u8]) -> Option<bool> {
236 let result = self
237 .map
238 .get(key)
239 .map(|entry| !matches!(entry, CacheEntry::DoesNotExist));
240 if result.is_some() {
241 let key_value_size = self.queue.remove(key).expect("key_value_size");
243 self.queue.insert(key.to_vec(), key_value_size);
244 }
245 result
246 }
247}
248
249#[derive(Clone)]
251pub struct LruCachingStore<K> {
252 store: K,
254 cache: Option<Arc<Mutex<LruPrefixCache>>>,
256}
257
258impl<K> WithError for LruCachingStore<K>
259where
260 K: WithError,
261{
262 type Error = K::Error;
263}
264
265impl<K> ReadableKeyValueStore for LruCachingStore<K>
266where
267 K: ReadableKeyValueStore,
268{
269 const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
271 type Keys = K::Keys;
272 type KeyValues = K::KeyValues;
273
274 fn max_stream_queries(&self) -> usize {
275 self.store.max_stream_queries()
276 }
277
278 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
279 let Some(cache) = &self.cache else {
280 return self.store.read_value_bytes(key).await;
281 };
282 {
284 let mut cache = cache.lock().unwrap();
285 if let Some(value) = cache.query_read_value(key) {
286 #[cfg(with_metrics)]
287 metrics::READ_VALUE_CACHE_HIT_COUNT
288 .with_label_values(&[])
289 .inc();
290 return Ok(value);
291 }
292 }
293 #[cfg(with_metrics)]
294 metrics::READ_VALUE_CACHE_MISS_COUNT
295 .with_label_values(&[])
296 .inc();
297 let value = self.store.read_value_bytes(key).await?;
298 let mut cache = cache.lock().unwrap();
299 cache.insert_read_value(key.to_vec(), &value);
300 Ok(value)
301 }
302
303 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
304 let Some(cache) = &self.cache else {
305 return self.store.contains_key(key).await;
306 };
307 {
308 let mut cache = cache.lock().unwrap();
309 if let Some(value) = cache.query_contains_key(key) {
310 #[cfg(with_metrics)]
311 metrics::CONTAINS_KEY_CACHE_HIT_COUNT
312 .with_label_values(&[])
313 .inc();
314 return Ok(value);
315 }
316 }
317 #[cfg(with_metrics)]
318 metrics::CONTAINS_KEY_CACHE_MISS_COUNT
319 .with_label_values(&[])
320 .inc();
321 let result = self.store.contains_key(key).await?;
322 let mut cache = cache.lock().unwrap();
323 cache.insert_contains_key(key.to_vec(), result);
324 Ok(result)
325 }
326
327 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
328 let Some(cache) = &self.cache else {
329 return self.store.contains_keys(keys).await;
330 };
331 let size = keys.len();
332 let mut results = vec![false; size];
333 let mut indices = Vec::new();
334 let mut key_requests = Vec::new();
335 {
336 let mut cache = cache.lock().unwrap();
337 for i in 0..size {
338 if let Some(value) = cache.query_contains_key(&keys[i]) {
339 #[cfg(with_metrics)]
340 metrics::CONTAINS_KEY_CACHE_HIT_COUNT
341 .with_label_values(&[])
342 .inc();
343 results[i] = value;
344 } else {
345 #[cfg(with_metrics)]
346 metrics::CONTAINS_KEY_CACHE_MISS_COUNT
347 .with_label_values(&[])
348 .inc();
349 indices.push(i);
350 key_requests.push(keys[i].clone());
351 }
352 }
353 }
354 if !key_requests.is_empty() {
355 let key_results = self.store.contains_keys(key_requests.clone()).await?;
356 let mut cache = cache.lock().unwrap();
357 for ((index, result), key) in indices.into_iter().zip(key_results).zip(key_requests) {
358 results[index] = result;
359 cache.insert_contains_key(key, result);
360 }
361 }
362 Ok(results)
363 }
364
365 async fn read_multi_values_bytes(
366 &self,
367 keys: Vec<Vec<u8>>,
368 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
369 let Some(cache) = &self.cache else {
370 return self.store.read_multi_values_bytes(keys).await;
371 };
372
373 let mut result = Vec::with_capacity(keys.len());
374 let mut cache_miss_indices = Vec::new();
375 let mut miss_keys = Vec::new();
376 {
377 let mut cache = cache.lock().unwrap();
378 for (i, key) in keys.into_iter().enumerate() {
379 if let Some(value) = cache.query_read_value(&key) {
380 #[cfg(with_metrics)]
381 metrics::READ_VALUE_CACHE_HIT_COUNT
382 .with_label_values(&[])
383 .inc();
384 result.push(value);
385 } else {
386 #[cfg(with_metrics)]
387 metrics::READ_VALUE_CACHE_MISS_COUNT
388 .with_label_values(&[])
389 .inc();
390 result.push(None);
391 cache_miss_indices.push(i);
392 miss_keys.push(key);
393 }
394 }
395 }
396 if !miss_keys.is_empty() {
397 let values = self
398 .store
399 .read_multi_values_bytes(miss_keys.clone())
400 .await?;
401 let mut cache = cache.lock().unwrap();
402 for (i, (key, value)) in cache_miss_indices
403 .into_iter()
404 .zip(miss_keys.into_iter().zip(values))
405 {
406 cache.insert_read_value(key, &value);
407 result[i] = value;
408 }
409 }
410 Ok(result)
411 }
412
413 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, Self::Error> {
414 self.store.find_keys_by_prefix(key_prefix).await
415 }
416
417 async fn find_key_values_by_prefix(
418 &self,
419 key_prefix: &[u8],
420 ) -> Result<Self::KeyValues, Self::Error> {
421 self.store.find_key_values_by_prefix(key_prefix).await
422 }
423}
424
425impl<K> WritableKeyValueStore for LruCachingStore<K>
426where
427 K: WritableKeyValueStore,
428{
429 const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
431
432 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
433 let Some(cache) = &self.cache else {
434 return self.store.write_batch(batch).await;
435 };
436
437 {
438 let mut cache = cache.lock().unwrap();
439 for operation in &batch.operations {
440 match operation {
441 WriteOperation::Put { key, value } => {
442 let cache_entry = CacheEntry::Value(value.to_vec());
443 cache.insert(key.to_vec(), cache_entry);
444 }
445 WriteOperation::Delete { key } => {
446 let cache_entry = CacheEntry::DoesNotExist;
447 cache.insert(key.to_vec(), cache_entry);
448 }
449 WriteOperation::DeletePrefix { key_prefix } => {
450 cache.delete_prefix(key_prefix);
451 }
452 }
453 }
454 }
455 self.store.write_batch(batch).await
456 }
457
458 async fn clear_journal(&self) -> Result<(), Self::Error> {
459 self.store.clear_journal().await
460 }
461}
462
463#[derive(Debug, Clone, Serialize, Deserialize)]
465pub struct LruCachingConfig<C> {
466 pub inner_config: C,
468 pub storage_cache_config: StorageCacheConfig,
470}
471
472impl<K> AdminKeyValueStore for LruCachingStore<K>
473where
474 K: AdminKeyValueStore,
475{
476 type Config = LruCachingConfig<K::Config>;
477
478 fn get_name() -> String {
479 format!("lru caching {}", K::get_name())
480 }
481
482 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
483 let store = K::connect(&config.inner_config, namespace).await?;
484 Ok(LruCachingStore::new(
485 store,
486 config.storage_cache_config.clone(),
487 ))
488 }
489
490 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
491 let store = self.store.open_exclusive(root_key)?;
492 let store = LruCachingStore::new(store, self.storage_cache_config());
493 store.enable_exclusive_access();
494 Ok(store)
495 }
496
497 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
498 K::list_all(&config.inner_config).await
499 }
500
501 async fn list_root_keys(
502 config: &Self::Config,
503 namespace: &str,
504 ) -> Result<Vec<Vec<u8>>, Self::Error> {
505 K::list_root_keys(&config.inner_config, namespace).await
506 }
507
508 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
509 K::delete_all(&config.inner_config).await
510 }
511
512 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
513 K::exists(&config.inner_config, namespace).await
514 }
515
516 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
517 K::create(&config.inner_config, namespace).await
518 }
519
520 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
521 K::delete(&config.inner_config, namespace).await
522 }
523}
524
525#[cfg(with_testing)]
526impl<K> TestKeyValueStore for LruCachingStore<K>
527where
528 K: TestKeyValueStore,
529{
530 async fn new_test_config() -> Result<LruCachingConfig<K::Config>, K::Error> {
531 let inner_config = K::new_test_config().await?;
532 let storage_cache_config = DEFAULT_STORAGE_CACHE_CONFIG;
533 Ok(LruCachingConfig {
534 inner_config,
535 storage_cache_config,
536 })
537 }
538}
539
540impl<K> LruCachingStore<K> {
541 pub fn new(store: K, storage_cache_config: StorageCacheConfig) -> Self {
543 let cache = {
544 if storage_cache_config.max_cache_entries == 0 {
545 None
546 } else {
547 Some(Arc::new(Mutex::new(LruPrefixCache::new(
548 storage_cache_config,
549 ))))
550 }
551 };
552 Self { store, cache }
553 }
554
555 pub fn storage_cache_config(&self) -> StorageCacheConfig {
557 match &self.cache {
558 None => StorageCacheConfig {
559 max_cache_size: 0,
560 max_entry_size: 0,
561 max_cache_entries: 0,
562 },
563 Some(cache) => {
564 let cache = cache.lock().unwrap();
565 cache.storage_cache_config.clone()
566 }
567 }
568 }
569
570 pub fn enable_exclusive_access(&self) {
572 if let Some(cache) = &self.cache {
573 let mut cache = cache.lock().unwrap();
574 cache.has_exclusive_access = true;
575 }
576 }
577}
578
579#[cfg(with_testing)]
581pub type LruCachingMemoryStore = LruCachingStore<MemoryStore>;