linera_client/
util.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::HashSet, num::ParseIntError, str::FromStr};
5
6use futures::future;
7use linera_base::{
8    crypto::CryptoError,
9    data_types::{TimeDelta, Timestamp},
10    identifiers::{ApplicationId, ChainId, GenericApplicationId},
11    time::Duration,
12};
13use linera_core::{data_types::RoundTimeout, node::NotificationStream, worker::Reason};
14use tokio_stream::StreamExt as _;
15
16pub fn parse_json<T: serde::de::DeserializeOwned>(s: &str) -> anyhow::Result<T> {
17    Ok(serde_json::from_str(s.trim())?)
18}
19
20pub fn parse_millis(s: &str) -> Result<Duration, ParseIntError> {
21    Ok(Duration::from_millis(s.parse()?))
22}
23
24pub fn parse_secs(s: &str) -> Result<Duration, ParseIntError> {
25    Ok(Duration::from_secs(s.parse()?))
26}
27
28pub fn parse_millis_delta(s: &str) -> Result<TimeDelta, ParseIntError> {
29    Ok(TimeDelta::from_millis(s.parse()?))
30}
31
32pub fn parse_json_optional_millis_delta(s: &str) -> anyhow::Result<Option<TimeDelta>> {
33    Ok(parse_json::<Option<u64>>(s)?.map(TimeDelta::from_millis))
34}
35
36pub fn parse_chain_set(s: &str) -> Result<HashSet<ChainId>, CryptoError> {
37    match s.trim() {
38        "" => Ok(HashSet::new()),
39        s => s.split(",").map(ChainId::from_str).collect(),
40    }
41}
42
43pub fn parse_app_set(s: &str) -> anyhow::Result<HashSet<GenericApplicationId>> {
44    s.trim()
45        .split(",")
46        .map(|app_str| {
47            GenericApplicationId::from_str(app_str)
48                .or_else(|_| Ok(ApplicationId::from_str(app_str)?.into()))
49        })
50        .collect()
51}
52
53/// Returns after the specified time or if we receive a notification that a new round has started.
54pub async fn wait_for_next_round(stream: &mut NotificationStream, timeout: RoundTimeout) {
55    let mut stream = stream.filter(|notification| match &notification.reason {
56        Reason::NewBlock { height, .. } | Reason::NewEvents { height, .. } => {
57            *height >= timeout.next_block_height
58        }
59        Reason::NewRound { round, .. } => *round > timeout.current_round,
60        Reason::NewIncomingBundle { .. } | Reason::BlockExecuted { .. } => false,
61    });
62    future::select(
63        Box::pin(stream.next()),
64        Box::pin(linera_base::time::timer::sleep(
65            timeout.timestamp.duration_since(Timestamp::now()),
66        )),
67    )
68    .await;
69}
70
71macro_rules! impl_from_infallible {
72    ($target:path) => {
73        impl From<::std::convert::Infallible> for $target {
74            fn from(infallible: ::std::convert::Infallible) -> Self {
75                match infallible {}
76            }
77        }
78    };
79}
80
81pub(crate) use impl_from_infallible;