1use std::sync::{Arc, Mutex};
7
8use serde::{Deserialize, Serialize};
9
10#[cfg(with_testing)]
11use crate::memory::MemoryDatabase;
12#[cfg(with_testing)]
13use crate::store::TestKeyValueDatabase;
14use crate::{
15 batch::{Batch, WriteOperation},
16 lru_prefix_cache::{LruPrefixCache, StorageCacheConfig},
17 store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
18};
19
20#[cfg(with_metrics)]
21mod metrics {
22 use std::sync::LazyLock;
23
24 use linera_base::prometheus_util::register_int_counter_vec;
25 use prometheus::IntCounterVec;
26
27 pub static READ_VALUE_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
29 register_int_counter_vec(
30 "num_read_value_cache_miss",
31 "Number of read value cache misses",
32 &[],
33 )
34 });
35
36 pub static READ_VALUE_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
38 register_int_counter_vec(
39 "num_read_value_cache_hits",
40 "Number of read value cache hits",
41 &[],
42 )
43 });
44
45 pub static CONTAINS_KEY_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
47 register_int_counter_vec(
48 "num_contains_key_cache_miss",
49 "Number of contains key cache misses",
50 &[],
51 )
52 });
53
54 pub static CONTAINS_KEY_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
56 register_int_counter_vec(
57 "num_contains_key_cache_hit",
58 "Number of contains key cache hits",
59 &[],
60 )
61 });
62
63 pub static FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT: LazyLock<IntCounterVec> =
65 LazyLock::new(|| {
66 register_int_counter_vec(
67 "num_find_keys_by_prefix_cache_miss",
68 "Number of find keys by prefix cache misses",
69 &[],
70 )
71 });
72
73 pub static FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
75 register_int_counter_vec(
76 "num_find_keys_by_prefix_cache_hit",
77 "Number of find keys by prefix cache hits",
78 &[],
79 )
80 });
81
82 pub static FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT: LazyLock<IntCounterVec> =
84 LazyLock::new(|| {
85 register_int_counter_vec(
86 "num_find_key_values_by_prefix_cache_miss",
87 "Number of find key values by prefix cache misses",
88 &[],
89 )
90 });
91
92 pub static FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT: LazyLock<IntCounterVec> =
94 LazyLock::new(|| {
95 register_int_counter_vec(
96 "num_find_key_values_by_prefix_cache_hit",
97 "Number of find key values by prefix cache hits",
98 &[],
99 )
100 });
101}
102
103pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
107 max_cache_size: 10000000,
108 max_value_entry_size: 1000000,
109 max_find_keys_entry_size: 1000000,
110 max_find_key_values_entry_size: 1000000,
111 max_cache_entries: 1000,
112 max_cache_value_size: 10000000,
113 max_cache_find_keys_size: 10000000,
114 max_cache_find_key_values_size: 10000000,
115};
116
117#[derive(Clone)]
119pub struct LruCachingDatabase<D> {
120 database: D,
122 config: StorageCacheConfig,
124}
125
126#[derive(Clone)]
128pub struct LruCachingStore<S> {
129 store: S,
131 cache: Option<Arc<Mutex<LruPrefixCache>>>,
133}
134
135impl<D> WithError for LruCachingDatabase<D>
136where
137 D: WithError,
138{
139 type Error = D::Error;
140}
141
142impl<S> WithError for LruCachingStore<S>
143where
144 S: WithError,
145{
146 type Error = S::Error;
147}
148
149impl<K> ReadableKeyValueStore for LruCachingStore<K>
150where
151 K: ReadableKeyValueStore,
152{
153 const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
155
156 fn max_stream_queries(&self) -> usize {
157 self.store.max_stream_queries()
158 }
159
160 fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
161 self.store.root_key()
162 }
163
164 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
165 let Some(cache) = &self.cache else {
166 return self.store.read_value_bytes(key).await;
167 };
168 {
170 let mut cache = cache.lock().unwrap();
171 if let Some(value) = cache.query_read_value(key) {
172 #[cfg(with_metrics)]
173 metrics::READ_VALUE_CACHE_HIT_COUNT
174 .with_label_values(&[])
175 .inc();
176 return Ok(value);
177 }
178 }
179 #[cfg(with_metrics)]
180 metrics::READ_VALUE_CACHE_MISS_COUNT
181 .with_label_values(&[])
182 .inc();
183 let value = self.store.read_value_bytes(key).await?;
184 let mut cache = cache.lock().unwrap();
185 cache.insert_read_value(key, &value);
186 Ok(value)
187 }
188
189 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
190 let Some(cache) = &self.cache else {
191 return self.store.contains_key(key).await;
192 };
193 {
194 let mut cache = cache.lock().unwrap();
195 if let Some(value) = cache.query_contains_key(key) {
196 #[cfg(with_metrics)]
197 metrics::CONTAINS_KEY_CACHE_HIT_COUNT
198 .with_label_values(&[])
199 .inc();
200 return Ok(value);
201 }
202 }
203 #[cfg(with_metrics)]
204 metrics::CONTAINS_KEY_CACHE_MISS_COUNT
205 .with_label_values(&[])
206 .inc();
207 let result = self.store.contains_key(key).await?;
208 let mut cache = cache.lock().unwrap();
209 cache.insert_contains_key(key, result);
210 Ok(result)
211 }
212
213 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
214 let Some(cache) = &self.cache else {
215 return self.store.contains_keys(keys).await;
216 };
217 let size = keys.len();
218 let mut results = vec![false; size];
219 let mut indices = Vec::new();
220 let mut key_requests = Vec::new();
221 {
222 let mut cache = cache.lock().unwrap();
223 for i in 0..size {
224 if let Some(value) = cache.query_contains_key(&keys[i]) {
225 #[cfg(with_metrics)]
226 metrics::CONTAINS_KEY_CACHE_HIT_COUNT
227 .with_label_values(&[])
228 .inc();
229 results[i] = value;
230 } else {
231 #[cfg(with_metrics)]
232 metrics::CONTAINS_KEY_CACHE_MISS_COUNT
233 .with_label_values(&[])
234 .inc();
235 indices.push(i);
236 key_requests.push(keys[i].clone());
237 }
238 }
239 }
240 if !key_requests.is_empty() {
241 let key_results = self.store.contains_keys(key_requests.clone()).await?;
242 let mut cache = cache.lock().unwrap();
243 for ((index, result), key) in indices.into_iter().zip(key_results).zip(key_requests) {
244 results[index] = result;
245 cache.insert_contains_key(&key, result);
246 }
247 }
248 Ok(results)
249 }
250
251 async fn read_multi_values_bytes(
252 &self,
253 keys: Vec<Vec<u8>>,
254 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
255 let Some(cache) = &self.cache else {
256 return self.store.read_multi_values_bytes(keys).await;
257 };
258
259 let mut result = Vec::with_capacity(keys.len());
260 let mut cache_miss_indices = Vec::new();
261 let mut miss_keys = Vec::new();
262 {
263 let mut cache = cache.lock().unwrap();
264 for (i, key) in keys.into_iter().enumerate() {
265 if let Some(value) = cache.query_read_value(&key) {
266 #[cfg(with_metrics)]
267 metrics::READ_VALUE_CACHE_HIT_COUNT
268 .with_label_values(&[])
269 .inc();
270 result.push(value);
271 } else {
272 #[cfg(with_metrics)]
273 metrics::READ_VALUE_CACHE_MISS_COUNT
274 .with_label_values(&[])
275 .inc();
276 result.push(None);
277 cache_miss_indices.push(i);
278 miss_keys.push(key);
279 }
280 }
281 }
282 if !miss_keys.is_empty() {
283 let values = self
284 .store
285 .read_multi_values_bytes(miss_keys.clone())
286 .await?;
287 let mut cache = cache.lock().unwrap();
288 for (i, (key, value)) in cache_miss_indices
289 .into_iter()
290 .zip(miss_keys.into_iter().zip(values))
291 {
292 cache.insert_read_value(&key, &value);
293 result[i] = value;
294 }
295 }
296 Ok(result)
297 }
298
299 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
300 let Some(cache) = self.get_exclusive_cache() else {
301 return self.store.find_keys_by_prefix(key_prefix).await;
302 };
303 {
304 let mut cache = cache.lock().unwrap();
305 if let Some(value) = cache.query_find_keys(key_prefix) {
306 #[cfg(with_metrics)]
307 metrics::FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT
308 .with_label_values(&[])
309 .inc();
310 return Ok(value);
311 }
312 }
313 #[cfg(with_metrics)]
314 metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT
315 .with_label_values(&[])
316 .inc();
317 let keys = self.store.find_keys_by_prefix(key_prefix).await?;
318 let mut cache = cache.lock().unwrap();
319 cache.insert_find_keys(key_prefix.to_vec(), &keys);
320 Ok(keys)
321 }
322
323 async fn find_key_values_by_prefix(
324 &self,
325 key_prefix: &[u8],
326 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
327 let Some(cache) = self.get_exclusive_cache() else {
328 return self.store.find_key_values_by_prefix(key_prefix).await;
329 };
330 {
331 let mut cache = cache.lock().unwrap();
332 if let Some(value) = cache.query_find_key_values(key_prefix) {
333 #[cfg(with_metrics)]
334 metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT
335 .with_label_values(&[])
336 .inc();
337 return Ok(value);
338 }
339 }
340 #[cfg(with_metrics)]
341 metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT
342 .with_label_values(&[])
343 .inc();
344 let key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
345 let mut cache = cache.lock().unwrap();
346 cache.insert_find_key_values(key_prefix.to_vec(), &key_values);
347 Ok(key_values)
348 }
349}
350
351impl<K> WritableKeyValueStore for LruCachingStore<K>
352where
353 K: WritableKeyValueStore,
354{
355 const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
357
358 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
359 let Some(cache) = &self.cache else {
360 return self.store.write_batch(batch).await;
361 };
362
363 {
364 let mut cache = cache.lock().unwrap();
365 for operation in &batch.operations {
366 match operation {
367 WriteOperation::Put { key, value } => {
368 cache.put_key_value(key, value);
369 }
370 WriteOperation::Delete { key } => {
371 cache.delete_key(key);
372 }
373 WriteOperation::DeletePrefix { key_prefix } => {
374 cache.delete_prefix(key_prefix);
375 }
376 }
377 }
378 }
379 self.store.write_batch(batch).await
380 }
381
382 async fn clear_journal(&self) -> Result<(), Self::Error> {
383 self.store.clear_journal().await
384 }
385}
386
387#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct LruCachingConfig<C> {
390 pub inner_config: C,
392 pub storage_cache_config: StorageCacheConfig,
394}
395
396impl<D> KeyValueDatabase for LruCachingDatabase<D>
397where
398 D: KeyValueDatabase,
399{
400 type Config = LruCachingConfig<D::Config>;
401
402 type Store = LruCachingStore<D::Store>;
403
404 fn get_name() -> String {
405 format!("lru caching {}", D::get_name())
406 }
407
408 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
409 let database = D::connect(&config.inner_config, namespace).await?;
410 Ok(LruCachingDatabase {
411 database,
412 config: config.storage_cache_config.clone(),
413 })
414 }
415
416 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
417 let store = self.database.open_shared(root_key)?;
418 let store = LruCachingStore::new(
419 store,
420 self.config.clone(),
421 false,
422 );
423 Ok(store)
424 }
425
426 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
427 let store = self.database.open_exclusive(root_key)?;
428 let store = LruCachingStore::new(
429 store,
430 self.config.clone(),
431 true,
432 );
433 Ok(store)
434 }
435
436 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
437 D::list_all(&config.inner_config).await
438 }
439
440 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
441 self.database.list_root_keys().await
442 }
443
444 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
445 D::delete_all(&config.inner_config).await
446 }
447
448 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
449 D::exists(&config.inner_config, namespace).await
450 }
451
452 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
453 D::create(&config.inner_config, namespace).await
454 }
455
456 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
457 D::delete(&config.inner_config, namespace).await
458 }
459}
460
461impl<S> LruCachingStore<S> {
462 fn new(store: S, config: StorageCacheConfig, has_exclusive_access: bool) -> Self {
464 let cache = {
465 if config.max_cache_entries == 0 {
466 None
467 } else {
468 Some(Arc::new(Mutex::new(LruPrefixCache::new(
469 config,
470 has_exclusive_access,
471 ))))
472 }
473 };
474 Self { store, cache }
475 }
476
477 fn get_exclusive_cache(&self) -> Option<&Arc<Mutex<LruPrefixCache>>> {
479 let Some(cache) = &self.cache else {
480 return None;
481 };
482 let has_exclusive_access = {
483 let cache = cache.lock().unwrap();
484 cache.has_exclusive_access()
485 };
486 if has_exclusive_access {
487 Some(cache)
488 } else {
489 None
490 }
491 }
492}
493
494#[cfg(with_testing)]
496pub type LruCachingMemoryDatabase = LruCachingDatabase<MemoryDatabase>;
497
498#[cfg(with_testing)]
499impl<D> TestKeyValueDatabase for LruCachingDatabase<D>
500where
501 D: TestKeyValueDatabase,
502{
503 async fn new_test_config() -> Result<LruCachingConfig<D::Config>, D::Error> {
504 let inner_config = D::new_test_config().await?;
505 let storage_cache_config = DEFAULT_STORAGE_CACHE_CONFIG;
506 Ok(LruCachingConfig {
507 inner_config,
508 storage_cache_config,
509 })
510 }
511}