Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VER: Release 0.10.0 #24

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
# Changelog

## 0.10.0 - 2024-05-22

#### Enhancements
- Added `use_snapshot` attribute to `Subscription`, defaults to false
- Upgraded reqwest version to 0.12

#### Breaking changes
- Upgraded DBN version to 0.18.0
- Changed type of `flags` in `MboMsg`, `TradeMsg`, `Mbp1Msg`, `Mbp10Msg`, and `CbboMsg`
from `u8` to a new `FlagSet` type with predicate methods for the various bit flags
as well as setters. The `u8` value can still be obtained by calling the `raw()` method.
- Improved `Debug` formatting
- Switched `DecodeStream` from `streaming_iterator` crate to `fallible_streaming_iterator`
to allow better notification of errors
- Changed default value for `stype_in` and `stype_out` in `SymbolMappingMsg` to
`u8::MAX` to match C++ client and to reflect an unknown value. This also changes the
value of these fields when upgrading a `SymbolMappingMsgV1` to DBNv2

## 0.9.1 - 2024-05-15

#### Bug fixes
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "databento"
authors = ["Databento <support@databento.com>"]
version = "0.9.1"
version = "0.10.0"
edition = "2021"
repository = "https://github.com/databento/databento-rs"
description = "Official Databento client library"
Expand All @@ -24,14 +24,14 @@ live = ["dep:hex", "dep:sha2", "tokio/net"]

[dependencies]
# binary encoding
dbn = { version = "0.17.1", features = ["async", "serde"] }
dbn = { version = "0.18.0", features = ["async", "serde"] }
# Async stream trait
futures = { version = "0.3", optional = true }
# Used for Live authentication
hex = { version = "0.4", optional = true }
log = "0.4"
# HTTP client for historical API
reqwest = { version = "0.11", optional = true, features = ["json", "stream"] }
reqwest = { version = "0.12", optional = true, features = ["json", "stream"] }
# JSON deserialization for historical API
serde = { version = "1.0", optional = true, features = ["derive"] }
serde_json = { version = "1.0", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions src/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub struct Subscription {
/// [`LiveClient::start`](crate::LiveClient::start).
#[builder(default, setter(strip_option))]
pub start: Option<OffsetDateTime>,
#[doc(hidden)]
/// Reserved for future use.
#[builder(setter(strip_bool))]
pub use_snapshot: bool,
}

#[doc(hidden)]
Expand Down
67 changes: 63 additions & 4 deletions src/live/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,26 @@ impl Client {
/// subscription, sending an error, and closing the connection.
pub async fn subscribe(&mut self, sub: &Subscription) -> crate::Result<()> {
let Subscription {
schema, stype_in, ..
schema,
stype_in,
start,
use_snapshot,
..
} = &sub;

if *use_snapshot && start.is_some() {
return Err(Error::BadArgument {
param_name: "use_snapshot".to_string(),
desc: "cannot request snapshot with start time".to_string(),
});
}

for sym_str in sub.symbols.to_chunked_api_string() {
let args = format!("schema={schema}|stype_in={stype_in}|symbols={sym_str}");
let snapshot = *use_snapshot as u8;

let args = format!(
"schema={schema}|stype_in={stype_in}|symbols={sym_str}|snapshot={snapshot}"
);

let sub_str = if let Some(start) = sub.start.as_ref() {
format!("{args}|start={}\n", start.unix_timestamp_nanos())
Expand Down Expand Up @@ -330,7 +346,7 @@ mod tests {
enums::rtype,
publishers::Dataset,
record::{HasRType, OhlcvMsg, RecordHeader, TradeMsg, WithTsOut},
Mbp10Msg, MetadataBuilder, Record, SType, Schema,
FlagSet, Mbp10Msg, MetadataBuilder, Record, SType, Schema,
};
use tokio::{
io::BufReader,
Expand Down Expand Up @@ -396,6 +412,12 @@ mod tests {
if let Some(start) = subscription.start {
assert!(sub_line.contains(&format!("start={}", start.unix_timestamp_nanos())))
}

if subscription.use_snapshot {
assert!(sub_line.contains("snapshot=1"));
} else {
assert!(sub_line.contains("snapshot=0"));
}
}

async fn start(&mut self) {
Expand Down Expand Up @@ -561,6 +583,43 @@ mod tests {
fixture.stop().await;
}

#[tokio::test]
async fn test_subscribe_snapshot() {
let (mut fixture, mut client) = setup(Dataset::XnasItch, false).await;
let subscription = Subscription::builder()
.symbols(vec!["MSFT", "TSLA", "QQQ"])
.schema(Schema::Ohlcv1M)
.stype_in(SType::RawSymbol)
.use_snapshot()
.build();
fixture.expect_subscribe(subscription.clone());
client.subscribe(&subscription).await.unwrap();
fixture.stop().await;
}

#[tokio::test]
async fn test_subscribe_snapshot_failed() {
let (fixture, mut client) = setup(Dataset::XnasItch, false).await;

let err = client
.subscribe(
&Subscription::builder()
.symbols(vec!["MSFT", "TSLA", "QQQ"])
.schema(Schema::Ohlcv1M)
.stype_in(SType::RawSymbol)
.start(time::OffsetDateTime::now_utc())
.use_snapshot()
.build(),
)
.await
.unwrap_err();
assert!(err
.to_string()
.contains("cannot request snapshot with start time"));

fixture.stop().await;
}

#[tokio::test]
async fn test_subscription_chunking() {
const SYMBOL: &str = "TEST";
Expand Down Expand Up @@ -611,7 +670,7 @@ mod tests {
size: 2,
action: b'A' as c_char,
side: b'A' as c_char,
flags: 0,
flags: FlagSet::default(),
depth: 1,
ts_recv: 0,
ts_in_delta: 0,
Expand Down