Skip to content

Commit

Permalink
feat: make gen timestamp deterministic (risingwavelabs#8619)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <tabvision@bupt.icu>
  • Loading branch information
tabVersion authored Mar 17, 2023
1 parent 18863e0 commit bba43ba
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 9 deletions.
17 changes: 16 additions & 1 deletion src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod varchar;
use std::time::Duration;

use anyhow::Result;
use chrono::{DateTime, FixedOffset};
pub use numeric::*;
use serde_json::Value;
pub use timestamp::*;
Expand Down Expand Up @@ -155,11 +156,13 @@ impl FieldGeneratorImpl {
}

pub fn with_timestamp(
base: Option<DateTime<FixedOffset>>,
max_past: Option<String>,
max_past_mode: Option<String>,
seed: u64,
) -> Result<Self> {
Ok(FieldGeneratorImpl::Timestamp(TimestampField::new(
base,
max_past,
max_past_mode,
seed,
Expand Down Expand Up @@ -293,7 +296,7 @@ mod tests {
let mut generator = match data_type {
DataType::Varchar => FieldGeneratorImpl::with_varchar(None, seed).unwrap(),
DataType::Timestamp => {
FieldGeneratorImpl::with_timestamp(None, None, seed).unwrap()
FieldGeneratorImpl::with_timestamp(None, None, None, seed).unwrap()
}
_ => FieldGeneratorImpl::with_number_random(data_type, None, None, seed).unwrap(),
};
Expand Down Expand Up @@ -321,4 +324,16 @@ mod tests {
assert_eq!(datum2_new, datum2);
}
}

#[test]
fn test_deterministic_timestamp() {
let seed = 1234;
let base_time: DateTime<FixedOffset> =
DateTime::parse_from_rfc3339("2020-01-01T00:00:00+00:00").unwrap();
let mut generator =
FieldGeneratorImpl::with_timestamp(Some(base_time), None, None, seed).unwrap();
let val1 = generator.generate_json(1);
let val2 = generator.generate_json(1);
assert_eq!(val1, val2);
}
}
18 changes: 12 additions & 6 deletions src/common/src/field_generator/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ enum LocalNow {
}

pub struct TimestampField {
base: Option<DateTime<FixedOffset>>,
max_past: Duration,
local_now: LocalNow,
seed: u64,
}

impl TimestampField {
pub fn new(
base: Option<DateTime<FixedOffset>>,
max_past_option: Option<String>,
max_past_mode: Option<String>,
seed: u64,
Expand All @@ -61,6 +63,7 @@ impl TimestampField {
};
debug!(?local_now, ?max_past, "parse timestamp field option");
Ok(Self {
base,
// convert to chrono::Duration
max_past: chrono::Duration::from_std(max_past)?,
local_now,
Expand All @@ -72,12 +75,15 @@ impl TimestampField {
let milliseconds = self.max_past.num_milliseconds();
let mut rng = StdRng::seed_from_u64(offset ^ self.seed);
let max_milliseconds = rng.gen_range(0..=milliseconds);
let now = match self.local_now {
LocalNow::Relative => Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))
.unwrap(),
LocalNow::Absolute(now) => now,
let now = match self.base {
Some(base) => base.naive_local(),
None => match self.local_now {
LocalNow::Relative => Local::now()
.naive_local()
.duration_round(Duration::microseconds(1))
.unwrap(),
LocalNow::Absolute(now) => now,
},
};
now - Duration::milliseconds(max_milliseconds)
}
Expand Down
18 changes: 16 additions & 2 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;

use anyhow::Result;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
Expand Down Expand Up @@ -204,8 +204,22 @@ fn generator_from_data_type(
let max_past_mode_value = fields_option_map
.get(&max_past_mode_key)
.map(|s| s.to_lowercase());
let basetime = match fields_option_map.get(format!("fields.{}.basetime", name).as_str())
{
Some(base) => {
Some(chrono::DateTime::parse_from_rfc3339(base).map_err(|e| {
anyhow!("cannot parse {:?} to rfc3339 due to {:?}", base, e)
})?)
}
None => None,
};

FieldGeneratorImpl::with_timestamp(max_past_value, max_past_mode_value, random_seed)
FieldGeneratorImpl::with_timestamp(
basetime,
max_past_value,
max_past_mode_value,
random_seed,
)
}
DataType::Varchar => {
let length_key = format!("fields.{}.length", name);
Expand Down

0 comments on commit bba43ba

Please sign in to comment.