Skip to main content

linera_storage_service/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    mem,
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        Arc,
9    },
10};
11
12use async_lock::{Semaphore, SemaphoreGuard};
13use futures::future::join_all;
14use linera_base::{ensure, util::future::FutureSyncExt as _};
15#[cfg(with_metrics)]
16use linera_views::metering::MeteredDatabase;
17#[cfg(with_testing)]
18use linera_views::store::TestKeyValueDatabase;
19use linera_views::{
20    batch::{Batch, WriteOperation},
21    lru_caching::LruCachingDatabase,
22    store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
23};
24use serde::de::DeserializeOwned;
25use tonic::transport::{Channel, Endpoint};
26
27#[cfg(with_testing)]
28use crate::common::storage_service_test_endpoint;
29use crate::{
30    common::{
31        KeyPrefix, StorageServiceStoreError, StorageServiceStoreInternalConfig, MAX_PAYLOAD_SIZE,
32    },
33    key_value_store::{
34        statement::Operation, storage_service_client::StorageServiceClient, KeyValue,
35        KeyValueAppend, ReplyContainsKey, ReplyContainsKeys, ReplyExistsNamespace,
36        ReplyFindKeyValuesByPrefix, ReplyFindKeysByPrefix, ReplyListAll, ReplyListRootKeys,
37        ReplyReadMultiValues, ReplyReadValue, ReplySpecificChunk, RequestContainsKey,
38        RequestContainsKeys, RequestCreateNamespace, RequestDeleteNamespace,
39        RequestExistsNamespace, RequestFindKeyValuesByPrefix, RequestFindKeysByPrefix,
40        RequestListRootKeys, RequestReadMultiValues, RequestReadValue, RequestSpecificChunk,
41        RequestWriteBatchExtended, Statement,
42    },
43};
44
45// The maximum key size is set to 1M rather arbitrarily.
46const MAX_KEY_SIZE: usize = 1000000;
47
48// The shared store client.
49// * Interior mutability is required for the client because
50// accessing requires mutability while the KeyValueStore
51// does not allow it.
52// * The semaphore and max_stream_queries work as other
53// stores.
54//
55// The encoding of namespaces is done by taking their
56// serialization. This works because the set of serialization
57// of strings is prefix free.
58// The data is stored in the following way.
59// * A `key` in a `namespace` is stored as
60//   [`KeyPrefix::Key`] + namespace + key
61// * An additional key with empty value is stored at
62//   [`KeyPrefix::Namespace`] + namespace
63//   is stored to indicate the existence of a namespace.
64// * A key with empty value is stored at
65//   [`KeyPrefix::RootKey`] + namespace + root_key
66//   to indicate the existence of a root key.
67/// A database handle connected to a remote storage service.
68#[derive(Clone)]
69pub struct StorageServiceDatabaseInternal {
70    channel: Channel,
71    semaphore: Option<Arc<Semaphore>>,
72    max_stream_queries: usize,
73    namespace: Vec<u8>,
74}
75
76/// A store within a namespace and root key, backed by a remote storage service.
77#[derive(Clone)]
78pub struct StorageServiceStoreInternal {
79    channel: Channel,
80    semaphore: Option<Arc<Semaphore>>,
81    max_stream_queries: usize,
82    prefix_len: usize,
83    start_key: Vec<u8>,
84    root_key_written: Arc<AtomicBool>,
85}
86
87impl WithError for StorageServiceDatabaseInternal {
88    type Error = StorageServiceStoreError;
89}
90
91impl WithError for StorageServiceStoreInternal {
92    type Error = StorageServiceStoreError;
93}
94
95impl ReadableKeyValueStore for StorageServiceStoreInternal {
96    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
97
98    fn max_stream_queries(&self) -> usize {
99        self.max_stream_queries
100    }
101
102    fn root_key(&self) -> Result<Vec<u8>, StorageServiceStoreError> {
103        let root_key = bcs::from_bytes(&self.start_key[self.prefix_len..])?;
104        Ok(root_key)
105    }
106
107    async fn read_value_bytes(
108        &self,
109        key: &[u8],
110    ) -> Result<Option<Vec<u8>>, StorageServiceStoreError> {
111        ensure!(
112            key.len() <= MAX_KEY_SIZE,
113            StorageServiceStoreError::KeyTooLong
114        );
115        let mut full_key = self.start_key.clone();
116        full_key.extend(key);
117        let query = RequestReadValue { key: full_key };
118        let request = tonic::Request::new(query);
119        let channel = self.channel.clone();
120        let mut client = StorageServiceClient::new(channel);
121        let _guard = self.acquire().await;
122        let response = client.process_read_value(request).make_sync().await?;
123        let response = response.into_inner();
124        let ReplyReadValue {
125            value,
126            message_index,
127            num_chunks,
128        } = response;
129        if num_chunks == 0 {
130            Ok(value)
131        } else {
132            self.read_entries(message_index, num_chunks).await
133        }
134    }
135
136    async fn contains_key(&self, key: &[u8]) -> Result<bool, StorageServiceStoreError> {
137        ensure!(
138            key.len() <= MAX_KEY_SIZE,
139            StorageServiceStoreError::KeyTooLong
140        );
141        let mut full_key = self.start_key.clone();
142        full_key.extend(key);
143        let query = RequestContainsKey { key: full_key };
144        let request = tonic::Request::new(query);
145        let channel = self.channel.clone();
146        let mut client = StorageServiceClient::new(channel);
147        let _guard = self.acquire().await;
148        let response = client.process_contains_key(request).make_sync().await?;
149        let response = response.into_inner();
150        let ReplyContainsKey { test } = response;
151        Ok(test)
152    }
153
154    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, StorageServiceStoreError> {
155        let mut full_keys = Vec::new();
156        for key in keys {
157            ensure!(
158                key.len() <= MAX_KEY_SIZE,
159                StorageServiceStoreError::KeyTooLong
160            );
161            let mut full_key = self.start_key.clone();
162            full_key.extend(key);
163            full_keys.push(full_key);
164        }
165        let query = RequestContainsKeys { keys: full_keys };
166        let request = tonic::Request::new(query);
167        let channel = self.channel.clone();
168        let mut client = StorageServiceClient::new(channel);
169        let _guard = self.acquire().await;
170        let response = client.process_contains_keys(request).make_sync().await?;
171        let response = response.into_inner();
172        let ReplyContainsKeys { tests } = response;
173        Ok(tests)
174    }
175
176    async fn read_multi_values_bytes(
177        &self,
178        keys: &[Vec<u8>],
179    ) -> Result<Vec<Option<Vec<u8>>>, StorageServiceStoreError> {
180        let mut full_keys = Vec::new();
181        for key in keys {
182            ensure!(
183                key.len() <= MAX_KEY_SIZE,
184                StorageServiceStoreError::KeyTooLong
185            );
186            let mut full_key = self.start_key.clone();
187            full_key.extend(key);
188            full_keys.push(full_key);
189        }
190        let query = RequestReadMultiValues { keys: full_keys };
191        let request = tonic::Request::new(query);
192        let channel = self.channel.clone();
193        let mut client = StorageServiceClient::new(channel);
194        let _guard = self.acquire().await;
195        let response = client
196            .process_read_multi_values(request)
197            .make_sync()
198            .await?;
199        let response = response.into_inner();
200        let ReplyReadMultiValues {
201            values,
202            message_index,
203            num_chunks,
204        } = response;
205        if num_chunks == 0 {
206            let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
207            Ok(values)
208        } else {
209            self.read_entries(message_index, num_chunks).await
210        }
211    }
212
213    async fn find_keys_by_prefix(
214        &self,
215        key_prefix: &[u8],
216    ) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
217        ensure!(
218            key_prefix.len() <= MAX_KEY_SIZE,
219            StorageServiceStoreError::KeyTooLong
220        );
221        let mut full_key_prefix = self.start_key.clone();
222        full_key_prefix.extend(key_prefix);
223        let query = RequestFindKeysByPrefix {
224            key_prefix: full_key_prefix,
225        };
226        let request = tonic::Request::new(query);
227        let channel = self.channel.clone();
228        let mut client = StorageServiceClient::new(channel);
229        let _guard = self.acquire().await;
230        let response = client
231            .process_find_keys_by_prefix(request)
232            .make_sync()
233            .await?;
234        let response = response.into_inner();
235        let ReplyFindKeysByPrefix {
236            keys,
237            message_index,
238            num_chunks,
239        } = response;
240        if num_chunks == 0 {
241            Ok(keys)
242        } else {
243            self.read_entries(message_index, num_chunks).await
244        }
245    }
246
247    async fn find_key_values_by_prefix(
248        &self,
249        key_prefix: &[u8],
250    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageServiceStoreError> {
251        ensure!(
252            key_prefix.len() <= MAX_KEY_SIZE,
253            StorageServiceStoreError::KeyTooLong
254        );
255        let mut full_key_prefix = self.start_key.clone();
256        full_key_prefix.extend(key_prefix);
257        let query = RequestFindKeyValuesByPrefix {
258            key_prefix: full_key_prefix,
259        };
260        let request = tonic::Request::new(query);
261        let channel = self.channel.clone();
262        let mut client = StorageServiceClient::new(channel);
263        let _guard = self.acquire().await;
264        let response = client
265            .process_find_key_values_by_prefix(request)
266            .make_sync()
267            .await?;
268        let response = response.into_inner();
269        let ReplyFindKeyValuesByPrefix {
270            key_values,
271            message_index,
272            num_chunks,
273        } = response;
274        if num_chunks == 0 {
275            let key_values = key_values
276                .into_iter()
277                .map(|x| (x.key, x.value))
278                .collect::<Vec<_>>();
279            Ok(key_values)
280        } else {
281            self.read_entries(message_index, num_chunks).await
282        }
283    }
284}
285
286impl WritableKeyValueStore for StorageServiceStoreInternal {
287    const MAX_VALUE_SIZE: usize = usize::MAX;
288
289    async fn write_batch(&self, batch: Batch) -> Result<(), StorageServiceStoreError> {
290        if batch.operations.is_empty() {
291            return Ok(());
292        }
293        let mut statements = Vec::new();
294        let mut chunk_size = 0;
295
296        if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
297            let mut full_key = self.start_key.clone();
298            full_key[0] = KeyPrefix::RootKey as u8;
299            let operation = Operation::Put(KeyValue {
300                key: full_key,
301                value: vec![],
302            });
303            let statement = Statement {
304                operation: Some(operation),
305            };
306            statements.push(statement);
307            chunk_size += self.start_key.len();
308        }
309
310        let bcs_root_key_len = self.start_key.len() - self.prefix_len;
311        for operation in batch.operations {
312            let (key_len, value_len) = match &operation {
313                WriteOperation::Delete { key } => (key.len(), 0),
314                WriteOperation::Put { key, value } => (key.len(), value.len()),
315                WriteOperation::DeletePrefix { key_prefix } => (key_prefix.len(), 0),
316            };
317            let operation_size = key_len + value_len + bcs_root_key_len;
318            ensure!(
319                key_len <= MAX_KEY_SIZE,
320                StorageServiceStoreError::KeyTooLong
321            );
322            if operation_size + chunk_size < MAX_PAYLOAD_SIZE {
323                let statement = self.get_statement(operation);
324                statements.push(statement);
325                chunk_size += operation_size;
326            } else {
327                self.submit_statements(mem::take(&mut statements)).await?;
328                chunk_size = 0;
329                if operation_size > MAX_PAYLOAD_SIZE {
330                    // One single operation is especially big. So split it in chunks.
331                    let WriteOperation::Put { key, value } = operation else {
332                        // Only the put can go over the limit
333                        unreachable!();
334                    };
335                    let mut full_key = self.start_key.clone();
336                    full_key.extend(key);
337                    let value_chunks = value
338                        .chunks(MAX_PAYLOAD_SIZE)
339                        .map(|x| x.to_vec())
340                        .collect::<Vec<_>>();
341                    let num_chunks = value_chunks.len();
342                    for (i_chunk, value) in value_chunks.into_iter().enumerate() {
343                        let last = i_chunk + 1 == num_chunks;
344                        let operation = Operation::Append(KeyValueAppend {
345                            key: full_key.clone(),
346                            value,
347                            last,
348                        });
349                        statements = vec![Statement {
350                            operation: Some(operation),
351                        }];
352                        self.submit_statements(mem::take(&mut statements)).await?;
353                    }
354                } else {
355                    // The operation is small enough, it is just that we have many so we need to split.
356                    let statement = self.get_statement(operation);
357                    statements.push(statement);
358                    chunk_size = operation_size;
359                }
360            }
361        }
362        self.submit_statements(mem::take(&mut statements)).await
363    }
364
365    async fn clear_journal(&self) -> Result<(), StorageServiceStoreError> {
366        Ok(())
367    }
368}
369
370impl StorageServiceStoreInternal {
371    /// Obtains the semaphore lock on the database if needed.
372    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
373        match &self.semaphore {
374            None => None,
375            Some(count) => Some(count.acquire().await),
376        }
377    }
378
379    async fn submit_statements(
380        &self,
381        statements: Vec<Statement>,
382    ) -> Result<(), StorageServiceStoreError> {
383        if !statements.is_empty() {
384            let query = RequestWriteBatchExtended { statements };
385            let request = tonic::Request::new(query);
386            let channel = self.channel.clone();
387            let mut client = StorageServiceClient::new(channel);
388            let _guard = self.acquire().await;
389            let _response = client
390                .process_write_batch_extended(request)
391                .make_sync()
392                .await?;
393        }
394        Ok(())
395    }
396
397    fn get_statement(&self, operation: WriteOperation) -> Statement {
398        let operation = match operation {
399            WriteOperation::Delete { key } => {
400                let mut full_key = self.start_key.clone();
401                full_key.extend(key);
402                Operation::Delete(full_key)
403            }
404            WriteOperation::Put { key, value } => {
405                let mut full_key = self.start_key.clone();
406                full_key.extend(key);
407                Operation::Put(KeyValue {
408                    key: full_key,
409                    value,
410                })
411            }
412            WriteOperation::DeletePrefix { key_prefix } => {
413                let mut full_key_prefix = self.start_key.clone();
414                full_key_prefix.extend(key_prefix);
415                Operation::DeletePrefix(full_key_prefix)
416            }
417        };
418        Statement {
419            operation: Some(operation),
420        }
421    }
422
423    async fn read_single_entry(
424        &self,
425        message_index: i64,
426        index: i32,
427    ) -> Result<Vec<u8>, StorageServiceStoreError> {
428        let channel = self.channel.clone();
429        let query = RequestSpecificChunk {
430            message_index,
431            index,
432        };
433        let request = tonic::Request::new(query);
434        let mut client = StorageServiceClient::new(channel);
435        let response = client.process_specific_chunk(request).make_sync().await?;
436        let response = response.into_inner();
437        let ReplySpecificChunk { chunk } = response;
438        Ok(chunk)
439    }
440
441    async fn read_entries<S: DeserializeOwned>(
442        &self,
443        message_index: i64,
444        num_chunks: i32,
445    ) -> Result<S, StorageServiceStoreError> {
446        let mut handles = Vec::new();
447        for index in 0..num_chunks {
448            let handle = self.read_single_entry(message_index, index);
449            handles.push(handle);
450        }
451        let mut value = Vec::new();
452        for chunk in join_all(handles).await {
453            let chunk = chunk?;
454            value.extend(chunk);
455        }
456        Ok(bcs::from_bytes(&value)?)
457    }
458}
459
460impl KeyValueDatabase for StorageServiceDatabaseInternal {
461    type Config = StorageServiceStoreInternalConfig;
462    type Store = StorageServiceStoreInternal;
463
464    fn get_name() -> String {
465        "service store".to_string()
466    }
467
468    async fn connect(
469        config: &Self::Config,
470        namespace: &str,
471    ) -> Result<Self, StorageServiceStoreError> {
472        let semaphore = config
473            .max_concurrent_queries
474            .map(|n| Arc::new(Semaphore::new(n)));
475        let namespace = bcs::to_bytes(namespace)?;
476        let max_stream_queries = config.max_stream_queries;
477        let endpoint = config.http_address();
478        let endpoint = Endpoint::from_shared(endpoint)?;
479        let channel = endpoint.connect_lazy();
480        Ok(Self {
481            channel,
482            semaphore,
483            max_stream_queries,
484            namespace,
485        })
486    }
487
488    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, StorageServiceStoreError> {
489        let channel = self.channel.clone();
490        let semaphore = self.semaphore.clone();
491        let max_stream_queries = self.max_stream_queries;
492        let mut start_key = vec![KeyPrefix::Key as u8];
493        start_key.extend(&self.namespace);
494        start_key.extend(bcs::to_bytes(root_key)?);
495        let prefix_len = self.namespace.len() + 1;
496        Ok(StorageServiceStoreInternal {
497            channel,
498            semaphore,
499            max_stream_queries,
500            prefix_len,
501            start_key,
502            root_key_written: Arc::new(AtomicBool::new(false)),
503        })
504    }
505
506    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
507        self.open_shared(root_key)
508    }
509
510    async fn list_all(config: &Self::Config) -> Result<Vec<String>, StorageServiceStoreError> {
511        let endpoint = config.http_address();
512        let endpoint = Endpoint::from_shared(endpoint)?;
513        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
514        let response = client.process_list_all(()).make_sync().await?;
515        let response = response.into_inner();
516        let ReplyListAll { namespaces } = response;
517        let namespaces = namespaces
518            .into_iter()
519            .map(|x| bcs::from_bytes(&x))
520            .collect::<Result<_, _>>()?;
521        Ok(namespaces)
522    }
523
524    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
525        let query = RequestListRootKeys {
526            namespace: self.namespace.clone(),
527        };
528        let request = tonic::Request::new(query);
529        let mut client = StorageServiceClient::new(self.channel.clone());
530        let response = client.process_list_root_keys(request).make_sync().await?;
531        let response = response.into_inner();
532        let ReplyListRootKeys { root_keys } = response;
533        Ok(root_keys)
534    }
535
536    async fn delete_all(config: &Self::Config) -> Result<(), StorageServiceStoreError> {
537        let endpoint = config.http_address();
538        let endpoint = Endpoint::from_shared(endpoint)?;
539        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
540        let _response = client.process_delete_all(()).make_sync().await?;
541        Ok(())
542    }
543
544    async fn exists(
545        config: &Self::Config,
546        namespace: &str,
547    ) -> Result<bool, StorageServiceStoreError> {
548        let namespace = bcs::to_bytes(namespace)?;
549        let query = RequestExistsNamespace { namespace };
550        let request = tonic::Request::new(query);
551        let endpoint = config.http_address();
552        let endpoint = Endpoint::from_shared(endpoint)?;
553        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
554        let response = client.process_exists_namespace(request).make_sync().await?;
555        let response = response.into_inner();
556        let ReplyExistsNamespace { exists } = response;
557        Ok(exists)
558    }
559
560    async fn create(
561        config: &Self::Config,
562        namespace: &str,
563    ) -> Result<(), StorageServiceStoreError> {
564        if StorageServiceDatabaseInternal::exists(config, namespace).await? {
565            return Err(StorageServiceStoreError::StoreAlreadyExists);
566        }
567        let namespace = bcs::to_bytes(namespace)?;
568        let query = RequestCreateNamespace { namespace };
569        let request = tonic::Request::new(query);
570        let endpoint = config.http_address();
571        let endpoint = Endpoint::from_shared(endpoint)?;
572        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
573        let _response = client.process_create_namespace(request).make_sync().await?;
574        Ok(())
575    }
576
577    async fn delete(
578        config: &Self::Config,
579        namespace: &str,
580    ) -> Result<(), StorageServiceStoreError> {
581        let namespace = bcs::to_bytes(namespace)?;
582        let query = RequestDeleteNamespace { namespace };
583        let request = tonic::Request::new(query);
584        let endpoint = config.http_address();
585        let endpoint = Endpoint::from_shared(endpoint)?;
586        let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
587        let _response = client.process_delete_namespace(request).make_sync().await?;
588        Ok(())
589    }
590}
591
592#[cfg(with_testing)]
593impl TestKeyValueDatabase for StorageServiceDatabaseInternal {
594    async fn new_test_config() -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError>
595    {
596        let endpoint = storage_service_test_endpoint()?;
597        service_config_from_endpoint(&endpoint)
598    }
599}
600
601/// Creates a `StorageServiceStoreConfig` from an endpoint.
602pub fn service_config_from_endpoint(
603    endpoint: &str,
604) -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError> {
605    Ok(StorageServiceStoreInternalConfig {
606        endpoint: endpoint.to_string(),
607        max_concurrent_queries: None,
608        max_stream_queries: 100,
609    })
610}
611
612/// Checks that endpoint is truly absent.
613pub async fn storage_service_check_absence(
614    endpoint: &str,
615) -> Result<bool, StorageServiceStoreError> {
616    let endpoint = Endpoint::from_shared(endpoint.to_string())?;
617    let result = StorageServiceClient::connect(endpoint).await;
618    Ok(result.is_err())
619}
620
621/// Checks whether an endpoint is valid or not.
622pub async fn storage_service_check_validity(
623    endpoint: &str,
624) -> Result<(), StorageServiceStoreError> {
625    let config = service_config_from_endpoint(endpoint).unwrap();
626    let namespace = "namespace";
627    let database = StorageServiceDatabaseInternal::connect(&config, namespace).await?;
628    let store = database.open_shared(&[])?;
629    let _value = store.read_value_bytes(&[42]).await?;
630    Ok(())
631}
632
633/// The service database client with metrics
634#[cfg(with_metrics)]
635pub type StorageServiceDatabase =
636    MeteredDatabase<LruCachingDatabase<MeteredDatabase<StorageServiceDatabaseInternal>>>;
637
638/// The service database client without metrics
639#[cfg(not(with_metrics))]
640pub type StorageServiceDatabase = LruCachingDatabase<StorageServiceDatabaseInternal>;