Skip to content

Commit

Permalink
fix for time partition limit
Browse files Browse the repository at this point in the history
additional header to be provided
X-P-Time-Partition-Limit with a value of unsigned integer with ending 'd'
eg. 90d for 90 days
if not provided, default constraint of 30 days will be applied
using this, user can ingest logs older than 30 days as well

fixes #752
  • Loading branch information
nikhilsinhaparseable committed May 5, 2024
1 parent ebf2c16 commit 51d3a2b
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 28 deletions.
2 changes: 1 addition & 1 deletion server/src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl EventFormat for Event {
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data, None, false)?;
let data = flatten_json_body(self.data, None, None, false)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand Down
1 change: 1 addition & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';
Expand Down
8 changes: 7 additions & 1 deletion server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
.map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?;

let time_partition = object_store_format.time_partition;
let time_partition_limit = object_store_format.time_partition_limit;
let static_schema_flag = object_store_format.static_schema_flag;
let body_val: Value = serde_json::from_slice(&body)?;
let size: usize = body.len();
Expand All @@ -129,7 +130,11 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
.process()
.await?;
} else {
let data = convert_array_to_object(body_val.clone(), time_partition.clone())?;
let data = convert_array_to_object(
body_val.clone(),
time_partition.clone(),
time_partition_limit,
)?;
for value in data {
let body_timestamp = value.get(&time_partition.clone().unwrap().to_string());
parsed_timestamp = body_timestamp
Expand Down Expand Up @@ -210,6 +215,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
stream_name.to_string(),
"",
"",
"",
Arc::new(Schema::empty()),
)
.await?;
Expand Down
39 changes: 37 additions & 2 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::base_path_without_preceding_slash;
use super::cluster::fetch_stats_from_ingestors;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use crate::alerts::Alerts;
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY};
use crate::metadata::STREAM_INFO;
use crate::option::{Mode, CONFIG};
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
Expand All @@ -40,6 +40,7 @@ use itertools::Itertools;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::num::NonZeroU32;
use std::sync::Arc;

pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
Expand Down Expand Up @@ -191,6 +192,29 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
} else {
""
};
let mut time_partition_in_days: &str = "";
if let Some((_, time_partition_limit_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == TIME_PARTITION_LIMIT_KEY)
{
let time_partition_limit = time_partition_limit_name.to_str().unwrap();
if !time_partition_limit.ends_with('d') {
return Err(StreamError::Custom {
msg: "missing 'd' suffix for duration value".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
if days.parse::<NonZeroU32>().is_err() {
return Err(StreamError::Custom {
msg: "could not convert duration to an unsigned number".to_string(),
status: StatusCode::BAD_REQUEST,
});
} else {
time_partition_in_days = days;
}
}
let static_schema_flag = if let Some((_, static_schema_flag)) = req
.headers()
.iter()
Expand Down Expand Up @@ -235,7 +259,14 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
});
}

create_stream(stream_name, time_partition, static_schema_flag, schema).await?;
create_stream(
stream_name,
time_partition,
time_partition_in_days,
static_schema_flag,
schema,
)
.await?;

Ok(("log stream created", StatusCode::OK))
}
Expand Down Expand Up @@ -516,6 +547,7 @@ fn remove_id_from_alerts(value: &mut Value) {
pub async fn create_stream(
stream_name: String,
time_partition: &str,
time_partition_limit: &str,
static_schema_flag: &str,
schema: Arc<Schema>,
) -> Result<(), CreateStreamError> {
Expand All @@ -528,6 +560,7 @@ pub async fn create_stream(
.create_stream(
&stream_name,
time_partition,
time_partition_limit,
static_schema_flag,
schema.clone(),
)
Expand Down Expand Up @@ -557,6 +590,7 @@ pub async fn create_stream(
stream_name.to_string(),
created_at,
time_partition.to_string(),
time_partition_limit.to_string(),
static_schema_flag.to_string(),
static_schema,
);
Expand Down Expand Up @@ -595,6 +629,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
created_at: stream_meta.created_at.clone(),
first_event_at: stream_meta.first_event_at.clone(),
time_partition: stream_meta.time_partition.clone(),
time_partition_limit: stream_meta.time_partition_limit.clone(),
cache_enabled: stream_meta.cache_enabled,
static_schema_flag: stream_meta.static_schema_flag.clone(),
};
Expand Down
11 changes: 9 additions & 2 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
use crate::alerts::Alerts;
use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE};
use crate::storage::{LogStream, ObjectStorage, StorageDir};
use crate::utils::arrow::MergedRecordReader;

use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
use derive_more::{Deref, DerefMut};

// TODO: make return type be of 'static lifetime instead of cloning
Expand All @@ -47,6 +46,7 @@ pub struct LogStreamMetadata {
pub created_at: String,
pub first_event_at: Option<String>,
pub time_partition: Option<String>,
pub time_partition_limit: Option<String>,
pub static_schema_flag: Option<String>,
}

Expand Down Expand Up @@ -166,6 +166,7 @@ impl StreamInfo {
stream_name: String,
created_at: String,
time_partition: String,
time_partition_limit: String,
static_schema_flag: String,
static_schema: HashMap<String, Arc<Field>>,
) {
Expand All @@ -181,6 +182,11 @@ impl StreamInfo {
} else {
Some(time_partition)
},
time_partition_limit: if time_partition_limit.is_empty() {
None
} else {
Some(time_partition_limit)
},
static_schema_flag: if static_schema_flag != "true" {
None
} else {
Expand Down Expand Up @@ -237,6 +243,7 @@ impl StreamInfo {
created_at: meta.created_at,
first_event_at: meta.first_event_at,
time_partition: meta.time_partition,
time_partition_limit: meta.time_partition_limit,
static_schema_flag: meta.static_schema_flag,
};

Expand Down
10 changes: 7 additions & 3 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@ mod s3;
pub mod staging;
mod store_metadata;

use self::retention::Retention;
pub use self::staging::StorageDir;
pub use localfs::FSConfig;
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
pub use s3::S3Config;
pub use store_metadata::{
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
};

use self::retention::Retention;
pub use self::staging::StorageDir;

// metadata file names in a Stream prefix
pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
Expand Down Expand Up @@ -94,6 +93,8 @@ pub struct ObjectStoreFormat {
#[serde(skip_serializing_if = "Option::is_none")]
pub time_partition: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub time_partition_limit: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub static_schema_flag: Option<String>,
}

Expand All @@ -109,6 +110,8 @@ pub struct StreamInfo {
#[serde(skip_serializing_if = "Option::is_none")]
pub time_partition: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub time_partition_limit: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub static_schema_flag: Option<String>,
}

Expand Down Expand Up @@ -155,6 +158,7 @@ impl Default for ObjectStoreFormat {
cache_enabled: false,
retention: None,
time_partition: None,
time_partition_limit: None,
static_schema_flag: None,
}
}
Expand Down
6 changes: 6 additions & 0 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub trait ObjectStorage: Sync + 'static {
&self,
stream_name: &str,
time_partition: &str,
time_partition_limit: &str,
static_schema_flag: &str,
schema: Arc<Schema>,
) -> Result<(), ObjectStorageError> {
Expand All @@ -139,6 +140,11 @@ pub trait ObjectStorage: Sync + 'static {
} else {
format.time_partition = Some(time_partition.to_string());
}
if time_partition_limit.is_empty() {
format.time_partition_limit = None;
} else {
format.time_partition_limit = Some(time_partition_limit.to_string());
}
if static_schema_flag != "true" {
format.static_schema_flag = None;
} else {
Expand Down
12 changes: 10 additions & 2 deletions server/src/utils/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,24 @@ pub mod flatten;
pub fn flatten_json_body(
body: serde_json::Value,
time_partition: Option<String>,
time_partition_limit: Option<String>,
validation_required: bool,
) -> Result<Value, anyhow::Error> {
flatten::flatten(body, "_", time_partition, validation_required)
flatten::flatten(
body,
"_",
time_partition,
time_partition_limit,
validation_required,
)
}

pub fn convert_array_to_object(
body: Value,
time_partition: Option<String>,
time_partition_limit: Option<String>,
) -> Result<Vec<Value>, anyhow::Error> {
let data = flatten_json_body(body, time_partition, true)?;
let data = flatten_json_body(body, time_partition, time_partition_limit, true)?;
let value_arr = match data {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
Expand Down
Loading

0 comments on commit 51d3a2b

Please sign in to comment.