linera_storage_service/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    mem,
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        Arc,
9    },
10};
11
12use async_lock::{Semaphore, SemaphoreGuard};
13use futures::future::join_all;
14use linera_base::ensure;
15#[cfg(with_metrics)]
16use linera_views::metering::MeteredDatabase;
17#[cfg(with_testing)]
18use linera_views::store::TestKeyValueDatabase;
19use linera_views::{
20    batch::{Batch, WriteOperation},
21    lru_caching::LruCachingDatabase,
22    store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
23    FutureSyncExt as _,
24};
25use serde::de::DeserializeOwned;
26use tonic::transport::{Channel, Endpoint};
27
28#[cfg(with_testing)]
29use crate::common::storage_service_test_endpoint;
30use crate::{
31    common::{
32        KeyPrefix, StorageServiceStoreError, StorageServiceStoreInternalConfig, MAX_PAYLOAD_SIZE,
33    },
34    key_value_store::{
35        statement::Operation, storage_service_client::StorageServiceClient, KeyValue,
36        KeyValueAppend, ReplyContainsKey, ReplyContainsKeys, ReplyExistsNamespace,
37        ReplyFindKeyValuesByPrefix, ReplyFindKeysByPrefix, ReplyListAll, ReplyListRootKeys,
38        ReplyReadMultiValues, ReplyReadValue, ReplySpecificChunk, RequestContainsKey,
39        RequestContainsKeys, RequestCreateNamespace, RequestDeleteNamespace,
40        RequestExistsNamespace, RequestFindKeyValuesByPrefix, RequestFindKeysByPrefix,
41        RequestListRootKeys, RequestReadMultiValues, RequestReadValue, RequestSpecificChunk,
42        RequestWriteBatchExtended, Statement,
43    },
44};
45
46// The maximum key size is set to 1M rather arbitrarily.
47const MAX_KEY_SIZE: usize = 1000000;
48
49// The shared store client.
50// * Interior mutability is required for the client because
51// accessing requires mutability while the KeyValueStore
52// does not allow it.
53// * The semaphore and max_stream_queries work as other
54// stores.
55//
56// The encoding of namespaces is done by taking their
57// serialization. This works because the set of serialization
58// of strings is prefix free.
59// The data is stored in the following way.
60// * A `key` in a `namespace` is stored as
61//   [`KeyPrefix::Key`] + namespace + key
62// * An additional key with empty value is stored at
63//   [`KeyPrefix::Namespace`] + namespace
64//   is stored to indicate the existence of a namespace.
65// * A key with empty value is stored at
66//   [`KeyPrefix::RootKey`] + namespace + root_key
67//   to indicate the existence of a root key.
68#[derive(Clone)]
69pub struct StorageServiceDatabaseInternal {
70    channel: Channel,
71    semaphore: Option<Arc<Semaphore>>,
72    max_stream_queries: usize,
73    namespace: Vec<u8>,
74}
75
76#[derive(Clone)]
77pub struct StorageServiceStoreInternal {
78    channel: Channel,
79    semaphore: Option<Arc<Semaphore>>,
80    max_stream_queries: usize,
81    prefix_len: usize,
82    start_key: Vec<u8>,
83    root_key_written: Arc<AtomicBool>,
84}
85
86impl WithError for StorageServiceDatabaseInternal {
87    type Error = StorageServiceStoreError;
88}
89
90impl WithError for StorageServiceStoreInternal {
91    type Error = StorageServiceStoreError;
92}
93
94impl ReadableKeyValueStore for StorageServiceStoreInternal {
95    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
96
97    fn max_stream_queries(&self) -> usize {
98        self.max_stream_queries
99    }
100
101    fn root_key(&self) -> Result<Vec<u8>, StorageServiceStoreError> {
102        let root_key = bcs::from_bytes(&self.start_key[self.prefix_len..])?;
103        Ok(root_key)
104    }
105
106    async fn read_value_bytes(
107        &self,
108        key: &[u8],
109    ) -> Result<Option<Vec<u8>>, StorageServiceStoreError> {
110        ensure!(
111            key.len() <= MAX_KEY_SIZE,
112            StorageServiceStoreError::KeyTooLong
113        );
114        let mut full_key = self.start_key.clone();
115        full_key.extend(key);
116        let query = RequestReadValue { key: full_key };
117        let request = tonic::Request::new(query);
118        let channel = self.channel.clone();
119        let mut client = StorageServiceClient::new(channel);
120        let _guard = self.acquire().await;
121        let response = client.process_read_value(request).make_sync().await?;
122        let response = response.into_inner();
123        let ReplyReadValue {
124            value,
125            message_index,
126            num_chunks,
127        } = response;
128        if num_chunks == 0 {
129            Ok(value)
130        } else {
131            self.read_entries(message_index, num_chunks).await
132        }
133    }
134
135    async fn contains_key(&self, key: &[u8]) -> Result<bool, StorageServiceStoreError> {
136        ensure!(
137            key.len() <= MAX_KEY_SIZE,
138            StorageServiceStoreError::KeyTooLong
139        );
140        let mut full_key = self.start_key.clone();
141        full_key.extend(key);
142        let query = RequestContainsKey { key: full_key };
143        let request = tonic::Request::new(query);
144        let channel = self.channel.clone();
145        let mut client = StorageServiceClient::new(channel);
146        let _guard = self.acquire().await;
147        let response = client.process_contains_key(request).make_sync().await?;
148        let response = response.into_inner();
149        let ReplyContainsKey { test } = response;
150        Ok(test)
151    }
152
153    async fn contains_keys(
154        &self,
155        keys: Vec<Vec<u8>>,
156    ) -> Result<Vec<bool>, StorageServiceStoreError> {
157        let mut full_keys = Vec::new();
158        for key in keys {
159            ensure!(
160                key.len() <= MAX_KEY_SIZE,
161                StorageServiceStoreError::KeyTooLong
162            );
163            let mut full_key = self.start_key.clone();
164            full_key.extend(&key);
165            full_keys.push(full_key);
166        }
167        let query = RequestContainsKeys { keys: full_keys };
168        let request = tonic::Request::new(query);
169        let channel = self.channel.clone();
170        let mut client = StorageServiceClient::new(channel);
171        let _guard = self.acquire().await;
172        let response = client.process_contains_keys(request).make_sync().await?;
173        let response = response.into_inner();
174        let ReplyContainsKeys { tests } = response;
175        Ok(tests)
176    }
177
178    async fn read_multi_values_bytes(
179        &self,
180        keys: Vec<Vec<u8>>,
181    ) -> Result<Vec<Option<Vec<u8>>>, StorageServiceStoreError> {
182        let mut full_keys = Vec::new();
183        for key in keys {
184            ensure!(
185                key.len() <= MAX_KEY_SIZE,
186                StorageServiceStoreError::KeyTooLong
187            );
188            let mut full_key = self.start_key.clone();
189            full_key.extend(&key);
190            full_keys.push(full_key);
191        }
192        let query = RequestReadMultiValues { keys: full_keys };
193        let request = tonic::Request::new(query);
194        let channel = self.channel.clone();
195        let mut client = StorageServiceClient::new(channel);
196        let _guard = self.acquire().await;
197        let response = client
198            .process_read_multi_values(request)
199            .make_sync()
200            .await?;
201        let response = response.into_inner();
202        let ReplyReadMultiValues {
203            values,
204            message_index,
205            num_chunks,
206        } = response;
207        if num_chunks == 0 {
208            let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
209            Ok(values)
210        } else {
211            self.read_entries(message_index, num_chunks).await
212        }
213    }
214
215    async fn find_keys_by_prefix(
216        &self,
217        key_prefix: &[u8],
218    ) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
219        ensure!(
220            key_prefix.len() <= MAX_KEY_SIZE,
221            StorageServiceStoreError::KeyTooLong
222        );
223        let mut full_key_prefix = self.start_key.clone();
224        full_key_prefix.extend(key_prefix);
225        let query = RequestFindKeysByPrefix {
226            key_prefix: full_key_prefix,
227        };
228        let request = tonic::Request::new(query);
229        let channel = self.channel.clone();
230        let mut client = StorageServiceClient::new(channel);
231        let _guard = self.acquire().await;
232        let response = client
233            .process_find_keys_by_prefix(request)
234            .make_sync()
235            .await?;
236        let response = response.into_inner();
237        let ReplyFindKeysByPrefix {
238            keys,
239            message_index,
240            num_chunks,
241        } = response;
242        if num_chunks == 0 {
243            Ok(keys)
244        } else {
245            self.read_entries(message_index, num_chunks).await
246        }
247    }
248
249    async fn find_key_values_by_prefix(
250        &self,
251        key_prefix: &[u8],
252    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageServiceStoreError> {
253        ensure!(
254            key_prefix.len() <= MAX_KEY_SIZE,
255            StorageServiceStoreError::KeyTooLong
256        );
257        let mut full_key_prefix = self.start_key.clone();
258        full_key_prefix.extend(key_prefix);
259        let query = RequestFindKeyValuesByPrefix {
260            key_prefix: full_key_prefix,
261        };
262        let request = tonic::Request::new(query);
263        let channel = self.channel.clone();
264        let mut client = StorageServiceClient::new(channel);
265        let _guard = self.acquire().await;
266        let response = client
267            .process_find_key_values_by_prefix(request)
268            .make_sync()
269            .await?;
270        let response = response.into_inner();
271        let ReplyFindKeyValuesByPrefix {
272            key_values,
273            message_index,
274            num_chunks,
275        } = response;
276        if num_chunks == 0 {
277            let key_values = key_values
278                .into_iter()
279                .map(|x| (x.key, x.value))
280                .collect::<Vec<_>>();
281            Ok(key_values)
282        } else {
283            self.read_entries(message_index, num_chunks).await
284        }
285    }
286}
287
288impl WritableKeyValueStore for StorageServiceStoreInternal {
289    const MAX_VALUE_SIZE: usize = usize::MAX;
290
291    async fn write_batch(&self, batch: Batch) -> Result<(), StorageServiceStoreError> {
292        if batch.operations.is_empty() {
293            return Ok(());
294        }
295        let mut statements = Vec::new();
296        let mut chunk_size = 0;
297
298        if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
299            let mut full_key = self.start_key.clone();
300            full_key[0] = KeyPrefix::RootKey as u8;
301            let operation = Operation::Put(KeyValue {
302                key: full_key,
303                value: vec![],
304            });
305            let statement = Statement {
306                operation: Some(operation),
307            };
308            statements.push(statement);
309            chunk_size += self.start_key.len();
310        }
311
312        let bcs_root_key_len = self.start_key.len() - self.prefix_len;
313        for operation in batch.operations {
314            let (key_len, value_len) = match &operation {
315                WriteOperation::Delete { key } => (key.len(), 0),
316                WriteOperation::Put { key, value } => (key.len(), value.len()),
317                WriteOperation::DeletePrefix { key_prefix } => (key_prefix.len(), 0),
318            };
319            let operation_size = key_len + value_len + bcs_root_key_len;
320            ensure!(
321                key_len <= MAX_KEY_SIZE,
322                StorageServiceStoreError::KeyTooLong
323            );
324            if operation_size + chunk_size < MAX_PAYLOAD_SIZE {
325                let statement = self.get_statement(operation);
326                statements.push(statement);
327                chunk_size += operation_size;
328            } else {
329                self.submit_statements(mem::take(&mut statements)).await?;
330                chunk_size = 0;
331                if operation_size > MAX_PAYLOAD_SIZE {
332                    // One single operation is especially big. So split it in chunks.
333                    let WriteOperation::Put { key, value } = operation else {
334                        // Only the put can go over the limit
335                        unreachable!();
336                    };
337                    let mut full_key = self.start_key.clone();
338                    full_key.extend(key);
339                    let value_chunks = value
340                        .chunks(MAX_PAYLOAD_SIZE)
341                        .map(|x| x.to_vec())
342                        .collect::<Vec<_>>();
343                    let num_chunks = value_chunks.len();
344                    for (i_chunk, value) in value_chunks.into_iter().enumerate() {
345                        let last = i_chunk + 1 == num_chunks;
346                        let operation = Operation::Append(KeyValueAppend {
347                            key: full_key.clone(),
348                            value,
349                            last,
350                        });
351                        statements = vec![Statement {
352                            operation: Some(operation),
353                        }];
354                        self.submit_statements(mem::take(&mut statements)).await?;
355                    }
356                } else {
357                    // The operation is small enough, it is just that we have many so we need to split.
358                    let statement = self.get_statement(operation);
359                    statements.push(statement);
360                    chunk_size = operation_size;
361                }
362            }
363        }
364        self.submit_statements(mem::take(&mut statements)).await
365    }
366
367    async fn clear_journal(&self) -> Result<(), StorageServiceStoreError> {
368        Ok(())
369    }
370}
371
372impl StorageServiceStoreInternal {
373    /// Obtains the semaphore lock on the database if needed.
374    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
375        match &self.semaphore {
376            None => None,
377            Some(count) => Some(count.acquire().await),
378        }
379    }
380
381    async fn submit_statements(
382        &self,
383        statements: Vec<Statement>,
384    ) -> Result<(), StorageServiceStoreError> {
385        if !statements.is_empty() {
386            let query = RequestWriteBatchExtended { statements };
387            let request = tonic::Request::new(query);
388            let channel = self.channel.clone();
389            let mut client = StorageServiceClient::new(channel);
390            let _guard = self.acquire().await;
391            let _response = client
392                .process_write_batch_extended(request)
393                .make_sync()
394                .await?;
395        }
396        Ok(())
397    }
398
399    fn get_statement(&self, operation: WriteOperation) -> Statement {
400        let operation = match operation {
401            WriteOperation::Delete { key } => {
402                let mut full_key = self.start_key.clone();
403                full_key.extend(key);
404                Operation::Delete(full_key)
405            }
406            WriteOperation::Put { key, value } => {
407                let mut full_key = self.start_key.clone();
408                full_key.extend(key);
409                Operation::Put(KeyValue {
410                    key: full_key,
411                    value,
412                })
413            }
414            WriteOperation::DeletePrefix { key_prefix } => {
415                let mut full_key_prefix = self.start_key.clone();
416                full_key_prefix.extend(key_prefix);
417                Operation::DeletePrefix(full_key_prefix)
418            }
419        };
420        Statement {
421            operation: Some(operation),
422        }
423    }
424
425    async fn read_single_entry(
426        &self,
427        message_index: i64,
428        index: i32,
429    ) -> Result<Vec<u8>, StorageServiceStoreError> {
430        let channel = self.channel.clone();
431        let query = RequestSpecificChunk {
432            message_index,
433            index,
434        };
435        let request = tonic::Request::new(query);
436        let mut client = StorageServiceClient::new(channel);
437        let response = client.process_specific_chunk(request).make_sync().await?;
438        let response = response.into_inner();
439        let ReplySpecificChunk { chunk } = response;
440        Ok(chunk)
441    }
442
443    async fn read_entries<S: DeserializeOwned>(
444        &self,
445        message_index: i64,
446        num_chunks: i32,
447    ) -> Result<S, StorageServiceStoreError> {
448        let mut handles = Vec::new();
449        for index in 0..num_chunks {
450            let handle = self.read_single_entry(message_index, index);
451            handles.push(handle);
452        }
453        let mut value = Vec::new();
454        for chunk in join_all(handles).await {
455            let chunk = chunk?;
456            value.extend(chunk);
457        }
458        Ok(bcs::from_bytes(&value)?)
459    }
460}
461
462impl KeyValueDatabase for StorageServiceDatabaseInternal {
463    type Config = StorageServiceStoreInternalConfig;
464    type Store = StorageServiceStoreInternal;
465
466    fn get_name() -> String {
467        "service store".to_string()
468    }
469
470    async fn connect(
471        config: &Self::Config,
472        namespace: &str,
473    ) -> Result<Self, StorageServiceStoreError> {
474        let semaphore = config
475            .max_concurrent_queries
476            .map(|n| Arc::new(Semaphore::new(n)));
477        let namespace = bcs::to_bytes(namespace)?;
478        let max_stream_queries = config.max_stream_queries;
479        let endpoint = config.http_address();
480        let endpoint = Endpoint::from_shared(endpoint)?;
481        let channel = endpoint.connect_lazy();
482        Ok(Self {
483            channel,
484            semaphore,
485            max_stream_queries,
486            namespace,
487        })
488    }
489
490    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, StorageServiceStoreError> {
491        let channel = self.channel.clone();
492        let semaphore = self.semaphore.clone();
493        let max_stream_queries = self.max_stream_queries;
494        let mut start_key = vec![KeyPrefix::Key as u8];
495        start_key.extend(&self.namespace);
496        start_key.extend(bcs::to_bytes(root_key)?);
497        let prefix_len = self.namespace.len() + 1;
498        Ok(StorageServiceStoreInternal {
499            channel,
500            semaphore,
501            max_stream_queries,
502            prefix_len,
503            start_key,
504            root_key_written: Arc::new(AtomicBool::new(false)),
505        })
506    }
507
508    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
509        self.open_shared(root_key)
510    }
511
512    async fn list_all(config: &Self::Config) -> Result<Vec<String>, StorageServiceStoreError> {
513        let endpoint = config.http_address();
514        let endpoint = Endpoint::from_shared(endpoint)?;
515        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
516        let response = client.process_list_all(()).make_sync().await?;
517        let response = response.into_inner();
518        let ReplyListAll { namespaces } = response;
519        let namespaces = namespaces
520            .into_iter()
521            .map(|x| bcs::from_bytes(&x))
522            .collect::<Result<_, _>>()?;
523        Ok(namespaces)
524    }
525
526    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
527        let query = RequestListRootKeys {
528            namespace: self.namespace.clone(),
529        };
530        let request = tonic::Request::new(query);
531        let mut client = StorageServiceClient::new(self.channel.clone());
532        let response = client.process_list_root_keys(request).make_sync().await?;
533        let response = response.into_inner();
534        let ReplyListRootKeys { root_keys } = response;
535        Ok(root_keys)
536    }
537
538    async fn delete_all(config: &Self::Config) -> Result<(), StorageServiceStoreError> {
539        let endpoint = config.http_address();
540        let endpoint = Endpoint::from_shared(endpoint)?;
541        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
542        let _response = client.process_delete_all(()).make_sync().await?;
543        Ok(())
544    }
545
546    async fn exists(
547        config: &Self::Config,
548        namespace: &str,
549    ) -> Result<bool, StorageServiceStoreError> {
550        let namespace = bcs::to_bytes(namespace)?;
551        let query = RequestExistsNamespace { namespace };
552        let request = tonic::Request::new(query);
553        let endpoint = config.http_address();
554        let endpoint = Endpoint::from_shared(endpoint)?;
555        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
556        let response = client.process_exists_namespace(request).make_sync().await?;
557        let response = response.into_inner();
558        let ReplyExistsNamespace { exists } = response;
559        Ok(exists)
560    }
561
562    async fn create(
563        config: &Self::Config,
564        namespace: &str,
565    ) -> Result<(), StorageServiceStoreError> {
566        if StorageServiceDatabaseInternal::exists(config, namespace).await? {
567            return Err(StorageServiceStoreError::StoreAlreadyExists);
568        }
569        let namespace = bcs::to_bytes(namespace)?;
570        let query = RequestCreateNamespace { namespace };
571        let request = tonic::Request::new(query);
572        let endpoint = config.http_address();
573        let endpoint = Endpoint::from_shared(endpoint)?;
574        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
575        let _response = client.process_create_namespace(request).make_sync().await?;
576        Ok(())
577    }
578
579    async fn delete(
580        config: &Self::Config,
581        namespace: &str,
582    ) -> Result<(), StorageServiceStoreError> {
583        let namespace = bcs::to_bytes(namespace)?;
584        let query = RequestDeleteNamespace { namespace };
585        let request = tonic::Request::new(query);
586        let endpoint = config.http_address();
587        let endpoint = Endpoint::from_shared(endpoint)?;
588        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
589        let _response = client.process_delete_namespace(request).make_sync().await?;
590        Ok(())
591    }
592}
593
594#[cfg(with_testing)]
595impl TestKeyValueDatabase for StorageServiceDatabaseInternal {
596    async fn new_test_config() -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError>
597    {
598        let endpoint = storage_service_test_endpoint()?;
599        service_config_from_endpoint(&endpoint)
600    }
601}
602
603/// Creates a `StorageServiceStoreConfig` from an endpoint.
604pub fn service_config_from_endpoint(
605    endpoint: &str,
606) -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError> {
607    Ok(StorageServiceStoreInternalConfig {
608        endpoint: endpoint.to_string(),
609        max_concurrent_queries: None,
610        max_stream_queries: 100,
611    })
612}
613
614/// Checks that endpoint is truly absent.
615pub async fn storage_service_check_absence(
616    endpoint: &str,
617) -> Result<bool, StorageServiceStoreError> {
618    let endpoint = Endpoint::from_shared(endpoint.to_string())?;
619    let result = StorageServiceClient::connect(endpoint).await;
620    Ok(result.is_err())
621}
622
623/// Checks whether an endpoint is valid or not.
624pub async fn storage_service_check_validity(
625    endpoint: &str,
626) -> Result<(), StorageServiceStoreError> {
627    let config = service_config_from_endpoint(endpoint).unwrap();
628    let namespace = "namespace";
629    let database = StorageServiceDatabaseInternal::connect(&config, namespace).await?;
630    let store = database.open_shared(&[])?;
631    let _value = store.read_value_bytes(&[42]).await?;
632    Ok(())
633}
634
635/// The service database client with metrics
636#[cfg(with_metrics)]
637pub type StorageServiceDatabase =
638    MeteredDatabase<LruCachingDatabase<MeteredDatabase<StorageServiceDatabaseInternal>>>;
639
640/// The service database client without metrics
641#[cfg(not(with_metrics))]
642pub type StorageServiceDatabase = LruCachingDatabase<StorageServiceDatabaseInternal>;