Skip to main content

linera_rpc/grpc/
pool.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use linera_base::time::Duration;
7
8use super::{transport, GrpcError};
9
10/// A pool of transport channels to be used by gRPC.
11#[derive(Clone, Default)]
12pub struct GrpcConnectionPool {
13    options: transport::Options,
14    channels: Arc<papaya::HashMap<String, transport::Channel>>,
15}
16
17impl GrpcConnectionPool {
18    pub fn new(options: transport::Options) -> Self {
19        Self {
20            options,
21            channels: Arc::new(papaya::HashMap::default()),
22        }
23    }
24
25    pub fn with_connect_timeout(mut self, connect_timeout: impl Into<Option<Duration>>) -> Self {
26        self.options.connect_timeout = connect_timeout.into();
27        self
28    }
29
30    pub fn with_timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
31        self.options.timeout = timeout.into();
32        self
33    }
34
35    /// Obtains a channel for the current address. Either clones an existing one (thereby
36    /// reusing the connection), or creates one if needed. New channels do not create a
37    /// connection immediately.
38    pub fn channel(&self, address: String) -> Result<transport::Channel, GrpcError> {
39        let pinned = self.channels.pin();
40        if let Some(channel) = pinned.get(&address) {
41            return Ok(channel.clone());
42        }
43        let channel = transport::create_channel(address.clone(), &self.options)?;
44        Ok(pinned.get_or_insert(address, channel).clone())
45    }
46}