From 855015288e1da5b7602cb204491ec9e09bf5fe9f Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 10 Dec 2024 10:59:05 -0500 Subject: [PATCH] fix: update data type of all numeric fields in storage schema to Float64 --- src/catalog/column.rs | 8 ++++++++ src/event/format/mod.rs | 38 +++++++++++++++++------------------ src/metadata.rs | 14 +++++++++++++ src/storage/object_storage.rs | 18 +++++++++++++++-- 4 files changed, 56 insertions(+), 22 deletions(-) diff --git a/src/catalog/column.rs b/src/catalog/column.rs index d5db2950d..ef4b5858b 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -66,6 +66,14 @@ impl TypedStatistics { max: max(this.max, other.max), }) } + + // Ints are casted to Float if self is Float and other in Int + (TypedStatistics::Float(this), TypedStatistics::Int(other)) => { + TypedStatistics::Float(Float64Type { + min: this.min.min(other.min as f64), + max: this.max.max(other.max as f64), + }) + } (TypedStatistics::Float(this), TypedStatistics::Float(other)) => { TypedStatistics::Float(Float64Type { min: this.min.min(other.min), diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 2453c75ff..1c3f6a8e4 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -103,6 +103,9 @@ pub trait EventFormat: Sized { return Err(anyhow!("Schema mismatch")); } new_schema = update_field_type_in_schema(new_schema, None, time_partition, None); + new_schema = Arc::new(Schema::new(override_num_fields_from_schema( + new_schema.fields().to_vec(), + ))); let rb = Self::decode(data, new_schema.clone())?; let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows())); let metadata_arr = @@ -205,24 +208,21 @@ pub fn override_timestamp_fields( } /// All number fields from inferred schema are forced into Float64 -pub fn override_num_fields_from_schema(schema: Arc) -> Arc { - Arc::new(Schema::new( - schema - .fields() - .iter() - .map(|field| { - if field.data_type().is_numeric() { - Arc::new(Field::new( - field.name(), - DataType::Float64, - field.is_nullable(), - )) - } else { - field.clone() - } - }) - .collect::>>(), - )) +pub fn override_num_fields_from_schema(schema: Vec>) -> Vec> { + schema + .iter() + .map(|field| { + if field.data_type().is_numeric() && field.data_type() != &DataType::Float64 { + Arc::new(Field::new( + field.name(), + DataType::Float64, + field.is_nullable(), + )) + } else { + field.clone() + } + }) + .collect::>>() } pub fn update_field_type_in_schema( @@ -232,8 +232,6 @@ pub fn update_field_type_in_schema( log_records: Option<&Vec>, ) -> Arc { let mut updated_schema = inferred_schema.clone(); - updated_schema = override_num_fields_from_schema(updated_schema); - if let Some(existing_schema) = existing_schema { let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema)); let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema); diff --git a/src/metadata.rs b/src/metadata.rs index f768a4e88..5447ea796 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -164,6 +164,20 @@ impl StreamInfo { Ok(Arc::new(schema)) } + /// update the schema in the metadata + pub fn set_schema( + &self, + stream_name: &str, + schema: HashMap>, + ) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.schema = schema; + }) + } + pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index e70c326bd..e9ee32f18 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -25,6 +25,7 @@ use super::{ SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; +use crate::event::format::override_num_fields_from_schema; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; @@ -40,7 +41,7 @@ use crate::{ }; use actix_web_prometheus::PrometheusMetrics; -use arrow_schema::Schema; +use arrow_schema::{Field, Schema}; use async_trait::async_trait; use bytes::Bytes; use chrono::Local; @@ -667,8 +668,21 @@ pub async fn commit_schema_to_storage( schema: Schema, ) -> Result<(), ObjectStorageError> { let storage = CONFIG.storage().get_object_store(); - let stream_schema = storage.get_schema(stream_name).await?; + let mut stream_schema = storage.get_schema(stream_name).await?; + // override the data type of all numeric fields to Float64 + //if data type is not Float64 already + stream_schema = Schema::new(override_num_fields_from_schema( + stream_schema.fields().iter().cloned().collect(), + )); let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap(); + + //update the merged schema in the metadata and storage + let schema_map: HashMap> = new_schema + .fields() + .iter() + .map(|field| (field.name().clone(), Arc::clone(field))) + .collect(); + let _ = STREAM_INFO.set_schema(stream_name, schema_map); storage.put_schema(stream_name, &new_schema).await }