linera_views/backends/
metering.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Adds metrics to a key-value store.
5
6use std::{
7    collections::{btree_map::Entry, BTreeMap},
8    sync::{Arc, LazyLock, Mutex},
9};
10
11use convert_case::{Case, Casing};
12use linera_base::prometheus_util::{
13    exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
14    MeasureLatency as _,
15};
16use prometheus::{exponential_buckets, HistogramVec, IntCounterVec};
17
18#[cfg(with_testing)]
19use crate::store::TestKeyValueDatabase;
20use crate::{
21    batch::Batch,
22    store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
23};
24
25#[derive(Clone)]
26/// The implementation of the `KeyValueStoreMetrics` for the `KeyValueStore`.
27pub struct KeyValueStoreMetrics {
28    read_value_bytes_latency: HistogramVec,
29    contains_key_latency: HistogramVec,
30    contains_keys_latency: HistogramVec,
31    read_multi_values_bytes_latency: HistogramVec,
32    find_keys_by_prefix_latency: HistogramVec,
33    find_key_values_by_prefix_latency: HistogramVec,
34    write_batch_latency: HistogramVec,
35    clear_journal_latency: HistogramVec,
36    connect_latency: HistogramVec,
37    open_shared_latency: HistogramVec,
38    open_exclusive_latency: HistogramVec,
39    list_all_latency: HistogramVec,
40    list_root_keys_latency: HistogramVec,
41    delete_all_latency: HistogramVec,
42    exists_latency: HistogramVec,
43    create_latency: HistogramVec,
44    delete_latency: HistogramVec,
45    read_value_none_cases: IntCounterVec,
46    read_value_key_size: HistogramVec,
47    read_value_value_size: HistogramVec,
48    read_multi_values_num_entries: HistogramVec,
49    read_multi_values_key_sizes: HistogramVec,
50    contains_keys_num_entries: HistogramVec,
51    contains_keys_key_sizes: HistogramVec,
52    contains_key_key_size: HistogramVec,
53    find_keys_by_prefix_prefix_size: HistogramVec,
54    find_keys_by_prefix_num_keys: HistogramVec,
55    find_keys_by_prefix_keys_size: HistogramVec,
56    find_key_values_by_prefix_prefix_size: HistogramVec,
57    find_key_values_by_prefix_num_keys: HistogramVec,
58    find_key_values_by_prefix_key_values_size: HistogramVec,
59    write_batch_size: HistogramVec,
60    list_all_sizes: HistogramVec,
61    exists_true_cases: IntCounterVec,
62}
63
64#[derive(Default)]
65struct StoreMetrics {
66    stores: BTreeMap<String, Arc<KeyValueStoreMetrics>>,
67}
68
69/// The global variables of the RocksDB stores
70static STORE_COUNTERS: LazyLock<Mutex<StoreMetrics>> =
71    LazyLock::new(|| Mutex::new(StoreMetrics::default()));
72
73fn get_counter(name: &str) -> Arc<KeyValueStoreMetrics> {
74    let mut store_metrics = STORE_COUNTERS.lock().unwrap();
75    let key = name.to_string();
76    match store_metrics.stores.entry(key) {
77        Entry::Occupied(entry) => {
78            let entry = entry.into_mut();
79            entry.clone()
80        }
81        Entry::Vacant(entry) => {
82            let store_metric = Arc::new(KeyValueStoreMetrics::new(name.to_string()));
83            entry.insert(store_metric.clone());
84            store_metric
85        }
86    }
87}
88
89impl KeyValueStoreMetrics {
90    /// Creation of a named Metered counter.
91    pub fn new(name: String) -> Self {
92        // name can be "rocks db". Then var_name = "rocks_db" and title_name = "RocksDb"
93        let var_name = name.replace(' ', "_");
94        let title_name = name.to_case(Case::Snake);
95
96        // Latency buckets in milliseconds: up to 10 seconds
97        let latency_buckets = exponential_bucket_latencies(10000.0);
98        // Size buckets in bytes: 1B to 10MB
99        let size_buckets =
100            Some(exponential_buckets(1.0, 4.0, 12).expect("Size buckets creation should not fail"));
101        // Count buckets: 1 to 100,000
102        let count_buckets = Some(
103            exponential_buckets(1.0, 3.0, 11).expect("Count buckets creation should not fail"),
104        );
105
106        let entry1 = format!("{}_read_value_bytes_latency", var_name);
107        let entry2 = format!("{} read value bytes latency", title_name);
108        let read_value_bytes_latency =
109            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
110
111        let entry1 = format!("{}_contains_key_latency", var_name);
112        let entry2 = format!("{} contains key latency", title_name);
113        let contains_key_latency =
114            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
115
116        let entry1 = format!("{}_contains_keys_latency", var_name);
117        let entry2 = format!("{} contains keys latency", title_name);
118        let contains_keys_latency =
119            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
120
121        let entry1 = format!("{}_read_multi_value_bytes_latency", var_name);
122        let entry2 = format!("{} read multi value bytes latency", title_name);
123        let read_multi_values_bytes_latency =
124            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
125
126        let entry1 = format!("{}_find_keys_by_prefix_latency", var_name);
127        let entry2 = format!("{} find keys by prefix latency", title_name);
128        let find_keys_by_prefix_latency =
129            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
130
131        let entry1 = format!("{}_find_key_values_by_prefix_latency", var_name);
132        let entry2 = format!("{} find key values by prefix latency", title_name);
133        let find_key_values_by_prefix_latency =
134            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
135
136        let entry1 = format!("{}_write_batch_latency", var_name);
137        let entry2 = format!("{} write batch latency", title_name);
138        let write_batch_latency =
139            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
140
141        let entry1 = format!("{}_clear_journal_latency", var_name);
142        let entry2 = format!("{} clear journal latency", title_name);
143        let clear_journal_latency =
144            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
145
146        let entry1 = format!("{}_connect_latency", var_name);
147        let entry2 = format!("{} connect latency", title_name);
148        let connect_latency =
149            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
150
151        let entry1 = format!("{}_open_shared_latency", var_name);
152        let entry2 = format!("{} open shared partition", title_name);
153        let open_shared_latency =
154            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
155
156        let entry1 = format!("{}_open_exclusive_latency", var_name);
157        let entry2 = format!("{} open exclusive partition", title_name);
158        let open_exclusive_latency =
159            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
160
161        let entry1 = format!("{}_list_all_latency", var_name);
162        let entry2 = format!("{} list all latency", title_name);
163        let list_all_latency =
164            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
165
166        let entry1 = format!("{}_list_root_keys_latency", var_name);
167        let entry2 = format!("{} list root keys latency", title_name);
168        let list_root_keys_latency =
169            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
170
171        let entry1 = format!("{}_delete_all_latency", var_name);
172        let entry2 = format!("{} delete all latency", title_name);
173        let delete_all_latency =
174            register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
175
176        let entry1 = format!("{}_exists_latency", var_name);
177        let entry2 = format!("{} exists latency", title_name);
178        let exists_latency = register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
179
180        let entry1 = format!("{}_create_latency", var_name);
181        let entry2 = format!("{} create latency", title_name);
182        let create_latency = register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
183
184        let entry1 = format!("{}_delete_latency", var_name);
185        let entry2 = format!("{} delete latency", title_name);
186        let delete_latency = register_histogram_vec(&entry1, &entry2, &[], latency_buckets.clone());
187
188        let entry1 = format!("{}_read_value_none_cases", var_name);
189        let entry2 = format!("{} read value none cases", title_name);
190        let read_value_none_cases = register_int_counter_vec(&entry1, &entry2, &[]);
191
192        let entry1 = format!("{}_read_value_key_size", var_name);
193        let entry2 = format!("{} read value key size", title_name);
194        let read_value_key_size =
195            register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
196
197        let entry1 = format!("{}_read_value_value_size", var_name);
198        let entry2 = format!("{} read value value size", title_name);
199        let read_value_value_size =
200            register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
201
202        let entry1 = format!("{}_read_multi_values_num_entries", var_name);
203        let entry2 = format!("{} read multi values num entries", title_name);
204        let read_multi_values_num_entries =
205            register_histogram_vec(&entry1, &entry2, &[], count_buckets.clone());
206
207        let entry1 = format!("{}_read_multi_values_key_sizes", var_name);
208        let entry2 = format!("{} read multi values key sizes", title_name);
209        let read_multi_values_key_sizes =
210            register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
211
212        let entry1 = format!("{}_contains_keys_num_entries", var_name);
213        let entry2 = format!("{} contains keys num entries", title_name);
214        let contains_keys_num_entries =
215            register_histogram_vec(&entry1, &entry2, &[], count_buckets.clone());
216
217        let entry1 = format!("{}_contains_keys_key_sizes", var_name);
218        let entry2 = format!("{} contains keys key sizes", title_name);
219        let contains_keys_key_sizes =
220            register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
221
222        let entry1 = format!("{}_contains_key_key_size", var_name);
223        let entry2 = format!("{} contains key key size", title_name);
224        let contains_key_key_size =
225            register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
226
227        let entry1 = format!("{}_find_keys_by_prefix_prefix_size", var_name);
228        let entry2 = format!("{} find keys by prefix prefix size", title_name);
229        let find_keys_by_prefix_prefix_size =
230            register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
231
232        let entry1 = format!("{}_find_keys_by_prefix_num_keys", var_name);
233        let entry2 = format!("{} find keys by prefix num keys", title_name);
234        let find_keys_by_prefix_num_keys =
235            register_histogram_vec(&entry1, &entry2, &[], count_buckets.clone());
236
237        let entry1 = format!("{}_find_keys_by_prefix_keys_size", var_name);
238        let entry2 = format!("{} find keys by prefix keys size", title_name);
239        let find_keys_by_prefix_keys_size =
240            register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
241
242        let entry1 = format!("{}_find_key_values_by_prefix_prefix_size", var_name);
243        let entry2 = format!("{} find key values by prefix prefix size", title_name);
244        let find_key_values_by_prefix_prefix_size =
245            register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
246
247        let entry1 = format!("{}_find_key_values_by_prefix_num_keys", var_name);
248        let entry2 = format!("{} find key values by prefix num keys", title_name);
249        let find_key_values_by_prefix_num_keys =
250            register_histogram_vec(&entry1, &entry2, &[], count_buckets.clone());
251
252        let entry1 = format!("{}_find_key_values_by_prefix_key_values_size", var_name);
253        let entry2 = format!("{} find key values by prefix key values size", title_name);
254        let find_key_values_by_prefix_key_values_size =
255            register_histogram_vec(&entry1, &entry2, &[], size_buckets.clone());
256
257        let entry1 = format!("{}_write_batch_size", var_name);
258        let entry2 = format!("{} write batch size", title_name);
259        let write_batch_size = register_histogram_vec(&entry1, &entry2, &[], size_buckets);
260
261        let entry1 = format!("{}_list_all_sizes", var_name);
262        let entry2 = format!("{} list all sizes", title_name);
263        let list_all_sizes = register_histogram_vec(&entry1, &entry2, &[], count_buckets);
264
265        let entry1 = format!("{}_exists_true_cases", var_name);
266        let entry2 = format!("{} exists true cases", title_name);
267        let exists_true_cases = register_int_counter_vec(&entry1, &entry2, &[]);
268
269        KeyValueStoreMetrics {
270            read_value_bytes_latency,
271            contains_key_latency,
272            contains_keys_latency,
273            read_multi_values_bytes_latency,
274            find_keys_by_prefix_latency,
275            find_key_values_by_prefix_latency,
276            write_batch_latency,
277            clear_journal_latency,
278            connect_latency,
279            open_shared_latency,
280            open_exclusive_latency,
281            list_all_latency,
282            list_root_keys_latency,
283            delete_all_latency,
284            exists_latency,
285            create_latency,
286            delete_latency,
287            read_value_none_cases,
288            read_value_key_size,
289            read_value_value_size,
290            read_multi_values_num_entries,
291            read_multi_values_key_sizes,
292            contains_keys_num_entries,
293            contains_keys_key_sizes,
294            contains_key_key_size,
295            find_keys_by_prefix_prefix_size,
296            find_keys_by_prefix_num_keys,
297            find_keys_by_prefix_keys_size,
298            find_key_values_by_prefix_prefix_size,
299            find_key_values_by_prefix_num_keys,
300            find_key_values_by_prefix_key_values_size,
301            write_batch_size,
302            list_all_sizes,
303            exists_true_cases,
304        }
305    }
306}
307
308/// A metered database that keeps track of every operation.
309#[derive(Clone)]
310pub struct MeteredDatabase<D> {
311    /// The metrics being computed.
312    counter: Arc<KeyValueStoreMetrics>,
313    /// The underlying database.
314    database: D,
315}
316
317/// A metered store that keeps track of every operation.
318#[derive(Clone)]
319pub struct MeteredStore<S> {
320    /// The metrics being computed.
321    counter: Arc<KeyValueStoreMetrics>,
322    /// The underlying store.
323    store: S,
324}
325
326impl<D> WithError for MeteredDatabase<D>
327where
328    D: WithError,
329{
330    type Error = D::Error;
331}
332
333impl<S> WithError for MeteredStore<S>
334where
335    S: WithError,
336{
337    type Error = S::Error;
338}
339
340impl<S> ReadableKeyValueStore for MeteredStore<S>
341where
342    S: ReadableKeyValueStore,
343{
344    const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE;
345
346    fn max_stream_queries(&self) -> usize {
347        self.store.max_stream_queries()
348    }
349
350    fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
351        self.store.root_key()
352    }
353
354    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
355        let _latency = self.counter.read_value_bytes_latency.measure_latency();
356        self.counter
357            .read_value_key_size
358            .with_label_values(&[])
359            .observe(key.len() as f64);
360        let result = self.store.read_value_bytes(key).await?;
361        match &result {
362            None => self
363                .counter
364                .read_value_none_cases
365                .with_label_values(&[])
366                .inc(),
367            Some(value) => self
368                .counter
369                .read_value_value_size
370                .with_label_values(&[])
371                .observe(value.len() as f64),
372        }
373        Ok(result)
374    }
375
376    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
377        let _latency = self.counter.contains_key_latency.measure_latency();
378        self.counter
379            .contains_key_key_size
380            .with_label_values(&[])
381            .observe(key.len() as f64);
382        self.store.contains_key(key).await
383    }
384
385    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
386        let _latency = self.counter.contains_keys_latency.measure_latency();
387        self.counter
388            .contains_keys_num_entries
389            .with_label_values(&[])
390            .observe(keys.len() as f64);
391        let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
392        self.counter
393            .contains_keys_key_sizes
394            .with_label_values(&[])
395            .observe(key_sizes as f64);
396        self.store.contains_keys(keys).await
397    }
398
399    async fn read_multi_values_bytes(
400        &self,
401        keys: &[Vec<u8>],
402    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
403        let _latency = self
404            .counter
405            .read_multi_values_bytes_latency
406            .measure_latency();
407        self.counter
408            .read_multi_values_num_entries
409            .with_label_values(&[])
410            .observe(keys.len() as f64);
411        let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
412        self.counter
413            .read_multi_values_key_sizes
414            .with_label_values(&[])
415            .observe(key_sizes as f64);
416        self.store.read_multi_values_bytes(keys).await
417    }
418
419    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
420        let _latency = self.counter.find_keys_by_prefix_latency.measure_latency();
421        self.counter
422            .find_keys_by_prefix_prefix_size
423            .with_label_values(&[])
424            .observe(key_prefix.len() as f64);
425        let result = self.store.find_keys_by_prefix(key_prefix).await?;
426        let (num_keys, keys_size) = result
427            .iter()
428            .map(|key| key.len())
429            .fold((0, 0), |(count, size), len| (count + 1, size + len));
430        self.counter
431            .find_keys_by_prefix_num_keys
432            .with_label_values(&[])
433            .observe(num_keys as f64);
434        self.counter
435            .find_keys_by_prefix_keys_size
436            .with_label_values(&[])
437            .observe(keys_size as f64);
438        Ok(result)
439    }
440
441    async fn find_key_values_by_prefix(
442        &self,
443        key_prefix: &[u8],
444    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
445        let _latency = self
446            .counter
447            .find_key_values_by_prefix_latency
448            .measure_latency();
449        self.counter
450            .find_key_values_by_prefix_prefix_size
451            .with_label_values(&[])
452            .observe(key_prefix.len() as f64);
453        let result = self.store.find_key_values_by_prefix(key_prefix).await?;
454        let (num_keys, key_values_size) = result
455            .iter()
456            .map(|(key, value)| key.len() + value.len())
457            .fold((0, 0), |(count, size), len| (count + 1, size + len));
458        self.counter
459            .find_key_values_by_prefix_num_keys
460            .with_label_values(&[])
461            .observe(num_keys as f64);
462        self.counter
463            .find_key_values_by_prefix_key_values_size
464            .with_label_values(&[])
465            .observe(key_values_size as f64);
466        Ok(result)
467    }
468}
469
470impl<S> WritableKeyValueStore for MeteredStore<S>
471where
472    S: WritableKeyValueStore,
473{
474    const MAX_VALUE_SIZE: usize = S::MAX_VALUE_SIZE;
475
476    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
477        let _latency = self.counter.write_batch_latency.measure_latency();
478        self.counter
479            .write_batch_size
480            .with_label_values(&[])
481            .observe(batch.size() as f64);
482        self.store.write_batch(batch).await
483    }
484
485    async fn clear_journal(&self) -> Result<(), Self::Error> {
486        let _metric = self.counter.clear_journal_latency.measure_latency();
487        self.store.clear_journal().await
488    }
489}
490
491impl<D> KeyValueDatabase for MeteredDatabase<D>
492where
493    D: KeyValueDatabase,
494{
495    type Config = D::Config;
496    type Store = MeteredStore<D::Store>;
497
498    fn get_name() -> String {
499        D::get_name()
500    }
501
502    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
503        let name = D::get_name();
504        let counter = get_counter(&name);
505        let _latency = counter.connect_latency.measure_latency();
506        let database = D::connect(config, namespace).await?;
507        let counter = get_counter(&name);
508        Ok(Self { counter, database })
509    }
510
511    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
512        let _latency = self.counter.open_shared_latency.measure_latency();
513        let store = self.database.open_shared(root_key)?;
514        let counter = self.counter.clone();
515        Ok(MeteredStore { counter, store })
516    }
517
518    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
519        let _latency = self.counter.open_exclusive_latency.measure_latency();
520        let store = self.database.open_exclusive(root_key)?;
521        let counter = self.counter.clone();
522        Ok(MeteredStore { counter, store })
523    }
524
525    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
526        let name = D::get_name();
527        let counter = get_counter(&name);
528        let _latency = counter.list_all_latency.measure_latency();
529        let namespaces = D::list_all(config).await?;
530        let counter = get_counter(&name);
531        counter
532            .list_all_sizes
533            .with_label_values(&[])
534            .observe(namespaces.len() as f64);
535        Ok(namespaces)
536    }
537
538    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
539        let _latency = self.counter.list_root_keys_latency.measure_latency();
540        self.database.list_root_keys().await
541    }
542
543    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
544        let name = D::get_name();
545        let counter = get_counter(&name);
546        let _latency = counter.delete_all_latency.measure_latency();
547        D::delete_all(config).await
548    }
549
550    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
551        let name = D::get_name();
552        let counter = get_counter(&name);
553        let _latency = counter.exists_latency.measure_latency();
554        let result = D::exists(config, namespace).await?;
555        if result {
556            let counter = get_counter(&name);
557            counter.exists_true_cases.with_label_values(&[]).inc();
558        }
559        Ok(result)
560    }
561
562    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
563        let name = D::get_name();
564        let counter = get_counter(&name);
565        let _latency = counter.create_latency.measure_latency();
566        D::create(config, namespace).await
567    }
568
569    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
570        let name = D::get_name();
571        let counter = get_counter(&name);
572        let _latency = counter.delete_latency.measure_latency();
573        D::delete(config, namespace).await
574    }
575}
576
577#[cfg(with_testing)]
578impl<D> TestKeyValueDatabase for MeteredDatabase<D>
579where
580    D: TestKeyValueDatabase,
581{
582    async fn new_test_config() -> Result<D::Config, Self::Error> {
583        D::new_test_config().await
584    }
585}