1use std::{
7 collections::{btree_map::Entry, BTreeMap},
8 sync::{Arc, LazyLock, Mutex},
9};
10
11use convert_case::{Case, Casing};
12use linera_base::prometheus_util::{
13 exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
14 MeasureLatency as _,
15};
16use prometheus::{exponential_buckets, HistogramVec, IntCounterVec};
17
18#[cfg(with_testing)]
19use crate::store::TestKeyValueDatabase;
20use crate::{
21 batch::Batch,
22 store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
23};
24
25#[derive(Clone)]
26pub struct KeyValueStoreMetrics {
28 read_value_bytes_latency: HistogramVec,
29 contains_key_latency: HistogramVec,
30 contains_keys_latency: HistogramVec,
31 read_multi_values_bytes_latency: HistogramVec,
32 find_keys_by_prefix_latency: HistogramVec,
33 find_key_values_by_prefix_latency: HistogramVec,
34 write_batch_latency: HistogramVec,
35 clear_journal_latency: HistogramVec,
36 connect_latency: HistogramVec,
37 open_shared_latency: HistogramVec,
38 open_exclusive_latency: HistogramVec,
39 list_all_latency: HistogramVec,
40 list_root_keys_latency: HistogramVec,
41 delete_all_latency: HistogramVec,
42 exists_latency: HistogramVec,
43 create_latency: HistogramVec,
44 delete_latency: HistogramVec,
45 read_value_none_cases: IntCounterVec,
46 read_value_key_size: HistogramVec,
47 read_value_value_size: HistogramVec,
48 read_multi_values_num_entries: HistogramVec,
49 read_multi_values_key_sizes: HistogramVec,
50 contains_keys_num_entries: HistogramVec,
51 contains_keys_key_sizes: HistogramVec,
52 contains_key_key_size: HistogramVec,
53 find_keys_by_prefix_prefix_size: HistogramVec,
54 find_keys_by_prefix_num_keys: HistogramVec,
55 find_keys_by_prefix_keys_size: HistogramVec,
56 find_key_values_by_prefix_prefix_size: HistogramVec,
57 find_key_values_by_prefix_num_keys: HistogramVec,
58 find_key_values_by_prefix_key_values_size: HistogramVec,
59 write_batch_size: HistogramVec,
60 list_all_sizes: HistogramVec,
61 exists_true_cases: IntCounterVec,
62}
63
64#[derive(Default)]
65struct StoreMetrics {
66 stores: BTreeMap<String, Arc<KeyValueStoreMetrics>>,
67}
68
69static STORE_COUNTERS: LazyLock<Mutex<StoreMetrics>> =
71 LazyLock::new(|| Mutex::new(StoreMetrics::default()));
72
73fn get_counter(name: &str) -> Arc<KeyValueStoreMetrics> {
74 let mut store_metrics = STORE_COUNTERS.lock().unwrap();
75 let key = name.to_string();
76 match store_metrics.stores.entry(key) {
77 Entry::Occupied(entry) => {
78 let entry = entry.into_mut();
79 entry.clone()
80 }
81 Entry::Vacant(entry) => {
82 let store_metric = Arc::new(KeyValueStoreMetrics::new(name.to_string()));
83 entry.insert(store_metric.clone());
84 store_metric
85 }
86 }
87}
88
89impl KeyValueStoreMetrics {
90 pub fn new(name: String) -> Self {
92 let var_name = name.replace(' ', "_");
94 let title_name = name.to_case(Case::Snake);
95
96 let latency_buckets = exponential_bucket_latencies(10000.0);
98 let size_buckets =
100 Some(exponential_buckets(1.0, 4.0, 12).expect("Size buckets creation should not fail"));
101 let count_buckets = Some(
103 exponential_buckets(1.0, 3.0, 11).expect("Count buckets creation should not fail"),
104 );
105
106 let entry1 = format!("{}_read_value_bytes_latency", var_name);
107 let entry2 = format!("{} read value bytes latency", title_name);
108 let read_value_bytes_latency =
109 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
110
111 let entry1 = format!("{}_contains_key_latency", var_name);
112 let entry2 = format!("{} contains key latency", title_name);
113 let contains_key_latency =
114 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
115
116 let entry1 = format!("{}_contains_keys_latency", var_name);
117 let entry2 = format!("{} contains keys latency", title_name);
118 let contains_keys_latency =
119 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
120
121 let entry1 = format!("{}_read_multi_value_bytes_latency", var_name);
122 let entry2 = format!("{} read multi value bytes latency", title_name);
123 let read_multi_values_bytes_latency =
124 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
125
126 let entry1 = format!("{}_find_keys_by_prefix_latency", var_name);
127 let entry2 = format!("{} find keys by prefix latency", title_name);
128 let find_keys_by_prefix_latency =
129 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
130
131 let entry1 = format!("{}_find_key_values_by_prefix_latency", var_name);
132 let entry2 = format!("{} find key values by prefix latency", title_name);
133 let find_key_values_by_prefix_latency =
134 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
135
136 let entry1 = format!("{}_write_batch_latency", var_name);
137 let entry2 = format!("{} write batch latency", title_name);
138 let write_batch_latency =
139 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
140
141 let entry1 = format!("{}_clear_journal_latency", var_name);
142 let entry2 = format!("{} clear journal latency", title_name);
143 let clear_journal_latency =
144 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
145
146 let entry1 = format!("{}_connect_latency", var_name);
147 let entry2 = format!("{} connect latency", title_name);
148 let connect_latency =
149 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
150
151 let entry1 = format!("{}_open_shared_latency", var_name);
152 let entry2 = format!("{} open shared partition", title_name);
153 let open_shared_latency =
154 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
155
156 let entry1 = format!("{}_open_exclusive_latency", var_name);
157 let entry2 = format!("{} open exclusive partition", title_name);
158 let open_exclusive_latency =
159 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
160
161 let entry1 = format!("{}_list_all_latency", var_name);
162 let entry2 = format!("{} list all latency", title_name);
163 let list_all_latency =
164 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
165
166 let entry1 = format!("{}_list_root_keys_latency", var_name);
167 let entry2 = format!("{} list root keys latency", title_name);
168 let list_root_keys_latency =
169 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
170
171 let entry1 = format!("{}_delete_all_latency", var_name);
172 let entry2 = format!("{} delete all latency", title_name);
173 let delete_all_latency =
174 register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
175
176 let entry1 = format!("{}_exists_latency", var_name);
177 let entry2 = format!("{} exists latency", title_name);
178 let exists_latency = register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
179
180 let entry1 = format!("{}_create_latency", var_name);
181 let entry2 = format!("{} create latency", title_name);
182 let create_latency = register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
183
184 let entry1 = format!("{}_delete_latency", var_name);
185 let entry2 = format!("{} delete latency", title_name);
186 let delete_latency = register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
187
188 let entry1 = format!("{}_read_value_none_cases", var_name);
189 let entry2 = format!("{} read value none cases", title_name);
190 let read_value_none_cases = register_int_counter_vec(&entry1, &entry2, &[]);
191
192 let entry1 = format!("{}_read_value_key_size", var_name);
193 let entry2 = format!("{} read value key size", title_name);
194 let read_value_key_size =
195 register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
196
197 let entry1 = format!("{}_read_value_value_size", var_name);
198 let entry2 = format!("{} read value value size", title_name);
199 let read_value_value_size =
200 register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
201
202 let entry1 = format!("{}_read_multi_values_num_entries", var_name);
203 let entry2 = format!("{} read multi values num entries", title_name);
204 let read_multi_values_num_entries =
205 register_histogram_vec(&entry1, &entry2, &[], count_buckets.clone());
206
207 let entry1 = format!("{}_read_multi_values_key_sizes", var_name);
208 let entry2 = format!("{} read multi values key sizes", title_name);
209 let read_multi_values_key_sizes =
210 register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
211
212 let entry1 = format!("{}_contains_keys_num_entries", var_name);
213 let entry2 = format!("{} contains keys num entries", title_name);
214 let contains_keys_num_entries =
215 register_histogram_vec(&entry1, &entry2, &[], count_buckets.clone());
216
217 let entry1 = format!("{}_contains_keys_key_sizes", var_name);
218 let entry2 = format!("{} contains keys key sizes", title_name);
219 let contains_keys_key_sizes =
220 register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
221
222 let entry1 = format!("{}_contains_key_key_size", var_name);
223 let entry2 = format!("{} contains key key size", title_name);
224 let contains_key_key_size =
225 register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
226
227 let entry1 = format!("{}_find_keys_by_prefix_prefix_size", var_name);
228 let entry2 = format!("{} find keys by prefix prefix size", title_name);
229 let find_keys_by_prefix_prefix_size =
230 register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
231
232 let entry1 = format!("{}_find_keys_by_prefix_num_keys", var_name);
233 let entry2 = format!("{} find keys by prefix num keys", title_name);
234 let find_keys_by_prefix_num_keys =
235 register_histogram_vec(&entry1, &entry2, &[], count_buckets.clone());
236
237 let entry1 = format!("{}_find_keys_by_prefix_keys_size", var_name);
238 let entry2 = format!("{} find keys by prefix keys size", title_name);
239 let find_keys_by_prefix_keys_size =
240 register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
241
242 let entry1 = format!("{}_find_key_values_by_prefix_prefix_size", var_name);
243 let entry2 = format!("{} find key values by prefix prefix size", title_name);
244 let find_key_values_by_prefix_prefix_size =
245 register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
246
247 let entry1 = format!("{}_find_key_values_by_prefix_num_keys", var_name);
248 let entry2 = format!("{} find key values by prefix num keys", title_name);
249 let find_key_values_by_prefix_num_keys =
250 register_histogram_vec(&entry1, &entry2, &[], count_buckets.clone());
251
252 let entry1 = format!("{}_find_key_values_by_prefix_key_values_size", var_name);
253 let entry2 = format!("{} find key values by prefix key values size", title_name);
254 let find_key_values_by_prefix_key_values_size =
255 register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
256
257 let entry1 = format!("{}_write_batch_size", var_name);
258 let entry2 = format!("{} write batch size", title_name);
259 let write_batch_size = register_histogram_vec(&entry1, &entry2, &[], size_buckets);
260
261 let entry1 = format!("{}_list_all_sizes", var_name);
262 let entry2 = format!("{} list all sizes", title_name);
263 let list_all_sizes = register_histogram_vec(&entry1, &entry2, &[], count_buckets);
264
265 let entry1 = format!("{}_exists_true_cases", var_name);
266 let entry2 = format!("{} exists true cases", title_name);
267 let exists_true_cases = register_int_counter_vec(&entry1, &entry2, &[]);
268
269 KeyValueStoreMetrics {
270 read_value_bytes_latency,
271 contains_key_latency,
272 contains_keys_latency,
273 read_multi_values_bytes_latency,
274 find_keys_by_prefix_latency,
275 find_key_values_by_prefix_latency,
276 write_batch_latency,
277 clear_journal_latency,
278 connect_latency,
279 open_shared_latency,
280 open_exclusive_latency,
281 list_all_latency,
282 list_root_keys_latency,
283 delete_all_latency,
284 exists_latency,
285 create_latency,
286 delete_latency,
287 read_value_none_cases,
288 read_value_key_size,
289 read_value_value_size,
290 read_multi_values_num_entries,
291 read_multi_values_key_sizes,
292 contains_keys_num_entries,
293 contains_keys_key_sizes,
294 contains_key_key_size,
295 find_keys_by_prefix_prefix_size,
296 find_keys_by_prefix_num_keys,
297 find_keys_by_prefix_keys_size,
298 find_key_values_by_prefix_prefix_size,
299 find_key_values_by_prefix_num_keys,
300 find_key_values_by_prefix_key_values_size,
301 write_batch_size,
302 list_all_sizes,
303 exists_true_cases,
304 }
305 }
306}
307
308#[derive(Clone)]
310pub struct MeteredDatabase<D> {
311 counter: Arc<KeyValueStoreMetrics>,
313 database: D,
315}
316
317#[derive(Clone)]
319pub struct MeteredStore<S> {
320 counter: Arc<KeyValueStoreMetrics>,
322 store: S,
324}
325
326impl<D> WithError for MeteredDatabase<D>
327where
328 D: WithError,
329{
330 type Error = D::Error;
331}
332
333impl<S> WithError for MeteredStore<S>
334where
335 S: WithError,
336{
337 type Error = S::Error;
338}
339
340impl<S> ReadableKeyValueStore for MeteredStore<S>
341where
342 S: ReadableKeyValueStore,
343{
344 const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE;
345
346 fn max_stream_queries(&self) -> usize {
347 self.store.max_stream_queries()
348 }
349
350 fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
351 self.store.root_key()
352 }
353
354 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
355 let _latency = self.counter.read_value_bytes_latency.measure_latency();
356 self.counter
357 .read_value_key_size
358 .with_label_values(&[])
359 .observe(key.len() as f64);
360 let result = self.store.read_value_bytes(key).await?;
361 match &result {
362 None => self
363 .counter
364 .read_value_none_cases
365 .with_label_values(&[])
366 .inc(),
367 Some(value) => self
368 .counter
369 .read_value_value_size
370 .with_label_values(&[])
371 .observe(value.len() as f64),
372 }
373 Ok(result)
374 }
375
376 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
377 let _latency = self.counter.contains_key_latency.measure_latency();
378 self.counter
379 .contains_key_key_size
380 .with_label_values(&[])
381 .observe(key.len() as f64);
382 self.store.contains_key(key).await
383 }
384
385 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
386 let _latency = self.counter.contains_keys_latency.measure_latency();
387 self.counter
388 .contains_keys_num_entries
389 .with_label_values(&[])
390 .observe(keys.len() as f64);
391 let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
392 self.counter
393 .contains_keys_key_sizes
394 .with_label_values(&[])
395 .observe(key_sizes as f64);
396 self.store.contains_keys(keys).await
397 }
398
399 async fn read_multi_values_bytes(
400 &self,
401 keys: &[Vec<u8>],
402 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
403 let _latency = self
404 .counter
405 .read_multi_values_bytes_latency
406 .measure_latency();
407 self.counter
408 .read_multi_values_num_entries
409 .with_label_values(&[])
410 .observe(keys.len() as f64);
411 let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
412 self.counter
413 .read_multi_values_key_sizes
414 .with_label_values(&[])
415 .observe(key_sizes as f64);
416 self.store.read_multi_values_bytes(keys).await
417 }
418
419 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
420 let _latency = self.counter.find_keys_by_prefix_latency.measure_latency();
421 self.counter
422 .find_keys_by_prefix_prefix_size
423 .with_label_values(&[])
424 .observe(key_prefix.len() as f64);
425 let result = self.store.find_keys_by_prefix(key_prefix).await?;
426 let (num_keys, keys_size) = result
427 .iter()
428 .map(|key| key.len())
429 .fold((0, 0), |(count, size), len| (count + 1, size + len));
430 self.counter
431 .find_keys_by_prefix_num_keys
432 .with_label_values(&[])
433 .observe(num_keys as f64);
434 self.counter
435 .find_keys_by_prefix_keys_size
436 .with_label_values(&[])
437 .observe(keys_size as f64);
438 Ok(result)
439 }
440
441 async fn find_key_values_by_prefix(
442 &self,
443 key_prefix: &[u8],
444 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
445 let _latency = self
446 .counter
447 .find_key_values_by_prefix_latency
448 .measure_latency();
449 self.counter
450 .find_key_values_by_prefix_prefix_size
451 .with_label_values(&[])
452 .observe(key_prefix.len() as f64);
453 let result = self.store.find_key_values_by_prefix(key_prefix).await?;
454 let (num_keys, key_values_size) = result
455 .iter()
456 .map(|(key, value)| key.len() + value.len())
457 .fold((0, 0), |(count, size), len| (count + 1, size + len));
458 self.counter
459 .find_key_values_by_prefix_num_keys
460 .with_label_values(&[])
461 .observe(num_keys as f64);
462 self.counter
463 .find_key_values_by_prefix_key_values_size
464 .with_label_values(&[])
465 .observe(key_values_size as f64);
466 Ok(result)
467 }
468}
469
470impl<S> WritableKeyValueStore for MeteredStore<S>
471where
472 S: WritableKeyValueStore,
473{
474 const MAX_VALUE_SIZE: usize = S::MAX_VALUE_SIZE;
475
476 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
477 let _latency = self.counter.write_batch_latency.measure_latency();
478 self.counter
479 .write_batch_size
480 .with_label_values(&[])
481 .observe(batch.size() as f64);
482 self.store.write_batch(batch).await
483 }
484
485 async fn clear_journal(&self) -> Result<(), Self::Error> {
486 let _metric = self.counter.clear_journal_latency.measure_latency();
487 self.store.clear_journal().await
488 }
489}
490
491impl<D> KeyValueDatabase for MeteredDatabase<D>
492where
493 D: KeyValueDatabase,
494{
495 type Config = D::Config;
496 type Store = MeteredStore<D::Store>;
497
498 fn get_name() -> String {
499 D::get_name()
500 }
501
502 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
503 let name = D::get_name();
504 let counter = get_counter(&name);
505 let _latency = counter.connect_latency.measure_latency();
506 let database = D::connect(config, namespace).await?;
507 let counter = get_counter(&name);
508 Ok(Self { counter, database })
509 }
510
511 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
512 let _latency = self.counter.open_shared_latency.measure_latency();
513 let store = self.database.open_shared(root_key)?;
514 let counter = self.counter.clone();
515 Ok(MeteredStore { counter, store })
516 }
517
518 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
519 let _latency = self.counter.open_exclusive_latency.measure_latency();
520 let store = self.database.open_exclusive(root_key)?;
521 let counter = self.counter.clone();
522 Ok(MeteredStore { counter, store })
523 }
524
525 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
526 let name = D::get_name();
527 let counter = get_counter(&name);
528 let _latency = counter.list_all_latency.measure_latency();
529 let namespaces = D::list_all(config).await?;
530 let counter = get_counter(&name);
531 counter
532 .list_all_sizes
533 .with_label_values(&[])
534 .observe(namespaces.len() as f64);
535 Ok(namespaces)
536 }
537
538 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
539 let _latency = self.counter.list_root_keys_latency.measure_latency();
540 self.database.list_root_keys().await
541 }
542
543 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
544 let name = D::get_name();
545 let counter = get_counter(&name);
546 let _latency = counter.delete_all_latency.measure_latency();
547 D::delete_all(config).await
548 }
549
550 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
551 let name = D::get_name();
552 let counter = get_counter(&name);
553 let _latency = counter.exists_latency.measure_latency();
554 let result = D::exists(config, namespace).await?;
555 if result {
556 let counter = get_counter(&name);
557 counter.exists_true_cases.with_label_values(&[]).inc();
558 }
559 Ok(result)
560 }
561
562 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
563 let name = D::get_name();
564 let counter = get_counter(&name);
565 let _latency = counter.create_latency.measure_latency();
566 D::create(config, namespace).await
567 }
568
569 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
570 let name = D::get_name();
571 let counter = get_counter(&name);
572 let _latency = counter.delete_latency.measure_latency();
573 D::delete(config, namespace).await
574 }
575}
576
577#[cfg(with_testing)]
578impl<D> TestKeyValueDatabase for MeteredDatabase<D>
579where
580 D: TestKeyValueDatabase,
581{
582 async fn new_test_config() -> Result<D::Config, Self::Error> {
583 D::new_test_config().await
584 }
585}