1use std::sync::{Arc, Mutex};
7
8use serde::{Deserialize, Serialize};
9
10#[cfg(with_testing)]
11use crate::memory::MemoryDatabase;
12#[cfg(with_testing)]
13use crate::store::TestKeyValueDatabase;
14use crate::{
15 batch::{Batch, WriteOperation},
16 lru_prefix_cache::{LruPrefixCache, StorageCacheConfig},
17 store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
18};
19
20#[cfg(with_metrics)]
21mod metrics {
22 use std::sync::LazyLock;
23
24 use linera_base::prometheus_util::register_int_counter_vec;
25 use prometheus::IntCounterVec;
26
27 pub static READ_VALUE_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
29 register_int_counter_vec(
30 "num_read_value_cache_miss",
31 "Number of read value cache misses",
32 &[],
33 )
34 });
35
36 pub static READ_VALUE_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
38 register_int_counter_vec(
39 "num_read_value_cache_hits",
40 "Number of read value cache hits",
41 &[],
42 )
43 });
44
45 pub static CONTAINS_KEY_CACHE_MISS_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
47 register_int_counter_vec(
48 "num_contains_key_cache_miss",
49 "Number of contains key cache misses",
50 &[],
51 )
52 });
53
54 pub static CONTAINS_KEY_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
56 register_int_counter_vec(
57 "num_contains_key_cache_hit",
58 "Number of contains key cache hits",
59 &[],
60 )
61 });
62
63 pub static FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT: LazyLock<IntCounterVec> =
65 LazyLock::new(|| {
66 register_int_counter_vec(
67 "num_find_keys_by_prefix_cache_miss",
68 "Number of find keys by prefix cache misses",
69 &[],
70 )
71 });
72
73 pub static FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
75 register_int_counter_vec(
76 "num_find_keys_by_prefix_cache_hit",
77 "Number of find keys by prefix cache hits",
78 &[],
79 )
80 });
81
82 pub static FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT: LazyLock<IntCounterVec> =
84 LazyLock::new(|| {
85 register_int_counter_vec(
86 "num_find_key_values_by_prefix_cache_miss",
87 "Number of find key values by prefix cache misses",
88 &[],
89 )
90 });
91
92 pub static FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT: LazyLock<IntCounterVec> =
94 LazyLock::new(|| {
95 register_int_counter_vec(
96 "num_find_key_values_by_prefix_cache_hit",
97 "Number of find key values by prefix cache hits",
98 &[],
99 )
100 });
101}
102
103pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
107 max_cache_size: 10000000,
108 max_value_entry_size: 1000000,
109 max_find_keys_entry_size: 1000000,
110 max_find_key_values_entry_size: 1000000,
111 max_cache_entries: 1000,
112 max_cache_value_size: 10000000,
113 max_cache_find_keys_size: 10000000,
114 max_cache_find_key_values_size: 10000000,
115};
116
117#[derive(Clone)]
119pub struct LruCachingDatabase<D> {
120 database: D,
122 config: StorageCacheConfig,
124}
125
126#[derive(Clone)]
128pub struct LruCachingStore<S> {
129 store: S,
131 cache: Option<Arc<Mutex<LruPrefixCache>>>,
133}
134
135impl<D> WithError for LruCachingDatabase<D>
136where
137 D: WithError,
138{
139 type Error = D::Error;
140}
141
142impl<S> WithError for LruCachingStore<S>
143where
144 S: WithError,
145{
146 type Error = S::Error;
147}
148
149impl<K> ReadableKeyValueStore for LruCachingStore<K>
150where
151 K: ReadableKeyValueStore,
152{
153 const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
155
156 fn max_stream_queries(&self) -> usize {
157 self.store.max_stream_queries()
158 }
159
160 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
161 let Some(cache) = &self.cache else {
162 return self.store.read_value_bytes(key).await;
163 };
164 {
166 let mut cache = cache.lock().unwrap();
167 if let Some(value) = cache.query_read_value(key) {
168 #[cfg(with_metrics)]
169 metrics::READ_VALUE_CACHE_HIT_COUNT
170 .with_label_values(&[])
171 .inc();
172 return Ok(value);
173 }
174 }
175 #[cfg(with_metrics)]
176 metrics::READ_VALUE_CACHE_MISS_COUNT
177 .with_label_values(&[])
178 .inc();
179 let value = self.store.read_value_bytes(key).await?;
180 let mut cache = cache.lock().unwrap();
181 cache.insert_read_value(key, &value);
182 Ok(value)
183 }
184
185 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
186 let Some(cache) = &self.cache else {
187 return self.store.contains_key(key).await;
188 };
189 {
190 let mut cache = cache.lock().unwrap();
191 if let Some(value) = cache.query_contains_key(key) {
192 #[cfg(with_metrics)]
193 metrics::CONTAINS_KEY_CACHE_HIT_COUNT
194 .with_label_values(&[])
195 .inc();
196 return Ok(value);
197 }
198 }
199 #[cfg(with_metrics)]
200 metrics::CONTAINS_KEY_CACHE_MISS_COUNT
201 .with_label_values(&[])
202 .inc();
203 let result = self.store.contains_key(key).await?;
204 let mut cache = cache.lock().unwrap();
205 cache.insert_contains_key(key, result);
206 Ok(result)
207 }
208
209 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
210 let Some(cache) = &self.cache else {
211 return self.store.contains_keys(keys).await;
212 };
213 let size = keys.len();
214 let mut results = vec![false; size];
215 let mut indices = Vec::new();
216 let mut key_requests = Vec::new();
217 {
218 let mut cache = cache.lock().unwrap();
219 for i in 0..size {
220 if let Some(value) = cache.query_contains_key(&keys[i]) {
221 #[cfg(with_metrics)]
222 metrics::CONTAINS_KEY_CACHE_HIT_COUNT
223 .with_label_values(&[])
224 .inc();
225 results[i] = value;
226 } else {
227 #[cfg(with_metrics)]
228 metrics::CONTAINS_KEY_CACHE_MISS_COUNT
229 .with_label_values(&[])
230 .inc();
231 indices.push(i);
232 key_requests.push(keys[i].clone());
233 }
234 }
235 }
236 if !key_requests.is_empty() {
237 let key_results = self.store.contains_keys(key_requests.clone()).await?;
238 let mut cache = cache.lock().unwrap();
239 for ((index, result), key) in indices.into_iter().zip(key_results).zip(key_requests) {
240 results[index] = result;
241 cache.insert_contains_key(&key, result);
242 }
243 }
244 Ok(results)
245 }
246
247 async fn read_multi_values_bytes(
248 &self,
249 keys: Vec<Vec<u8>>,
250 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
251 let Some(cache) = &self.cache else {
252 return self.store.read_multi_values_bytes(keys).await;
253 };
254
255 let mut result = Vec::with_capacity(keys.len());
256 let mut cache_miss_indices = Vec::new();
257 let mut miss_keys = Vec::new();
258 {
259 let mut cache = cache.lock().unwrap();
260 for (i, key) in keys.into_iter().enumerate() {
261 if let Some(value) = cache.query_read_value(&key) {
262 #[cfg(with_metrics)]
263 metrics::READ_VALUE_CACHE_HIT_COUNT
264 .with_label_values(&[])
265 .inc();
266 result.push(value);
267 } else {
268 #[cfg(with_metrics)]
269 metrics::READ_VALUE_CACHE_MISS_COUNT
270 .with_label_values(&[])
271 .inc();
272 result.push(None);
273 cache_miss_indices.push(i);
274 miss_keys.push(key);
275 }
276 }
277 }
278 if !miss_keys.is_empty() {
279 let values = self
280 .store
281 .read_multi_values_bytes(miss_keys.clone())
282 .await?;
283 let mut cache = cache.lock().unwrap();
284 for (i, (key, value)) in cache_miss_indices
285 .into_iter()
286 .zip(miss_keys.into_iter().zip(values))
287 {
288 cache.insert_read_value(&key, &value);
289 result[i] = value;
290 }
291 }
292 Ok(result)
293 }
294
295 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
296 let Some(cache) = self.get_exclusive_cache() else {
297 return self.store.find_keys_by_prefix(key_prefix).await;
298 };
299 {
300 let mut cache = cache.lock().unwrap();
301 if let Some(value) = cache.query_find_keys(key_prefix) {
302 #[cfg(with_metrics)]
303 metrics::FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT
304 .with_label_values(&[])
305 .inc();
306 return Ok(value);
307 }
308 }
309 #[cfg(with_metrics)]
310 metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT
311 .with_label_values(&[])
312 .inc();
313 let keys = self.store.find_keys_by_prefix(key_prefix).await?;
314 let mut cache = cache.lock().unwrap();
315 cache.insert_find_keys(key_prefix.to_vec(), &keys);
316 Ok(keys)
317 }
318
319 async fn find_key_values_by_prefix(
320 &self,
321 key_prefix: &[u8],
322 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
323 let Some(cache) = self.get_exclusive_cache() else {
324 return self.store.find_key_values_by_prefix(key_prefix).await;
325 };
326 {
327 let mut cache = cache.lock().unwrap();
328 if let Some(value) = cache.query_find_key_values(key_prefix) {
329 #[cfg(with_metrics)]
330 metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT
331 .with_label_values(&[])
332 .inc();
333 return Ok(value);
334 }
335 }
336 #[cfg(with_metrics)]
337 metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT
338 .with_label_values(&[])
339 .inc();
340 let key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
341 let mut cache = cache.lock().unwrap();
342 cache.insert_find_key_values(key_prefix.to_vec(), &key_values);
343 Ok(key_values)
344 }
345}
346
347impl<K> WritableKeyValueStore for LruCachingStore<K>
348where
349 K: WritableKeyValueStore,
350{
351 const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
353
354 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
355 let Some(cache) = &self.cache else {
356 return self.store.write_batch(batch).await;
357 };
358
359 {
360 let mut cache = cache.lock().unwrap();
361 for operation in &batch.operations {
362 match operation {
363 WriteOperation::Put { key, value } => {
364 cache.put_key_value(key, value);
365 }
366 WriteOperation::Delete { key } => {
367 cache.delete_key(key);
368 }
369 WriteOperation::DeletePrefix { key_prefix } => {
370 cache.delete_prefix(key_prefix);
371 }
372 }
373 }
374 }
375 self.store.write_batch(batch).await
376 }
377
378 async fn clear_journal(&self) -> Result<(), Self::Error> {
379 self.store.clear_journal().await
380 }
381}
382
383#[derive(Debug, Clone, Serialize, Deserialize)]
385pub struct LruCachingConfig<C> {
386 pub inner_config: C,
388 pub storage_cache_config: StorageCacheConfig,
390}
391
392impl<D> KeyValueDatabase for LruCachingDatabase<D>
393where
394 D: KeyValueDatabase,
395{
396 type Config = LruCachingConfig<D::Config>;
397
398 type Store = LruCachingStore<D::Store>;
399
400 fn get_name() -> String {
401 format!("lru caching {}", D::get_name())
402 }
403
404 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
405 let database = D::connect(&config.inner_config, namespace).await?;
406 Ok(LruCachingDatabase {
407 database,
408 config: config.storage_cache_config.clone(),
409 })
410 }
411
412 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
413 let store = self.database.open_shared(root_key)?;
414 let store = LruCachingStore::new(
415 store,
416 self.config.clone(),
417 false,
418 );
419 Ok(store)
420 }
421
422 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
423 let store = self.database.open_exclusive(root_key)?;
424 let store = LruCachingStore::new(
425 store,
426 self.config.clone(),
427 true,
428 );
429 Ok(store)
430 }
431
432 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
433 D::list_all(&config.inner_config).await
434 }
435
436 async fn list_root_keys(
437 config: &Self::Config,
438 namespace: &str,
439 ) -> Result<Vec<Vec<u8>>, Self::Error> {
440 D::list_root_keys(&config.inner_config, namespace).await
441 }
442
443 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
444 D::delete_all(&config.inner_config).await
445 }
446
447 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
448 D::exists(&config.inner_config, namespace).await
449 }
450
451 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
452 D::create(&config.inner_config, namespace).await
453 }
454
455 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
456 D::delete(&config.inner_config, namespace).await
457 }
458}
459
460impl<S> LruCachingStore<S> {
461 fn new(store: S, config: StorageCacheConfig, has_exclusive_access: bool) -> Self {
463 let cache = {
464 if config.max_cache_entries == 0 {
465 None
466 } else {
467 Some(Arc::new(Mutex::new(LruPrefixCache::new(
468 config,
469 has_exclusive_access,
470 ))))
471 }
472 };
473 Self { store, cache }
474 }
475
476 fn get_exclusive_cache(&self) -> Option<&Arc<Mutex<LruPrefixCache>>> {
478 let Some(cache) = &self.cache else {
479 return None;
480 };
481 let has_exclusive_access = {
482 let cache = cache.lock().unwrap();
483 cache.has_exclusive_access()
484 };
485 if has_exclusive_access {
486 Some(cache)
487 } else {
488 None
489 }
490 }
491}
492
493#[cfg(with_testing)]
495pub type LruCachingMemoryDatabase = LruCachingDatabase<MemoryDatabase>;
496
497#[cfg(with_testing)]
498impl<D> TestKeyValueDatabase for LruCachingDatabase<D>
499where
500 D: TestKeyValueDatabase,
501{
502 async fn new_test_config() -> Result<LruCachingConfig<D::Config>, D::Error> {
503 let inner_config = D::new_test_config().await?;
504 let storage_cache_config = DEFAULT_STORAGE_CACHE_CONFIG;
505 Ok(LruCachingConfig {
506 inner_config,
507 storage_cache_config,
508 })
509 }
510}