1use std::{
5 mem,
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc,
9 },
10};
11
12use async_lock::{Semaphore, SemaphoreGuard};
13use futures::future::join_all;
14use linera_base::ensure;
15#[cfg(with_metrics)]
16use linera_views::metering::MeteredStore;
17#[cfg(with_testing)]
18use linera_views::store::TestKeyValueStore;
19use linera_views::{
20 batch::{Batch, WriteOperation},
21 lru_caching::LruCachingStore,
22 store::{
23 AdminKeyValueStore, CommonStoreInternalConfig, ReadableKeyValueStore, WithError,
24 WritableKeyValueStore,
25 },
26 FutureSyncExt,
27};
28use serde::de::DeserializeOwned;
29use tonic::transport::{Channel, Endpoint};
30
31#[cfg(with_testing)]
32use crate::common::storage_service_test_endpoint;
33use crate::{
34 common::{KeyPrefix, ServiceStoreError, ServiceStoreInternalConfig, MAX_PAYLOAD_SIZE},
35 key_value_store::{
36 statement::Operation, store_processor_client::StoreProcessorClient, KeyValue,
37 KeyValueAppend, ReplyContainsKey, ReplyContainsKeys, ReplyExistsNamespace,
38 ReplyFindKeyValuesByPrefix, ReplyFindKeysByPrefix, ReplyListAll, ReplyListRootKeys,
39 ReplyReadMultiValues, ReplyReadValue, ReplySpecificChunk, RequestContainsKey,
40 RequestContainsKeys, RequestCreateNamespace, RequestDeleteNamespace,
41 RequestExistsNamespace, RequestFindKeyValuesByPrefix, RequestFindKeysByPrefix,
42 RequestListRootKeys, RequestReadMultiValues, RequestReadValue, RequestSpecificChunk,
43 RequestWriteBatchExtended, Statement,
44 },
45};
46
47const MAX_KEY_SIZE: usize = 1000000;
49
50#[derive(Clone)]
70pub struct ServiceStoreClientInternal {
71 channel: Channel,
72 semaphore: Option<Arc<Semaphore>>,
73 max_stream_queries: usize,
74 prefix_len: usize,
75 start_key: Vec<u8>,
76 root_key_written: Arc<AtomicBool>,
77}
78
79impl WithError for ServiceStoreClientInternal {
80 type Error = ServiceStoreError;
81}
82
83impl ReadableKeyValueStore for ServiceStoreClientInternal {
84 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
85 type Keys = Vec<Vec<u8>>;
86 type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
87
88 fn max_stream_queries(&self) -> usize {
89 self.max_stream_queries
90 }
91
92 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ServiceStoreError> {
93 ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
94 let mut full_key = self.start_key.clone();
95 full_key.extend(key);
96 let query = RequestReadValue { key: full_key };
97 let request = tonic::Request::new(query);
98 let channel = self.channel.clone();
99 let mut client = StoreProcessorClient::new(channel);
100 let _guard = self.acquire().await;
101 let response = client.process_read_value(request).make_sync().await?;
102 let response = response.into_inner();
103 let ReplyReadValue {
104 value,
105 message_index,
106 num_chunks,
107 } = response;
108 if num_chunks == 0 {
109 Ok(value)
110 } else {
111 self.read_entries(message_index, num_chunks).await
112 }
113 }
114
115 async fn contains_key(&self, key: &[u8]) -> Result<bool, ServiceStoreError> {
116 ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
117 let mut full_key = self.start_key.clone();
118 full_key.extend(key);
119 let query = RequestContainsKey { key: full_key };
120 let request = tonic::Request::new(query);
121 let channel = self.channel.clone();
122 let mut client = StoreProcessorClient::new(channel);
123 let _guard = self.acquire().await;
124 let response = client.process_contains_key(request).make_sync().await?;
125 let response = response.into_inner();
126 let ReplyContainsKey { test } = response;
127 Ok(test)
128 }
129
130 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, ServiceStoreError> {
131 let mut full_keys = Vec::new();
132 for key in keys {
133 ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
134 let mut full_key = self.start_key.clone();
135 full_key.extend(&key);
136 full_keys.push(full_key);
137 }
138 let query = RequestContainsKeys { keys: full_keys };
139 let request = tonic::Request::new(query);
140 let channel = self.channel.clone();
141 let mut client = StoreProcessorClient::new(channel);
142 let _guard = self.acquire().await;
143 let response = client.process_contains_keys(request).make_sync().await?;
144 let response = response.into_inner();
145 let ReplyContainsKeys { tests } = response;
146 Ok(tests)
147 }
148
149 async fn read_multi_values_bytes(
150 &self,
151 keys: Vec<Vec<u8>>,
152 ) -> Result<Vec<Option<Vec<u8>>>, ServiceStoreError> {
153 let mut full_keys = Vec::new();
154 for key in keys {
155 ensure!(key.len() <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
156 let mut full_key = self.start_key.clone();
157 full_key.extend(&key);
158 full_keys.push(full_key);
159 }
160 let query = RequestReadMultiValues { keys: full_keys };
161 let request = tonic::Request::new(query);
162 let channel = self.channel.clone();
163 let mut client = StoreProcessorClient::new(channel);
164 let _guard = self.acquire().await;
165 let response = client
166 .process_read_multi_values(request)
167 .make_sync()
168 .await?;
169 let response = response.into_inner();
170 let ReplyReadMultiValues {
171 values,
172 message_index,
173 num_chunks,
174 } = response;
175 if num_chunks == 0 {
176 let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
177 Ok(values)
178 } else {
179 self.read_entries(message_index, num_chunks).await
180 }
181 }
182
183 async fn find_keys_by_prefix(
184 &self,
185 key_prefix: &[u8],
186 ) -> Result<Vec<Vec<u8>>, ServiceStoreError> {
187 ensure!(
188 key_prefix.len() <= MAX_KEY_SIZE,
189 ServiceStoreError::KeyTooLong
190 );
191 let mut full_key_prefix = self.start_key.clone();
192 full_key_prefix.extend(key_prefix);
193 let query = RequestFindKeysByPrefix {
194 key_prefix: full_key_prefix,
195 };
196 let request = tonic::Request::new(query);
197 let channel = self.channel.clone();
198 let mut client = StoreProcessorClient::new(channel);
199 let _guard = self.acquire().await;
200 let response = client
201 .process_find_keys_by_prefix(request)
202 .make_sync()
203 .await?;
204 let response = response.into_inner();
205 let ReplyFindKeysByPrefix {
206 keys,
207 message_index,
208 num_chunks,
209 } = response;
210 if num_chunks == 0 {
211 Ok(keys)
212 } else {
213 self.read_entries(message_index, num_chunks).await
214 }
215 }
216
217 async fn find_key_values_by_prefix(
218 &self,
219 key_prefix: &[u8],
220 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ServiceStoreError> {
221 ensure!(
222 key_prefix.len() <= MAX_KEY_SIZE,
223 ServiceStoreError::KeyTooLong
224 );
225 let mut full_key_prefix = self.start_key.clone();
226 full_key_prefix.extend(key_prefix);
227 let query = RequestFindKeyValuesByPrefix {
228 key_prefix: full_key_prefix,
229 };
230 let request = tonic::Request::new(query);
231 let channel = self.channel.clone();
232 let mut client = StoreProcessorClient::new(channel);
233 let _guard = self.acquire().await;
234 let response = client
235 .process_find_key_values_by_prefix(request)
236 .make_sync()
237 .await?;
238 let response = response.into_inner();
239 let ReplyFindKeyValuesByPrefix {
240 key_values,
241 message_index,
242 num_chunks,
243 } = response;
244 if num_chunks == 0 {
245 let key_values = key_values
246 .into_iter()
247 .map(|x| (x.key, x.value))
248 .collect::<Vec<_>>();
249 Ok(key_values)
250 } else {
251 self.read_entries(message_index, num_chunks).await
252 }
253 }
254}
255
256impl WritableKeyValueStore for ServiceStoreClientInternal {
257 const MAX_VALUE_SIZE: usize = usize::MAX;
258
259 async fn write_batch(&self, batch: Batch) -> Result<(), ServiceStoreError> {
260 if batch.operations.is_empty() {
261 return Ok(());
262 }
263 let mut statements = Vec::new();
264 let mut chunk_size = 0;
265
266 if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
267 let mut full_key = self.start_key.clone();
268 full_key[0] = KeyPrefix::RootKey as u8;
269 let operation = Operation::Put(KeyValue {
270 key: full_key,
271 value: vec![],
272 });
273 let statement = Statement {
274 operation: Some(operation),
275 };
276 statements.push(statement);
277 chunk_size += self.start_key.len();
278 }
279
280 let root_key_len = self.start_key.len() - self.prefix_len;
281 for operation in batch.operations {
282 let (key_len, value_len) = match &operation {
283 WriteOperation::Delete { key } => (key.len(), 0),
284 WriteOperation::Put { key, value } => (key.len(), value.len()),
285 WriteOperation::DeletePrefix { key_prefix } => (key_prefix.len(), 0),
286 };
287 let operation_size = key_len + value_len + root_key_len;
288 ensure!(key_len <= MAX_KEY_SIZE, ServiceStoreError::KeyTooLong);
289 if operation_size + chunk_size < MAX_PAYLOAD_SIZE {
290 let statement = self.get_statement(operation);
291 statements.push(statement);
292 chunk_size += operation_size;
293 } else {
294 self.submit_statements(mem::take(&mut statements)).await?;
295 chunk_size = 0;
296 if operation_size > MAX_PAYLOAD_SIZE {
297 let WriteOperation::Put { key, value } = operation else {
299 unreachable!();
301 };
302 let mut full_key = self.start_key.clone();
303 full_key.extend(key);
304 let value_chunks = value
305 .chunks(MAX_PAYLOAD_SIZE)
306 .map(|x| x.to_vec())
307 .collect::<Vec<_>>();
308 let num_chunks = value_chunks.len();
309 for (i_chunk, value) in value_chunks.into_iter().enumerate() {
310 let last = i_chunk + 1 == num_chunks;
311 let operation = Operation::Append(KeyValueAppend {
312 key: full_key.clone(),
313 value,
314 last,
315 });
316 statements = vec![Statement {
317 operation: Some(operation),
318 }];
319 self.submit_statements(mem::take(&mut statements)).await?;
320 }
321 } else {
322 let statement = self.get_statement(operation);
324 statements.push(statement);
325 chunk_size = operation_size;
326 }
327 }
328 }
329 self.submit_statements(mem::take(&mut statements)).await
330 }
331
332 async fn clear_journal(&self) -> Result<(), ServiceStoreError> {
333 Ok(())
334 }
335}
336
337impl ServiceStoreClientInternal {
338 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
340 match &self.semaphore {
341 None => None,
342 Some(count) => Some(count.acquire().await),
343 }
344 }
345
346 async fn submit_statements(&self, statements: Vec<Statement>) -> Result<(), ServiceStoreError> {
347 if !statements.is_empty() {
348 let query = RequestWriteBatchExtended { statements };
349 let request = tonic::Request::new(query);
350 let channel = self.channel.clone();
351 let mut client = StoreProcessorClient::new(channel);
352 let _guard = self.acquire().await;
353 let _response = client
354 .process_write_batch_extended(request)
355 .make_sync()
356 .await?;
357 }
358 Ok(())
359 }
360
361 fn get_statement(&self, operation: WriteOperation) -> Statement {
362 let operation = match operation {
363 WriteOperation::Delete { key } => {
364 let mut full_key = self.start_key.clone();
365 full_key.extend(key);
366 Operation::Delete(full_key)
367 }
368 WriteOperation::Put { key, value } => {
369 let mut full_key = self.start_key.clone();
370 full_key.extend(key);
371 Operation::Put(KeyValue {
372 key: full_key,
373 value,
374 })
375 }
376 WriteOperation::DeletePrefix { key_prefix } => {
377 let mut full_key_prefix = self.start_key.clone();
378 full_key_prefix.extend(key_prefix);
379 Operation::DeletePrefix(full_key_prefix)
380 }
381 };
382 Statement {
383 operation: Some(operation),
384 }
385 }
386
387 async fn read_single_entry(
388 &self,
389 message_index: i64,
390 index: i32,
391 ) -> Result<Vec<u8>, ServiceStoreError> {
392 let channel = self.channel.clone();
393 let query = RequestSpecificChunk {
394 message_index,
395 index,
396 };
397 let request = tonic::Request::new(query);
398 let mut client = StoreProcessorClient::new(channel);
399 let response = client.process_specific_chunk(request).make_sync().await?;
400 let response = response.into_inner();
401 let ReplySpecificChunk { chunk } = response;
402 Ok(chunk)
403 }
404
405 async fn read_entries<S: DeserializeOwned>(
406 &self,
407 message_index: i64,
408 num_chunks: i32,
409 ) -> Result<S, ServiceStoreError> {
410 let mut handles = Vec::new();
411 for index in 0..num_chunks {
412 let handle = self.read_single_entry(message_index, index);
413 handles.push(handle);
414 }
415 let mut value = Vec::new();
416 for chunk in join_all(handles).await {
417 let chunk = chunk?;
418 value.extend(chunk);
419 }
420 Ok(bcs::from_bytes(&value)?)
421 }
422}
423
424impl AdminKeyValueStore for ServiceStoreClientInternal {
425 type Config = ServiceStoreInternalConfig;
426
427 fn get_name() -> String {
428 "service store".to_string()
429 }
430
431 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, ServiceStoreError> {
432 let semaphore = config
433 .common_config
434 .max_concurrent_queries
435 .map(|n| Arc::new(Semaphore::new(n)));
436 let namespace = bcs::to_bytes(namespace)?;
437 let max_stream_queries = config.common_config.max_stream_queries;
438 let mut start_key = vec![KeyPrefix::Key as u8];
439 start_key.extend(&namespace);
440 let prefix_len = namespace.len() + 1;
441 let endpoint = config.http_address();
442 let endpoint = Endpoint::from_shared(endpoint)?;
443 let channel = endpoint.connect_lazy();
444 Ok(Self {
445 channel,
446 semaphore,
447 max_stream_queries,
448 prefix_len,
449 start_key,
450 root_key_written: Arc::new(AtomicBool::new(false)),
451 })
452 }
453
454 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, ServiceStoreError> {
455 let channel = self.channel.clone();
456 let prefix_len = self.prefix_len;
457 let semaphore = self.semaphore.clone();
458 let max_stream_queries = self.max_stream_queries;
459 let mut start_key = self.start_key[..prefix_len].to_vec();
460 start_key.extend(root_key);
461 Ok(Self {
462 channel,
463 semaphore,
464 max_stream_queries,
465 prefix_len,
466 start_key,
467 root_key_written: Arc::new(AtomicBool::new(false)),
468 })
469 }
470
471 async fn list_all(config: &Self::Config) -> Result<Vec<String>, ServiceStoreError> {
472 let endpoint = config.http_address();
473 let endpoint = Endpoint::from_shared(endpoint)?;
474 let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
475 let response = client.process_list_all(()).make_sync().await?;
476 let response = response.into_inner();
477 let ReplyListAll { namespaces } = response;
478 let namespaces = namespaces
479 .into_iter()
480 .map(|x| bcs::from_bytes(&x))
481 .collect::<Result<_, _>>()?;
482 Ok(namespaces)
483 }
484
485 async fn list_root_keys(
486 config: &Self::Config,
487 namespace: &str,
488 ) -> Result<Vec<Vec<u8>>, ServiceStoreError> {
489 let namespace = bcs::to_bytes(namespace)?;
490 let query = RequestListRootKeys { namespace };
491 let request = tonic::Request::new(query);
492 let endpoint = config.http_address();
493 let endpoint = Endpoint::from_shared(endpoint)?;
494 let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
495 let response = client.process_list_root_keys(request).make_sync().await?;
496 let response = response.into_inner();
497 let ReplyListRootKeys { root_keys } = response;
498 Ok(root_keys)
499 }
500
501 async fn delete_all(config: &Self::Config) -> Result<(), ServiceStoreError> {
502 let endpoint = config.http_address();
503 let endpoint = Endpoint::from_shared(endpoint)?;
504 let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
505 let _response = client.process_delete_all(()).make_sync().await?;
506 Ok(())
507 }
508
509 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, ServiceStoreError> {
510 let namespace = bcs::to_bytes(namespace)?;
511 let query = RequestExistsNamespace { namespace };
512 let request = tonic::Request::new(query);
513 let endpoint = config.http_address();
514 let endpoint = Endpoint::from_shared(endpoint)?;
515 let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
516 let response = client.process_exists_namespace(request).make_sync().await?;
517 let response = response.into_inner();
518 let ReplyExistsNamespace { exists } = response;
519 Ok(exists)
520 }
521
522 async fn create(config: &Self::Config, namespace: &str) -> Result<(), ServiceStoreError> {
523 if ServiceStoreClientInternal::exists(config, namespace).await? {
524 return Err(ServiceStoreError::StoreAlreadyExists);
525 }
526 let namespace = bcs::to_bytes(namespace)?;
527 let query = RequestCreateNamespace { namespace };
528 let request = tonic::Request::new(query);
529 let endpoint = config.http_address();
530 let endpoint = Endpoint::from_shared(endpoint)?;
531 let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
532 let _response = client.process_create_namespace(request).make_sync().await?;
533 Ok(())
534 }
535
536 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), ServiceStoreError> {
537 let namespace = bcs::to_bytes(namespace)?;
538 let query = RequestDeleteNamespace { namespace };
539 let request = tonic::Request::new(query);
540 let endpoint = config.http_address();
541 let endpoint = Endpoint::from_shared(endpoint)?;
542 let mut client = StoreProcessorClient::connect(endpoint).make_sync().await?;
543 let _response = client.process_delete_namespace(request).make_sync().await?;
544 Ok(())
545 }
546}
547
548#[cfg(with_testing)]
549impl TestKeyValueStore for ServiceStoreClientInternal {
550 async fn new_test_config() -> Result<ServiceStoreInternalConfig, ServiceStoreError> {
551 let endpoint = storage_service_test_endpoint()?;
552 service_config_from_endpoint(&endpoint)
553 }
554}
555
556pub fn service_config_from_endpoint(
558 endpoint: &str,
559) -> Result<ServiceStoreInternalConfig, ServiceStoreError> {
560 let common_config = CommonStoreInternalConfig {
561 max_concurrent_queries: None,
562 max_stream_queries: 100,
563 replication_factor: 1,
564 };
565 let endpoint = endpoint.to_string();
566 Ok(ServiceStoreInternalConfig {
567 endpoint,
568 common_config,
569 })
570}
571
572pub async fn storage_service_check_absence(endpoint: &str) -> Result<bool, ServiceStoreError> {
574 let endpoint = Endpoint::from_shared(endpoint.to_string())?;
575 let result = StoreProcessorClient::connect(endpoint).await;
576 Ok(result.is_err())
577}
578
579pub async fn storage_service_check_validity(endpoint: &str) -> Result<(), ServiceStoreError> {
581 let config = service_config_from_endpoint(endpoint).unwrap();
582 let namespace = "namespace";
583 let store = ServiceStoreClientInternal::connect(&config, namespace).await?;
584 let _value = store.read_value_bytes(&[42]).await?;
585 Ok(())
586}
587
588#[cfg(with_metrics)]
590pub type ServiceStoreClient =
591 MeteredStore<LruCachingStore<MeteredStore<ServiceStoreClientInternal>>>;
592
593#[cfg(not(with_metrics))]
595pub type ServiceStoreClient = LruCachingStore<ServiceStoreClientInternal>;