linera_rpc/grpc/
pool.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use dashmap::DashMap;
5use linera_base::time::Duration;
6
7use super::{transport, GrpcError};
8
9/// A pool of transport channels to be used by gRPC.
10#[derive(Clone, Default)]
11pub struct GrpcConnectionPool {
12    options: transport::Options,
13    channels: DashMap<String, transport::Channel>,
14}
15
16impl GrpcConnectionPool {
17    pub fn new(options: transport::Options) -> Self {
18        Self {
19            options,
20            channels: DashMap::default(),
21        }
22    }
23
24    pub fn with_connect_timeout(mut self, connect_timeout: impl Into<Option<Duration>>) -> Self {
25        self.options.connect_timeout = connect_timeout.into();
26        self
27    }
28
29    pub fn with_timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
30        self.options.timeout = timeout.into();
31        self
32    }
33
34    /// Obtains a channel for the current address. Either clones an existing one (thereby
35    /// reusing the connection), or creates one if needed. New channels do not create a
36    /// connection immediately.
37    pub fn channel(&self, address: String) -> Result<transport::Channel, GrpcError> {
38        Ok(self
39            .channels
40            .entry(address.clone())
41            .or_try_insert_with(|| transport::create_channel(address, &self.options))?
42            .clone())
43    }
44}