Skip to content

Commit

Permalink
refactor: clean up parts of the codebase (#981)
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh authored Dec 20, 2024
1 parent 82a09eb commit b277775
Show file tree
Hide file tree
Showing 24 changed files with 587 additions and 703 deletions.
4 changes: 2 additions & 2 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub struct Report {
memory_total_bytes: u64,
platform: String,
storage_mode: String,
server_mode: String,
server_mode: Mode,
version: String,
commit_hash: String,
active_ingestors: u64,
Expand Down Expand Up @@ -112,7 +112,7 @@ impl Report {
memory_total_bytes: mem_total,
platform: platform().to_string(),
storage_mode: CONFIG.get_storage_mode_string().to_string(),
server_mode: CONFIG.parseable.mode.to_string(),
server_mode: CONFIG.parseable.mode,
version: current().released_version.to_string(),
commit_hash: current().commit_hash,
active_ingestors: ingestor_metrics.0,
Expand Down
20 changes: 6 additions & 14 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,20 +527,12 @@ impl FromArgMatches for Cli {
.get_one::<usize>(Self::ROW_GROUP_SIZE)
.cloned()
.expect("default for row_group size");
self.parquet_compression = match m
.get_one::<String>(Self::PARQUET_COMPRESSION_ALGO)
.expect("default for compression algo")
.as_str()
{
"uncompressed" => Compression::UNCOMPRESSED,
"snappy" => Compression::SNAPPY,
"gzip" => Compression::GZIP,
"lzo" => Compression::LZO,
"brotli" => Compression::BROTLI,
"lz4" => Compression::LZ4,
"zstd" => Compression::ZSTD,
_ => unreachable!(),
};
self.parquet_compression = serde_json::from_str(&format!(
"{:?}",
m.get_one::<String>(Self::PARQUET_COMPRESSION_ALGO)
.expect("default for compression algo")
))
.expect("unexpected compression algo");

let openid_client_id = m.get_one::<String>(Self::OPENID_CLIENT_ID).cloned();
let openid_client_secret = m.get_one::<String>(Self::OPENID_CLIENT_SECRET).cloned();
Expand Down
12 changes: 6 additions & 6 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ impl EventFormat for Event {
// also extract the arrow schema, tags and metadata from the incoming json
fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data, None, None, None, false)?;
let data = flatten_json_body(&self.data, None, None, None, false)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand All @@ -66,13 +66,13 @@ impl EventFormat for Event {
collect_keys(value_arr.iter()).expect("fields can be collected from array of objects");

let mut is_first = false;
let schema = match derive_arrow_schema(&stream_schema, fields) {
let schema = match derive_arrow_schema(stream_schema, fields) {
Ok(schema) => schema,
Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) {
Ok(mut infer_schema) => {
let new_infer_schema = super::super::format::update_field_type_in_schema(
Arc::new(infer_schema),
Some(&stream_schema),
Some(stream_schema),
time_partition,
Some(&value_arr),
);
Expand Down
31 changes: 15 additions & 16 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,21 @@ pub trait EventFormat: Sized {

fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;

fn into_recordbatch(
self,
storage_schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(
storage_schema.clone(),
static_schema_flag.clone(),
time_partition.clone(),
)?;
let (data, mut schema, is_first, tags, metadata) =
self.to_data(storage_schema, static_schema_flag, time_partition)?;

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
Expand Down Expand Up @@ -120,8 +119,8 @@ pub trait EventFormat: Sized {

fn is_schema_matching(
new_schema: Arc<Schema>,
storage_schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
) -> bool {
if static_schema_flag.is_none() {
return true;
Expand Down Expand Up @@ -207,7 +206,7 @@ pub fn override_timestamp_fields(
pub fn update_field_type_in_schema(
inferred_schema: Arc<Schema>,
existing_schema: Option<&HashMap<String, Arc<Field>>>,
time_partition: Option<String>,
time_partition: Option<&String>,
log_records: Option<&Vec<Value>>,
) -> Arc<Schema> {
let mut updated_schema = inferred_schema.clone();
Expand Down Expand Up @@ -236,12 +235,12 @@ pub fn update_field_type_in_schema(
if time_partition.is_none() {
return updated_schema;
}
let time_partition_field_name = time_partition.unwrap();

let new_schema: Vec<Field> = updated_schema
.fields()
.iter()
.map(|field| {
if *field.name() == time_partition_field_name {
if field.name() == time_partition.unwrap() {
if field.data_type() == &DataType::Utf8 {
let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None);
Field::new(field.name().clone(), new_data_type, true)
Expand Down
34 changes: 17 additions & 17 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
}
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

flatten_and_push_logs(req, body, stream_name).await?;
flatten_and_push_logs(req, body, &stream_name).await?;
Ok(HttpResponse::Ok().finish())
} else {
Err(PostError::Header(ParseHeaderError::MissingStreamName))
Expand All @@ -84,7 +84,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
tags: String::default(),
metadata: String::default(),
};
event.into_recordbatch(schema, None, None)?
event.into_recordbatch(&schema, None, None)?
};
event::Event {
rb,
Expand Down Expand Up @@ -114,9 +114,9 @@ pub async fn handle_otel_ingestion(
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
push_logs(stream_name.to_string(), req.clone(), body).await?;
let stream_name = stream_name.to_str().unwrap();
create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?;
push_logs(stream_name, &req, &body).await?;
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Expand Down Expand Up @@ -149,7 +149,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
}
}

flatten_and_push_logs(req, body, stream_name).await?;
flatten_and_push_logs(req, body, &stream_name).await?;
Ok(HttpResponse::Ok().finish())
}

Expand Down Expand Up @@ -319,7 +319,7 @@ mod tests {
.append_header((PREFIX_META.to_string() + "C", "meta1"))
.to_http_request();

let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -359,7 +359,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
Expand Down Expand Up @@ -391,7 +391,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
Expand Down Expand Up @@ -423,7 +423,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

assert!(into_event_batch(req, json, schema, None, None).is_err());
assert!(into_event_batch(&req, &json, schema, None, None).is_err());
}

#[test]
Expand All @@ -441,7 +441,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand All @@ -453,7 +453,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

assert!(into_event_batch(req, json, HashMap::default(), None, None).is_err())
assert!(into_event_batch(&req, &json, HashMap::default(), None, None).is_err())
}

#[test]
Expand All @@ -476,7 +476,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -524,7 +524,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -572,7 +572,7 @@ mod tests {
);
let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -621,7 +621,7 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(req, json, schema, None, None).is_err());
assert!(into_event_batch(&req, &json, schema, None, None).is_err());
}

#[test]
Expand Down Expand Up @@ -649,7 +649,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();

assert_eq!(rb.num_rows(), 4);
assert_eq!(rb.num_columns(), 7);
Expand Down
4 changes: 1 addition & 3 deletions src/handlers/http/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ use std::collections::BTreeMap;
use std::str;

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct Message {
#[serde(rename = "records")]
records: Vec<Data>,
#[serde(rename = "requestId")]
request_id: String,
timestamp: u64,
}
#[derive(Serialize, Deserialize, Debug)]
struct Data {
#[serde(rename = "data")]
data: String,
}

Expand Down
8 changes: 4 additions & 4 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,8 @@ pub async fn put_stream_hot_tier(
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, &hottier.size)
.await?;
hottier.used_size = Some(existing_hot_tier_used_size.to_string());
hottier.available_size = Some(hottier.size.clone());
hottier.used_size = existing_hot_tier_used_size.to_string();
hottier.available_size = hottier.size.to_string();
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
Expand Down Expand Up @@ -658,8 +658,8 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, Str
if let Some(hot_tier_manager) = HotTierManager::global() {
let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
hot_tier.size = format!("{} {}", hot_tier.size, "Bytes");
hot_tier.used_size = Some(format!("{} {}", hot_tier.used_size.unwrap(), "Bytes"));
hot_tier.available_size = Some(format!("{} {}", hot_tier.available_size.unwrap(), "Bytes"));
hot_tier.used_size = format!("{} Bytes", hot_tier.used_size);
hot_tier.available_size = format!("{} Bytes", hot_tier.available_size);
Ok((web::Json(hot_tier), StatusCode::OK))
} else {
Err(StreamError::Custom {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use crate::{
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct Message {
#[serde(rename = "commonAttributes")]
common_attributes: CommonAttributes,
}

Expand Down
9 changes: 2 additions & 7 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use crate::sync;

use crate::{handlers::http::base_path, option::CONFIG};
use actix_web::body::MessageBody;
use actix_web::web;
use actix_web::web::resource;
use actix_web::Scope;
Expand Down Expand Up @@ -309,9 +308,7 @@ impl IngestServer {
.clone_from(&INGESTOR_META.domain_name);
store_data.port.clone_from(&INGESTOR_META.port);

let resource = serde_json::to_string(&store_data)?
.try_into_bytes()
.map_err(|err| anyhow!(err))?;
let resource = Bytes::from(serde_json::to_vec(&store_data)?);

// if pushing to object store fails propagate the error
return store
Expand All @@ -320,9 +317,7 @@ impl IngestServer {
.map_err(|err| anyhow!(err));
}
} else {
let resource = serde_json::to_string(&resource)?
.try_into_bytes()
.map_err(|err| anyhow!(err))?;
let resource = Bytes::from(serde_json::to_vec(&resource)?);

store.put_object(&path, resource).await?;
}
Expand Down
6 changes: 2 additions & 4 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ impl IngestorMetadata {
#[cfg(test)]
mod test {
use actix_web::body::MessageBody;
use bytes::Bytes;
use rstest::rstest;

use super::{IngestorMetadata, DEFAULT_VERSION};
Expand Down Expand Up @@ -256,10 +257,7 @@ mod test {
"8002".to_string(),
);

let lhs = serde_json::to_string(&im)
.unwrap()
.try_into_bytes()
.unwrap();
let lhs = Bytes::from(serde_json::to_vec(&im).unwrap());
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"#
.try_into_bytes()
.unwrap();
Expand Down
Loading

0 comments on commit b277775

Please sign in to comment.