1use std::{
7 collections::{btree_map::Entry, BTreeMap},
8 sync::{Arc, LazyLock, Mutex},
9};
10
11use convert_case::{Case, Casing};
12use linera_base::prometheus_util::{
13 register_histogram_vec, register_int_counter_vec, MeasureLatency as _,
14};
15use prometheus::{HistogramVec, IntCounterVec};
16
17#[cfg(with_testing)]
18use crate::store::TestKeyValueDatabase;
19use crate::{
20 batch::Batch,
21 store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
22};
23
24#[derive(Clone)]
25pub struct KeyValueStoreMetrics {
27 read_value_bytes_latency: HistogramVec,
28 contains_key_latency: HistogramVec,
29 contains_keys_latency: HistogramVec,
30 read_multi_values_bytes_latency: HistogramVec,
31 find_keys_by_prefix_latency: HistogramVec,
32 find_key_values_by_prefix_latency: HistogramVec,
33 write_batch_latency: HistogramVec,
34 clear_journal_latency: HistogramVec,
35 connect_latency: HistogramVec,
36 open_shared_latency: HistogramVec,
37 open_exclusive_latency: HistogramVec,
38 list_all_latency: HistogramVec,
39 list_root_keys_latency: HistogramVec,
40 delete_all_latency: HistogramVec,
41 exists_latency: HistogramVec,
42 create_latency: HistogramVec,
43 delete_latency: HistogramVec,
44 read_value_none_cases: IntCounterVec,
45 read_value_key_size: HistogramVec,
46 read_value_value_size: HistogramVec,
47 read_multi_values_num_entries: HistogramVec,
48 read_multi_values_key_sizes: HistogramVec,
49 contains_keys_num_entries: HistogramVec,
50 contains_keys_key_sizes: HistogramVec,
51 contains_key_key_size: HistogramVec,
52 find_keys_by_prefix_prefix_size: HistogramVec,
53 find_keys_by_prefix_num_keys: HistogramVec,
54 find_keys_by_prefix_keys_size: HistogramVec,
55 find_key_values_by_prefix_prefix_size: HistogramVec,
56 find_key_values_by_prefix_num_keys: HistogramVec,
57 find_key_values_by_prefix_key_values_size: HistogramVec,
58 write_batch_size: HistogramVec,
59 list_all_sizes: HistogramVec,
60 exists_true_cases: IntCounterVec,
61}
62
63#[derive(Default)]
64struct StoreMetrics {
65 stores: BTreeMap<String, Arc<KeyValueStoreMetrics>>,
66}
67
68static STORE_COUNTERS: LazyLock<Mutex<StoreMetrics>> =
70 LazyLock::new(|| Mutex::new(StoreMetrics::default()));
71
72fn get_counter(name: &str) -> Arc<KeyValueStoreMetrics> {
73 let mut store_metrics = STORE_COUNTERS.lock().unwrap();
74 let key = name.to_string();
75 match store_metrics.stores.entry(key) {
76 Entry::Occupied(entry) => {
77 let entry = entry.into_mut();
78 entry.clone()
79 }
80 Entry::Vacant(entry) => {
81 let store_metric = Arc::new(KeyValueStoreMetrics::new(name.to_string()));
82 entry.insert(store_metric.clone());
83 store_metric
84 }
85 }
86}
87
88impl KeyValueStoreMetrics {
89 pub fn new(name: String) -> Self {
91 let var_name = name.replace(' ', "_");
93 let title_name = name.to_case(Case::Snake);
94
95 let entry1 = format!("{}_read_value_bytes_latency", var_name);
96 let entry2 = format!("{} read value bytes latency", title_name);
97 let read_value_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
98
99 let entry1 = format!("{}_contains_key_latency", var_name);
100 let entry2 = format!("{} contains key latency", title_name);
101 let contains_key_latency = register_histogram_vec(&entry1, &entry2, &[], None);
102
103 let entry1 = format!("{}_contains_keys_latency", var_name);
104 let entry2 = format!("{} contains keys latency", title_name);
105 let contains_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
106
107 let entry1 = format!("{}_read_multi_value_bytes_latency", var_name);
108 let entry2 = format!("{} read multi value bytes latency", title_name);
109 let read_multi_values_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
110
111 let entry1 = format!("{}_find_keys_by_prefix_latency", var_name);
112 let entry2 = format!("{} find keys by prefix latency", title_name);
113 let find_keys_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
114
115 let entry1 = format!("{}_find_key_values_by_prefix_latency", var_name);
116 let entry2 = format!("{} find key values by prefix latency", title_name);
117 let find_key_values_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
118
119 let entry1 = format!("{}_write_batch_latency", var_name);
120 let entry2 = format!("{} write batch latency", title_name);
121 let write_batch_latency = register_histogram_vec(&entry1, &entry2, &[], None);
122
123 let entry1 = format!("{}_clear_journal_latency", var_name);
124 let entry2 = format!("{} clear journal latency", title_name);
125 let clear_journal_latency = register_histogram_vec(&entry1, &entry2, &[], None);
126
127 let entry1 = format!("{}_connect_latency", var_name);
128 let entry2 = format!("{} connect latency", title_name);
129 let connect_latency = register_histogram_vec(&entry1, &entry2, &[], None);
130
131 let entry1 = format!("{}_open_shared_latency", var_name);
132 let entry2 = format!("{} open shared partition", title_name);
133 let open_shared_latency = register_histogram_vec(&entry1, &entry2, &[], None);
134
135 let entry1 = format!("{}_open_exclusive_latency", var_name);
136 let entry2 = format!("{} open exclusive partition", title_name);
137 let open_exclusive_latency = register_histogram_vec(&entry1, &entry2, &[], None);
138
139 let entry1 = format!("{}_list_all_latency", var_name);
140 let entry2 = format!("{} list all latency", title_name);
141 let list_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
142
143 let entry1 = format!("{}_list_root_keys_latency", var_name);
144 let entry2 = format!("{} list root keys latency", title_name);
145 let list_root_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
146
147 let entry1 = format!("{}_delete_all_latency", var_name);
148 let entry2 = format!("{} delete all latency", title_name);
149 let delete_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
150
151 let entry1 = format!("{}_exists_latency", var_name);
152 let entry2 = format!("{} exists latency", title_name);
153 let exists_latency = register_histogram_vec(&entry1, &entry2, &[], None);
154
155 let entry1 = format!("{}_create_latency", var_name);
156 let entry2 = format!("{} create latency", title_name);
157 let create_latency = register_histogram_vec(&entry1, &entry2, &[], None);
158
159 let entry1 = format!("{}_delete_latency", var_name);
160 let entry2 = format!("{} delete latency", title_name);
161 let delete_latency = register_histogram_vec(&entry1, &entry2, &[], None);
162
163 let entry1 = format!("{}_read_value_none_cases", var_name);
164 let entry2 = format!("{} read value none cases", title_name);
165 let read_value_none_cases = register_int_counter_vec(&entry1, &entry2, &[]);
166
167 let entry1 = format!("{}_read_value_key_size", var_name);
168 let entry2 = format!("{} read value key size", title_name);
169 let read_value_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
170
171 let entry1 = format!("{}_read_value_value_size", var_name);
172 let entry2 = format!("{} read value value size", title_name);
173 let read_value_value_size = register_histogram_vec(&entry1, &entry2, &[], None);
174
175 let entry1 = format!("{}_read_multi_values_num_entries", var_name);
176 let entry2 = format!("{} read multi values num entries", title_name);
177 let read_multi_values_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
178
179 let entry1 = format!("{}_read_multi_values_key_sizes", var_name);
180 let entry2 = format!("{} read multi values key sizes", title_name);
181 let read_multi_values_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
182
183 let entry1 = format!("{}_contains_keys_num_entries", var_name);
184 let entry2 = format!("{} contains keys num entries", title_name);
185 let contains_keys_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
186
187 let entry1 = format!("{}_contains_keys_key_sizes", var_name);
188 let entry2 = format!("{} contains keys key sizes", title_name);
189 let contains_keys_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
190
191 let entry1 = format!("{}_contains_key_key_size", var_name);
192 let entry2 = format!("{} contains key key size", title_name);
193 let contains_key_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
194
195 let entry1 = format!("{}_find_keys_by_prefix_prefix_size", var_name);
196 let entry2 = format!("{} find keys by prefix prefix size", title_name);
197 let find_keys_by_prefix_prefix_size = register_histogram_vec(&entry1, &entry2, &[], None);
198
199 let entry1 = format!("{}_find_keys_by_prefix_num_keys", var_name);
200 let entry2 = format!("{} find keys by prefix num keys", title_name);
201 let find_keys_by_prefix_num_keys = register_histogram_vec(&entry1, &entry2, &[], None);
202
203 let entry1 = format!("{}_find_keys_by_prefix_keys_size", var_name);
204 let entry2 = format!("{} find keys by prefix keys size", title_name);
205 let find_keys_by_prefix_keys_size = register_histogram_vec(&entry1, &entry2, &[], None);
206
207 let entry1 = format!("{}_find_key_values_by_prefix_prefix_size", var_name);
208 let entry2 = format!("{} find key values by prefix prefix size", title_name);
209 let find_key_values_by_prefix_prefix_size =
210 register_histogram_vec(&entry1, &entry2, &[], None);
211
212 let entry1 = format!("{}_find_key_values_by_prefix_num_keys", var_name);
213 let entry2 = format!("{} find key values by prefix num keys", title_name);
214 let find_key_values_by_prefix_num_keys =
215 register_histogram_vec(&entry1, &entry2, &[], None);
216
217 let entry1 = format!("{}_find_key_values_by_prefix_key_values_size", var_name);
218 let entry2 = format!("{} find key values by prefix key values size", title_name);
219 let find_key_values_by_prefix_key_values_size =
220 register_histogram_vec(&entry1, &entry2, &[], None);
221
222 let entry1 = format!("{}_write_batch_size", var_name);
223 let entry2 = format!("{} write batch size", title_name);
224 let write_batch_size = register_histogram_vec(&entry1, &entry2, &[], None);
225
226 let entry1 = format!("{}_list_all_sizes", var_name);
227 let entry2 = format!("{} list all sizes", title_name);
228 let list_all_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
229
230 let entry1 = format!("{}_exists_true_cases", var_name);
231 let entry2 = format!("{} exists true cases", title_name);
232 let exists_true_cases = register_int_counter_vec(&entry1, &entry2, &[]);
233
234 KeyValueStoreMetrics {
235 read_value_bytes_latency,
236 contains_key_latency,
237 contains_keys_latency,
238 read_multi_values_bytes_latency,
239 find_keys_by_prefix_latency,
240 find_key_values_by_prefix_latency,
241 write_batch_latency,
242 clear_journal_latency,
243 connect_latency,
244 open_shared_latency,
245 open_exclusive_latency,
246 list_all_latency,
247 list_root_keys_latency,
248 delete_all_latency,
249 exists_latency,
250 create_latency,
251 delete_latency,
252 read_value_none_cases,
253 read_value_key_size,
254 read_value_value_size,
255 read_multi_values_num_entries,
256 read_multi_values_key_sizes,
257 contains_keys_num_entries,
258 contains_keys_key_sizes,
259 contains_key_key_size,
260 find_keys_by_prefix_prefix_size,
261 find_keys_by_prefix_num_keys,
262 find_keys_by_prefix_keys_size,
263 find_key_values_by_prefix_prefix_size,
264 find_key_values_by_prefix_num_keys,
265 find_key_values_by_prefix_key_values_size,
266 write_batch_size,
267 list_all_sizes,
268 exists_true_cases,
269 }
270 }
271}
272
273#[derive(Clone)]
275pub struct MeteredDatabase<D> {
276 counter: Arc<KeyValueStoreMetrics>,
278 database: D,
280}
281
282#[derive(Clone)]
284pub struct MeteredStore<S> {
285 counter: Arc<KeyValueStoreMetrics>,
287 store: S,
289}
290
291impl<D> WithError for MeteredDatabase<D>
292where
293 D: WithError,
294{
295 type Error = D::Error;
296}
297
298impl<S> WithError for MeteredStore<S>
299where
300 S: WithError,
301{
302 type Error = S::Error;
303}
304
305impl<S> ReadableKeyValueStore for MeteredStore<S>
306where
307 S: ReadableKeyValueStore,
308{
309 const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE;
310
311 fn max_stream_queries(&self) -> usize {
312 self.store.max_stream_queries()
313 }
314
315 fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
316 self.store.root_key()
317 }
318
319 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
320 let _latency = self.counter.read_value_bytes_latency.measure_latency();
321 self.counter
322 .read_value_key_size
323 .with_label_values(&[])
324 .observe(key.len() as f64);
325 let result = self.store.read_value_bytes(key).await?;
326 match &result {
327 None => self
328 .counter
329 .read_value_none_cases
330 .with_label_values(&[])
331 .inc(),
332 Some(value) => self
333 .counter
334 .read_value_value_size
335 .with_label_values(&[])
336 .observe(value.len() as f64),
337 }
338 Ok(result)
339 }
340
341 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
342 let _latency = self.counter.contains_key_latency.measure_latency();
343 self.counter
344 .contains_key_key_size
345 .with_label_values(&[])
346 .observe(key.len() as f64);
347 self.store.contains_key(key).await
348 }
349
350 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
351 let _latency = self.counter.contains_keys_latency.measure_latency();
352 self.counter
353 .contains_keys_num_entries
354 .with_label_values(&[])
355 .observe(keys.len() as f64);
356 let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
357 self.counter
358 .contains_keys_key_sizes
359 .with_label_values(&[])
360 .observe(key_sizes as f64);
361 self.store.contains_keys(keys).await
362 }
363
364 async fn read_multi_values_bytes(
365 &self,
366 keys: &[Vec<u8>],
367 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
368 let _latency = self
369 .counter
370 .read_multi_values_bytes_latency
371 .measure_latency();
372 self.counter
373 .read_multi_values_num_entries
374 .with_label_values(&[])
375 .observe(keys.len() as f64);
376 let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
377 self.counter
378 .read_multi_values_key_sizes
379 .with_label_values(&[])
380 .observe(key_sizes as f64);
381 self.store.read_multi_values_bytes(keys).await
382 }
383
384 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
385 let _latency = self.counter.find_keys_by_prefix_latency.measure_latency();
386 self.counter
387 .find_keys_by_prefix_prefix_size
388 .with_label_values(&[])
389 .observe(key_prefix.len() as f64);
390 let result = self.store.find_keys_by_prefix(key_prefix).await?;
391 let (num_keys, keys_size) = result
392 .iter()
393 .map(|key| key.len())
394 .fold((0, 0), |(count, size), len| (count + 1, size + len));
395 self.counter
396 .find_keys_by_prefix_num_keys
397 .with_label_values(&[])
398 .observe(num_keys as f64);
399 self.counter
400 .find_keys_by_prefix_keys_size
401 .with_label_values(&[])
402 .observe(keys_size as f64);
403 Ok(result)
404 }
405
406 async fn find_key_values_by_prefix(
407 &self,
408 key_prefix: &[u8],
409 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
410 let _latency = self
411 .counter
412 .find_key_values_by_prefix_latency
413 .measure_latency();
414 self.counter
415 .find_key_values_by_prefix_prefix_size
416 .with_label_values(&[])
417 .observe(key_prefix.len() as f64);
418 let result = self.store.find_key_values_by_prefix(key_prefix).await?;
419 let (num_keys, key_values_size) = result
420 .iter()
421 .map(|(key, value)| key.len() + value.len())
422 .fold((0, 0), |(count, size), len| (count + 1, size + len));
423 self.counter
424 .find_key_values_by_prefix_num_keys
425 .with_label_values(&[])
426 .observe(num_keys as f64);
427 self.counter
428 .find_key_values_by_prefix_key_values_size
429 .with_label_values(&[])
430 .observe(key_values_size as f64);
431 Ok(result)
432 }
433}
434
435impl<S> WritableKeyValueStore for MeteredStore<S>
436where
437 S: WritableKeyValueStore,
438{
439 const MAX_VALUE_SIZE: usize = S::MAX_VALUE_SIZE;
440
441 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
442 let _latency = self.counter.write_batch_latency.measure_latency();
443 self.counter
444 .write_batch_size
445 .with_label_values(&[])
446 .observe(batch.size() as f64);
447 self.store.write_batch(batch).await
448 }
449
450 async fn clear_journal(&self) -> Result<(), Self::Error> {
451 let _metric = self.counter.clear_journal_latency.measure_latency();
452 self.store.clear_journal().await
453 }
454}
455
456impl<D> KeyValueDatabase for MeteredDatabase<D>
457where
458 D: KeyValueDatabase,
459{
460 type Config = D::Config;
461 type Store = MeteredStore<D::Store>;
462
463 fn get_name() -> String {
464 D::get_name()
465 }
466
467 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
468 let name = D::get_name();
469 let counter = get_counter(&name);
470 let _latency = counter.connect_latency.measure_latency();
471 let database = D::connect(config, namespace).await?;
472 let counter = get_counter(&name);
473 Ok(Self { counter, database })
474 }
475
476 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
477 let _latency = self.counter.open_shared_latency.measure_latency();
478 let store = self.database.open_shared(root_key)?;
479 let counter = self.counter.clone();
480 Ok(MeteredStore { counter, store })
481 }
482
483 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
484 let _latency = self.counter.open_exclusive_latency.measure_latency();
485 let store = self.database.open_exclusive(root_key)?;
486 let counter = self.counter.clone();
487 Ok(MeteredStore { counter, store })
488 }
489
490 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
491 let name = D::get_name();
492 let counter = get_counter(&name);
493 let _latency = counter.list_all_latency.measure_latency();
494 let namespaces = D::list_all(config).await?;
495 let counter = get_counter(&name);
496 counter
497 .list_all_sizes
498 .with_label_values(&[])
499 .observe(namespaces.len() as f64);
500 Ok(namespaces)
501 }
502
503 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
504 let _latency = self.counter.list_root_keys_latency.measure_latency();
505 self.database.list_root_keys().await
506 }
507
508 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
509 let name = D::get_name();
510 let counter = get_counter(&name);
511 let _latency = counter.delete_all_latency.measure_latency();
512 D::delete_all(config).await
513 }
514
515 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
516 let name = D::get_name();
517 let counter = get_counter(&name);
518 let _latency = counter.exists_latency.measure_latency();
519 let result = D::exists(config, namespace).await?;
520 if result {
521 let counter = get_counter(&name);
522 counter.exists_true_cases.with_label_values(&[]).inc();
523 }
524 Ok(result)
525 }
526
527 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
528 let name = D::get_name();
529 let counter = get_counter(&name);
530 let _latency = counter.create_latency.measure_latency();
531 D::create(config, namespace).await
532 }
533
534 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
535 let name = D::get_name();
536 let counter = get_counter(&name);
537 let _latency = counter.delete_latency.measure_latency();
538 D::delete(config, namespace).await
539 }
540}
541
542#[cfg(with_testing)]
543impl<D> TestKeyValueDatabase for MeteredDatabase<D>
544where
545 D: TestKeyValueDatabase,
546{
547 async fn new_test_config() -> Result<D::Config, Self::Error> {
548 D::new_test_config().await
549 }
550}