-
-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingestion partition change #683
Ingestion partition change #683
Conversation
…of using server time
… of using server time
…i -- to allow ingestion/query using timestamp column from the log data instead of server time / p_timestamp store the time_partition field name in stream.json and in memory STREAM_INFO in ingest api - check if timestamp column name exists in the log data, if no, throw exception also, check if timestamp value can be parsed into datetime, if no, throw exception arrow file name gets the date, hr, mm from the timestamp field (if defined in stream) else file name gets the date, hr, mm from the server time parquet file name gets a random number attached to it -- this is because a lot of log data can have same date, hr, mm value of the timestamp field and with this random number, parquet will not get overwritten in the console, query from and to date will be matched against the value of the timestamp column of the log data (if defined in the stream), else from and to date will be matched against the p_timestamp column
Fixes #685 |
server/src/handlers/http/ingest.rs
Outdated
let mut ingestion_prefix_timestamp = Utc::now().naive_utc(); | ||
|
||
if time_partition.is_some() { | ||
let body_timestamp = body.get(&time_partition.clone().unwrap().to_string()); | ||
if body_timestamp.is_some() { | ||
if body_timestamp | ||
.unwrap() | ||
.to_owned() | ||
.as_str() | ||
.unwrap() | ||
.parse::<DateTime<Utc>>() | ||
.is_ok() | ||
{ | ||
ingestion_prefix_timestamp = body_timestamp | ||
.unwrap() | ||
.to_owned() | ||
.as_str() | ||
.unwrap() | ||
.parse::<DateTime<Utc>>() | ||
.unwrap() | ||
.naive_utc(); | ||
} else { | ||
return Err(PostError::Invalid(anyhow::Error::msg(format!( | ||
"field {} is not in the correct datetime format", | ||
body_timestamp.unwrap().to_owned().as_str().unwrap() | ||
)))); | ||
} | ||
} else { | ||
return Err(PostError::Invalid(anyhow::Error::msg(format!( | ||
"ingestion failed as field {} is not part of the log", | ||
time_partition.unwrap() | ||
)))); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be simplified,
.
.
.
let ingestion_prefix_timestamp = if let Some(time_partition) = time_partition {
let body_timestamp = body.get(&time_partition).ok_or(PostError::Invalid(anyhow::Error::msg(format!(
"ingestion failed as field {} is not part of the log",
time_partition
))))?;
body_timestamp.as_str().unwrap().parse::<DateTime<Utc>>().map_err(|_| PostError::Invalid(anyhow::Error::msg(format!(
"field {} is not in the correct datetime format",
body_timestamp.to_owned().as_str().unwrap()
))))?.naive_utc()
} else {
Utc::now().naive_utc()
};
.
.
.
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the logic has changed now, please review again
review comments incorporated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from that one function else LGTM
…seablehq#683) This PR adds enhancement to use a user provided timestamp for partition in ingesting logs instead of using server time. User needs to add custom header X-P-Time-Partition (optional) at stream creation api to allow ingestion/query using timestamp column from the log data instead of server time p_timestamp This is time_partition field name is stored in stream.json and in memory STREAM_INFO in ingest api. Server checks if timestamp column name exists in the log event, if not, throw exception. Also, checks if timestamp value can be parsed into datetime, if not, throw exception arrow file name gets the date, hr, mm from the timestamp field (if defined in stream) else file name gets the date, hr, mm from the server time parquet file name gets a random number attached to it. This is because a lot of log data can have same date, hr, mm value of the timestamp field and with this random number, parquet will not get overwritten in the console, query from and to date will be matched against the value of the timestamp column of the log data (if defined in the stream), else from and to date will be matched against the p_timestamp column. Fixes parseablehq#671 Fixes parseablehq#685
add custom header X-P-Time-Partition (optional) at stream creation api -- to allow ingestion/query using timestamp column from the log data instead of server time / p_timestamp
store the time_partition field name in stream.json and in memory STREAM_INFO
in ingest api - check if timestamp column name exists in the log data, if no, throw exception
also, check if timestamp value can be parsed into datetime, if no, throw exception
arrow file name gets the date, hr, mm from the timestamp field (if defined in stream) else file name gets the date, hr, mm from the server time
parquet file name gets a random number attached to it -- this is because a lot of log data can have same date, hr, mm value of the timestamp field and with this random number, parquet will not get overwritten
in the console, query from and to date will be matched against the value of the timestamp column of the log data (if defined in the stream), else from and to date will be matched against the p_timestamp column