Skip to content

Commit

Permalink
fix: removed time partition logic from ingestion flow (#703)
Browse files Browse the repository at this point in the history
* removed the X-P-Time-Partition header in log stream creation API
* removed the logic that partitions the ingested log based on the 
X-P-Time-Partition header value of which was stored in `stream.json`
* Query still uses the logic to make query compatible with the external 
tool that ingests based on time partition
  • Loading branch information
nikhilsinhaparseable authored Mar 14, 2024
1 parent a04dd4f commit 72502ad
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 354 deletions.
94 changes: 27 additions & 67 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use relative_path::RelativePathBuf;

use crate::{
catalog::manifest::Manifest,
event::DEFAULT_TIMESTAMP_KEY,
query::PartialTimeFilter,
storage::{ObjectStorage, ObjectStorageError},
};
Expand Down Expand Up @@ -70,46 +69,25 @@ impl ManifestFile for manifest::File {
}
}

fn get_file_bounds(
file: &manifest::File,
partition_column: String,
) -> (DateTime<Utc>, DateTime<Utc>) {
if partition_column == DEFAULT_TIMESTAMP_KEY {
match file
.columns()
.iter()
.find(|col| col.name == partition_column)
.unwrap()
.stats
.as_ref()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.max)
.unwrap()
.and_utc(),
),
_ => unreachable!(),
}
} else {
match file
.columns()
.iter()
.find(|col| col.name == partition_column)
.unwrap()
.stats
.as_ref()
.unwrap()
{
column::TypedStatistics::String(stats) => (
stats.min.parse::<DateTime<Utc>>().unwrap(),
stats.max.parse::<DateTime<Utc>>().unwrap(),
),
_ => unreachable!(),
}
fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
match file
.columns()
.iter()
.find(|col| col.name == "p_timestamp")
.unwrap()
.stats
.clone()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.max)
.unwrap()
.and_utc(),
),
_ => unreachable!(),
}
}

Expand All @@ -121,17 +99,8 @@ pub async fn update_snapshot(
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
let time_partition = meta.time_partition;
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(&change, time_partition);
lower_bound
}
None => {
let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};

let (lower_bound, _) = get_file_bounds(&change);
let pos = manifests.iter().position(|item| {
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
});
Expand All @@ -140,18 +109,16 @@ pub async fn update_snapshot(
// This updates an existing file so there is no need to create a snapshot entry.
if let Some(pos) = pos {
let info = &mut manifests[pos];
let manifest_path =
partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);

let Some(mut manifest) = storage.get_manifest(&manifest_path).await? else {
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
let Some(mut manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
manifest.apply_change(change);
storage.put_manifest(&manifest_path, manifest).await?;
storage.put_manifest(&path, manifest).await?;
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
Expand Down Expand Up @@ -210,7 +177,6 @@ pub async fn get_first_event(
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let manifests = &mut meta.snapshot.manifest_list;
let time_partition = meta.time_partition;
if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
Expand All @@ -232,15 +198,9 @@ pub async fn get_first_event(
};

if let Some(first_event) = manifest.files.first() {
if let Some(time_partition) = time_partition {
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
return Ok(Some(first_event_at));
} else {
let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
return Ok(Some(first_event_at));
}
let (lower_bound, _) = get_file_bounds(first_event);
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
return Ok(Some(first_event_at));
}
Ok(None)
}
Expand Down
12 changes: 2 additions & 10 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use std::sync::Arc;
use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::metadata;
use chrono::NaiveDateTime;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
Expand All @@ -42,7 +41,6 @@ pub struct Event {
pub origin_format: &'static str,
pub origin_size: u64,
pub is_first_event: bool,
pub parsed_timestamp: NaiveDateTime,
}

// Events holds the schema related to a each event for a single log stream
Expand All @@ -55,12 +53,7 @@ impl Event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

Self::process_event(
&self.stream_name,
&key,
self.rb.clone(),
self.parsed_timestamp,
)?;
Self::process_event(&self.stream_name, &key, self.rb.clone())?;

metadata::STREAM_INFO.update_stats(
&self.stream_name,
Expand All @@ -87,9 +80,8 @@ impl Event {
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?;
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?;
Ok(())
}
}
Expand Down
29 changes: 11 additions & 18 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::utils;
use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter};
use arrow_array::{RecordBatch, TimestampMillisecondArray};
use arrow_schema::Schema;
use chrono::{NaiveDateTime, Utc};
use chrono::Utc;
use derive_more::{Deref, DerefMut};
use once_cell::sync::Lazy;

Expand All @@ -48,7 +48,6 @@ impl Writer {
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
let rb = utils::arrow::replace_columns(
rb.schema(),
Expand All @@ -57,8 +56,7 @@ impl Writer {
&[Arc::new(get_timestamp_array(rb.num_rows()))],
);

self.disk
.push(stream_name, schema_key, &rb, parsed_timestamp)?;
self.disk.push(stream_name, schema_key, &rb)?;
self.mem.push(schema_key, rb);
Ok(())
}
Expand All @@ -74,34 +72,29 @@ impl WriterTable {
stream_name: &str,
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
let hashmap_guard = self.read().unwrap();

match hashmap_guard.get(stream_name) {
Some(stream_writer) => {
stream_writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
stream_writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
}
None => {
drop(hashmap_guard);
let mut map = self.write().unwrap();
// check for race condition
// if map contains entry then just
if let Some(writer) = map.get(stream_name) {
writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
)?;
writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
} else {
let mut writer = Writer::default();
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
writer.push(stream_name, schema_key, record)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
}
}
Expand Down
34 changes: 21 additions & 13 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use chrono::NaiveDateTime;
use derive_more::{Deref, DerefMut};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
Expand All @@ -44,17 +43,27 @@ impl FileWriter {
stream_name: &str,
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(), StreamWriterError> {
let (path, writer) =
init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
file_path: path,
writer,
},
);
match self.get_mut(schema_key) {
Some(writer) => {
writer
.writer
.write(record)
.map_err(StreamWriterError::Writer)?;
}
// entry is not present thus we create it
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
file_path: path,
writer,
},
);
}
};

Ok(())
}
Expand All @@ -70,10 +79,9 @@ fn init_new_stream_writer_file(
stream_name: &str,
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
let dir = StorageDir::new(stream_name);
let path = dir.path_by_current_time(schema_key, parsed_timestamp);
let path = dir.path_by_current_time(schema_key);
std::fs::create_dir_all(dir.data_path)?;

let file = OpenOptions::new().create(true).append(true).open(&path)?;
Expand Down
1 change: 0 additions & 1 deletion server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const PREFIX_TAGS: &str = "x-p-tag-";
const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';

Expand Down
Loading

0 comments on commit 72502ad

Please sign in to comment.