1use std::{
5 mem,
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc,
9 },
10};
11
12use async_lock::{Semaphore, SemaphoreGuard};
13use futures::future::join_all;
14use linera_base::{ensure, util::future::FutureSyncExt as _};
15#[cfg(with_metrics)]
16use linera_views::metering::MeteredDatabase;
17#[cfg(with_testing)]
18use linera_views::store::TestKeyValueDatabase;
19use linera_views::{
20 batch::{Batch, WriteOperation},
21 lru_caching::LruCachingDatabase,
22 store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
23};
24use serde::de::DeserializeOwned;
25use tonic::transport::{Channel, Endpoint};
26
27#[cfg(with_testing)]
28use crate::common::storage_service_test_endpoint;
29use crate::{
30 common::{
31 KeyPrefix, StorageServiceStoreError, StorageServiceStoreInternalConfig, MAX_PAYLOAD_SIZE,
32 },
33 key_value_store::{
34 statement::Operation, storage_service_client::StorageServiceClient, KeyValue,
35 KeyValueAppend, ReplyContainsKey, ReplyContainsKeys, ReplyExistsNamespace,
36 ReplyFindKeyValuesByPrefix, ReplyFindKeysByPrefix, ReplyListAll, ReplyListRootKeys,
37 ReplyReadMultiValues, ReplyReadValue, ReplySpecificChunk, RequestContainsKey,
38 RequestContainsKeys, RequestCreateNamespace, RequestDeleteNamespace,
39 RequestExistsNamespace, RequestFindKeyValuesByPrefix, RequestFindKeysByPrefix,
40 RequestListRootKeys, RequestReadMultiValues, RequestReadValue, RequestSpecificChunk,
41 RequestWriteBatchExtended, Statement,
42 },
43};
44
45const MAX_KEY_SIZE: usize = 1000000;
47
48#[derive(Clone)]
69pub struct StorageServiceDatabaseInternal {
70 channel: Channel,
71 semaphore: Option<Arc<Semaphore>>,
72 max_stream_queries: usize,
73 namespace: Vec<u8>,
74}
75
76#[derive(Clone)]
78pub struct StorageServiceStoreInternal {
79 channel: Channel,
80 semaphore: Option<Arc<Semaphore>>,
81 max_stream_queries: usize,
82 prefix_len: usize,
83 start_key: Vec<u8>,
84 root_key_written: Arc<AtomicBool>,
85}
86
87impl WithError for StorageServiceDatabaseInternal {
88 type Error = StorageServiceStoreError;
89}
90
91impl WithError for StorageServiceStoreInternal {
92 type Error = StorageServiceStoreError;
93}
94
95impl ReadableKeyValueStore for StorageServiceStoreInternal {
96 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
97
98 fn max_stream_queries(&self) -> usize {
99 self.max_stream_queries
100 }
101
102 fn root_key(&self) -> Result<Vec<u8>, StorageServiceStoreError> {
103 let root_key = bcs::from_bytes(&self.start_key[self.prefix_len..])?;
104 Ok(root_key)
105 }
106
107 async fn read_value_bytes(
108 &self,
109 key: &[u8],
110 ) -> Result<Option<Vec<u8>>, StorageServiceStoreError> {
111 ensure!(
112 key.len() <= MAX_KEY_SIZE,
113 StorageServiceStoreError::KeyTooLong
114 );
115 let mut full_key = self.start_key.clone();
116 full_key.extend(key);
117 let query = RequestReadValue { key: full_key };
118 let request = tonic::Request::new(query);
119 let channel = self.channel.clone();
120 let mut client = StorageServiceClient::new(channel);
121 let _guard = self.acquire().await;
122 let response = client.process_read_value(request).make_sync().await?;
123 let response = response.into_inner();
124 let ReplyReadValue {
125 value,
126 message_index,
127 num_chunks,
128 } = response;
129 if num_chunks == 0 {
130 Ok(value)
131 } else {
132 self.read_entries(message_index, num_chunks).await
133 }
134 }
135
136 async fn contains_key(&self, key: &[u8]) -> Result<bool, StorageServiceStoreError> {
137 ensure!(
138 key.len() <= MAX_KEY_SIZE,
139 StorageServiceStoreError::KeyTooLong
140 );
141 let mut full_key = self.start_key.clone();
142 full_key.extend(key);
143 let query = RequestContainsKey { key: full_key };
144 let request = tonic::Request::new(query);
145 let channel = self.channel.clone();
146 let mut client = StorageServiceClient::new(channel);
147 let _guard = self.acquire().await;
148 let response = client.process_contains_key(request).make_sync().await?;
149 let response = response.into_inner();
150 let ReplyContainsKey { test } = response;
151 Ok(test)
152 }
153
154 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, StorageServiceStoreError> {
155 let mut full_keys = Vec::new();
156 for key in keys {
157 ensure!(
158 key.len() <= MAX_KEY_SIZE,
159 StorageServiceStoreError::KeyTooLong
160 );
161 let mut full_key = self.start_key.clone();
162 full_key.extend(key);
163 full_keys.push(full_key);
164 }
165 let query = RequestContainsKeys { keys: full_keys };
166 let request = tonic::Request::new(query);
167 let channel = self.channel.clone();
168 let mut client = StorageServiceClient::new(channel);
169 let _guard = self.acquire().await;
170 let response = client.process_contains_keys(request).make_sync().await?;
171 let response = response.into_inner();
172 let ReplyContainsKeys { tests } = response;
173 Ok(tests)
174 }
175
176 async fn read_multi_values_bytes(
177 &self,
178 keys: &[Vec<u8>],
179 ) -> Result<Vec<Option<Vec<u8>>>, StorageServiceStoreError> {
180 let mut full_keys = Vec::new();
181 for key in keys {
182 ensure!(
183 key.len() <= MAX_KEY_SIZE,
184 StorageServiceStoreError::KeyTooLong
185 );
186 let mut full_key = self.start_key.clone();
187 full_key.extend(key);
188 full_keys.push(full_key);
189 }
190 let query = RequestReadMultiValues { keys: full_keys };
191 let request = tonic::Request::new(query);
192 let channel = self.channel.clone();
193 let mut client = StorageServiceClient::new(channel);
194 let _guard = self.acquire().await;
195 let response = client
196 .process_read_multi_values(request)
197 .make_sync()
198 .await?;
199 let response = response.into_inner();
200 let ReplyReadMultiValues {
201 values,
202 message_index,
203 num_chunks,
204 } = response;
205 if num_chunks == 0 {
206 let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
207 Ok(values)
208 } else {
209 self.read_entries(message_index, num_chunks).await
210 }
211 }
212
213 async fn find_keys_by_prefix(
214 &self,
215 key_prefix: &[u8],
216 ) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
217 ensure!(
218 key_prefix.len() <= MAX_KEY_SIZE,
219 StorageServiceStoreError::KeyTooLong
220 );
221 let mut full_key_prefix = self.start_key.clone();
222 full_key_prefix.extend(key_prefix);
223 let query = RequestFindKeysByPrefix {
224 key_prefix: full_key_prefix,
225 };
226 let request = tonic::Request::new(query);
227 let channel = self.channel.clone();
228 let mut client = StorageServiceClient::new(channel);
229 let _guard = self.acquire().await;
230 let response = client
231 .process_find_keys_by_prefix(request)
232 .make_sync()
233 .await?;
234 let response = response.into_inner();
235 let ReplyFindKeysByPrefix {
236 keys,
237 message_index,
238 num_chunks,
239 } = response;
240 if num_chunks == 0 {
241 Ok(keys)
242 } else {
243 self.read_entries(message_index, num_chunks).await
244 }
245 }
246
247 async fn find_key_values_by_prefix(
248 &self,
249 key_prefix: &[u8],
250 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageServiceStoreError> {
251 ensure!(
252 key_prefix.len() <= MAX_KEY_SIZE,
253 StorageServiceStoreError::KeyTooLong
254 );
255 let mut full_key_prefix = self.start_key.clone();
256 full_key_prefix.extend(key_prefix);
257 let query = RequestFindKeyValuesByPrefix {
258 key_prefix: full_key_prefix,
259 };
260 let request = tonic::Request::new(query);
261 let channel = self.channel.clone();
262 let mut client = StorageServiceClient::new(channel);
263 let _guard = self.acquire().await;
264 let response = client
265 .process_find_key_values_by_prefix(request)
266 .make_sync()
267 .await?;
268 let response = response.into_inner();
269 let ReplyFindKeyValuesByPrefix {
270 key_values,
271 message_index,
272 num_chunks,
273 } = response;
274 if num_chunks == 0 {
275 let key_values = key_values
276 .into_iter()
277 .map(|x| (x.key, x.value))
278 .collect::<Vec<_>>();
279 Ok(key_values)
280 } else {
281 self.read_entries(message_index, num_chunks).await
282 }
283 }
284}
285
286impl WritableKeyValueStore for StorageServiceStoreInternal {
287 const MAX_VALUE_SIZE: usize = usize::MAX;
288
289 async fn write_batch(&self, batch: Batch) -> Result<(), StorageServiceStoreError> {
290 if batch.operations.is_empty() {
291 return Ok(());
292 }
293 let mut statements = Vec::new();
294 let mut chunk_size = 0;
295
296 if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
297 let mut full_key = self.start_key.clone();
298 full_key[0] = KeyPrefix::RootKey as u8;
299 let operation = Operation::Put(KeyValue {
300 key: full_key,
301 value: vec![],
302 });
303 let statement = Statement {
304 operation: Some(operation),
305 };
306 statements.push(statement);
307 chunk_size += self.start_key.len();
308 }
309
310 let bcs_root_key_len = self.start_key.len() - self.prefix_len;
311 for operation in batch.operations {
312 let (key_len, value_len) = match &operation {
313 WriteOperation::Delete { key } => (key.len(), 0),
314 WriteOperation::Put { key, value } => (key.len(), value.len()),
315 WriteOperation::DeletePrefix { key_prefix } => (key_prefix.len(), 0),
316 };
317 let operation_size = key_len + value_len + bcs_root_key_len;
318 ensure!(
319 key_len <= MAX_KEY_SIZE,
320 StorageServiceStoreError::KeyTooLong
321 );
322 if operation_size + chunk_size < MAX_PAYLOAD_SIZE {
323 let statement = self.get_statement(operation);
324 statements.push(statement);
325 chunk_size += operation_size;
326 } else {
327 self.submit_statements(mem::take(&mut statements)).await?;
328 chunk_size = 0;
329 if operation_size > MAX_PAYLOAD_SIZE {
330 let WriteOperation::Put { key, value } = operation else {
332 unreachable!();
334 };
335 let mut full_key = self.start_key.clone();
336 full_key.extend(key);
337 let value_chunks = value
338 .chunks(MAX_PAYLOAD_SIZE)
339 .map(|x| x.to_vec())
340 .collect::<Vec<_>>();
341 let num_chunks = value_chunks.len();
342 for (i_chunk, value) in value_chunks.into_iter().enumerate() {
343 let last = i_chunk + 1 == num_chunks;
344 let operation = Operation::Append(KeyValueAppend {
345 key: full_key.clone(),
346 value,
347 last,
348 });
349 statements = vec![Statement {
350 operation: Some(operation),
351 }];
352 self.submit_statements(mem::take(&mut statements)).await?;
353 }
354 } else {
355 let statement = self.get_statement(operation);
357 statements.push(statement);
358 chunk_size = operation_size;
359 }
360 }
361 }
362 self.submit_statements(mem::take(&mut statements)).await
363 }
364
365 async fn clear_journal(&self) -> Result<(), StorageServiceStoreError> {
366 Ok(())
367 }
368}
369
370impl StorageServiceStoreInternal {
371 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
373 match &self.semaphore {
374 None => None,
375 Some(count) => Some(count.acquire().await),
376 }
377 }
378
379 async fn submit_statements(
380 &self,
381 statements: Vec<Statement>,
382 ) -> Result<(), StorageServiceStoreError> {
383 if !statements.is_empty() {
384 let query = RequestWriteBatchExtended { statements };
385 let request = tonic::Request::new(query);
386 let channel = self.channel.clone();
387 let mut client = StorageServiceClient::new(channel);
388 let _guard = self.acquire().await;
389 let _response = client
390 .process_write_batch_extended(request)
391 .make_sync()
392 .await?;
393 }
394 Ok(())
395 }
396
397 fn get_statement(&self, operation: WriteOperation) -> Statement {
398 let operation = match operation {
399 WriteOperation::Delete { key } => {
400 let mut full_key = self.start_key.clone();
401 full_key.extend(key);
402 Operation::Delete(full_key)
403 }
404 WriteOperation::Put { key, value } => {
405 let mut full_key = self.start_key.clone();
406 full_key.extend(key);
407 Operation::Put(KeyValue {
408 key: full_key,
409 value,
410 })
411 }
412 WriteOperation::DeletePrefix { key_prefix } => {
413 let mut full_key_prefix = self.start_key.clone();
414 full_key_prefix.extend(key_prefix);
415 Operation::DeletePrefix(full_key_prefix)
416 }
417 };
418 Statement {
419 operation: Some(operation),
420 }
421 }
422
423 async fn read_single_entry(
424 &self,
425 message_index: i64,
426 index: i32,
427 ) -> Result<Vec<u8>, StorageServiceStoreError> {
428 let channel = self.channel.clone();
429 let query = RequestSpecificChunk {
430 message_index,
431 index,
432 };
433 let request = tonic::Request::new(query);
434 let mut client = StorageServiceClient::new(channel);
435 let response = client.process_specific_chunk(request).make_sync().await?;
436 let response = response.into_inner();
437 let ReplySpecificChunk { chunk } = response;
438 Ok(chunk)
439 }
440
441 async fn read_entries<S: DeserializeOwned>(
442 &self,
443 message_index: i64,
444 num_chunks: i32,
445 ) -> Result<S, StorageServiceStoreError> {
446 let mut handles = Vec::new();
447 for index in 0..num_chunks {
448 let handle = self.read_single_entry(message_index, index);
449 handles.push(handle);
450 }
451 let mut value = Vec::new();
452 for chunk in join_all(handles).await {
453 let chunk = chunk?;
454 value.extend(chunk);
455 }
456 Ok(bcs::from_bytes(&value)?)
457 }
458}
459
460impl KeyValueDatabase for StorageServiceDatabaseInternal {
461 type Config = StorageServiceStoreInternalConfig;
462 type Store = StorageServiceStoreInternal;
463
464 fn get_name() -> String {
465 "service store".to_string()
466 }
467
468 async fn connect(
469 config: &Self::Config,
470 namespace: &str,
471 ) -> Result<Self, StorageServiceStoreError> {
472 let semaphore = config
473 .max_concurrent_queries
474 .map(|n| Arc::new(Semaphore::new(n)));
475 let namespace = bcs::to_bytes(namespace)?;
476 let max_stream_queries = config.max_stream_queries;
477 let endpoint = config.http_address();
478 let endpoint = Endpoint::from_shared(endpoint)?;
479 let channel = endpoint.connect_lazy();
480 Ok(Self {
481 channel,
482 semaphore,
483 max_stream_queries,
484 namespace,
485 })
486 }
487
488 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, StorageServiceStoreError> {
489 let channel = self.channel.clone();
490 let semaphore = self.semaphore.clone();
491 let max_stream_queries = self.max_stream_queries;
492 let mut start_key = vec![KeyPrefix::Key as u8];
493 start_key.extend(&self.namespace);
494 start_key.extend(bcs::to_bytes(root_key)?);
495 let prefix_len = self.namespace.len() + 1;
496 Ok(StorageServiceStoreInternal {
497 channel,
498 semaphore,
499 max_stream_queries,
500 prefix_len,
501 start_key,
502 root_key_written: Arc::new(AtomicBool::new(false)),
503 })
504 }
505
506 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
507 self.open_shared(root_key)
508 }
509
510 async fn list_all(config: &Self::Config) -> Result<Vec<String>, StorageServiceStoreError> {
511 let endpoint = config.http_address();
512 let endpoint = Endpoint::from_shared(endpoint)?;
513 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
514 let response = client.process_list_all(()).make_sync().await?;
515 let response = response.into_inner();
516 let ReplyListAll { namespaces } = response;
517 let namespaces = namespaces
518 .into_iter()
519 .map(|x| bcs::from_bytes(&x))
520 .collect::<Result<_, _>>()?;
521 Ok(namespaces)
522 }
523
524 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, StorageServiceStoreError> {
525 let query = RequestListRootKeys {
526 namespace: self.namespace.clone(),
527 };
528 let request = tonic::Request::new(query);
529 let mut client = StorageServiceClient::new(self.channel.clone());
530 let response = client.process_list_root_keys(request).make_sync().await?;
531 let response = response.into_inner();
532 let ReplyListRootKeys { root_keys } = response;
533 Ok(root_keys)
534 }
535
536 async fn delete_all(config: &Self::Config) -> Result<(), StorageServiceStoreError> {
537 let endpoint = config.http_address();
538 let endpoint = Endpoint::from_shared(endpoint)?;
539 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
540 let _response = client.process_delete_all(()).make_sync().await?;
541 Ok(())
542 }
543
544 async fn exists(
545 config: &Self::Config,
546 namespace: &str,
547 ) -> Result<bool, StorageServiceStoreError> {
548 let namespace = bcs::to_bytes(namespace)?;
549 let query = RequestExistsNamespace { namespace };
550 let request = tonic::Request::new(query);
551 let endpoint = config.http_address();
552 let endpoint = Endpoint::from_shared(endpoint)?;
553 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
554 let response = client.process_exists_namespace(request).make_sync().await?;
555 let response = response.into_inner();
556 let ReplyExistsNamespace { exists } = response;
557 Ok(exists)
558 }
559
560 async fn create(
561 config: &Self::Config,
562 namespace: &str,
563 ) -> Result<(), StorageServiceStoreError> {
564 if StorageServiceDatabaseInternal::exists(config, namespace).await? {
565 return Err(StorageServiceStoreError::StoreAlreadyExists);
566 }
567 let namespace = bcs::to_bytes(namespace)?;
568 let query = RequestCreateNamespace { namespace };
569 let request = tonic::Request::new(query);
570 let endpoint = config.http_address();
571 let endpoint = Endpoint::from_shared(endpoint)?;
572 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
573 let _response = client.process_create_namespace(request).make_sync().await?;
574 Ok(())
575 }
576
577 async fn delete(
578 config: &Self::Config,
579 namespace: &str,
580 ) -> Result<(), StorageServiceStoreError> {
581 let namespace = bcs::to_bytes(namespace)?;
582 let query = RequestDeleteNamespace { namespace };
583 let request = tonic::Request::new(query);
584 let endpoint = config.http_address();
585 let endpoint = Endpoint::from_shared(endpoint)?;
586 let mut client = StorageServiceClient::connect(endpoint).make_sync().await?;
587 let _response = client.process_delete_namespace(request).make_sync().await?;
588 Ok(())
589 }
590}
591
592#[cfg(with_testing)]
593impl TestKeyValueDatabase for StorageServiceDatabaseInternal {
594 async fn new_test_config() -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError>
595 {
596 let endpoint = storage_service_test_endpoint()?;
597 service_config_from_endpoint(&endpoint)
598 }
599}
600
601pub fn service_config_from_endpoint(
603 endpoint: &str,
604) -> Result<StorageServiceStoreInternalConfig, StorageServiceStoreError> {
605 Ok(StorageServiceStoreInternalConfig {
606 endpoint: endpoint.to_string(),
607 max_concurrent_queries: None,
608 max_stream_queries: 100,
609 })
610}
611
612pub async fn storage_service_check_absence(
614 endpoint: &str,
615) -> Result<bool, StorageServiceStoreError> {
616 let endpoint = Endpoint::from_shared(endpoint.to_string())?;
617 let result = StorageServiceClient::connect(endpoint).await;
618 Ok(result.is_err())
619}
620
621pub async fn storage_service_check_validity(
623 endpoint: &str,
624) -> Result<(), StorageServiceStoreError> {
625 let config = service_config_from_endpoint(endpoint).unwrap();
626 let namespace = "namespace";
627 let database = StorageServiceDatabaseInternal::connect(&config, namespace).await?;
628 let store = database.open_shared(&[])?;
629 let _value = store.read_value_bytes(&[42]).await?;
630 Ok(())
631}
632
633#[cfg(with_metrics)]
635pub type StorageServiceDatabase =
636 MeteredDatabase<LruCachingDatabase<MeteredDatabase<StorageServiceDatabaseInternal>>>;
637
638#[cfg(not(with_metrics))]
640pub type StorageServiceDatabase = LruCachingDatabase<StorageServiceDatabaseInternal>;