1use std::{
7 collections::{btree_map::Entry, BTreeMap},
8 sync::{Arc, LazyLock, Mutex},
9};
10
11use convert_case::{Case, Casing};
12use linera_base::prometheus_util::{
13 register_histogram_vec, register_int_counter_vec, MeasureLatency as _,
14};
15use prometheus::{HistogramVec, IntCounterVec};
16
17#[cfg(with_testing)]
18use crate::store::TestKeyValueDatabase;
19use crate::{
20 batch::Batch,
21 store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
22};
23
24#[derive(Clone)]
25pub struct KeyValueStoreMetrics {
27 read_value_bytes_latency: HistogramVec,
28 contains_key_latency: HistogramVec,
29 contains_keys_latency: HistogramVec,
30 read_multi_values_bytes_latency: HistogramVec,
31 find_keys_by_prefix_latency: HistogramVec,
32 find_key_values_by_prefix_latency: HistogramVec,
33 write_batch_latency: HistogramVec,
34 clear_journal_latency: HistogramVec,
35 connect_latency: HistogramVec,
36 open_shared_latency: HistogramVec,
37 open_exclusive_latency: HistogramVec,
38 list_all_latency: HistogramVec,
39 list_root_keys_latency: HistogramVec,
40 delete_all_latency: HistogramVec,
41 exists_latency: HistogramVec,
42 create_latency: HistogramVec,
43 delete_latency: HistogramVec,
44 read_value_none_cases: IntCounterVec,
45 read_value_key_size: HistogramVec,
46 read_value_value_size: HistogramVec,
47 read_multi_values_num_entries: HistogramVec,
48 read_multi_values_key_sizes: HistogramVec,
49 contains_keys_num_entries: HistogramVec,
50 contains_keys_key_sizes: HistogramVec,
51 contains_key_key_size: HistogramVec,
52 find_keys_by_prefix_prefix_size: HistogramVec,
53 find_keys_by_prefix_num_keys: HistogramVec,
54 find_keys_by_prefix_keys_size: HistogramVec,
55 find_key_values_by_prefix_prefix_size: HistogramVec,
56 find_key_values_by_prefix_num_keys: HistogramVec,
57 find_key_values_by_prefix_key_values_size: HistogramVec,
58 write_batch_size: HistogramVec,
59 list_all_sizes: HistogramVec,
60 exists_true_cases: IntCounterVec,
61}
62
63#[derive(Default)]
64struct StoreMetrics {
65 stores: BTreeMap<String, Arc<KeyValueStoreMetrics>>,
66}
67
68static STORE_COUNTERS: LazyLock<Mutex<StoreMetrics>> =
70 LazyLock::new(|| Mutex::new(StoreMetrics::default()));
71
72fn get_counter(name: &str) -> Arc<KeyValueStoreMetrics> {
73 let mut store_metrics = STORE_COUNTERS.lock().unwrap();
74 let key = name.to_string();
75 match store_metrics.stores.entry(key) {
76 Entry::Occupied(entry) => {
77 let entry = entry.into_mut();
78 entry.clone()
79 }
80 Entry::Vacant(entry) => {
81 let store_metric = Arc::new(KeyValueStoreMetrics::new(name.to_string()));
82 entry.insert(store_metric.clone());
83 store_metric
84 }
85 }
86}
87
88impl KeyValueStoreMetrics {
89 pub fn new(name: String) -> Self {
91 let var_name = name.replace(' ', "_");
93 let title_name = name.to_case(Case::Snake);
94
95 let entry1 = format!("{}_read_value_bytes_latency", var_name);
96 let entry2 = format!("{} read value bytes latency", title_name);
97 let read_value_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
98
99 let entry1 = format!("{}_contains_key_latency", var_name);
100 let entry2 = format!("{} contains key latency", title_name);
101 let contains_key_latency = register_histogram_vec(&entry1, &entry2, &[], None);
102
103 let entry1 = format!("{}_contains_keys_latency", var_name);
104 let entry2 = format!("{} contains keys latency", title_name);
105 let contains_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
106
107 let entry1 = format!("{}_read_multi_value_bytes_latency", var_name);
108 let entry2 = format!("{} read multi value bytes latency", title_name);
109 let read_multi_values_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
110
111 let entry1 = format!("{}_find_keys_by_prefix_latency", var_name);
112 let entry2 = format!("{} find keys by prefix latency", title_name);
113 let find_keys_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
114
115 let entry1 = format!("{}_find_key_values_by_prefix_latency", var_name);
116 let entry2 = format!("{} find key values by prefix latency", title_name);
117 let find_key_values_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
118
119 let entry1 = format!("{}_write_batch_latency", var_name);
120 let entry2 = format!("{} write batch latency", title_name);
121 let write_batch_latency = register_histogram_vec(&entry1, &entry2, &[], None);
122
123 let entry1 = format!("{}_clear_journal_latency", var_name);
124 let entry2 = format!("{} clear journal latency", title_name);
125 let clear_journal_latency = register_histogram_vec(&entry1, &entry2, &[], None);
126
127 let entry1 = format!("{}_connect_latency", var_name);
128 let entry2 = format!("{} connect latency", title_name);
129 let connect_latency = register_histogram_vec(&entry1, &entry2, &[], None);
130
131 let entry1 = format!("{}_open_shared_latency", var_name);
132 let entry2 = format!("{} open shared partition", title_name);
133 let open_shared_latency = register_histogram_vec(&entry1, &entry2, &[], None);
134
135 let entry1 = format!("{}_open_exclusive_latency", var_name);
136 let entry2 = format!("{} open exclusive partition", title_name);
137 let open_exclusive_latency = register_histogram_vec(&entry1, &entry2, &[], None);
138
139 let entry1 = format!("{}_list_all_latency", var_name);
140 let entry2 = format!("{} list all latency", title_name);
141 let list_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
142
143 let entry1 = format!("{}_list_root_keys_latency", var_name);
144 let entry2 = format!("{} list root keys latency", title_name);
145 let list_root_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
146
147 let entry1 = format!("{}_delete_all_latency", var_name);
148 let entry2 = format!("{} delete all latency", title_name);
149 let delete_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
150
151 let entry1 = format!("{}_exists_latency", var_name);
152 let entry2 = format!("{} exists latency", title_name);
153 let exists_latency = register_histogram_vec(&entry1, &entry2, &[], None);
154
155 let entry1 = format!("{}_create_latency", var_name);
156 let entry2 = format!("{} create latency", title_name);
157 let create_latency = register_histogram_vec(&entry1, &entry2, &[], None);
158
159 let entry1 = format!("{}_delete_latency", var_name);
160 let entry2 = format!("{} delete latency", title_name);
161 let delete_latency = register_histogram_vec(&entry1, &entry2, &[], None);
162
163 let entry1 = format!("{}_read_value_none_cases", var_name);
164 let entry2 = format!("{} read value none cases", title_name);
165 let read_value_none_cases = register_int_counter_vec(&entry1, &entry2, &[]);
166
167 let entry1 = format!("{}_read_value_key_size", var_name);
168 let entry2 = format!("{} read value key size", title_name);
169 let read_value_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
170
171 let entry1 = format!("{}_read_value_value_size", var_name);
172 let entry2 = format!("{} read value value size", title_name);
173 let read_value_value_size = register_histogram_vec(&entry1, &entry2, &[], None);
174
175 let entry1 = format!("{}_read_multi_values_num_entries", var_name);
176 let entry2 = format!("{} read multi values num entries", title_name);
177 let read_multi_values_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
178
179 let entry1 = format!("{}_read_multi_values_key_sizes", var_name);
180 let entry2 = format!("{} read multi values key sizes", title_name);
181 let read_multi_values_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
182
183 let entry1 = format!("{}_contains_keys_num_entries", var_name);
184 let entry2 = format!("{} contains keys num entries", title_name);
185 let contains_keys_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
186
187 let entry1 = format!("{}_contains_keys_key_sizes", var_name);
188 let entry2 = format!("{} contains keys key sizes", title_name);
189 let contains_keys_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
190
191 let entry1 = format!("{}_contains_key_key_size", var_name);
192 let entry2 = format!("{} contains key key size", title_name);
193 let contains_key_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
194
195 let entry1 = format!("{}_find_keys_by_prefix_prefix_size", var_name);
196 let entry2 = format!("{} find keys by prefix prefix size", title_name);
197 let find_keys_by_prefix_prefix_size = register_histogram_vec(&entry1, &entry2, &[], None);
198
199 let entry1 = format!("{}_find_keys_by_prefix_num_keys", var_name);
200 let entry2 = format!("{} find keys by prefix num keys", title_name);
201 let find_keys_by_prefix_num_keys = register_histogram_vec(&entry1, &entry2, &[], None);
202
203 let entry1 = format!("{}_find_keys_by_prefix_keys_size", var_name);
204 let entry2 = format!("{} find keys by prefix keys size", title_name);
205 let find_keys_by_prefix_keys_size = register_histogram_vec(&entry1, &entry2, &[], None);
206
207 let entry1 = format!("{}_find_key_values_by_prefix_prefix_size", var_name);
208 let entry2 = format!("{} find key values by prefix prefix size", title_name);
209 let find_key_values_by_prefix_prefix_size =
210 register_histogram_vec(&entry1, &entry2, &[], None);
211
212 let entry1 = format!("{}_find_key_values_by_prefix_num_keys", var_name);
213 let entry2 = format!("{} find key values by prefix num keys", title_name);
214 let find_key_values_by_prefix_num_keys =
215 register_histogram_vec(&entry1, &entry2, &[], None);
216
217 let entry1 = format!("{}_find_key_values_by_prefix_key_values_size", var_name);
218 let entry2 = format!("{} find key values by prefix key values size", title_name);
219 let find_key_values_by_prefix_key_values_size =
220 register_histogram_vec(&entry1, &entry2, &[], None);
221
222 let entry1 = format!("{}_write_batch_size", var_name);
223 let entry2 = format!("{} write batch size", title_name);
224 let write_batch_size = register_histogram_vec(&entry1, &entry2, &[], None);
225
226 let entry1 = format!("{}_list_all_sizes", var_name);
227 let entry2 = format!("{} list all sizes", title_name);
228 let list_all_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
229
230 let entry1 = format!("{}_exists_true_cases", var_name);
231 let entry2 = format!("{} exists true cases", title_name);
232 let exists_true_cases = register_int_counter_vec(&entry1, &entry2, &[]);
233
234 KeyValueStoreMetrics {
235 read_value_bytes_latency,
236 contains_key_latency,
237 contains_keys_latency,
238 read_multi_values_bytes_latency,
239 find_keys_by_prefix_latency,
240 find_key_values_by_prefix_latency,
241 write_batch_latency,
242 clear_journal_latency,
243 connect_latency,
244 open_shared_latency,
245 open_exclusive_latency,
246 list_all_latency,
247 list_root_keys_latency,
248 delete_all_latency,
249 exists_latency,
250 create_latency,
251 delete_latency,
252 read_value_none_cases,
253 read_value_key_size,
254 read_value_value_size,
255 read_multi_values_num_entries,
256 read_multi_values_key_sizes,
257 contains_keys_num_entries,
258 contains_keys_key_sizes,
259 contains_key_key_size,
260 find_keys_by_prefix_prefix_size,
261 find_keys_by_prefix_num_keys,
262 find_keys_by_prefix_keys_size,
263 find_key_values_by_prefix_prefix_size,
264 find_key_values_by_prefix_num_keys,
265 find_key_values_by_prefix_key_values_size,
266 write_batch_size,
267 list_all_sizes,
268 exists_true_cases,
269 }
270 }
271}
272
273#[derive(Clone)]
275pub struct MeteredDatabase<D> {
276 counter: Arc<KeyValueStoreMetrics>,
278 database: D,
280}
281
282#[derive(Clone)]
284pub struct MeteredStore<S> {
285 counter: Arc<KeyValueStoreMetrics>,
287 store: S,
289}
290
291impl<D> WithError for MeteredDatabase<D>
292where
293 D: WithError,
294{
295 type Error = D::Error;
296}
297
298impl<S> WithError for MeteredStore<S>
299where
300 S: WithError,
301{
302 type Error = S::Error;
303}
304
305impl<S> ReadableKeyValueStore for MeteredStore<S>
306where
307 S: ReadableKeyValueStore,
308{
309 const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE;
310
311 fn max_stream_queries(&self) -> usize {
312 self.store.max_stream_queries()
313 }
314
315 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
316 let _latency = self.counter.read_value_bytes_latency.measure_latency();
317 self.counter
318 .read_value_key_size
319 .with_label_values(&[])
320 .observe(key.len() as f64);
321 let result = self.store.read_value_bytes(key).await?;
322 match &result {
323 None => self
324 .counter
325 .read_value_none_cases
326 .with_label_values(&[])
327 .inc(),
328 Some(value) => self
329 .counter
330 .read_value_value_size
331 .with_label_values(&[])
332 .observe(value.len() as f64),
333 }
334 Ok(result)
335 }
336
337 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
338 let _latency = self.counter.contains_key_latency.measure_latency();
339 self.counter
340 .contains_key_key_size
341 .with_label_values(&[])
342 .observe(key.len() as f64);
343 self.store.contains_key(key).await
344 }
345
346 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
347 let _latency = self.counter.contains_keys_latency.measure_latency();
348 self.counter
349 .contains_keys_num_entries
350 .with_label_values(&[])
351 .observe(keys.len() as f64);
352 let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
353 self.counter
354 .contains_keys_key_sizes
355 .with_label_values(&[])
356 .observe(key_sizes as f64);
357 self.store.contains_keys(keys).await
358 }
359
360 async fn read_multi_values_bytes(
361 &self,
362 keys: Vec<Vec<u8>>,
363 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
364 let _latency = self
365 .counter
366 .read_multi_values_bytes_latency
367 .measure_latency();
368 self.counter
369 .read_multi_values_num_entries
370 .with_label_values(&[])
371 .observe(keys.len() as f64);
372 let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
373 self.counter
374 .read_multi_values_key_sizes
375 .with_label_values(&[])
376 .observe(key_sizes as f64);
377 self.store.read_multi_values_bytes(keys).await
378 }
379
380 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
381 let _latency = self.counter.find_keys_by_prefix_latency.measure_latency();
382 self.counter
383 .find_keys_by_prefix_prefix_size
384 .with_label_values(&[])
385 .observe(key_prefix.len() as f64);
386 let result = self.store.find_keys_by_prefix(key_prefix).await?;
387 let (num_keys, keys_size) = result
388 .iter()
389 .map(|key| key.len())
390 .fold((0, 0), |(count, size), len| (count + 1, size + len));
391 self.counter
392 .find_keys_by_prefix_num_keys
393 .with_label_values(&[])
394 .observe(num_keys as f64);
395 self.counter
396 .find_keys_by_prefix_keys_size
397 .with_label_values(&[])
398 .observe(keys_size as f64);
399 Ok(result)
400 }
401
402 async fn find_key_values_by_prefix(
403 &self,
404 key_prefix: &[u8],
405 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
406 let _latency = self
407 .counter
408 .find_key_values_by_prefix_latency
409 .measure_latency();
410 self.counter
411 .find_key_values_by_prefix_prefix_size
412 .with_label_values(&[])
413 .observe(key_prefix.len() as f64);
414 let result = self.store.find_key_values_by_prefix(key_prefix).await?;
415 let (num_keys, key_values_size) = result
416 .iter()
417 .map(|(key, value)| key.len() + value.len())
418 .fold((0, 0), |(count, size), len| (count + 1, size + len));
419 self.counter
420 .find_key_values_by_prefix_num_keys
421 .with_label_values(&[])
422 .observe(num_keys as f64);
423 self.counter
424 .find_key_values_by_prefix_key_values_size
425 .with_label_values(&[])
426 .observe(key_values_size as f64);
427 Ok(result)
428 }
429}
430
431impl<S> WritableKeyValueStore for MeteredStore<S>
432where
433 S: WritableKeyValueStore,
434{
435 const MAX_VALUE_SIZE: usize = S::MAX_VALUE_SIZE;
436
437 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
438 let _latency = self.counter.write_batch_latency.measure_latency();
439 self.counter
440 .write_batch_size
441 .with_label_values(&[])
442 .observe(batch.size() as f64);
443 self.store.write_batch(batch).await
444 }
445
446 async fn clear_journal(&self) -> Result<(), Self::Error> {
447 let _metric = self.counter.clear_journal_latency.measure_latency();
448 self.store.clear_journal().await
449 }
450}
451
452impl<D> KeyValueDatabase for MeteredDatabase<D>
453where
454 D: KeyValueDatabase,
455{
456 type Config = D::Config;
457 type Store = MeteredStore<D::Store>;
458
459 fn get_name() -> String {
460 D::get_name()
461 }
462
463 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
464 let name = D::get_name();
465 let counter = get_counter(&name);
466 let _latency = counter.connect_latency.measure_latency();
467 let database = D::connect(config, namespace).await?;
468 let counter = get_counter(&name);
469 Ok(Self { counter, database })
470 }
471
472 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
473 let _latency = self.counter.open_shared_latency.measure_latency();
474 let store = self.database.open_shared(root_key)?;
475 let counter = self.counter.clone();
476 Ok(MeteredStore { counter, store })
477 }
478
479 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
480 let _latency = self.counter.open_exclusive_latency.measure_latency();
481 let store = self.database.open_exclusive(root_key)?;
482 let counter = self.counter.clone();
483 Ok(MeteredStore { counter, store })
484 }
485
486 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
487 let name = D::get_name();
488 let counter = get_counter(&name);
489 let _latency = counter.list_all_latency.measure_latency();
490 let namespaces = D::list_all(config).await?;
491 let counter = get_counter(&name);
492 counter
493 .list_all_sizes
494 .with_label_values(&[])
495 .observe(namespaces.len() as f64);
496 Ok(namespaces)
497 }
498
499 async fn list_root_keys(
500 config: &Self::Config,
501 namespace: &str,
502 ) -> Result<Vec<Vec<u8>>, Self::Error> {
503 let name = D::get_name();
504 let counter = get_counter(&name);
505 let _latency = counter.list_root_keys_latency.measure_latency();
506 D::list_root_keys(config, namespace).await
507 }
508
509 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
510 let name = D::get_name();
511 let counter = get_counter(&name);
512 let _latency = counter.delete_all_latency.measure_latency();
513 D::delete_all(config).await
514 }
515
516 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
517 let name = D::get_name();
518 let counter = get_counter(&name);
519 let _latency = counter.exists_latency.measure_latency();
520 let result = D::exists(config, namespace).await?;
521 if result {
522 let counter = get_counter(&name);
523 counter.exists_true_cases.with_label_values(&[]).inc();
524 }
525 Ok(result)
526 }
527
528 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
529 let name = D::get_name();
530 let counter = get_counter(&name);
531 let _latency = counter.create_latency.measure_latency();
532 D::create(config, namespace).await
533 }
534
535 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
536 let name = D::get_name();
537 let counter = get_counter(&name);
538 let _latency = counter.delete_latency.measure_latency();
539 D::delete(config, namespace).await
540 }
541}
542
543#[cfg(with_testing)]
544impl<D> TestKeyValueDatabase for MeteredDatabase<D>
545where
546 D: TestKeyValueDatabase,
547{
548 async fn new_test_config() -> Result<D::Config, Self::Error> {
549 D::new_test_config().await
550 }
551}