Skip to content

Commit

Permalink
ADD: Add configurable heartbeat support to clients
Browse files Browse the repository at this point in the history
  • Loading branch information
threecgreen committed May 31, 2024
1 parent 5f2befb commit ea1575a
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 56 deletions.
20 changes: 17 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## 0.11.0 - TBD

#### Enhancements
- Added configurable `heartbeat_interval` parameter for live client that determines the
timeout before heartbeat `SystemMsg` records will be sent. It can be configured via
the `heartbeat_interval` and `heartbeat_interval_s` methods of the
`live::ClientBuilder`
- Added `addr` function to `live::ClientBuilder` for configuring a custom gateway
address without using `LiveClient::connect_with_addr` directly

#### Breaking changes
- Added `heartbeat_interval` parameter to `LiveClient::connect` and
`LiveClient::connect_with_addr`

## 0.10.0 - 2024-05-22

#### Enhancements
Expand Down Expand Up @@ -56,10 +70,10 @@
- Document `live::Subscription::start` is based on `ts_event`
- Allow constructing a `DateRange` and `DateTimeRange` with an `end` based on a
`time::Duration`
- Implemented `Debug` for `LiveClient`, `LiveClientBuilder`, `HistoricalClient`,
`HistoricalClientBuilder`, `BatchClient`, `MetadataClient`, `SymbologyClient`, and
- Implemented `Debug` for `LiveClient`, `live::ClientBuilder`, `HistoricalClient`,
`historical::ClientBuilder`, `BatchClient`, `MetadataClient`, `SymbologyClient`, and
`TimeseriesClient`
- Derived `Clone` for `LiveClientBuilder` and `HistoricalClientBuilder`
- Derived `Clone` for `live::ClientBuilder` and `historical::ClientBuilder`
- Added `ApiKey` type for safely deriving `Debug` for types containing an API key

#### Breaking changes
Expand Down
78 changes: 70 additions & 8 deletions src/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

mod client;

use std::{net::SocketAddr, sync::Arc};

use dbn::{SType, Schema, VersionUpgradePolicy};
use time::OffsetDateTime;
use log::warn;
use time::{Duration, OffsetDateTime};
use tokio::net::{lookup_host, ToSocketAddrs};
use typed_builder::TypedBuilder;

use crate::{ApiKey, Symbols};
Expand Down Expand Up @@ -43,19 +47,23 @@ pub struct Unset;
/// - `dataset`
#[derive(Debug, Clone)]
pub struct ClientBuilder<AK, D> {
addr: Option<Arc<SocketAddr>>,
key: AK,
dataset: D,
send_ts_out: bool,
upgrade_policy: VersionUpgradePolicy,
heartbeat_interval: Option<Duration>,
}

impl Default for ClientBuilder<Unset, Unset> {
fn default() -> Self {
Self {
addr: None,
key: Unset,
dataset: Unset,
send_ts_out: false,
upgrade_policy: VersionUpgradePolicy::Upgrade,
heartbeat_interval: None,
}
}
}
Expand All @@ -74,6 +82,43 @@ impl<AK, D> ClientBuilder<AK, D> {
self.upgrade_policy = upgrade_policy;
self
}

/// Sets `heartbeat_interval`, which controls the interval at which the gateway
/// will send heartbeat records if no other data records are sent. If no heartbeat
/// interval is configured, the gateway default will be used.
///
/// Note that granularity of less than a second is not supported and will be
/// ignored.
pub fn heartbeat_interval(mut self, heartbeat_interval: Duration) -> Self {
if heartbeat_interval.subsec_nanoseconds() > 0 {
warn!(
"heartbeat_interval subsecond precision ignored: {}ns",
heartbeat_interval.subsec_nanoseconds()
)
}
self.heartbeat_interval = Some(heartbeat_interval);
self
}

/// Overrides the address of the gateway the client will connect to. This is an
/// advanced method.
///
/// # Errors
/// This function returns an error when `addr` fails to resolve.
pub async fn addr(mut self, addr: impl ToSocketAddrs) -> crate::Result<Self> {
const PARAM_NAME: &str = "addr";
self.addr = Some(
lookup_host(addr)
.await
.map_err(|e| crate::Error::bad_arg(PARAM_NAME, format!("{e}")))?
.next()
.map(Arc::new)
.ok_or_else(|| {
crate::Error::bad_arg(PARAM_NAME, "did not resolve to any host".to_owned())
})?,
);
Ok(self)
}
}

impl ClientBuilder<Unset, Unset> {
Expand All @@ -90,10 +135,12 @@ impl<D> ClientBuilder<Unset, D> {
/// This function returns an error when the API key is invalid.
pub fn key(self, key: impl ToString) -> crate::Result<ClientBuilder<ApiKey, D>> {
Ok(ClientBuilder {
addr: self.addr,
key: crate::validate_key(key.to_string())?,
dataset: self.dataset,
send_ts_out: self.send_ts_out,
upgrade_policy: self.upgrade_policy,
heartbeat_interval: self.heartbeat_interval,
})
}

Expand All @@ -113,10 +160,12 @@ impl<AK> ClientBuilder<AK, Unset> {
/// Sets the dataset.
pub fn dataset(self, dataset: impl ToString) -> ClientBuilder<AK, String> {
ClientBuilder {
addr: self.addr,
key: self.key,
dataset: dataset.to_string(),
send_ts_out: self.send_ts_out,
upgrade_policy: self.upgrade_policy,
heartbeat_interval: self.heartbeat_interval,
}
}
}
Expand All @@ -128,12 +177,25 @@ impl ClientBuilder<ApiKey, String> {
/// This function returns an error when its unable
/// to connect and authenticate with the Live gateway.
pub async fn build(self) -> crate::Result<Client> {
Client::connect(
self.key.0,
self.dataset,
self.send_ts_out,
self.upgrade_policy,
)
.await
if let Some(addr) = self.addr {
Client::connect_with_addr(
*addr,
self.key.0,
self.dataset,
self.send_ts_out,
self.upgrade_policy,
self.heartbeat_interval,
)
.await
} else {
Client::connect(
self.key.0,
self.dataset,
self.send_ts_out,
self.upgrade_policy,
self.heartbeat_interval,
)
.await
}
}
}
Loading

0 comments on commit ea1575a

Please sign in to comment.