linera_views/backends/
metering.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Adds metrics to a key-value store.
5
6use std::{
7    collections::{btree_map::Entry, BTreeMap},
8    sync::{Arc, LazyLock, Mutex},
9};
10
11use convert_case::{Case, Casing};
12use linera_base::prometheus_util::{
13    register_histogram_vec, register_int_counter_vec, MeasureLatency as _,
14};
15use prometheus::{HistogramVec, IntCounterVec};
16
17#[cfg(with_testing)]
18use crate::store::TestKeyValueStore;
19use crate::{
20    batch::Batch,
21    store::{
22        AdminKeyValueStore, KeyIterable as _, KeyValueIterable as _, ReadableKeyValueStore,
23        WithError, WritableKeyValueStore,
24    },
25};
26
27#[derive(Clone)]
28/// The implementation of the `KeyValueStoreMetrics` for the `KeyValueStore`.
29pub struct KeyValueStoreMetrics {
30    read_value_bytes_latency: HistogramVec,
31    contains_key_latency: HistogramVec,
32    contains_keys_latency: HistogramVec,
33    read_multi_values_bytes_latency: HistogramVec,
34    find_keys_by_prefix_latency: HistogramVec,
35    find_key_values_by_prefix_latency: HistogramVec,
36    write_batch_latency: HistogramVec,
37    clear_journal_latency: HistogramVec,
38    connect_latency: HistogramVec,
39    open_exclusive_latency: HistogramVec,
40    list_all_latency: HistogramVec,
41    list_root_keys_latency: HistogramVec,
42    delete_all_latency: HistogramVec,
43    exists_latency: HistogramVec,
44    create_latency: HistogramVec,
45    delete_latency: HistogramVec,
46    read_value_none_cases: IntCounterVec,
47    read_value_key_size: HistogramVec,
48    read_value_value_size: HistogramVec,
49    read_multi_values_num_entries: HistogramVec,
50    read_multi_values_key_sizes: HistogramVec,
51    contains_keys_num_entries: HistogramVec,
52    contains_keys_key_sizes: HistogramVec,
53    contains_key_key_size: HistogramVec,
54    find_keys_by_prefix_prefix_size: HistogramVec,
55    find_keys_by_prefix_num_keys: HistogramVec,
56    find_keys_by_prefix_keys_size: HistogramVec,
57    find_key_values_by_prefix_prefix_size: HistogramVec,
58    find_key_values_by_prefix_num_keys: HistogramVec,
59    find_key_values_by_prefix_key_values_size: HistogramVec,
60    write_batch_size: HistogramVec,
61    list_all_sizes: HistogramVec,
62    exists_true_cases: IntCounterVec,
63}
64
65#[derive(Default)]
66struct StoreMetrics {
67    stores: BTreeMap<String, Arc<KeyValueStoreMetrics>>,
68}
69
70/// The global variables of the RocksDB stores
71static STORE_COUNTERS: LazyLock<Mutex<StoreMetrics>> =
72    LazyLock::new(|| Mutex::new(StoreMetrics::default()));
73
74fn get_counter(name: &str) -> Arc<KeyValueStoreMetrics> {
75    let mut store_metrics = STORE_COUNTERS.lock().unwrap();
76    let key = name.to_string();
77    match store_metrics.stores.entry(key) {
78        Entry::Occupied(entry) => {
79            let entry = entry.into_mut();
80            entry.clone()
81        }
82        Entry::Vacant(entry) => {
83            let store_metric = Arc::new(KeyValueStoreMetrics::new(name.to_string()));
84            entry.insert(store_metric.clone());
85            store_metric
86        }
87    }
88}
89
90impl KeyValueStoreMetrics {
91    /// Creation of a named Metered counter.
92    pub fn new(name: String) -> Self {
93        // name can be "rocks db". Then var_name = "rocks_db" and title_name = "RocksDb"
94        let var_name = name.replace(' ', "_");
95        let title_name = name.to_case(Case::Snake);
96
97        let entry1 = format!("{}_read_value_bytes_latency", var_name);
98        let entry2 = format!("{} read value bytes latency", title_name);
99        let read_value_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
100
101        let entry1 = format!("{}_contains_key_latency", var_name);
102        let entry2 = format!("{} contains key latency", title_name);
103        let contains_key_latency = register_histogram_vec(&entry1, &entry2, &[], None);
104
105        let entry1 = format!("{}_contains_keys_latency", var_name);
106        let entry2 = format!("{} contains keys latency", title_name);
107        let contains_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
108
109        let entry1 = format!("{}_read_multi_value_bytes_latency", var_name);
110        let entry2 = format!("{} read multi value bytes latency", title_name);
111        let read_multi_values_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
112
113        let entry1 = format!("{}_find_keys_by_prefix_latency", var_name);
114        let entry2 = format!("{} find keys by prefix latency", title_name);
115        let find_keys_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
116
117        let entry1 = format!("{}_find_key_values_by_prefix_latency", var_name);
118        let entry2 = format!("{} find key values by prefix latency", title_name);
119        let find_key_values_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
120
121        let entry1 = format!("{}_write_batch_latency", var_name);
122        let entry2 = format!("{} write batch latency", title_name);
123        let write_batch_latency = register_histogram_vec(&entry1, &entry2, &[], None);
124
125        let entry1 = format!("{}_clear_journal_latency", var_name);
126        let entry2 = format!("{} clear journal latency", title_name);
127        let clear_journal_latency = register_histogram_vec(&entry1, &entry2, &[], None);
128
129        let entry1 = format!("{}_connect_latency", var_name);
130        let entry2 = format!("{} connect latency", title_name);
131        let connect_latency = register_histogram_vec(&entry1, &entry2, &[], None);
132
133        let entry1 = format!("{}_open_exclusive_latency", var_name);
134        let entry2 = format!("{} clone with root key latency", title_name);
135        let open_exclusive_latency = register_histogram_vec(&entry1, &entry2, &[], None);
136
137        let entry1 = format!("{}_list_all_latency", var_name);
138        let entry2 = format!("{} list all latency", title_name);
139        let list_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
140
141        let entry1 = format!("{}_list_root_keys_latency", var_name);
142        let entry2 = format!("{} list root keys latency", title_name);
143        let list_root_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
144
145        let entry1 = format!("{}_delete_all_latency", var_name);
146        let entry2 = format!("{} delete all latency", title_name);
147        let delete_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
148
149        let entry1 = format!("{}_exists_latency", var_name);
150        let entry2 = format!("{} exists latency", title_name);
151        let exists_latency = register_histogram_vec(&entry1, &entry2, &[], None);
152
153        let entry1 = format!("{}_create_latency", var_name);
154        let entry2 = format!("{} create latency", title_name);
155        let create_latency = register_histogram_vec(&entry1, &entry2, &[], None);
156
157        let entry1 = format!("{}_delete_latency", var_name);
158        let entry2 = format!("{} delete latency", title_name);
159        let delete_latency = register_histogram_vec(&entry1, &entry2, &[], None);
160
161        let entry1 = format!("{}_read_value_none_cases", var_name);
162        let entry2 = format!("{} read value none cases", title_name);
163        let read_value_none_cases = register_int_counter_vec(&entry1, &entry2, &[]);
164
165        let entry1 = format!("{}_read_value_key_size", var_name);
166        let entry2 = format!("{} read value key size", title_name);
167        let read_value_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
168
169        let entry1 = format!("{}_read_value_value_size", var_name);
170        let entry2 = format!("{} read value value size", title_name);
171        let read_value_value_size = register_histogram_vec(&entry1, &entry2, &[], None);
172
173        let entry1 = format!("{}_read_multi_values_num_entries", var_name);
174        let entry2 = format!("{} read multi values num entries", title_name);
175        let read_multi_values_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
176
177        let entry1 = format!("{}_read_multi_values_key_sizes", var_name);
178        let entry2 = format!("{} read multi values key sizes", title_name);
179        let read_multi_values_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
180
181        let entry1 = format!("{}_contains_keys_num_entries", var_name);
182        let entry2 = format!("{} contains keys num entries", title_name);
183        let contains_keys_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
184
185        let entry1 = format!("{}_contains_keys_key_sizes", var_name);
186        let entry2 = format!("{} contains keys key sizes", title_name);
187        let contains_keys_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
188
189        let entry1 = format!("{}_contains_key_key_size", var_name);
190        let entry2 = format!("{} contains key key size", title_name);
191        let contains_key_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
192
193        let entry1 = format!("{}_find_keys_by_prefix_prefix_size", var_name);
194        let entry2 = format!("{} find keys by prefix prefix size", title_name);
195        let find_keys_by_prefix_prefix_size = register_histogram_vec(&entry1, &entry2, &[], None);
196
197        let entry1 = format!("{}_find_keys_by_prefix_num_keys", var_name);
198        let entry2 = format!("{} find keys by prefix num keys", title_name);
199        let find_keys_by_prefix_num_keys = register_histogram_vec(&entry1, &entry2, &[], None);
200
201        let entry1 = format!("{}_find_keys_by_prefix_keys_size", var_name);
202        let entry2 = format!("{} find keys by prefix keys size", title_name);
203        let find_keys_by_prefix_keys_size = register_histogram_vec(&entry1, &entry2, &[], None);
204
205        let entry1 = format!("{}_find_key_values_by_prefix_prefix_size", var_name);
206        let entry2 = format!("{} find key values by prefix prefix size", title_name);
207        let find_key_values_by_prefix_prefix_size =
208            register_histogram_vec(&entry1, &entry2, &[], None);
209
210        let entry1 = format!("{}_find_key_values_by_prefix_num_keys", var_name);
211        let entry2 = format!("{} find key values by prefix num keys", title_name);
212        let find_key_values_by_prefix_num_keys =
213            register_histogram_vec(&entry1, &entry2, &[], None);
214
215        let entry1 = format!("{}_find_key_values_by_prefix_key_values_size", var_name);
216        let entry2 = format!("{} find key values by prefix key values size", title_name);
217        let find_key_values_by_prefix_key_values_size =
218            register_histogram_vec(&entry1, &entry2, &[], None);
219
220        let entry1 = format!("{}_write_batch_size", var_name);
221        let entry2 = format!("{} write batch size", title_name);
222        let write_batch_size = register_histogram_vec(&entry1, &entry2, &[], None);
223
224        let entry1 = format!("{}_list_all_sizes", var_name);
225        let entry2 = format!("{} list all sizes", title_name);
226        let list_all_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
227
228        let entry1 = format!("{}_exists_true_cases", var_name);
229        let entry2 = format!("{} exists true cases", title_name);
230        let exists_true_cases = register_int_counter_vec(&entry1, &entry2, &[]);
231
232        KeyValueStoreMetrics {
233            read_value_bytes_latency,
234            contains_key_latency,
235            contains_keys_latency,
236            read_multi_values_bytes_latency,
237            find_keys_by_prefix_latency,
238            find_key_values_by_prefix_latency,
239            write_batch_latency,
240            clear_journal_latency,
241            connect_latency,
242            open_exclusive_latency,
243            list_all_latency,
244            list_root_keys_latency,
245            delete_all_latency,
246            exists_latency,
247            create_latency,
248            delete_latency,
249            read_value_none_cases,
250            read_value_key_size,
251            read_value_value_size,
252            read_multi_values_num_entries,
253            read_multi_values_key_sizes,
254            contains_keys_num_entries,
255            contains_keys_key_sizes,
256            contains_key_key_size,
257            find_keys_by_prefix_prefix_size,
258            find_keys_by_prefix_num_keys,
259            find_keys_by_prefix_keys_size,
260            find_key_values_by_prefix_prefix_size,
261            find_key_values_by_prefix_num_keys,
262            find_key_values_by_prefix_key_values_size,
263            write_batch_size,
264            list_all_sizes,
265            exists_true_cases,
266        }
267    }
268}
269
270/// A metered wrapper that keeps track of every operation
271#[derive(Clone)]
272pub struct MeteredStore<K> {
273    /// the metrics being stored
274    counter: Arc<KeyValueStoreMetrics>,
275    /// The underlying store of the metered store
276    store: K,
277}
278
279impl<K> WithError for MeteredStore<K>
280where
281    K: WithError,
282{
283    type Error = K::Error;
284}
285
286impl<K> ReadableKeyValueStore for MeteredStore<K>
287where
288    K: ReadableKeyValueStore,
289{
290    const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
291    type Keys = K::Keys;
292    type KeyValues = K::KeyValues;
293
294    fn max_stream_queries(&self) -> usize {
295        self.store.max_stream_queries()
296    }
297
298    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
299        let _latency = self.counter.read_value_bytes_latency.measure_latency();
300        self.counter
301            .read_value_key_size
302            .with_label_values(&[])
303            .observe(key.len() as f64);
304        let result = self.store.read_value_bytes(key).await?;
305        match &result {
306            None => self
307                .counter
308                .read_value_none_cases
309                .with_label_values(&[])
310                .inc(),
311            Some(value) => self
312                .counter
313                .read_value_value_size
314                .with_label_values(&[])
315                .observe(value.len() as f64),
316        }
317        Ok(result)
318    }
319
320    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
321        let _latency = self.counter.contains_key_latency.measure_latency();
322        self.counter
323            .contains_key_key_size
324            .with_label_values(&[])
325            .observe(key.len() as f64);
326        self.store.contains_key(key).await
327    }
328
329    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
330        let _latency = self.counter.contains_keys_latency.measure_latency();
331        self.counter
332            .contains_keys_num_entries
333            .with_label_values(&[])
334            .observe(keys.len() as f64);
335        let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
336        self.counter
337            .contains_keys_key_sizes
338            .with_label_values(&[])
339            .observe(key_sizes as f64);
340        self.store.contains_keys(keys).await
341    }
342
343    async fn read_multi_values_bytes(
344        &self,
345        keys: Vec<Vec<u8>>,
346    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
347        let _latency = self
348            .counter
349            .read_multi_values_bytes_latency
350            .measure_latency();
351        self.counter
352            .read_multi_values_num_entries
353            .with_label_values(&[])
354            .observe(keys.len() as f64);
355        let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
356        self.counter
357            .read_multi_values_key_sizes
358            .with_label_values(&[])
359            .observe(key_sizes as f64);
360        self.store.read_multi_values_bytes(keys).await
361    }
362
363    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, Self::Error> {
364        let _latency = self.counter.find_keys_by_prefix_latency.measure_latency();
365        self.counter
366            .find_keys_by_prefix_prefix_size
367            .with_label_values(&[])
368            .observe(key_prefix.len() as f64);
369        let result = self.store.find_keys_by_prefix(key_prefix).await?;
370        let (num_keys, keys_size) = result
371            .iterator()
372            .map(|key| key.map(|k| k.len()))
373            .collect::<Result<Vec<usize>, _>>()?
374            .into_iter()
375            .fold((0, 0), |(count, size), len| (count + 1, size + len));
376        self.counter
377            .find_keys_by_prefix_num_keys
378            .with_label_values(&[])
379            .observe(num_keys as f64);
380        self.counter
381            .find_keys_by_prefix_keys_size
382            .with_label_values(&[])
383            .observe(keys_size as f64);
384        Ok(result)
385    }
386
387    async fn find_key_values_by_prefix(
388        &self,
389        key_prefix: &[u8],
390    ) -> Result<Self::KeyValues, Self::Error> {
391        let _latency = self
392            .counter
393            .find_key_values_by_prefix_latency
394            .measure_latency();
395        self.counter
396            .find_key_values_by_prefix_prefix_size
397            .with_label_values(&[])
398            .observe(key_prefix.len() as f64);
399        let result = self.store.find_key_values_by_prefix(key_prefix).await?;
400        let (num_keys, key_values_size) = result
401            .iterator()
402            .map(|key_value| key_value.map(|(key, value)| key.len() + value.len()))
403            .collect::<Result<Vec<usize>, _>>()?
404            .into_iter()
405            .fold((0, 0), |(count, size), len| (count + 1, size + len));
406        self.counter
407            .find_key_values_by_prefix_num_keys
408            .with_label_values(&[])
409            .observe(num_keys as f64);
410        self.counter
411            .find_key_values_by_prefix_key_values_size
412            .with_label_values(&[])
413            .observe(key_values_size as f64);
414        Ok(result)
415    }
416}
417
418impl<K> WritableKeyValueStore for MeteredStore<K>
419where
420    K: WritableKeyValueStore,
421{
422    const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
423
424    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
425        let _latency = self.counter.write_batch_latency.measure_latency();
426        self.counter
427            .write_batch_size
428            .with_label_values(&[])
429            .observe(batch.size() as f64);
430        self.store.write_batch(batch).await
431    }
432
433    async fn clear_journal(&self) -> Result<(), Self::Error> {
434        let _metric = self.counter.clear_journal_latency.measure_latency();
435        self.store.clear_journal().await
436    }
437}
438
439impl<K> AdminKeyValueStore for MeteredStore<K>
440where
441    K: AdminKeyValueStore,
442{
443    type Config = K::Config;
444
445    fn get_name() -> String {
446        K::get_name()
447    }
448
449    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
450        let name = K::get_name();
451        let counter = get_counter(&name);
452        let _latency = counter.connect_latency.measure_latency();
453        let store = K::connect(config, namespace).await?;
454        let counter = get_counter(&name);
455        Ok(Self { counter, store })
456    }
457
458    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
459        let _latency = self.counter.open_exclusive_latency.measure_latency();
460        let store = self.store.open_exclusive(root_key)?;
461        let counter = self.counter.clone();
462        Ok(Self { counter, store })
463    }
464
465    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
466        let name = K::get_name();
467        let counter = get_counter(&name);
468        let _latency = counter.list_all_latency.measure_latency();
469        let namespaces = K::list_all(config).await?;
470        let counter = get_counter(&name);
471        counter
472            .list_all_sizes
473            .with_label_values(&[])
474            .observe(namespaces.len() as f64);
475        Ok(namespaces)
476    }
477
478    async fn list_root_keys(
479        config: &Self::Config,
480        namespace: &str,
481    ) -> Result<Vec<Vec<u8>>, Self::Error> {
482        let name = K::get_name();
483        let counter = get_counter(&name);
484        let _latency = counter.list_root_keys_latency.measure_latency();
485        K::list_root_keys(config, namespace).await
486    }
487
488    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
489        let name = K::get_name();
490        let counter = get_counter(&name);
491        let _latency = counter.delete_all_latency.measure_latency();
492        K::delete_all(config).await
493    }
494
495    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
496        let name = K::get_name();
497        let counter = get_counter(&name);
498        let _latency = counter.exists_latency.measure_latency();
499        let result = K::exists(config, namespace).await?;
500        if result {
501            let counter = get_counter(&name);
502            counter.exists_true_cases.with_label_values(&[]).inc();
503        }
504        Ok(result)
505    }
506
507    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
508        let name = K::get_name();
509        let counter = get_counter(&name);
510        let _latency = counter.create_latency.measure_latency();
511        K::create(config, namespace).await
512    }
513
514    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
515        let name = K::get_name();
516        let counter = get_counter(&name);
517        let _latency = counter.delete_latency.measure_latency();
518        K::delete(config, namespace).await
519    }
520}
521
522#[cfg(with_testing)]
523impl<K> TestKeyValueStore for MeteredStore<K>
524where
525    K: TestKeyValueStore,
526{
527    async fn new_test_config() -> Result<K::Config, Self::Error> {
528        K::new_test_config().await
529    }
530}