Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make gen timestamp deterministic #8619

Merged
merged 2 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod varchar;
use std::time::Duration;

use anyhow::Result;
use chrono::{DateTime, FixedOffset};
pub use numeric::*;
use serde_json::Value;
pub use timestamp::*;
Expand Down Expand Up @@ -155,11 +156,13 @@ impl FieldGeneratorImpl {
}

pub fn with_timestamp(
base: Option<DateTime<FixedOffset>>,
max_past: Option<String>,
max_past_mode: Option<String>,
seed: u64,
) -> Result<Self> {
Ok(FieldGeneratorImpl::Timestamp(TimestampField::new(
base,
max_past,
max_past_mode,
seed,
Expand Down Expand Up @@ -293,7 +296,7 @@ mod tests {
let mut generator = match data_type {
DataType::Varchar => FieldGeneratorImpl::with_varchar(None, seed).unwrap(),
DataType::Timestamp => {
FieldGeneratorImpl::with_timestamp(None, None, seed).unwrap()
FieldGeneratorImpl::with_timestamp(None, None, None, seed).unwrap()
}
_ => FieldGeneratorImpl::with_number_random(data_type, None, None, seed).unwrap(),
};
Expand Down Expand Up @@ -321,4 +324,16 @@ mod tests {
assert_eq!(datum2_new, datum2);
}
}

#[test]
fn test_deterministic_timestamp() {
let seed = 1234;
let base_time: DateTime<FixedOffset> =
DateTime::parse_from_rfc3339("2020-01-01T00:00:00+00:00").unwrap();
let mut generator =
FieldGeneratorImpl::with_timestamp(Some(base_time), None, None, seed).unwrap();
let val1 = generator.generate_json(1);
let val2 = generator.generate_json(1);
assert_eq!(val1, val2);
}
}
18 changes: 12 additions & 6 deletions src/common/src/field_generator/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ enum LocalNow {
}

pub struct TimestampField {
base: Option<DateTime<FixedOffset>>,
max_past: Duration,
local_now: LocalNow,
seed: u64,
}

impl TimestampField {
pub fn new(
base: Option<DateTime<FixedOffset>>,
max_past_option: Option<String>,
max_past_mode: Option<String>,
seed: u64,
Expand All @@ -61,6 +63,7 @@ impl TimestampField {
};
debug!(?local_now, ?max_past, "parse timestamp field option");
Ok(Self {
base,
// convert to chrono::Duration
max_past: chrono::Duration::from_std(max_past)?,
local_now,
Expand All @@ -72,12 +75,15 @@ impl TimestampField {
let milliseconds = self.max_past.num_milliseconds();
let mut rng = StdRng::seed_from_u64(offset ^ self.seed);
let max_milliseconds = rng.gen_range(0..=milliseconds);
let now = match self.local_now {
LocalNow::Relative => Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))
.unwrap(),
LocalNow::Absolute(now) => now,
let now = match self.base {
Some(base) => base.naive_local(),
None => match self.local_now {
LocalNow::Relative => Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))
.unwrap(),
LocalNow::Absolute(now) => now,
},
};
now - Duration::milliseconds(max_milliseconds)
}
Expand Down
18 changes: 16 additions & 2 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;

use anyhow::Result;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
Expand Down Expand Up @@ -204,8 +204,22 @@ fn generator_from_data_type(
let max_past_mode_value = fields_option_map
.get(&max_past_mode_key)
.map(|s| s.to_lowercase());
let basetime = match fields_option_map.get(format!("fields.{}.basetime", name).as_str())
{
Some(base) => {
Some(chrono::DateTime::parse_from_rfc3339(base).map_err(|e| {
anyhow!("cannot parse {:?} to rfc3339 due to {:?}", base, e)
})?)
}
None => None,
};

FieldGeneratorImpl::with_timestamp(max_past_value, max_past_mode_value, random_seed)
FieldGeneratorImpl::with_timestamp(
basetime,
max_past_value,
max_past_mode_value,
random_seed,
)
}
DataType::Varchar => {
let length_key = format!("fields.{}.length", name);
Expand Down