linera_client/
util.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::HashSet, num::ParseIntError, str::FromStr};
5
6use futures::future;
7use linera_base::{
8    crypto::CryptoError,
9    data_types::{TimeDelta, Timestamp},
10    identifiers::{ApplicationId, ChainId, GenericApplicationId},
11    time::Duration,
12};
13use linera_core::{data_types::RoundTimeout, node::NotificationStream, worker::Reason};
14use tokio_stream::StreamExt as _;
15
16pub fn parse_json<T: serde::de::DeserializeOwned>(s: &str) -> anyhow::Result<T> {
17    Ok(serde_json::from_str(s.trim())?)
18}
19
20pub fn parse_millis(s: &str) -> Result<Duration, ParseIntError> {
21    Ok(Duration::from_millis(s.parse()?))
22}
23
24/// Converts a `Duration` to `Option<Duration>`, treating zero as `None`.
25pub fn non_zero_duration(d: Duration) -> Option<Duration> {
26    if d.is_zero() {
27        None
28    } else {
29        Some(d)
30    }
31}
32
33pub fn parse_secs(s: &str) -> Result<Duration, ParseIntError> {
34    Ok(Duration::from_secs(s.parse()?))
35}
36
37pub fn parse_millis_delta(s: &str) -> Result<TimeDelta, ParseIntError> {
38    Ok(TimeDelta::from_millis(s.parse()?))
39}
40
41pub fn parse_json_optional_millis_delta(s: &str) -> anyhow::Result<Option<TimeDelta>> {
42    Ok(parse_json::<Option<u64>>(s)?.map(TimeDelta::from_millis))
43}
44
45pub fn parse_chain_set(s: &str) -> Result<HashSet<ChainId>, CryptoError> {
46    match s.trim() {
47        "" => Ok(HashSet::new()),
48        s => s.split(",").map(ChainId::from_str).collect(),
49    }
50}
51
52pub fn parse_app_set(s: &str) -> anyhow::Result<HashSet<GenericApplicationId>> {
53    s.trim()
54        .split(",")
55        .map(|app_str| {
56            GenericApplicationId::from_str(app_str)
57                .or_else(|_| Ok(ApplicationId::from_str(app_str)?.into()))
58        })
59        .collect()
60}
61
62/// Returns after the specified time or if we receive a notification that a new round has started.
63pub async fn wait_for_next_round(stream: &mut NotificationStream, timeout: RoundTimeout) {
64    let mut stream = stream.filter(|notification| match &notification.reason {
65        Reason::NewBlock { height, .. } | Reason::NewEvents { height, .. } => {
66            *height >= timeout.next_block_height
67        }
68        Reason::NewRound { round, .. } => *round > timeout.current_round,
69        Reason::NewIncomingBundle { .. } | Reason::BlockExecuted { .. } => false,
70    });
71    future::select(
72        Box::pin(stream.next()),
73        Box::pin(linera_base::time::timer::sleep(
74            timeout.timestamp.duration_since(Timestamp::now()),
75        )),
76    )
77    .await;
78}
79
80macro_rules! impl_from_infallible {
81    ($target:path) => {
82        impl From<::std::convert::Infallible> for $target {
83            fn from(infallible: ::std::convert::Infallible) -> Self {
84                match infallible {}
85            }
86        }
87    };
88}
89
90pub(crate) use impl_from_infallible;