Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement timestamp generators #1128

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 49 additions & 48 deletions docs/source/SUMMARY.md
Copy link
Collaborator

@Lorak-mmk Lorak-mmk Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit: "docs/source/SUMMARY.md"

❓ Why did you change indentation in docs/source/SUMMARY.md file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my bad, automatic markdown linter changes must have gotten into the commit here and in the queries.md and I missed them, should the changes in both of these files be reverted (except the necessary ones of course)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in queries.md are fine. The formatting change in SUMMARY could be fine too, if there is some reason apart from personal preference to make it - that's why I asked the question. Afaik Markdown doesn't specify how many spaces should the indentation have, and for MdBook the example SUMMARY.md uses 4.

Original file line number Diff line number Diff line change
Expand Up @@ -3,75 +3,76 @@
[Scylla Rust Driver](index.md)

- [Quick start](quickstart/quickstart.md)
- [Creating a project](quickstart/create-project.md)
- [Running Scylla using Docker](quickstart/scylla-docker.md)
- [Connecting and running a simple query](quickstart/example.md)
- [Creating a project](quickstart/create-project.md)
- [Running Scylla using Docker](quickstart/scylla-docker.md)
- [Connecting and running a simple query](quickstart/example.md)

- [Migration guides](migration-guides/migration-guides.md)
- [Adjusting code to changes in serialization API introduced in 0.11](migration-guides/0.11-serialization.md)
- [Adjusting code to changes in deserialization API introduced in 0.15](migration-guides/0.15-deserialization.md)
- [Adjusting code to changes in serialization API introduced in 0.11](migration-guides/0.11-serialization.md)
- [Adjusting code to changes in deserialization API introduced in 0.15](migration-guides/0.15-deserialization.md)

- [Connecting to the cluster](connecting/connecting.md)
- [Compression](connecting/compression.md)
- [Authentication](connecting/authentication.md)
- [TLS](connecting/tls.md)
- [Compression](connecting/compression.md)
- [Authentication](connecting/authentication.md)
- [TLS](connecting/tls.md)

- [Making queries](queries/queries.md)
- [Simple query](queries/simple.md)
- [Query values](queries/values.md)
- [Query result](queries/result.md)
- [Prepared query](queries/prepared.md)
- [Batch statement](queries/batch.md)
- [Paged query](queries/paged.md)
- [Lightweight transaction query (LWT)](queries/lwt.md)
- [USE keyspace](queries/usekeyspace.md)
- [Schema agreement](queries/schema-agreement.md)
- [Query timeouts](queries/timeouts.md)
- [Simple query](queries/simple.md)
- [Query values](queries/values.md)
- [Query result](queries/result.md)
- [Prepared query](queries/prepared.md)
- [Batch statement](queries/batch.md)
- [Paged query](queries/paged.md)
- [Lightweight transaction query (LWT)](queries/lwt.md)
- [USE keyspace](queries/usekeyspace.md)
- [Schema agreement](queries/schema-agreement.md)
- [Query timeouts](queries/timeouts.md)
- [Timestamp generators](queries/timestamp-generators.md)

- [Execution profiles](execution-profiles/execution-profiles.md)
- [Creating a profile and setting it](execution-profiles/create-and-use.md)
- [All options supported by a profile](execution-profiles/maximal-example.md)
- [Options priority](execution-profiles/priority.md)
- [Remapping a profile handle](execution-profiles/remap.md)
- [Creating a profile and setting it](execution-profiles/create-and-use.md)
- [All options supported by a profile](execution-profiles/maximal-example.md)
- [Options priority](execution-profiles/priority.md)
- [Remapping a profile handle](execution-profiles/remap.md)

- [Data Types](data-types/data-types.md)
- [Bool, Tinyint, Smallint, Int, Bigint, Float, Double](data-types/primitive.md)
- [Ascii, Text, Varchar](data-types/text.md)
- [Counter](data-types/counter.md)
- [Blob](data-types/blob.md)
- [Inet](data-types/inet.md)
- [Uuid](data-types/uuid.md)
- [Timeuuid](data-types/timeuuid.md)
- [Date](data-types/date.md)
- [Time](data-types/time.md)
- [Timestamp](data-types/timestamp.md)
- [Duration](data-types/duration.md)
- [Decimal](data-types/decimal.md)
- [Varint](data-types/varint.md)
- [List, Set, Map](data-types/collections.md)
- [Tuple](data-types/tuple.md)
- [UDT (User defined type)](data-types/udt.md)
- [Bool, Tinyint, Smallint, Int, Bigint, Float, Double](data-types/primitive.md)
- [Ascii, Text, Varchar](data-types/text.md)
- [Counter](data-types/counter.md)
- [Blob](data-types/blob.md)
- [Inet](data-types/inet.md)
- [Uuid](data-types/uuid.md)
- [Timeuuid](data-types/timeuuid.md)
- [Date](data-types/date.md)
- [Time](data-types/time.md)
- [Timestamp](data-types/timestamp.md)
- [Duration](data-types/duration.md)
- [Decimal](data-types/decimal.md)
- [Varint](data-types/varint.md)
- [List, Set, Map](data-types/collections.md)
- [Tuple](data-types/tuple.md)
- [UDT (User defined type)](data-types/udt.md)

- [Load balancing](load-balancing/load-balancing.md)
- [Default policy](load-balancing/default-policy.md)
- [Default policy](load-balancing/default-policy.md)

- [Retry policy configuration](retry-policy/retry-policy.md)
- [Fallthrough retry policy](retry-policy/fallthrough.md)
- [Default retry policy](retry-policy/default.md)
- [Downgrading consistency policy](retry-policy/downgrading-consistency.md)
- [Fallthrough retry policy](retry-policy/fallthrough.md)
- [Default retry policy](retry-policy/default.md)
- [Downgrading consistency policy](retry-policy/downgrading-consistency.md)

- [Speculative execution](speculative-execution/speculative.md)
- [Simple](speculative-execution/simple.md)
- [Latency Percentile](speculative-execution/percentile.md)
- [Simple](speculative-execution/simple.md)
- [Latency Percentile](speculative-execution/percentile.md)

- [Driver metrics](metrics/metrics.md)

- [Logging](logging/logging.md)

- [Query tracing](tracing/tracing.md)
- [Tracing a simple/prepared query](tracing/basic.md)
- [Tracing a paged query](tracing/paged.md)
- [Tracing `Session::prepare`](tracing/prepare.md)
- [Query Execution History](tracing/query-history.md)
- [Tracing a simple/prepared query](tracing/basic.md)
- [Tracing a paged query](tracing/paged.md)
- [Tracing `Session::prepare`](tracing/prepare.md)
- [Query Execution History](tracing/query-history.md)

- [Database schema](schema/schema.md)
10 changes: 6 additions & 4 deletions docs/source/queries/queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Driver supports all kinds of statements supported by ScyllaDB. The following tables aim to bridge between DB concepts and driver's API.
They include recommendations on which API to use in what cases.

## Kinds of CQL statements (from the CQL protocol point of view):
## Kinds of CQL statements (from the CQL protocol point of view)

| Kind of CQL statement | Single | Batch |
|-----------------------|---------------------|------------------------------------------|
Comment on lines 3 to 9
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Thanks for the changes in this file!

Expand Down Expand Up @@ -59,7 +59,7 @@ This is **NOT** strictly related to content of the CQL query string.
| Load balancing | advanced if prepared, else primitive | advanced if prepared **and ALL** statements in the batch target the same partition, else primitive |
| Suitable operations | most of operations | - a list of operations that needs to be executed atomically (batch LightWeight Transaction)</br> - a batch of operations targetting the same partition (as an advanced optimisation) |

## CQL statements - operations (based on what the CQL string contains):
## CQL statements - operations (based on what the CQL string contains)

| CQL data manipulation statement | Recommended statement kind | Recommended Session operation |
|------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
Expand All @@ -86,9 +86,10 @@ This is **NOT** strictly related to content of the CQL query string.

For more detailed comparison and more best practices, see [doc page about paging](paged.md).

### Queries are fully asynchronous - you can run as many of them in parallel as you wish.
### Queries are fully asynchronous - you can run as many of them in parallel as you wish

## `USE KEYSPACE`

## `USE KEYSPACE`:
There is a special functionality to enable [USE keyspace](usekeyspace.md).

```{eval-rst}
Expand All @@ -106,4 +107,5 @@ There is a special functionality to enable [USE keyspace](usekeyspace.md).
schema-agreement
lwt
timeouts
timestamp-generators
```
41 changes: 41 additions & 0 deletions docs/source/queries/timestamp-generators.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Timestamp generators

If you want to generate timestamps on the client side you can provide
a TimestampGenerator to a SessionBuilder when creating a Session. Then
every executed statement will have attached a new timestamp generated
by the provided TimestampGenerator, as longas the statement did not
already have a timestamp provided (e.g. by using the `TIMESTAMP` clause).

Comment on lines +1 to +8
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 I don't think this is precise. The timestamp will be attached to the query, but the server will ignore it.
It would be good to describe the precedence here:

  1. USING TIMESTAMP in the query itself
  2. Timestamp manually set on the query
  3. Timestamp from the generator

## Monotonic Timestamp Generator

Most basic client-side timestamp generator. Guarantees monotonic timestamps
based on the system clock, with automatic timestamp incrementation
if the system clock timestamp would not be monotonic. If the clock skew
exceeds warning_threshold of the generator (provided in the constructor, 1s by default)
user will be warned with timestamp generation with warning_interval cooldown period
(provided in the constructor, 1s by default) to not spam the user.
Comment on lines +9 to +16
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 I wouldn't call it "most basic" - it is actually quite complicated. The most basic would be the one I proposed in another comment, which just naively uses SystemTime::now.


``` rust
# extern crate scylla;
# use std::error::Error;
# async fn check_only_compiles() -> Result<(), Box<dyn std::error::Error>> {
use scylla::{Session, SessionBuilder, query::Query};
use scylla::transport::timestamp_generator::MonotonicTimestampGenerator;
use std::sync::Arc;
use std::time::Duration;

let session: Session = SessionBuilder::new()
.known_node("127.0.0.1:9042")
.timestamp_generator(Arc::new(MonotonicTimestampGenerator::new()))
.build()
.await?;

// This query will have a timestamp generated
// by the monotonic timestamp generator
let my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)");
let to_insert: i32 = 12345;
session.query_unpaged(my_query, (to_insert,)).await?;
# Ok(())
# }


41 changes: 38 additions & 3 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use super::iterator::QueryPager;
use super::locator::tablets::{RawTablet, TabletParsingError};
use super::query_result::QueryResult;
use super::session::AddressTranslator;
use super::timestamp_generator::TimestampGenerator;
use super::topology::{PeerEndpoint, UntranslatedEndpoint, UntranslatedPeer};
use super::NodeAddr;
#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -565,6 +566,7 @@ pub(crate) struct ConnectionConfig {
pub(crate) compression: Option<Compression>,
pub(crate) tcp_nodelay: bool,
pub(crate) tcp_keepalive_interval: Option<Duration>,
pub(crate) timestamp_generator: Option<Arc<dyn TimestampGenerator>>,
#[cfg(feature = "ssl")]
pub(crate) ssl_config: Option<SslConfig>,
pub(crate) connect_timeout: std::time::Duration,
Expand All @@ -590,6 +592,7 @@ impl Default for ConnectionConfig {
compression: None,
tcp_nodelay: true,
tcp_keepalive_interval: None,
timestamp_generator: None,
event_sender: None,
#[cfg(feature = "ssl")]
ssl_config: None,
Expand Down Expand Up @@ -1047,6 +1050,17 @@ impl Connection {
page_size: Option<PageSize>,
paging_state: PagingState,
) -> Result<QueryResponse, UserRequestError> {
let mut timestamp = None;
if query.get_timestamp().is_none() {
if let Some(x) = self.config.timestamp_generator.as_ref() {
timestamp = Some(x.next_timestamp().await);
}
}

if timestamp.is_none() {
timestamp = query.get_timestamp()
}

let query_frame = query::Query {
contents: Cow::Borrowed(&query.contents),
parameters: query::QueryParameters {
Expand All @@ -1056,7 +1070,7 @@ impl Connection {
page_size: page_size.map(Into::into),
paging_state,
skip_metadata: false,
timestamp: query.get_timestamp(),
timestamp,
},
};

Expand Down Expand Up @@ -1109,14 +1123,25 @@ impl Connection {
page_size: Option<PageSize>,
paging_state: PagingState,
) -> Result<QueryResponse, UserRequestError> {
let mut timestamp = None;
if prepared_statement.get_timestamp().is_none() {
if let Some(x) = self.config.timestamp_generator.as_ref() {
timestamp = Some(x.next_timestamp().await);
}
}

if timestamp.is_none() {
timestamp = prepared_statement.get_timestamp()
}

let execute_frame = execute::Execute {
id: prepared_statement.get_id().to_owned(),
parameters: query::QueryParameters {
consistency,
serial_consistency,
values: Cow::Borrowed(values),
page_size: page_size.map(Into::into),
timestamp: prepared_statement.get_timestamp(),
timestamp,
skip_metadata: prepared_statement.get_use_cached_result_metadata(),
paging_state,
},
Expand Down Expand Up @@ -1248,14 +1273,24 @@ impl Connection {
});

let values = RawBatchValuesAdapter::new(values, contexts);
let mut timestamp = None;
if batch.get_timestamp().is_none() {
if let Some(x) = self.config.timestamp_generator.as_ref() {
timestamp = Some(x.next_timestamp().await);
}
}

if timestamp.is_none() {
timestamp = batch.get_timestamp()
}

let batch_frame = batch::Batch {
statements: Cow::Borrowed(&batch.statements),
values,
batch_type: batch.get_type(),
consistency,
serial_consistency,
timestamp: batch.get_timestamp(),
timestamp,
};

loop {
Expand Down
1 change: 1 addition & 0 deletions scylla/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod retry_policy;
pub mod session;
pub mod session_builder;
pub mod speculative_execution;
pub mod timestamp_generator;
pub mod topology;

pub use crate::frame::{Authenticator, Compression};
Expand Down
7 changes: 7 additions & 0 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use super::node::{InternalKnownNode, KnownNode};
use super::partitioner::PartitionerName;
use super::query_result::MaybeFirstRowError;
use super::query_result::RowsError;
use super::timestamp_generator::TimestampGenerator;
use super::topology::UntranslatedPeer;
use super::{NodeRef, SelfIdentity};
use crate::frame::response::result;
Expand Down Expand Up @@ -270,6 +271,10 @@ pub struct SessionConfig {
/// Generally, this options is best left as default (false).
pub disallow_shard_aware_port: bool,

// Timestamp generator used for generating timestamps on the client-side
// If None, server-side timestamps are used.
pub timestamp_generator: Option<Arc<dyn TimestampGenerator>>,

/// If empty, fetch all keyspaces
pub keyspaces_to_fetch: Vec<String>,

Expand Down Expand Up @@ -382,6 +387,7 @@ impl SessionConfig {
connect_timeout: Duration::from_secs(5),
connection_pool_size: Default::default(),
disallow_shard_aware_port: false,
timestamp_generator: None,
keyspaces_to_fetch: Vec::new(),
fetch_schema_metadata: true,
keepalive_interval: Some(Duration::from_secs(30)),
Expand Down Expand Up @@ -1074,6 +1080,7 @@ where
compression: config.compression,
tcp_nodelay: config.tcp_nodelay,
tcp_keepalive_interval: config.tcp_keepalive_interval,
timestamp_generator: config.timestamp_generator,
#[cfg(feature = "ssl")]
ssl_config: config.ssl_context.map(SslConfig::new_with_global_context),
authenticator: config.authenticator.clone(),
Expand Down
22 changes: 22 additions & 0 deletions scylla/src/transport/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::session::{
AddressTranslator, CurrentDeserializationApi, GenericSession, LegacyDeserializationApi,
SessionConfig,
};
use super::timestamp_generator::TimestampGenerator;
use super::Compression;

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -663,6 +664,27 @@ impl<K: SessionBuilderKind> GenericSessionBuilder<K> {
self
}

/// Set the timestamp generator that will generate timestamps on the client-side.
///
/// # Example
/// ```
/// # use scylla::{Session, SessionBuilder};
/// # use scylla::transport::timestamp_generator::MonotonicTimestampGenerator;
/// # use std::sync::Arc;
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let session: Session = SessionBuilder::new()
/// .known_node("127.0.0.1:9042")
/// .timestamp_generator(Arc::new(MonotonicTimestampGenerator::new()))
/// .build()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn timestamp_generator(mut self, timestamp_generator: Arc<dyn TimestampGenerator>) -> Self {
self.config.timestamp_generator = Some(timestamp_generator);
self
}

/// Set the keyspaces to be fetched, to retrieve their strategy, and schema metadata if enabled
/// No keyspaces, the default value, means all the keyspaces will be fetched.
///
Expand Down
Loading
Loading