linera_views/backends/
metering.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Adds metrics to a key-value store.
5
6use std::{
7    collections::{btree_map::Entry, BTreeMap},
8    sync::{Arc, LazyLock, Mutex},
9};
10
11use convert_case::{Case, Casing};
12use linera_base::prometheus_util::{
13    register_histogram_vec, register_int_counter_vec, MeasureLatency as _,
14};
15use prometheus::{HistogramVec, IntCounterVec};
16
17#[cfg(with_testing)]
18use crate::store::TestKeyValueStore;
19use crate::{
20    batch::Batch,
21    store::{AdminKeyValueStore, ReadableKeyValueStore, WithError, WritableKeyValueStore},
22};
23
24#[derive(Clone)]
25/// The implementation of the `KeyValueStoreMetrics` for the `KeyValueStore`.
26pub struct KeyValueStoreMetrics {
27    read_value_bytes_latency: HistogramVec,
28    contains_key_latency: HistogramVec,
29    contains_keys_latency: HistogramVec,
30    read_multi_values_bytes_latency: HistogramVec,
31    find_keys_by_prefix_latency: HistogramVec,
32    find_key_values_by_prefix_latency: HistogramVec,
33    write_batch_latency: HistogramVec,
34    clear_journal_latency: HistogramVec,
35    connect_latency: HistogramVec,
36    open_exclusive_latency: HistogramVec,
37    list_all_latency: HistogramVec,
38    list_root_keys_latency: HistogramVec,
39    delete_all_latency: HistogramVec,
40    exists_latency: HistogramVec,
41    create_latency: HistogramVec,
42    delete_latency: HistogramVec,
43    read_value_none_cases: IntCounterVec,
44    read_value_key_size: HistogramVec,
45    read_value_value_size: HistogramVec,
46    read_multi_values_num_entries: HistogramVec,
47    read_multi_values_key_sizes: HistogramVec,
48    contains_keys_num_entries: HistogramVec,
49    contains_keys_key_sizes: HistogramVec,
50    contains_key_key_size: HistogramVec,
51    find_keys_by_prefix_prefix_size: HistogramVec,
52    find_keys_by_prefix_num_keys: HistogramVec,
53    find_keys_by_prefix_keys_size: HistogramVec,
54    find_key_values_by_prefix_prefix_size: HistogramVec,
55    find_key_values_by_prefix_num_keys: HistogramVec,
56    find_key_values_by_prefix_key_values_size: HistogramVec,
57    write_batch_size: HistogramVec,
58    list_all_sizes: HistogramVec,
59    exists_true_cases: IntCounterVec,
60}
61
62#[derive(Default)]
63struct StoreMetrics {
64    stores: BTreeMap<String, Arc<KeyValueStoreMetrics>>,
65}
66
67/// The global variables of the RocksDB stores
68static STORE_COUNTERS: LazyLock<Mutex<StoreMetrics>> =
69    LazyLock::new(|| Mutex::new(StoreMetrics::default()));
70
71fn get_counter(name: &str) -> Arc<KeyValueStoreMetrics> {
72    let mut store_metrics = STORE_COUNTERS.lock().unwrap();
73    let key = name.to_string();
74    match store_metrics.stores.entry(key) {
75        Entry::Occupied(entry) => {
76            let entry = entry.into_mut();
77            entry.clone()
78        }
79        Entry::Vacant(entry) => {
80            let store_metric = Arc::new(KeyValueStoreMetrics::new(name.to_string()));
81            entry.insert(store_metric.clone());
82            store_metric
83        }
84    }
85}
86
87impl KeyValueStoreMetrics {
88    /// Creation of a named Metered counter.
89    pub fn new(name: String) -> Self {
90        // name can be "rocks db". Then var_name = "rocks_db" and title_name = "RocksDb"
91        let var_name = name.replace(' ', "_");
92        let title_name = name.to_case(Case::Snake);
93
94        let entry1 = format!("{}_read_value_bytes_latency", var_name);
95        let entry2 = format!("{} read value bytes latency", title_name);
96        let read_value_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
97
98        let entry1 = format!("{}_contains_key_latency", var_name);
99        let entry2 = format!("{} contains key latency", title_name);
100        let contains_key_latency = register_histogram_vec(&entry1, &entry2, &[], None);
101
102        let entry1 = format!("{}_contains_keys_latency", var_name);
103        let entry2 = format!("{} contains keys latency", title_name);
104        let contains_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
105
106        let entry1 = format!("{}_read_multi_value_bytes_latency", var_name);
107        let entry2 = format!("{} read multi value bytes latency", title_name);
108        let read_multi_values_bytes_latency = register_histogram_vec(&entry1, &entry2, &[], None);
109
110        let entry1 = format!("{}_find_keys_by_prefix_latency", var_name);
111        let entry2 = format!("{} find keys by prefix latency", title_name);
112        let find_keys_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
113
114        let entry1 = format!("{}_find_key_values_by_prefix_latency", var_name);
115        let entry2 = format!("{} find key values by prefix latency", title_name);
116        let find_key_values_by_prefix_latency = register_histogram_vec(&entry1, &entry2, &[], None);
117
118        let entry1 = format!("{}_write_batch_latency", var_name);
119        let entry2 = format!("{} write batch latency", title_name);
120        let write_batch_latency = register_histogram_vec(&entry1, &entry2, &[], None);
121
122        let entry1 = format!("{}_clear_journal_latency", var_name);
123        let entry2 = format!("{} clear journal latency", title_name);
124        let clear_journal_latency = register_histogram_vec(&entry1, &entry2, &[], None);
125
126        let entry1 = format!("{}_connect_latency", var_name);
127        let entry2 = format!("{} connect latency", title_name);
128        let connect_latency = register_histogram_vec(&entry1, &entry2, &[], None);
129
130        let entry1 = format!("{}_open_exclusive_latency", var_name);
131        let entry2 = format!("{} clone with root key latency", title_name);
132        let open_exclusive_latency = register_histogram_vec(&entry1, &entry2, &[], None);
133
134        let entry1 = format!("{}_list_all_latency", var_name);
135        let entry2 = format!("{} list all latency", title_name);
136        let list_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
137
138        let entry1 = format!("{}_list_root_keys_latency", var_name);
139        let entry2 = format!("{} list root keys latency", title_name);
140        let list_root_keys_latency = register_histogram_vec(&entry1, &entry2, &[], None);
141
142        let entry1 = format!("{}_delete_all_latency", var_name);
143        let entry2 = format!("{} delete all latency", title_name);
144        let delete_all_latency = register_histogram_vec(&entry1, &entry2, &[], None);
145
146        let entry1 = format!("{}_exists_latency", var_name);
147        let entry2 = format!("{} exists latency", title_name);
148        let exists_latency = register_histogram_vec(&entry1, &entry2, &[], None);
149
150        let entry1 = format!("{}_create_latency", var_name);
151        let entry2 = format!("{} create latency", title_name);
152        let create_latency = register_histogram_vec(&entry1, &entry2, &[], None);
153
154        let entry1 = format!("{}_delete_latency", var_name);
155        let entry2 = format!("{} delete latency", title_name);
156        let delete_latency = register_histogram_vec(&entry1, &entry2, &[], None);
157
158        let entry1 = format!("{}_read_value_none_cases", var_name);
159        let entry2 = format!("{} read value none cases", title_name);
160        let read_value_none_cases = register_int_counter_vec(&entry1, &entry2, &[]);
161
162        let entry1 = format!("{}_read_value_key_size", var_name);
163        let entry2 = format!("{} read value key size", title_name);
164        let read_value_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
165
166        let entry1 = format!("{}_read_value_value_size", var_name);
167        let entry2 = format!("{} read value value size", title_name);
168        let read_value_value_size = register_histogram_vec(&entry1, &entry2, &[], None);
169
170        let entry1 = format!("{}_read_multi_values_num_entries", var_name);
171        let entry2 = format!("{} read multi values num entries", title_name);
172        let read_multi_values_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
173
174        let entry1 = format!("{}_read_multi_values_key_sizes", var_name);
175        let entry2 = format!("{} read multi values key sizes", title_name);
176        let read_multi_values_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
177
178        let entry1 = format!("{}_contains_keys_num_entries", var_name);
179        let entry2 = format!("{} contains keys num entries", title_name);
180        let contains_keys_num_entries = register_histogram_vec(&entry1, &entry2, &[], None);
181
182        let entry1 = format!("{}_contains_keys_key_sizes", var_name);
183        let entry2 = format!("{} contains keys key sizes", title_name);
184        let contains_keys_key_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
185
186        let entry1 = format!("{}_contains_key_key_size", var_name);
187        let entry2 = format!("{} contains key key size", title_name);
188        let contains_key_key_size = register_histogram_vec(&entry1, &entry2, &[], None);
189
190        let entry1 = format!("{}_find_keys_by_prefix_prefix_size", var_name);
191        let entry2 = format!("{} find keys by prefix prefix size", title_name);
192        let find_keys_by_prefix_prefix_size = register_histogram_vec(&entry1, &entry2, &[], None);
193
194        let entry1 = format!("{}_find_keys_by_prefix_num_keys", var_name);
195        let entry2 = format!("{} find keys by prefix num keys", title_name);
196        let find_keys_by_prefix_num_keys = register_histogram_vec(&entry1, &entry2, &[], None);
197
198        let entry1 = format!("{}_find_keys_by_prefix_keys_size", var_name);
199        let entry2 = format!("{} find keys by prefix keys size", title_name);
200        let find_keys_by_prefix_keys_size = register_histogram_vec(&entry1, &entry2, &[], None);
201
202        let entry1 = format!("{}_find_key_values_by_prefix_prefix_size", var_name);
203        let entry2 = format!("{} find key values by prefix prefix size", title_name);
204        let find_key_values_by_prefix_prefix_size =
205            register_histogram_vec(&entry1, &entry2, &[], None);
206
207        let entry1 = format!("{}_find_key_values_by_prefix_num_keys", var_name);
208        let entry2 = format!("{} find key values by prefix num keys", title_name);
209        let find_key_values_by_prefix_num_keys =
210            register_histogram_vec(&entry1, &entry2, &[], None);
211
212        let entry1 = format!("{}_find_key_values_by_prefix_key_values_size", var_name);
213        let entry2 = format!("{} find key values by prefix key values size", title_name);
214        let find_key_values_by_prefix_key_values_size =
215            register_histogram_vec(&entry1, &entry2, &[], None);
216
217        let entry1 = format!("{}_write_batch_size", var_name);
218        let entry2 = format!("{} write batch size", title_name);
219        let write_batch_size = register_histogram_vec(&entry1, &entry2, &[], None);
220
221        let entry1 = format!("{}_list_all_sizes", var_name);
222        let entry2 = format!("{} list all sizes", title_name);
223        let list_all_sizes = register_histogram_vec(&entry1, &entry2, &[], None);
224
225        let entry1 = format!("{}_exists_true_cases", var_name);
226        let entry2 = format!("{} exists true cases", title_name);
227        let exists_true_cases = register_int_counter_vec(&entry1, &entry2, &[]);
228
229        KeyValueStoreMetrics {
230            read_value_bytes_latency,
231            contains_key_latency,
232            contains_keys_latency,
233            read_multi_values_bytes_latency,
234            find_keys_by_prefix_latency,
235            find_key_values_by_prefix_latency,
236            write_batch_latency,
237            clear_journal_latency,
238            connect_latency,
239            open_exclusive_latency,
240            list_all_latency,
241            list_root_keys_latency,
242            delete_all_latency,
243            exists_latency,
244            create_latency,
245            delete_latency,
246            read_value_none_cases,
247            read_value_key_size,
248            read_value_value_size,
249            read_multi_values_num_entries,
250            read_multi_values_key_sizes,
251            contains_keys_num_entries,
252            contains_keys_key_sizes,
253            contains_key_key_size,
254            find_keys_by_prefix_prefix_size,
255            find_keys_by_prefix_num_keys,
256            find_keys_by_prefix_keys_size,
257            find_key_values_by_prefix_prefix_size,
258            find_key_values_by_prefix_num_keys,
259            find_key_values_by_prefix_key_values_size,
260            write_batch_size,
261            list_all_sizes,
262            exists_true_cases,
263        }
264    }
265}
266
267/// A metered wrapper that keeps track of every operation
268#[derive(Clone)]
269pub struct MeteredStore<K> {
270    /// the metrics being stored
271    counter: Arc<KeyValueStoreMetrics>,
272    /// The underlying store of the metered store
273    store: K,
274}
275
276impl<K> WithError for MeteredStore<K>
277where
278    K: WithError,
279{
280    type Error = K::Error;
281}
282
283impl<K> ReadableKeyValueStore for MeteredStore<K>
284where
285    K: ReadableKeyValueStore,
286{
287    const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
288
289    fn max_stream_queries(&self) -> usize {
290        self.store.max_stream_queries()
291    }
292
293    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
294        let _latency = self.counter.read_value_bytes_latency.measure_latency();
295        self.counter
296            .read_value_key_size
297            .with_label_values(&[])
298            .observe(key.len() as f64);
299        let result = self.store.read_value_bytes(key).await?;
300        match &result {
301            None => self
302                .counter
303                .read_value_none_cases
304                .with_label_values(&[])
305                .inc(),
306            Some(value) => self
307                .counter
308                .read_value_value_size
309                .with_label_values(&[])
310                .observe(value.len() as f64),
311        }
312        Ok(result)
313    }
314
315    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
316        let _latency = self.counter.contains_key_latency.measure_latency();
317        self.counter
318            .contains_key_key_size
319            .with_label_values(&[])
320            .observe(key.len() as f64);
321        self.store.contains_key(key).await
322    }
323
324    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
325        let _latency = self.counter.contains_keys_latency.measure_latency();
326        self.counter
327            .contains_keys_num_entries
328            .with_label_values(&[])
329            .observe(keys.len() as f64);
330        let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
331        self.counter
332            .contains_keys_key_sizes
333            .with_label_values(&[])
334            .observe(key_sizes as f64);
335        self.store.contains_keys(keys).await
336    }
337
338    async fn read_multi_values_bytes(
339        &self,
340        keys: Vec<Vec<u8>>,
341    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
342        let _latency = self
343            .counter
344            .read_multi_values_bytes_latency
345            .measure_latency();
346        self.counter
347            .read_multi_values_num_entries
348            .with_label_values(&[])
349            .observe(keys.len() as f64);
350        let key_sizes = keys.iter().map(|k| k.len()).sum::<usize>();
351        self.counter
352            .read_multi_values_key_sizes
353            .with_label_values(&[])
354            .observe(key_sizes as f64);
355        self.store.read_multi_values_bytes(keys).await
356    }
357
358    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
359        let _latency = self.counter.find_keys_by_prefix_latency.measure_latency();
360        self.counter
361            .find_keys_by_prefix_prefix_size
362            .with_label_values(&[])
363            .observe(key_prefix.len() as f64);
364        let result = self.store.find_keys_by_prefix(key_prefix).await?;
365        let (num_keys, keys_size) = result
366            .iter()
367            .map(|key| key.len())
368            .fold((0, 0), |(count, size), len| (count + 1, size + len));
369        self.counter
370            .find_keys_by_prefix_num_keys
371            .with_label_values(&[])
372            .observe(num_keys as f64);
373        self.counter
374            .find_keys_by_prefix_keys_size
375            .with_label_values(&[])
376            .observe(keys_size as f64);
377        Ok(result)
378    }
379
380    async fn find_key_values_by_prefix(
381        &self,
382        key_prefix: &[u8],
383    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
384        let _latency = self
385            .counter
386            .find_key_values_by_prefix_latency
387            .measure_latency();
388        self.counter
389            .find_key_values_by_prefix_prefix_size
390            .with_label_values(&[])
391            .observe(key_prefix.len() as f64);
392        let result = self.store.find_key_values_by_prefix(key_prefix).await?;
393        let (num_keys, key_values_size) = result
394            .iter()
395            .map(|(key, value)| key.len() + value.len())
396            .fold((0, 0), |(count, size), len| (count + 1, size + len));
397        self.counter
398            .find_key_values_by_prefix_num_keys
399            .with_label_values(&[])
400            .observe(num_keys as f64);
401        self.counter
402            .find_key_values_by_prefix_key_values_size
403            .with_label_values(&[])
404            .observe(key_values_size as f64);
405        Ok(result)
406    }
407}
408
409impl<K> WritableKeyValueStore for MeteredStore<K>
410where
411    K: WritableKeyValueStore,
412{
413    const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
414
415    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
416        let _latency = self.counter.write_batch_latency.measure_latency();
417        self.counter
418            .write_batch_size
419            .with_label_values(&[])
420            .observe(batch.size() as f64);
421        self.store.write_batch(batch).await
422    }
423
424    async fn clear_journal(&self) -> Result<(), Self::Error> {
425        let _metric = self.counter.clear_journal_latency.measure_latency();
426        self.store.clear_journal().await
427    }
428}
429
430impl<K> AdminKeyValueStore for MeteredStore<K>
431where
432    K: AdminKeyValueStore,
433{
434    type Config = K::Config;
435
436    fn get_name() -> String {
437        K::get_name()
438    }
439
440    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
441        let name = K::get_name();
442        let counter = get_counter(&name);
443        let _latency = counter.connect_latency.measure_latency();
444        let store = K::connect(config, namespace).await?;
445        let counter = get_counter(&name);
446        Ok(Self { counter, store })
447    }
448
449    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
450        let _latency = self.counter.open_exclusive_latency.measure_latency();
451        let store = self.store.open_exclusive(root_key)?;
452        let counter = self.counter.clone();
453        Ok(Self { counter, store })
454    }
455
456    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
457        let name = K::get_name();
458        let counter = get_counter(&name);
459        let _latency = counter.list_all_latency.measure_latency();
460        let namespaces = K::list_all(config).await?;
461        let counter = get_counter(&name);
462        counter
463            .list_all_sizes
464            .with_label_values(&[])
465            .observe(namespaces.len() as f64);
466        Ok(namespaces)
467    }
468
469    async fn list_root_keys(
470        config: &Self::Config,
471        namespace: &str,
472    ) -> Result<Vec<Vec<u8>>, Self::Error> {
473        let name = K::get_name();
474        let counter = get_counter(&name);
475        let _latency = counter.list_root_keys_latency.measure_latency();
476        K::list_root_keys(config, namespace).await
477    }
478
479    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
480        let name = K::get_name();
481        let counter = get_counter(&name);
482        let _latency = counter.delete_all_latency.measure_latency();
483        K::delete_all(config).await
484    }
485
486    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
487        let name = K::get_name();
488        let counter = get_counter(&name);
489        let _latency = counter.exists_latency.measure_latency();
490        let result = K::exists(config, namespace).await?;
491        if result {
492            let counter = get_counter(&name);
493            counter.exists_true_cases.with_label_values(&[]).inc();
494        }
495        Ok(result)
496    }
497
498    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
499        let name = K::get_name();
500        let counter = get_counter(&name);
501        let _latency = counter.create_latency.measure_latency();
502        K::create(config, namespace).await
503    }
504
505    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
506        let name = K::get_name();
507        let counter = get_counter(&name);
508        let _latency = counter.delete_latency.measure_latency();
509        K::delete(config, namespace).await
510    }
511}
512
513#[cfg(with_testing)]
514impl<K> TestKeyValueStore for MeteredStore<K>
515where
516    K: TestKeyValueStore,
517{
518    async fn new_test_config() -> Result<K::Config, Self::Error> {
519        K::new_test_config().await
520    }
521}