linera_storage_service/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    mem,
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        Arc,
9    },
10};
11
12use async_lock::{Semaphore, SemaphoreGuard};
13use futures::future::join_all;
14use linera_base::ensure;
15#[cfg(with_metrics)]
16use linera_views::metering::MeteredStore;
17#[cfg(with_testing)]
18use linera_views::store::TestKeyValueStore;
19use linera_views::{
20    batch::{Batch, WriteOperation},
21    lru_caching::LruCachingStore,
22    store::{
23        AdminKeyValueStore, CommonStoreInternalConfig, ReadableKeyValueStore, WithError,
24        WritableKeyValueStore,
25    },
26    FutureSyncExt,
27};
28use serde::de::DeserializeOwned;
29use tonic::transport::{Channel, Endpoint};
30
31#[cfg(with_testing)]
32use crate::common::storage_service_test_endpoint;
33use crate::{
34    common::{KeyPrefix, ServiceStoreError, ServiceStoreInternalConfig, MAX_PAYLOAD_SIZE},
35    key_value_store::{
36        statement::Operation, store_processor_client::StoreProcessorClient, KeyValue,
37        KeyValueAppend, ReplyContainsKey, ReplyContainsKeys, ReplyExistsNamespace,
38        ReplyFindKeyValuesByPrefix, ReplyFindKeysByPrefix, ReplyListAll, ReplyListRootKeys,
39        ReplyReadMultiValues, ReplyReadValue, ReplySpecificChunk, RequestContainsKey,
40        RequestContainsKeys, RequestCreateNamespace, RequestDeleteNamespace,
41        RequestExistsNamespace, RequestFindKeyValuesByPrefix, RequestFindKeysByPrefix,
42        RequestListRootKeys, RequestReadMultiValues, RequestReadValue, RequestSpecificChunk,
43        RequestWriteBatchExtended, Statement,
44    },
45};
46
47// The maximum key size is set to 1M rather arbitrarily.
48const MAX_KEY_SIZE: usize = 1000000;
49
50// The shared store client.
51// * Interior mutability is required for the client because
52// accessing requires mutability while the KeyValueStore
53// does not allow it.
54// * The semaphore and max_stream_queries work as other
55// stores.
56//
57// The encoding of namespaces is done by taking their
58// serialization. This works because the set of serialization
59// of strings is prefix free.
60// The data is stored in the following way.
61// * A `key` in a `namespace` is stored as
62//   [`KeyPrefix::Key`] + namespace + key
63// * An additional key with empty value is stored at
64//   [`KeyPrefix::Namespace`] + namespace
65//   is stored to indicate the existence of a namespace.
66// * A key with empty value is stored at
67//   [`KeyPrefix::RootKey`] + namespace + root_key
68//   to indicate the existence of a root key.
69#[derive(Clone)]
70pub struct ServiceStoreClientInternal {
71    channel: Channel,
72    semaphore: Option<Arc<Semaphore>>,
73    max_stream_queries: usize,
74    prefix_len: usize,
75    start_key: Vec<u8>,
76    root_key_written: Arc<AtomicBool>,
77}
78
79impl WithError for ServiceStoreClientInternal {
80    type Error = ServiceStoreError;
81}
82
83impl ReadableKeyValueStore for ServiceStoreClientInternal {
84    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
85    type Keys = Vec<Vec<u8>>;
86    type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
87
88    fn max_stream_queries(&self) -> usize {
89        self.max_stream_queries
90    }
91
92    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ServiceStoreError> {
93        ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
94        let mut full_key = self.start_key.clone();
95        full_key.extend(key);
96        let query = RequestReadValue { key: full_key };
97        let request = tonic::Request::new(query);
98        let channel = self.channel.clone();
99        let mut client = StoreProcessorClient::new(channel);
100        let _guard = self.acquire().await;
101        let response = client.process_read_value(request).make_sync().await?;
102        let response = response.into_inner();
103        let ReplyReadValue {
104            value,
105            message_index,
106            num_chunks,
107        } = response;
108        if num_chunks == 0 {
109            Ok(value)
110        } else {
111            self.read_entries(message_index, num_chunks).await
112        }
113    }
114
115    async fn contains_key(&self, key: &[u8]) -> Result<bool, ServiceStoreError> {
116        ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
117        let mut full_key = self.start_key.clone();
118        full_key.extend(key);
119        let query = RequestContainsKey { key: full_key };
120        let request = tonic::Request::new(query);
121        let channel = self.channel.clone();
122        let mut client = StoreProcessorClient::new(channel);
123        let _guard = self.acquire().await;
124        let response = client.process_contains_key(request).make_sync().await?;
125        let response = response.into_inner();
126        let ReplyContainsKey { test } = response;
127        Ok(test)
128    }
129
130    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, ServiceStoreError> {
131        let mut full_keys = Vec::new();
132        for key in keys {
133            ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
134            let mut full_key = self.start_key.clone();
135            full_key.extend(&key);
136            full_keys.push(full_key);
137        }
138        let query = RequestContainsKeys { keys: full_keys };
139        let request = tonic::Request::new(query);
140        let channel = self.channel.clone();
141        let mut client = StoreProcessorClient::new(channel);
142        let _guard = self.acquire().await;
143        let response = client.process_contains_keys(request).make_sync().await?;
144        let response = response.into_inner();
145        let ReplyContainsKeys { tests } = response;
146        Ok(tests)
147    }
148
149    async fn read_multi_values_bytes(
150        &self,
151        keys: Vec<Vec<u8>>,
152    ) -> Result<Vec<Option<Vec<u8>>>, ServiceStoreError> {
153        let mut full_keys = Vec::new();
154        for key in keys {
155            ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
156            let mut full_key = self.start_key.clone();
157            full_key.extend(&key);
158            full_keys.push(full_key);
159        }
160        let query = RequestReadMultiValues { keys: full_keys };
161        let request = tonic::Request::new(query);
162        let channel = self.channel.clone();
163        let mut client = StoreProcessorClient::new(channel);
164        let _guard = self.acquire().await;
165        let response = client
166            .process_read_multi_values(request)
167            .make_sync()
168            .await?;
169        let response = response.into_inner();
170        let ReplyReadMultiValues {
171            values,
172            message_index,
173            num_chunks,
174        } = response;
175        if num_chunks == 0 {
176            let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
177            Ok(values)
178        } else {
179            self.read_entries(message_index, num_chunks).await
180        }
181    }
182
183    async fn find_keys_by_prefix(
184        &self,
185        key_prefix: &[u8],
186    ) -> Result<Vec<Vec<u8>>, ServiceStoreError> {
187        ensure!(
188            key_prefix.len() <= MAX_KEY_SIZE,
189            ServiceStoreError::KeyTooLong
190        );
191        let mut full_key_prefix = self.start_key.clone();
192        full_key_prefix.extend(key_prefix);
193        let query = RequestFindKeysByPrefix {
194            key_prefix: full_key_prefix,
195        };
196        let request = tonic::Request::new(query);
197        let channel = self.channel.clone();
198        let mut client = StoreProcessorClient::new(channel);
199        let _guard = self.acquire().await;
200        let response = client
201            .process_find_keys_by_prefix(request)
202            .make_sync()
203            .await?;
204        let response = response.into_inner();
205        let ReplyFindKeysByPrefix {
206            keys,
207            message_index,
208            num_chunks,
209        } = response;
210        if num_chunks == 0 {
211            Ok(keys)
212        } else {
213            self.read_entries(message_index, num_chunks).await
214        }
215    }
216
217    async fn find_key_values_by_prefix(
218        &self,
219        key_prefix: &[u8],
220    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ServiceStoreError> {
221        ensure!(
222            key_prefix.len() <= MAX_KEY_SIZE,
223            ServiceStoreError::KeyTooLong
224        );
225        let mut full_key_prefix = self.start_key.clone();
226        full_key_prefix.extend(key_prefix);
227        let query = RequestFindKeyValuesByPrefix {
228            key_prefix: full_key_prefix,
229        };
230        let request = tonic::Request::new(query);
231        let channel = self.channel.clone();
232        let mut client = StoreProcessorClient::new(channel);
233        let _guard = self.acquire().await;
234        let response = client
235            .process_find_key_values_by_prefix(request)
236            .make_sync()
237            .await?;
238        let response = response.into_inner();
239        let ReplyFindKeyValuesByPrefix {
240            key_values,
241            message_index,
242            num_chunks,
243        } = response;
244        if num_chunks == 0 {
245            let key_values = key_values
246                .into_iter()
247                .map(|x| (x.key, x.value))
248                .collect::<Vec<_>>();
249            Ok(key_values)
250        } else {
251            self.read_entries(message_index, num_chunks).await
252        }
253    }
254}
255
256impl WritableKeyValueStore for ServiceStoreClientInternal {
257    const MAX_VALUE_SIZE: usize = usize::MAX;
258
259    async fn write_batch(&self, batch: Batch) -> Result<(), ServiceStoreError> {
260        if batch.operations.is_empty() {
261            return Ok(());
262        }
263        let mut statements = Vec::new();
264        let mut chunk_size = 0;
265
266        if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
267            let mut full_key = self.start_key.clone();
268            full_key[0] = KeyPrefix::RootKey as u8;
269            let operation = Operation::Put(KeyValue {
270                key: full_key,
271                value: vec![],
272            });
273            let statement = Statement {
274                operation: Some(operation),
275            };
276            statements.push(statement);
277            chunk_size += self.start_key.len();
278        }
279
280        let root_key_len = self.start_key.len() - self.prefix_len;
281        for operation in batch.operations {
282            let (key_len, value_len) = match &operation {
283                WriteOperation::Delete { key } => (key.len(), 0),
284                WriteOperation::Put { key, value } => (key.len(), value.len()),
285                WriteOperation::DeletePrefix { key_prefix } => (key_prefix.len(), 0),
286            };
287            let operation_size = key_len + value_len + root_key_len;
288            ensure!(key_len <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
289            if operation_size + chunk_size < MAX_PAYLOAD_SIZE {
290                let statement = self.get_statement(operation);
291                statements.push(statement);
292                chunk_size += operation_size;
293            } else {
294                self.submit_statements(mem::take(&mut statements)).await?;
295                chunk_size = 0;
296                if operation_size > MAX_PAYLOAD_SIZE {
297                    // One single operation is especially big. So split it in chunks.
298                    let WriteOperation::Put { key, value } = operation else {
299                        // Only the put can go over the limit
300                        unreachable!();
301                    };
302                    let mut full_key = self.start_key.clone();
303                    full_key.extend(key);
304                    let value_chunks = value
305                        .chunks(MAX_PAYLOAD_SIZE)
306                        .map(|x| x.to_vec())
307                        .collect::<Vec<_>>();
308                    let num_chunks = value_chunks.len();
309                    for (i_chunk, value) in value_chunks.into_iter().enumerate() {
310                        let last = i_chunk + 1 == num_chunks;
311                        let operation = Operation::Append(KeyValueAppend {
312                            key: full_key.clone(),
313                            value,
314                            last,
315                        });
316                        statements = vec![Statement {
317                            operation: Some(operation),
318                        }];
319                        self.submit_statements(mem::take(&mut statements)).await?;
320                    }
321                } else {
322                    // The operation is small enough, it is just that we have many so we need to split.
323                    let statement = self.get_statement(operation);
324                    statements.push(statement);
325                    chunk_size = operation_size;
326                }
327            }
328        }
329        self.submit_statements(mem::take(&mut statements)).await
330    }
331
332    async fn clear_journal(&self) -> Result<(), ServiceStoreError> {
333        Ok(())
334    }
335}
336
337impl ServiceStoreClientInternal {
338    /// Obtains the semaphore lock on the database if needed.
339    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
340        match &self.semaphore {
341            None => None,
342            Some(count) => Some(count.acquire().await),
343        }
344    }
345
346    async fn submit_statements(&self, statements: Vec<Statement>) -> Result<(), ServiceStoreError> {
347        if !statements.is_empty() {
348            let query = RequestWriteBatchExtended { statements };
349            let request = tonic::Request::new(query);
350            let channel = self.channel.clone();
351            let mut client = StoreProcessorClient::new(channel);
352            let _guard = self.acquire().await;
353            let _response = client
354                .process_write_batch_extended(request)
355                .make_sync()
356                .await?;
357        }
358        Ok(())
359    }
360
361    fn get_statement(&self, operation: WriteOperation) -> Statement {
362        let operation = match operation {
363            WriteOperation::Delete { key } => {
364                let mut full_key = self.start_key.clone();
365                full_key.extend(key);
366                Operation::Delete(full_key)
367            }
368            WriteOperation::Put { key, value } => {
369                let mut full_key = self.start_key.clone();
370                full_key.extend(key);
371                Operation::Put(KeyValue {
372                    key: full_key,
373                    value,
374                })
375            }
376            WriteOperation::DeletePrefix { key_prefix } => {
377                let mut full_key_prefix = self.start_key.clone();
378                full_key_prefix.extend(key_prefix);
379                Operation::DeletePrefix(full_key_prefix)
380            }
381        };
382        Statement {
383            operation: Some(operation),
384        }
385    }
386
387    async fn read_single_entry(
388        &self,
389        message_index: i64,
390        index: i32,
391    ) -> Result<Vec<u8>, ServiceStoreError> {
392        let channel = self.channel.clone();
393        let query = RequestSpecificChunk {
394            message_index,
395            index,
396        };
397        let request = tonic::Request::new(query);
398        let mut client = StoreProcessorClient::new(channel);
399        let response = client.process_specific_chunk(request).make_sync().await?;
400        let response = response.into_inner();
401        let ReplySpecificChunk { chunk } = response;
402        Ok(chunk)
403    }
404
405    async fn read_entries<S: DeserializeOwned>(
406        &self,
407        message_index: i64,
408        num_chunks: i32,
409    ) -> Result<S, ServiceStoreError> {
410        let mut handles = Vec::new();
411        for index in 0..num_chunks {
412            let handle = self.read_single_entry(message_index, index);
413            handles.push(handle);
414        }
415        let mut value = Vec::new();
416        for chunk in join_all(handles).await {
417            let chunk = chunk?;
418            value.extend(chunk);
419        }
420        Ok(bcs::from_bytes(&value)?)
421    }
422}
423
424impl AdminKeyValueStore for ServiceStoreClientInternal {
425    type Config = ServiceStoreInternalConfig;
426
427    fn get_name() -> String {
428        "service store".to_string()
429    }
430
431    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, ServiceStoreError> {
432        let semaphore = config
433            .common_config
434            .max_concurrent_queries
435            .map(|n| Arc::new(Semaphore::new(n)));
436        let namespace = bcs::to_bytes(namespace)?;
437        let max_stream_queries = config.common_config.max_stream_queries;
438        let mut start_key = vec![KeyPrefix::Key as u8];
439        start_key.extend(&namespace);
440        let prefix_len = namespace.len() + 1;
441        let endpoint = config.http_address();
442        let endpoint = Endpoint::from_shared(endpoint)?;
443        let channel = endpoint.connect_lazy();
444        Ok(Self {
445            channel,
446            semaphore,
447            max_stream_queries,
448            prefix_len,
449            start_key,
450            root_key_written: Arc::new(AtomicBool::new(false)),
451        })
452    }
453
454    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, ServiceStoreError> {
455        let channel = self.channel.clone();
456        let prefix_len = self.prefix_len;
457        let semaphore = self.semaphore.clone();
458        let max_stream_queries = self.max_stream_queries;
459        let mut start_key = self.start_key[..prefix_len].to_vec();
460        start_key.extend(root_key);
461        Ok(Self {
462            channel,
463            semaphore,
464            max_stream_queries,
465            prefix_len,
466            start_key,
467            root_key_written: Arc::new(AtomicBool::new(false)),
468        })
469    }
470
471    async fn list_all(config: &Self::Config) -> Result<Vec<String>, ServiceStoreError> {
472        let endpoint = config.http_address();
473        let endpoint = Endpoint::from_shared(endpoint)?;
474        let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
475        let response = client.process_list_all(()).make_sync().await?;
476        let response = response.into_inner();
477        let ReplyListAll { namespaces } = response;
478        let namespaces = namespaces
479            .into_iter()
480            .map(|x| bcs::from_bytes(&x))
481            .collect::<Result<_, _>>()?;
482        Ok(namespaces)
483    }
484
485    async fn list_root_keys(
486        config: &Self::Config,
487        namespace: &str,
488    ) -> Result<Vec<Vec<u8>>, ServiceStoreError> {
489        let namespace = bcs::to_bytes(namespace)?;
490        let query = RequestListRootKeys { namespace };
491        let request = tonic::Request::new(query);
492        let endpoint = config.http_address();
493        let endpoint = Endpoint::from_shared(endpoint)?;
494        let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
495        let response = client.process_list_root_keys(request).make_sync().await?;
496        let response = response.into_inner();
497        let ReplyListRootKeys { root_keys } = response;
498        Ok(root_keys)
499    }
500
501    async fn delete_all(config: &Self::Config) -> Result<(), ServiceStoreError> {
502        let endpoint = config.http_address();
503        let endpoint = Endpoint::from_shared(endpoint)?;
504        let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
505        let _response = client.process_delete_all(()).make_sync().await?;
506        Ok(())
507    }
508
509    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, ServiceStoreError> {
510        let namespace = bcs::to_bytes(namespace)?;
511        let query = RequestExistsNamespace { namespace };
512        let request = tonic::Request::new(query);
513        let endpoint = config.http_address();
514        let endpoint = Endpoint::from_shared(endpoint)?;
515        let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
516        let response = client.process_exists_namespace(request).make_sync().await?;
517        let response = response.into_inner();
518        let ReplyExistsNamespace { exists } = response;
519        Ok(exists)
520    }
521
522    async fn create(config: &Self::Config, namespace: &str) -> Result<(), ServiceStoreError> {
523        if ServiceStoreClientInternal::exists(config, namespace).await? {
524            return Err(ServiceStoreError::StoreAlreadyExists);
525        }
526        let namespace = bcs::to_bytes(namespace)?;
527        let query = RequestCreateNamespace { namespace };
528        let request = tonic::Request::new(query);
529        let endpoint = config.http_address();
530        let endpoint = Endpoint::from_shared(endpoint)?;
531        let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
532        let _response = client.process_create_namespace(request).make_sync().await?;
533        Ok(())
534    }
535
536    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), ServiceStoreError> {
537        let namespace = bcs::to_bytes(namespace)?;
538        let query = RequestDeleteNamespace { namespace };
539        let request = tonic::Request::new(query);
540        let endpoint = config.http_address();
541        let endpoint = Endpoint::from_shared(endpoint)?;
542        let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
543        let _response = client.process_delete_namespace(request).make_sync().await?;
544        Ok(())
545    }
546}
547
548#[cfg(with_testing)]
549impl TestKeyValueStore for ServiceStoreClientInternal {
550    async fn new_test_config() -> Result<ServiceStoreInternalConfig, ServiceStoreError> {
551        let endpoint = storage_service_test_endpoint()?;
552        service_config_from_endpoint(&endpoint)
553    }
554}
555
556/// Creates a `ServiceStoreConfig` from an endpoint.
557pub fn service_config_from_endpoint(
558    endpoint: &str,
559) -> Result<ServiceStoreInternalConfig, ServiceStoreError> {
560    let common_config = CommonStoreInternalConfig {
561        max_concurrent_queries: None,
562        max_stream_queries: 100,
563        replication_factor: 1,
564    };
565    let endpoint = endpoint.to_string();
566    Ok(ServiceStoreInternalConfig {
567        endpoint,
568        common_config,
569    })
570}
571
572/// Checks that endpoint is truly absent.
573pub async fn storage_service_check_absence(endpoint: &str) -> Result<bool, ServiceStoreError> {
574    let endpoint = Endpoint::from_shared(endpoint.to_string())?;
575    let result = StoreProcessorClient::connect(endpoint).await;
576    Ok(result.is_err())
577}
578
579/// Checks whether an endpoint is valid or not.
580pub async fn storage_service_check_validity(endpoint: &str) -> Result<(), ServiceStoreError> {
581    let config = service_config_from_endpoint(endpoint).unwrap();
582    let namespace = "namespace";
583    let store = ServiceStoreClientInternal::connect(&config, namespace).await?;
584    let _value = store.read_value_bytes(&[42]).await?;
585    Ok(())
586}
587
588/// The service store client with metrics
589#[cfg(with_metrics)]
590pub type ServiceStoreClient =
591    MeteredStore<LruCachingStore<MeteredStore<ServiceStoreClientInternal>>>;
592
593/// The service store client without metrics
594#[cfg(not(with_metrics))]
595pub type ServiceStoreClient = LruCachingStore<ServiceStoreClientInternal>;