diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index 52e201ca15a2..c53ef18ba58f 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -356,16 +356,29 @@ impl Buffer { } } -/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly -/// allocated memory region. -impl> From for Buffer { - fn from(p: T) -> Self { - // allocate aligned memory buffer - let slice = p.as_ref(); - let len = slice.len(); - let mut buffer = MutableBuffer::new(len); - buffer.extend_from_slice(slice); - buffer.into() +/// Note that here we deliberately do not implement +/// `impl> From for Buffer` +/// As it would accept `Buffer::from(vec![...])` that would cause an unexpected copy. +/// Instead, we ask user to be explicit when copying is occurring, e.g., `Buffer::from(vec![...].to_byte_slice())`. +/// For zero-copy conversion, user should use `Buffer::from_vec(vec![...])`. +/// +/// Since we removed impl for `AsRef`, we added the following three specific implementations to reduce API breakage. +/// See for more discussion on this. +impl From<&[u8]> for Buffer { + fn from(p: &[u8]) -> Self { + Self::from_slice_ref(p) + } +} + +impl From<[u8; N]> for Buffer { + fn from(p: [u8; N]) -> Self { + Self::from_slice_ref(p) + } +} + +impl From<&[u8; N]> for Buffer { + fn from(p: &[u8; N]) -> Self { + Self::from_slice_ref(p) } } diff --git a/arrow-cast/src/cast/mod.rs b/arrow-cast/src/cast/mod.rs index 4dd69596209a..7df9420f94f0 100644 --- a/arrow-cast/src/cast/mod.rs +++ b/arrow-cast/src/cast/mod.rs @@ -4409,8 +4409,8 @@ mod tests { IntervalUnit::YearMonth, IntervalYearMonthArray, vec![ - Some("1 years 1 mons 0 days 0 hours 0 mins 0.00 secs"), - Some("2 years 7 mons 0 days 0 hours 0 mins 0.00 secs"), + Some("1 years 1 mons"), + Some("2 years 7 mons"), None, None, None, @@ -4433,9 +4433,9 @@ mod tests { IntervalUnit::DayTime, IntervalDayTimeArray, vec![ - Some("0 years 0 mons 390 days 0 hours 0 mins 0.000 secs"), - Some("0 years 0 mons 930 days 0 hours 0 mins 0.000 secs"), - Some("0 years 0 mons 30 days 0 hours 0 mins 0.000 secs"), + Some("390 days"), + Some("930 days"), + Some("30 days"), None, None, ] @@ -4461,16 +4461,16 @@ mod tests { IntervalUnit::MonthDayNano, IntervalMonthDayNanoArray, vec![ - Some("0 years 13 mons 1 days 0 hours 0 mins 0.000000000 secs"), + Some("13 mons 1 days"), None, - Some("0 years 31 mons 35 days 0 hours 0 mins 0.001400000 secs"), - Some("0 years 0 mons 3 days 0 hours 0 mins 0.000000000 secs"), - Some("0 years 0 mons 0 days 0 hours 0 mins 8.000000000 secs"), + Some("31 mons 35 days 0.001400000 secs"), + Some("3 days"), + Some("8.000000000 secs"), None, - Some("0 years 0 mons 1 days 0 hours 0 mins 29.800000000 secs"), - Some("0 years 3 mons 0 days 0 hours 0 mins 1.000000000 secs"), - Some("0 years 0 mons 0 days 0 hours 8 mins 0.000000000 secs"), - Some("0 years 63 mons 9 days 19 hours 9 mins 2.222000000 secs"), + Some("1 days 29.800000000 secs"), + Some("3 mons 1.000000000 secs"), + Some("8 mins"), + Some("63 mons 9 days 19 hours 9 mins 2.222000000 secs"), None, ] ); diff --git a/arrow-cast/src/display.rs b/arrow-cast/src/display.rs index 6a40d036350a..312e7973963e 100644 --- a/arrow-cast/src/display.rs +++ b/arrow-cast/src/display.rs @@ -654,10 +654,7 @@ impl<'a> DisplayIndex for &'a PrimitiveArray { let years = (interval / 12_f64).floor(); let month = interval - (years * 12_f64); - write!( - f, - "{years} years {month} mons 0 days 0 hours 0 mins 0.00 secs", - )?; + write!(f, "{years} years {month} mons",)?; Ok(()) } } @@ -665,62 +662,140 @@ impl<'a> DisplayIndex for &'a PrimitiveArray { impl<'a> DisplayIndex for &'a PrimitiveArray { fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult { let value = self.value(idx); + let mut prefix = ""; - let secs = value.milliseconds / 1_000; + if value.days != 0 { + write!(f, "{prefix}{} days", value.days)?; + prefix = " "; + } + + if value.milliseconds != 0 { + let millis_fmt = MillisecondsFormatter { + milliseconds: value.milliseconds, + prefix, + }; + + f.write_fmt(format_args!("{millis_fmt}"))?; + } + + Ok(()) + } +} + +impl<'a> DisplayIndex for &'a PrimitiveArray { + fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult { + let value = self.value(idx); + let mut prefix = ""; + + if value.months != 0 { + write!(f, "{prefix}{} mons", value.months)?; + prefix = " "; + } + + if value.days != 0 { + write!(f, "{prefix}{} days", value.days)?; + prefix = " "; + } + + if value.nanoseconds != 0 { + let nano_fmt = NanosecondsFormatter { + nanoseconds: value.nanoseconds, + prefix, + }; + f.write_fmt(format_args!("{nano_fmt}"))?; + } + + Ok(()) + } +} + +struct NanosecondsFormatter<'a> { + nanoseconds: i64, + prefix: &'a str, +} + +impl<'a> Display for NanosecondsFormatter<'a> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut prefix = self.prefix; + + let secs = self.nanoseconds / 1_000_000_000; let mins = secs / 60; let hours = mins / 60; let secs = secs - (mins * 60); let mins = mins - (hours * 60); - let milliseconds = value.milliseconds % 1_000; + let nanoseconds = self.nanoseconds % 1_000_000_000; - let secs_sign = if secs < 0 || milliseconds < 0 { - "-" - } else { - "" - }; + if hours != 0 { + write!(f, "{prefix}{} hours", hours)?; + prefix = " "; + } + + if mins != 0 { + write!(f, "{prefix}{} mins", mins)?; + prefix = " "; + } + + if secs != 0 || nanoseconds != 0 { + let secs_sign = if secs < 0 || nanoseconds < 0 { "-" } else { "" }; + write!( + f, + "{prefix}{}{}.{:09} secs", + secs_sign, + secs.abs(), + nanoseconds.abs() + )?; + } - write!( - f, - "0 years 0 mons {} days {} hours {} mins {}{}.{:03} secs", - value.days, - hours, - mins, - secs_sign, - secs.abs(), - milliseconds.abs(), - )?; Ok(()) } } -impl<'a> DisplayIndex for &'a PrimitiveArray { - fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult { - let value = self.value(idx); +struct MillisecondsFormatter<'a> { + milliseconds: i32, + prefix: &'a str, +} + +impl<'a> Display for MillisecondsFormatter<'a> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut prefix = self.prefix; - let secs = value.nanoseconds / 1_000_000_000; + let secs = self.milliseconds / 1_000; let mins = secs / 60; let hours = mins / 60; let secs = secs - (mins * 60); let mins = mins - (hours * 60); - let nanoseconds = value.nanoseconds % 1_000_000_000; - - let secs_sign = if secs < 0 || nanoseconds < 0 { "-" } else { "" }; - - write!( - f, - "0 years {} mons {} days {} hours {} mins {}{}.{:09} secs", - value.months, - value.days, - hours, - mins, - secs_sign, - secs.abs(), - nanoseconds.abs(), - )?; + let milliseconds = self.milliseconds % 1_000; + + if hours != 0 { + write!(f, "{prefix}{} hours", hours,)?; + prefix = " "; + } + + if mins != 0 { + write!(f, "{prefix}{} mins", mins,)?; + prefix = " "; + } + + if secs != 0 || milliseconds != 0 { + let secs_sign = if secs < 0 || milliseconds < 0 { + "-" + } else { + "" + }; + + write!( + f, + "{prefix}{}{}.{:03} secs", + secs_sign, + secs.abs(), + milliseconds.abs() + )?; + } + Ok(()) } } diff --git a/arrow-cast/src/pretty.rs b/arrow-cast/src/pretty.rs index 9383b9f73f61..f41471e38d5e 100644 --- a/arrow-cast/src/pretty.rs +++ b/arrow-cast/src/pretty.rs @@ -986,16 +986,16 @@ mod tests { let table = pretty_format_batches(&[batch]).unwrap().to_string(); let expected = vec![ - "+----------------------------------------------------+", - "| IntervalDayTime |", - "+----------------------------------------------------+", - "| 0 years 0 mons -1 days 0 hours -10 mins 0.000 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins -1.001 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins -0.001 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.001 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.010 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.100 secs |", - "+----------------------------------------------------+", + "+------------------+", + "| IntervalDayTime |", + "+------------------+", + "| -1 days -10 mins |", + "| -1.001 secs |", + "| -0.001 secs |", + "| 0.001 secs |", + "| 0.010 secs |", + "| 0.100 secs |", + "+------------------+", ]; let actual: Vec<&str> = table.lines().collect(); @@ -1032,23 +1032,23 @@ mod tests { let table = pretty_format_batches(&[batch]).unwrap().to_string(); let expected = vec![ - "+-----------------------------------------------------------+", - "| IntervalMonthDayNano |", - "+-----------------------------------------------------------+", - "| 0 years -1 mons -1 days 0 hours -10 mins 0.000000000 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins -1.000000001 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins -0.000000001 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.000000001 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.000000010 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.000000100 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.000001000 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.000010000 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.000100000 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.001000000 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.010000000 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 0.100000000 secs |", - "| 0 years 0 mons 0 days 0 hours 0 mins 1.000000000 secs |", - "+-----------------------------------------------------------+", + "+--------------------------+", + "| IntervalMonthDayNano |", + "+--------------------------+", + "| -1 mons -1 days -10 mins |", + "| -1.000000001 secs |", + "| -0.000000001 secs |", + "| 0.000000001 secs |", + "| 0.000000010 secs |", + "| 0.000000100 secs |", + "| 0.000001000 secs |", + "| 0.000010000 secs |", + "| 0.000100000 secs |", + "| 0.001000000 secs |", + "| 0.010000000 secs |", + "| 0.100000000 secs |", + "| 1.000000000 secs |", + "+--------------------------+", ]; let actual: Vec<&str> = table.lines().collect(); diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index 539b1ea35d6c..f66891ef09a1 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -44,11 +44,11 @@ bytes = { version = "1", default-features = false } futures = { version = "0.3", default-features = false, features = ["alloc"] } once_cell = { version = "1", optional = true } paste = { version = "1.0" } -prost = { version = "0.12.3", default-features = false, features = ["prost-derive"] } +prost = { version = "0.13.1", default-features = false, features = ["prost-derive"] } # For Timestamp type -prost-types = { version = "0.12.3", default-features = false } +prost-types = { version = "0.13.1", default-features = false } tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] } -tonic = { version = "0.11.0", default-features = false, features = ["transport", "codegen", "prost"] } +tonic = { version = "0.12.0", default-features = false, features = ["transport", "codegen", "prost"] } # CLI-related dependencies anyhow = { version = "1.0", optional = true } @@ -70,8 +70,9 @@ cli = ["anyhow", "arrow-cast/prettyprint", "clap", "tracing-log", "tracing-subsc [dev-dependencies] arrow-cast = { workspace = true, features = ["prettyprint"] } assert_cmd = "2.0.8" -http = "0.2.9" -http-body = "0.4.5" +http = "1.1.0" +http-body = "1.0.0" +hyper-util = "0.1" pin-project-lite = "0.2" tempfile = "3.3" tokio-stream = { version = "0.1", features = ["net"] } diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs index 031628eaa833..d5168debc433 100644 --- a/arrow-flight/examples/flight_sql_server.rs +++ b/arrow-flight/examples/flight_sql_server.rs @@ -783,7 +783,8 @@ impl ProstMessageExt for FetchResults { #[cfg(test)] mod tests { use super::*; - use futures::TryStreamExt; + use futures::{TryFutureExt, TryStreamExt}; + use hyper_util::rt::TokioIo; use std::fs; use std::future::Future; use std::net::SocketAddr; @@ -843,7 +844,8 @@ mod tests { .serve_with_incoming(stream); let request_future = async { - let connector = service_fn(move |_| UnixStream::connect(path.clone())); + let connector = + service_fn(move |_| UnixStream::connect(path.clone()).map_ok(TokioIo::new)); let channel = Endpoint::try_from("http://example.com") .unwrap() .connect_with_connector(connector) diff --git a/arrow-flight/gen/Cargo.toml b/arrow-flight/gen/Cargo.toml index 7264a527ca8d..a12c683776b4 100644 --- a/arrow-flight/gen/Cargo.toml +++ b/arrow-flight/gen/Cargo.toml @@ -33,5 +33,5 @@ publish = false # Pin specific version of the tonic-build dependencies to avoid auto-generated # (and checked in) arrow.flight.protocol.rs from changing proc-macro2 = { version = "=1.0.86", default-features = false } -prost-build = { version = "=0.12.6", default-features = false } -tonic-build = { version = "=0.11.0", default-features = false, features = ["transport", "prost"] } +prost-build = { version = "=0.13.1", default-features = false } +tonic-build = { version = "=0.12.0", default-features = false, features = ["transport", "prost"] } diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs index bc314de9d19f..8c7292894eab 100644 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ b/arrow-flight/src/arrow.flight.protocol.rs @@ -38,7 +38,7 @@ pub struct BasicAuth { pub password: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Empty {} /// /// Describes an available action, including both the name used for execution @@ -103,7 +103,7 @@ pub struct Result { /// /// The result should be stored in Result.body. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CancelFlightInfoResult { #[prost(enumeration = "CancelStatus", tag = "1")] pub status: i32, @@ -1053,19 +1053,17 @@ pub mod flight_service_server { /// can expose a set of actions that are available. #[derive(Debug)] pub struct FlightServiceServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl FlightServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -1128,7 +1126,6 @@ pub mod flight_service_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/arrow.flight.protocol.FlightService/Handshake" => { #[allow(non_camel_case_types)] @@ -1162,7 +1159,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = HandshakeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1209,7 +1205,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ListFlightsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1255,7 +1250,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetFlightInfoSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1302,7 +1296,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = PollFlightInfoSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1348,7 +1341,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetSchemaSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1395,7 +1387,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = DoGetSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1442,7 +1433,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = DoPutSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1489,7 +1479,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = DoExchangeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1536,7 +1525,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = DoActionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1583,7 +1571,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ListActionsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1605,8 +1592,11 @@ pub mod flight_service_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -1627,16 +1617,6 @@ pub mod flight_service_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for FlightServiceServer { const NAME: &'static str = "arrow.flight.protocol.FlightService"; } diff --git a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs index c1f0fac0f6ba..5e6f198df75c 100644 --- a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs +++ b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs @@ -101,7 +101,7 @@ pub struct CommandGetSqlInfo { /// > /// The returned data should be ordered by data_type and then by type_name. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CommandGetXdbcTypeInfo { /// /// Specifies the data type to search for the info. @@ -121,7 +121,7 @@ pub struct CommandGetXdbcTypeInfo { /// > /// The returned data should be ordered by catalog_name. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CommandGetCatalogs {} /// /// Represents a request to retrieve the list of database schemas on a Flight SQL enabled backend. @@ -232,7 +232,7 @@ pub struct CommandGetTables { /// > /// The returned data should be ordered by table_type. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CommandGetTableTypes {} /// /// Represents a request to retrieve the primary keys of a table on a Flight SQL enabled backend. @@ -511,7 +511,7 @@ pub struct ActionClosePreparedStatementRequest { /// Request message for the "BeginTransaction" action. /// Begins a transaction. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ActionBeginTransactionRequest {} /// /// Request message for the "BeginSavepoint" action. @@ -802,7 +802,7 @@ pub struct CommandPreparedStatementUpdate { /// CommandPreparedStatementUpdate was in the request, containing /// results from the update. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct DoPutUpdateResult { /// The number of records updated. A return value of -1 represents /// an unknown updated record count. @@ -862,7 +862,7 @@ pub struct ActionCancelQueryRequest { /// This command is deprecated since 13.0.0. Use the "CancelFlightInfo" /// action with DoAction instead. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ActionCancelQueryResult { #[prost(enumeration = "action_cancel_query_result::CancelResult", tag = "1")] pub result: i32, diff --git a/arrow-flight/tests/common/trailers_layer.rs b/arrow-flight/tests/common/trailers_layer.rs index b2ab74f7d925..0ccb7df86c74 100644 --- a/arrow-flight/tests/common/trailers_layer.rs +++ b/arrow-flight/tests/common/trailers_layer.rs @@ -21,7 +21,7 @@ use std::task::{Context, Poll}; use futures::ready; use http::{HeaderValue, Request, Response}; -use http_body::SizeHint; +use http_body::{Frame, SizeHint}; use pin_project_lite::pin_project; use tower::{Layer, Service}; @@ -99,31 +99,19 @@ impl http_body::Body for WrappedBody { type Data = B::Data; type Error = B::Error; - fn poll_data( - mut self: Pin<&mut Self>, + fn poll_frame( + self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - self.as_mut().project().inner.poll_data(cx) - } - - fn poll_trailers( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let result: Result, Self::Error> = - ready!(self.as_mut().project().inner.poll_trailers(cx)); - - let mut trailers = http::header::HeaderMap::new(); - trailers.insert("test-trailer", HeaderValue::from_static("trailer_val")); + ) -> Poll, Self::Error>>> { + let mut result = ready!(self.project().inner.poll_frame(cx)); - match result { - Ok(Some(mut existing)) => { - existing.extend(trailers.iter().map(|(k, v)| (k.clone(), v.clone()))); - Poll::Ready(Ok(Some(existing))) + if let Some(Ok(frame)) = &mut result { + if let Some(trailers) = frame.trailers_mut() { + trailers.insert("test-trailer", HeaderValue::from_static("trailer_val")); } - Ok(None) => Poll::Ready(Ok(Some(trailers))), - Err(e) => Poll::Ready(Err(e)), } + + Poll::Ready(result) } fn is_end_stream(&self) -> bool { diff --git a/arrow-integration-testing/Cargo.toml b/arrow-integration-testing/Cargo.toml index 032b99f4fbbb..7be56d919852 100644 --- a/arrow-integration-testing/Cargo.toml +++ b/arrow-integration-testing/Cargo.toml @@ -42,11 +42,11 @@ async-trait = { version = "0.1.41", default-features = false } clap = { version = "4", default-features = false, features = ["std", "derive", "help", "error-context", "usage"] } futures = { version = "0.3", default-features = false } hex = { version = "0.4", default-features = false, features = ["std"] } -prost = { version = "0.12", default-features = false } +prost = { version = "0.13", default-features = false } serde = { version = "1.0", default-features = false, features = ["rc", "derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] } tokio = { version = "1.0", default-features = false } -tonic = { version = "0.11", default-features = false } +tonic = { version = "0.12", default-features = false } tracing-subscriber = { version = "0.3.1", default-features = false, features = ["fmt"], optional = true } num = { version = "0.4", default-features = false, features = ["std"] } flate2 = { version = "1", default-features = false, features = ["rust_backend"] } diff --git a/arrow-pyarrow-integration-testing/Cargo.toml b/arrow-pyarrow-integration-testing/Cargo.toml index 6f07d42d88c1..0834f2d13384 100644 --- a/arrow-pyarrow-integration-testing/Cargo.toml +++ b/arrow-pyarrow-integration-testing/Cargo.toml @@ -34,4 +34,4 @@ crate-type = ["cdylib"] [dependencies] arrow = { path = "../arrow", features = ["pyarrow"] } -pyo3 = { version = "0.21.1", features = ["extension-module"] } +pyo3 = { version = "0.22", features = ["extension-module"] } diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 745ca03214e6..12b6ddd6a830 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -54,7 +54,7 @@ arrow-select = { workspace = true } arrow-string = { workspace = true } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true } -pyo3 = { version = "0.21.1", default-features = false, optional = true } +pyo3 = { version = "0.22.2", default-features = false, optional = true } [package.metadata.docs.rs] features = ["prettyprint", "ipc_compression", "ffi", "pyarrow"] diff --git a/arrow/src/pyarrow.rs b/arrow/src/pyarrow.rs index 1733067c738a..43cdb4fe0919 100644 --- a/arrow/src/pyarrow.rs +++ b/arrow/src/pyarrow.rs @@ -83,11 +83,6 @@ fn to_py_err(err: ArrowError) -> PyErr { } pub trait FromPyArrow: Sized { - #[deprecated(since = "52.0.0", note = "Use from_pyarrow_bound")] - fn from_pyarrow(value: &PyAny) -> PyResult { - Self::from_pyarrow_bound(&value.as_borrowed()) - } - fn from_pyarrow_bound(value: &Bound) -> PyResult; } diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 2cc12a81dea5..7391d0964646 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -67,6 +67,7 @@ hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } +sysinfo = { version = "0.30.12", optional = true, default-features = false } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -114,12 +115,19 @@ async = ["futures", "tokio"] object_store = ["dep:object_store", "async"] # Group Zstd dependencies zstd = ["dep:zstd", "zstd-sys"] +# Display memory in example/write_parquet.rs +sysinfo = ["dep:sysinfo"] [[example]] name = "read_parquet" required-features = ["arrow"] path = "./examples/read_parquet.rs" +[[example]] +name = "write_parquet" +required-features = ["cli", "sysinfo"] +path = "./examples/write_parquet.rs" + [[example]] name = "async_read_parquet" required-features = ["arrow", "async"] diff --git a/parquet/examples/write_parquet.rs b/parquet/examples/write_parquet.rs new file mode 100644 index 000000000000..d2ef550df840 --- /dev/null +++ b/parquet/examples/write_parquet.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use arrow::array::{StructArray, UInt64Builder}; +use arrow::datatypes::DataType::UInt64; +use arrow::datatypes::{Field, Schema}; +use clap::{Parser, ValueEnum}; +use parquet::arrow::ArrowWriter as ParquetWriter; +use parquet::basic::Encoding; +use parquet::errors::Result; +use parquet::file::properties::{BloomFilterPosition, WriterProperties}; +use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System}; + +#[derive(ValueEnum, Clone)] +enum BloomFilterPositionArg { + End, + AfterRowGroup, +} + +#[derive(Parser)] +#[command(version)] +/// Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage. +struct Args { + #[arg(long, default_value_t = 1000)] + /// Number of batches to write + iterations: u64, + + #[arg(long, default_value_t = 1000000)] + /// Number of rows in each batch + batch: u64, + + #[arg(long, value_enum, default_value_t=BloomFilterPositionArg::AfterRowGroup)] + /// Where to write Bloom Filters + bloom_filter_position: BloomFilterPositionArg, + + /// Path to the file to write + path: PathBuf, +} + +fn now() -> String { + chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string() +} + +fn mem(system: &mut System) -> String { + let pid = Pid::from(std::process::id() as usize); + system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_memory()); + system + .process(pid) + .map(|proc| format!("{}MB", proc.memory() / 1_000_000)) + .unwrap_or("N/A".to_string()) +} + +fn main() -> Result<()> { + let args = Args::parse(); + + let bloom_filter_position = match args.bloom_filter_position { + BloomFilterPositionArg::End => BloomFilterPosition::End, + BloomFilterPositionArg::AfterRowGroup => BloomFilterPosition::AfterRowGroup, + }; + + let properties = WriterProperties::builder() + .set_column_bloom_filter_enabled("id".into(), true) + .set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED) + .set_bloom_filter_position(bloom_filter_position) + .build(); + let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)])); + // Create parquet file that will be read. + let file = File::create(args.path).unwrap(); + let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?; + + let mut system = + System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything())); + eprintln!( + "{} Writing {} batches of {} rows. RSS = {}", + now(), + args.iterations, + args.batch, + mem(&mut system) + ); + + let mut array_builder = UInt64Builder::new(); + let mut last_log = Instant::now(); + for i in 0..args.iterations { + if Instant::now() - last_log > Duration::new(10, 0) { + last_log = Instant::now(); + eprintln!( + "{} Iteration {}/{}. RSS = {}", + now(), + i + 1, + args.iterations, + mem(&mut system) + ); + } + for j in 0..args.batch { + array_builder.append_value(i + j); + } + writer.write( + &StructArray::new( + schema.fields().clone(), + vec![Arc::new(array_builder.finish())], + None, + ) + .into(), + )?; + } + writer.flush()?; + writer.close()?; + + eprintln!("{} Done. RSS = {}", now(), mem(&mut system)); + + Ok(()) +} diff --git a/parquet/regen.sh b/parquet/regen.sh index d1b82108a018..39999c7872cd 100755 --- a/parquet/regen.sh +++ b/parquet/regen.sh @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -REVISION=46cc3a0647d301bb9579ca8dd2cc356caf2a72d2 +REVISION=5b564f3c47679526cf72e54f207013f28f53acc4 SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]:-$0}")" && pwd)" diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 15c1a880cc75..c696763d63d2 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -394,7 +394,7 @@ impl ArrowReaderMetadata { let offset_index = metadata .row_groups() .iter() - .map(|rg| index_reader::read_pages_locations(reader, rg.columns())) + .map(|rg| index_reader::read_offset_indexes(reader, rg.columns())) .collect::>>()?; metadata.set_offset_index(Some(offset_index)) @@ -689,7 +689,7 @@ impl Iterator for ReaderPageIterator { // To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out empty `i[rg_idx]`. let page_locations = offset_index .filter(|i| !i[rg_idx].is_empty()) - .map(|i| i[rg_idx][self.column_idx].clone()); + .map(|i| i[rg_idx][self.column_idx].page_locations.clone()); let total_rows = rg.num_rows() as usize; let reader = self.reader.clone(); diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 00a3ad90d006..6a1434bce906 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1349,7 +1349,9 @@ impl<'a> StatisticsConverter<'a> { let iter = row_group_indices.into_iter().map(|rg_index| { let column_page_index_per_row_group_per_column = &column_page_index[*rg_index][parquet_index]; - let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + let num_data_pages = &column_offset_index[*rg_index][parquet_index] + .page_locations() + .len(); (*num_data_pages, column_page_index_per_row_group_per_column) }); @@ -1378,7 +1380,9 @@ impl<'a> StatisticsConverter<'a> { let iter = row_group_indices.into_iter().map(|rg_index| { let column_page_index_per_row_group_per_column = &column_page_index[*rg_index][parquet_index]; - let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + let num_data_pages = &column_offset_index[*rg_index][parquet_index] + .page_locations() + .len(); (*num_data_pages, column_page_index_per_row_group_per_column) }); @@ -1408,7 +1412,9 @@ impl<'a> StatisticsConverter<'a> { let iter = row_group_indices.into_iter().map(|rg_index| { let column_page_index_per_row_group_per_column = &column_page_index[*rg_index][parquet_index]; - let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + let num_data_pages = &column_offset_index[*rg_index][parquet_index] + .page_locations() + .len(); (*num_data_pages, column_page_index_per_row_group_per_column) }); @@ -1450,7 +1456,7 @@ impl<'a> StatisticsConverter<'a> { let mut row_count_total = Vec::new(); for rg_idx in row_group_indices { - let page_locations = &column_offset_index[*rg_idx][parquet_index]; + let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations(); let row_count_per_page = page_locations .windows(2) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index fc37ebfb4510..2d23ad8510f9 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -96,6 +96,7 @@ macro_rules! downcast_op { struct FallbackEncoder { encoder: FallbackEncoderImpl, num_values: usize, + variable_length_bytes: i64, } /// The fallback encoder in use @@ -152,6 +153,7 @@ impl FallbackEncoder { Ok(Self { encoder, num_values: 0, + variable_length_bytes: 0, }) } @@ -168,7 +170,8 @@ impl FallbackEncoder { let value = values.value(*idx); let value = value.as_ref(); buffer.extend_from_slice((value.len() as u32).as_bytes()); - buffer.extend_from_slice(value) + buffer.extend_from_slice(value); + self.variable_length_bytes += value.len() as i64; } } FallbackEncoderImpl::DeltaLength { buffer, lengths } => { @@ -177,6 +180,7 @@ impl FallbackEncoder { let value = value.as_ref(); lengths.put(&[value.len() as i32]).unwrap(); buffer.extend_from_slice(value); + self.variable_length_bytes += value.len() as i64; } } FallbackEncoderImpl::Delta { @@ -205,6 +209,7 @@ impl FallbackEncoder { buffer.extend_from_slice(&value[prefix_length..]); prefix_lengths.put(&[prefix_length as i32]).unwrap(); suffix_lengths.put(&[suffix_length as i32]).unwrap(); + self.variable_length_bytes += value.len() as i64; } } } @@ -269,12 +274,17 @@ impl FallbackEncoder { } }; + // Capture value of variable_length_bytes and reset for next page + let variable_length_bytes = Some(self.variable_length_bytes); + self.variable_length_bytes = 0; + Ok(DataPageValues { buf: buf.into(), num_values: std::mem::take(&mut self.num_values), encoding, min_value, max_value, + variable_length_bytes, }) } } @@ -321,6 +331,7 @@ impl Storage for ByteArrayStorage { struct DictEncoder { interner: Interner, indices: Vec, + variable_length_bytes: i64, } impl DictEncoder { @@ -336,6 +347,7 @@ impl DictEncoder { let value = values.value(*idx); let interned = self.interner.intern(value.as_ref()); self.indices.push(interned); + self.variable_length_bytes += value.as_ref().len() as i64; } } @@ -384,12 +396,17 @@ impl DictEncoder { self.indices.clear(); + // Capture value of variable_length_bytes and reset for next page + let variable_length_bytes = Some(self.variable_length_bytes); + self.variable_length_bytes = 0; + DataPageValues { buf: encoder.consume().into(), num_values, encoding: Encoding::RLE_DICTIONARY, min_value, max_value, + variable_length_bytes, } } } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index cf46f3b64a57..8f7b514ccf71 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -43,7 +43,7 @@ use crate::column::writer::{ }; use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr}; +use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData}; use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; @@ -204,7 +204,7 @@ impl ArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { self.writer.flushed_row_groups() } @@ -1096,8 +1096,10 @@ mod tests { use crate::data_type::AsBytes; use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index::Index; - use crate::file::page_index::index_reader::read_pages_locations; - use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion}; + use crate::file::page_index::index_reader::read_offset_indexes; + use crate::file::properties::{ + BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion, + }; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ reader::{FileReader, SerializedFileReader}, @@ -1667,16 +1669,16 @@ mod tests { "Expected a dictionary page" ); - let page_locations = read_pages_locations(&file, column).unwrap(); + let offset_indexes = read_offset_indexes(&file, column).unwrap(); - let offset_index = page_locations[0].clone(); + let page_locations = offset_indexes[0].page_locations.clone(); // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes // so we expect one dictionary encoded page and then a page per row thereafter. assert_eq!( - offset_index.len(), + page_locations.len(), 10, - "Expected 9 pages but got {offset_index:#?}" + "Expected 9 pages but got {page_locations:#?}" ); } @@ -1745,6 +1747,7 @@ mod tests { values: ArrayRef, schema: SchemaRef, bloom_filter: bool, + bloom_filter_position: BloomFilterPosition, } impl RoundTripOptions { @@ -1755,6 +1758,7 @@ mod tests { values, schema: Arc::new(schema), bloom_filter: false, + bloom_filter_position: BloomFilterPosition::AfterRowGroup, } } } @@ -1774,6 +1778,7 @@ mod tests { values, schema, bloom_filter, + bloom_filter_position, } = options; let encodings = match values.data_type() { @@ -1814,6 +1819,7 @@ mod tests { .set_dictionary_page_size_limit(dictionary_size.max(1)) .set_encoding(*encoding) .set_bloom_filter_enabled(bloom_filter) + .set_bloom_filter_position(bloom_filter_position) .build(); files.push(roundtrip_opts(&expected_batch, props)) @@ -2171,6 +2177,22 @@ mod tests { values_required::(many_vecs_iter); } + #[test] + fn i32_column_bloom_filter_at_end() { + let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); + let mut options = RoundTripOptions::new(array, false); + options.bloom_filter = true; + options.bloom_filter_position = BloomFilterPosition::End; + + let files = one_column_roundtrip_with_options(options); + check_bloom_filter( + files, + "col".to_string(), + (0..SMALL_SIZE as i32).collect(), + (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(), + ); + } + #[test] fn i32_column_bloom_filter() { let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); @@ -2998,8 +3020,8 @@ mod tests { assert_eq!(index.len(), 1); assert_eq!(index[0].len(), 2); // 2 columns - assert_eq!(index[0][0].len(), 1); // 1 page - assert_eq!(index[0][1].len(), 1); // 1 page + assert_eq!(index[0][0].page_locations().len(), 1); // 1 page + assert_eq!(index[0][1].page_locations().len(), 1); // 1 page } #[test] diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index e4205b7ef2ce..5695dbc10fe1 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -106,9 +106,10 @@ use crate::column::page::{PageIterator, PageReader}; use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::file::FOOTER_SIZE; -use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, PageLocation}; +use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; mod metadata; pub use metadata::*; @@ -489,7 +490,7 @@ where // TODO: calling build_array multiple times is wasteful let meta = self.metadata.row_group(row_group_idx); - let page_locations = self + let offset_index = self .metadata .offset_index() .map(|x| x[row_group_idx].as_slice()); @@ -499,7 +500,7 @@ where // schema: meta.schema_descr_ptr(), row_count: meta.num_rows() as usize, column_chunks: vec![None; meta.columns().len()], - page_locations, + offset_index, }; if let Some(filter) = self.filter.as_mut() { @@ -703,7 +704,7 @@ where /// An in-memory collection of column chunks struct InMemoryRowGroup<'a> { metadata: &'a RowGroupMetaData, - page_locations: Option<&'a [Vec]>, + offset_index: Option<&'a [OffsetIndexMetaData]>, column_chunks: Vec>>, row_count: usize, } @@ -716,7 +717,7 @@ impl<'a> InMemoryRowGroup<'a> { projection: &ProjectionMask, selection: Option<&RowSelection>, ) -> Result<()> { - if let Some((selection, page_locations)) = selection.zip(self.page_locations) { + if let Some((selection, offset_index)) = selection.zip(self.offset_index) { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` let mut page_start_offsets: Vec> = vec![]; @@ -734,14 +735,14 @@ impl<'a> InMemoryRowGroup<'a> { // then we need to also fetch a dictionary page. let mut ranges = vec![]; let (start, _len) = chunk_meta.byte_range(); - match page_locations[idx].first() { + match offset_index[idx].page_locations.first() { Some(first) if first.offset as u64 != start => { ranges.push(start as usize..first.offset as usize); } _ => (), } - ranges.extend(selection.scan_ranges(&page_locations[idx])); + ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations)); page_start_offsets.push(ranges.iter().map(|range| range.start).collect()); ranges @@ -812,7 +813,9 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> { "Invalid column index {i}, column was not fetched" ))), Some(data) => { - let page_locations = self.page_locations.map(|index| index[i].clone()); + let page_locations = self + .offset_index + .map(|index| index[i].page_locations.clone()); let page_reader: Box = Box::new(SerializedPageReader::new( data.clone(), self.metadata.column(i), @@ -1529,7 +1532,7 @@ mod tests { let metadata = parse_metadata(&data).unwrap(); let offset_index = - index_reader::read_pages_locations(&data, metadata.row_group(0).columns()) + index_reader::read_offset_indexes(&data, metadata.row_group(0).columns()) .expect("reading offset index"); let row_group_meta = metadata.row_group(0).clone(); @@ -1574,7 +1577,7 @@ mod tests { }; let mut skip = true; - let mut pages = offset_index[0].iter().peekable(); + let mut pages = offset_index[0].page_locations.iter().peekable(); // Setup `RowSelection` so that we can skip every other page, selecting the last page let mut selectors = vec![]; diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index edeb0fec00b7..274d8fef8976 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -54,7 +54,7 @@ use crate::{ arrow::arrow_writer::ArrowWriterOptions, arrow::ArrowWriter, errors::{ParquetError, Result}, - file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties}, + file::{metadata::RowGroupMetaData, properties::WriterProperties}, format::{FileMetaData, KeyValue}, }; use arrow_array::RecordBatch; @@ -172,7 +172,7 @@ impl AsyncArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { self.sync_writer.flushed_row_groups() } diff --git a/parquet/src/bin/parquet-index.rs b/parquet/src/bin/parquet-index.rs index 86e08b6dafa3..1a9b74dd78fb 100644 --- a/parquet/src/bin/parquet-index.rs +++ b/parquet/src/bin/parquet-index.rs @@ -37,6 +37,7 @@ use clap::Parser; use parquet::errors::{ParquetError, Result}; use parquet::file::page_index::index::{Index, PageIndex}; +use parquet::file::page_index::offset_index::OffsetIndexMetaData; use parquet::file::reader::{FileReader, SerializedFileReader}; use parquet::file::serialized_reader::ReadOptionsBuilder; use parquet::format::PageLocation; @@ -93,7 +94,8 @@ impl Args { )) })?; - let row_counts = compute_row_counts(offset_index, row_group.num_rows()); + let row_counts = + compute_row_counts(offset_index.page_locations.as_slice(), row_group.num_rows()); match &column_indices[column_idx] { Index::NONE => println!("NO INDEX"), Index::BOOLEAN(v) => print_index(&v.indexes, offset_index, &row_counts)?, @@ -131,20 +133,20 @@ fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec { /// Prints index information for a single column chunk fn print_index( column_index: &[PageIndex], - offset_index: &[PageLocation], + offset_index: &OffsetIndexMetaData, row_counts: &[i64], ) -> Result<()> { - if column_index.len() != offset_index.len() { + if column_index.len() != offset_index.page_locations.len() { return Err(ParquetError::General(format!( "Index length mismatch, got {} and {}", column_index.len(), - offset_index.len() + offset_index.page_locations.len() ))); } for (idx, ((c, o), row_count)) in column_index .iter() - .zip(offset_index) + .zip(offset_index.page_locations()) .zip(row_counts) .enumerate() { diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index b6c8212608b8..9d01c09040de 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -63,6 +63,7 @@ pub struct DataPageValues { pub encoding: Encoding, pub min_value: Option, pub max_value: Option, + pub variable_length_bytes: Option, } /// A generic encoder of [`ColumnValues`] to data and dictionary pages used by @@ -131,6 +132,7 @@ pub struct ColumnValueEncoderImpl { min_value: Option, max_value: Option, bloom_filter: Option, + variable_length_bytes: Option, } impl ColumnValueEncoderImpl { @@ -150,6 +152,10 @@ impl ColumnValueEncoderImpl { update_min(&self.descr, &min, &mut self.min_value); update_max(&self.descr, &max, &mut self.max_value); } + + if let Some(var_bytes) = T::T::variable_length_bytes(slice) { + *self.variable_length_bytes.get_or_insert(0) += var_bytes; + } } // encode the values into bloom filter if enabled @@ -203,6 +209,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { bloom_filter, min_value: None, max_value: None, + variable_length_bytes: None, }) } @@ -296,6 +303,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { num_values: std::mem::take(&mut self.num_values), min_value: self.min_value.take(), max_value: self.max_value.take(), + variable_length_bytes: self.variable_length_bytes.take(), }) } } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index fdc24890e1fa..2c0c957d87d3 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -192,7 +192,8 @@ struct PageMetrics { } // Metrics per column writer -struct ColumnMetrics { +#[derive(Default)] +struct ColumnMetrics { total_bytes_written: u64, total_rows_written: u64, total_uncompressed_size: u64, @@ -204,6 +205,20 @@ struct ColumnMetrics { max_column_value: Option, num_column_nulls: u64, column_distinct_count: Option, + variable_length_bytes: Option, +} + +impl ColumnMetrics { + fn new() -> Self { + Default::default() + } + + /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes + fn update_variable_length_bytes(&mut self, variable_length_bytes: Option) { + if let Some(var_bytes) = variable_length_bytes { + *self.variable_length_bytes.get_or_insert(0) += var_bytes; + } + } } /// Typed column writer for a primitive column. @@ -282,19 +297,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { num_buffered_rows: 0, num_page_nulls: 0, }, - column_metrics: ColumnMetrics { - total_bytes_written: 0, - total_rows_written: 0, - total_uncompressed_size: 0, - total_compressed_size: 0, - total_num_values: 0, - dictionary_page_offset: None, - data_page_offset: None, - min_column_value: None, - max_column_value: None, - num_column_nulls: 0, - column_distinct_count: None, - }, + column_metrics: ColumnMetrics::::new(), column_index_builder, offset_index_builder: OffsetIndexBuilder::new(), encodings, @@ -640,7 +643,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } /// Update the column index and offset index when adding the data page - fn update_column_offset_index(&mut self, page_statistics: Option<&ValueStatistics>) { + fn update_column_offset_index( + &mut self, + page_statistics: Option<&ValueStatistics>, + page_variable_length_bytes: Option, + ) { // update the column index let null_page = (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls; @@ -714,6 +721,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // update the offset index self.offset_index_builder .append_row_count(self.page_metrics.num_buffered_rows as i64); + + self.offset_index_builder + .append_unencoded_byte_array_data_bytes(page_variable_length_bytes); } /// Determine if we should allow truncating min/max values for this column's statistics @@ -789,7 +799,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }; // update column and offset index - self.update_column_offset_index(page_statistics.as_ref()); + self.update_column_offset_index( + page_statistics.as_ref(), + values_data.variable_length_bytes, + ); + + // Update variable_length_bytes in column_metrics + self.column_metrics + .update_variable_length_bytes(values_data.variable_length_bytes); + let page_statistics = page_statistics.map(Statistics::from); let compressed_page = match self.props.writer_version() { @@ -999,7 +1017,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { stats => stats, }; - builder = builder.set_statistics(statistics); + builder = builder + .set_statistics(statistics) + .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes); } let metadata = builder.build()?; diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index b85a75cfd410..01e92115c45b 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -644,6 +644,13 @@ pub(crate) mod private { (std::mem::size_of::(), 1) } + /// Return the number of variable length bytes in a given slice of data + /// + /// Returns the sum of lengths for BYTE_ARRAY data, and None for all other data types + fn variable_length_bytes(_: &[Self]) -> Option { + None + } + /// Return the value as i64 if possible /// /// This is essentially the same as `std::convert::TryInto` but can't be @@ -956,6 +963,10 @@ pub(crate) mod private { Ok(num_values) } + fn variable_length_bytes(values: &[Self]) -> Option { + Some(values.iter().map(|x| x.len() as i64).sum()) + } + fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result { let data = decoder .data diff --git a/parquet/src/file/metadata/memory.rs b/parquet/src/file/metadata/memory.rs index 57d5aaa2dd2f..0b6d1f0d1a24 100644 --- a/parquet/src/file/metadata/memory.rs +++ b/parquet/src/file/metadata/memory.rs @@ -23,6 +23,7 @@ use crate::data_type::private::ParquetValueType; use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData}; use crate::file::page_encoding_stats::PageEncodingStats; use crate::file::page_index::index::{Index, NativeIndex, PageIndex}; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::statistics::{Statistics, ValueStatistics}; use crate::format::{BoundaryOrder, PageLocation, SortingColumn}; use std::sync::Arc; @@ -97,6 +98,7 @@ impl HeapSize for ColumnChunkMetaData { + self.compression.heap_size() + self.statistics.heap_size() + self.encoding_stats.heap_size() + + self.unencoded_byte_array_data_bytes.heap_size() } } @@ -143,6 +145,12 @@ impl HeapSize for Statistics { } } +impl HeapSize for OffsetIndexMetaData { + fn heap_size(&self) -> usize { + self.page_locations.heap_size() + self.unencoded_byte_array_data_bytes.heap_size() + } +} + impl HeapSize for Index { fn heap_size(&self) -> usize { match self { diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 16c51c8115b9..52206e66a590 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -40,7 +40,7 @@ use std::sync::Arc; use crate::format::{ BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup, - SortingColumn, + SizeStatistics, SortingColumn, }; use crate::basic::{ColumnOrder, Compression, Encoding, Type}; @@ -48,6 +48,7 @@ use crate::errors::{ParquetError, Result}; pub(crate) use crate::file::metadata::memory::HeapSize; use crate::file::page_encoding_stats::{self, PageEncodingStats}; use crate::file::page_index::index::Index; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::statistics::{self, Statistics}; use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor, @@ -65,27 +66,23 @@ use crate::schema::types::{ /// [`Index`] corresponding to column `column_number` of row group /// `row_group_number`. /// -/// For example `column_index[2][3]` holds the [`Index`] for the forth +/// For example `column_index[2][3]` holds the [`Index`] for the fourth /// column in the third row group of the parquet file. /// /// [PageIndex documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub type ParquetColumnIndex = Vec>; -/// [`PageLocation`] for each data page of each row group of each column +/// [`OffsetIndexMetaData`] for each data page of each row group of each column /// /// This structure is the parsed representation of the [`OffsetIndex`] from the /// Parquet file footer, as described in the Parquet [PageIndex documentation]. /// -/// `offset_index[row_group_number][column_number][page_number]` holds -/// the [`PageLocation`] corresponding to page `page_number` of column +/// `offset_index[row_group_number][column_number]` holds +/// the [`OffsetIndexMetaData`] corresponding to column /// `column_number`of row group `row_group_number`. /// -/// For example `offset_index[2][3][4]` holds the [`PageLocation`] for -/// the fifth page of the forth column in the third row group of the -/// parquet file. -/// /// [PageIndex documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md -pub type ParquetOffsetIndex = Vec>>; +pub type ParquetOffsetIndex = Vec>; /// Parsed metadata for a single Parquet file /// @@ -110,7 +107,7 @@ pub struct ParquetMetaData { row_groups: Vec, /// Page level index for each page in each column chunk column_index: Option, - /// Offset index for all each page in each column chunk + /// Offset index for each page in each column chunk offset_index: Option, } @@ -374,6 +371,11 @@ impl RowGroupMetaData { &self.columns } + /// Returns mutable slice of column chunk metadata. + pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] { + &mut self.columns + } + /// Number of rows in this row group. pub fn num_rows(&self) -> i64 { self.num_rows @@ -554,6 +556,7 @@ pub struct ColumnChunkMetaData { offset_index_length: Option, column_index_offset: Option, column_index_length: Option, + unencoded_byte_array_data_bytes: Option, } /// Represents common operations for a column chunk. @@ -706,6 +709,14 @@ impl ColumnChunkMetaData { Some(offset..(offset + length)) } + /// Returns the number of bytes of variable length data after decoding. + /// + /// Only set for BYTE_ARRAY columns. This field may not be set by older + /// writers. + pub fn unencoded_byte_array_data_bytes(&self) -> Option { + self.unencoded_byte_array_data_bytes + } + /// Method to convert from Thrift. pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result { if cc.meta_data.is_none() { @@ -743,6 +754,12 @@ impl ColumnChunkMetaData { let offset_index_length = cc.offset_index_length; let column_index_offset = cc.column_index_offset; let column_index_length = cc.column_index_length; + let unencoded_byte_array_data_bytes = if let Some(size_stats) = col_metadata.size_statistics + { + size_stats.unencoded_byte_array_data_bytes + } else { + None + }; let result = ColumnChunkMetaData { column_descr, @@ -764,6 +781,7 @@ impl ColumnChunkMetaData { offset_index_length, column_index_offset, column_index_length, + unencoded_byte_array_data_bytes, }; Ok(result) } @@ -787,6 +805,16 @@ impl ColumnChunkMetaData { /// Method to convert to Thrift `ColumnMetaData` pub fn to_column_metadata_thrift(&self) -> ColumnMetaData { + let size_statistics = if self.unencoded_byte_array_data_bytes.is_some() { + Some(SizeStatistics { + unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes, + repetition_level_histogram: None, + definition_level_histogram: None, + }) + } else { + None + }; + ColumnMetaData { type_: self.column_type().into(), encodings: self.encodings().iter().map(|&v| v.into()).collect(), @@ -806,6 +834,7 @@ impl ColumnChunkMetaData { .map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()), bloom_filter_offset: self.bloom_filter_offset, bloom_filter_length: self.bloom_filter_length, + size_statistics, } } @@ -841,6 +870,7 @@ impl ColumnChunkMetaDataBuilder { offset_index_length: None, column_index_offset: None, column_index_length: None, + unencoded_byte_array_data_bytes: None, }) } @@ -952,6 +982,12 @@ impl ColumnChunkMetaDataBuilder { self } + /// Sets optional length of variable length data in bytes. + pub fn set_unencoded_byte_array_data_bytes(mut self, value: Option) -> Self { + self.0.unencoded_byte_array_data_bytes = value; + self + } + /// Builds column chunk metadata. pub fn build(self) -> Result { Ok(self.0) @@ -1033,6 +1069,8 @@ impl ColumnIndexBuilder { self.max_values, self.boundary_order, self.null_counts, + None, + None, ) } } @@ -1044,6 +1082,7 @@ pub struct OffsetIndexBuilder { offset_array: Vec, compressed_page_size_array: Vec, first_row_index_array: Vec, + unencoded_byte_array_data_bytes_array: Option>, current_first_row_index: i64, } @@ -1059,6 +1098,7 @@ impl OffsetIndexBuilder { offset_array: Vec::new(), compressed_page_size_array: Vec::new(), first_row_index_array: Vec::new(), + unencoded_byte_array_data_bytes_array: None, current_first_row_index: 0, } } @@ -1074,6 +1114,17 @@ impl OffsetIndexBuilder { self.compressed_page_size_array.push(compressed_page_size); } + pub fn append_unencoded_byte_array_data_bytes( + &mut self, + unencoded_byte_array_data_bytes: Option, + ) { + if let Some(val) = unencoded_byte_array_data_bytes { + self.unencoded_byte_array_data_bytes_array + .get_or_insert(Vec::new()) + .push(val); + } + } + /// Build and get the thrift metadata of offset index pub fn build_to_thrift(self) -> OffsetIndex { let locations = self @@ -1083,7 +1134,7 @@ impl OffsetIndexBuilder { .zip(self.first_row_index_array.iter()) .map(|((offset, size), row_index)| PageLocation::new(*offset, *size, *row_index)) .collect::>(); - OffsetIndex::new(locations) + OffsetIndex::new(locations, self.unencoded_byte_array_data_bytes_array) } } @@ -1234,6 +1285,7 @@ mod tests { .set_offset_index_length(Some(25)) .set_column_index_offset(Some(8000)) .set_column_index_length(Some(25)) + .set_unencoded_byte_array_data_bytes(Some(2000)) .build() .unwrap(); @@ -1345,7 +1397,8 @@ mod tests { let row_group_meta_with_stats = vec![row_group_meta_with_stats]; let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta_with_stats); - let base_expected_size = 2024; + let base_expected_size = 2088; + assert_eq!(parquet_meta.memory_size(), base_expected_size); let mut column_index = ColumnIndexBuilder::new(); @@ -1354,17 +1407,25 @@ mod tests { let native_index = NativeIndex::::try_new(column_index).unwrap(); // Now, add in OffsetIndex + let mut offset_index = OffsetIndexBuilder::new(); + offset_index.append_row_count(1); + offset_index.append_offset_and_size(2, 3); + offset_index.append_unencoded_byte_array_data_bytes(Some(10)); + offset_index.append_row_count(1); + offset_index.append_offset_and_size(2, 3); + offset_index.append_unencoded_byte_array_data_bytes(Some(10)); + let offset_index = offset_index.build_to_thrift(); + let parquet_meta = ParquetMetaData::new_with_page_index( file_metadata, row_group_meta, Some(vec![vec![Index::BOOLEAN(native_index)]]), Some(vec![vec![ - vec![PageLocation::new(1, 2, 3)], - vec![PageLocation::new(1, 2, 3)], + OffsetIndexMetaData::try_new(offset_index).unwrap() ]]), ); - let bigger_expected_size = 2304; + let bigger_expected_size = 2400; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); assert_eq!(parquet_meta.memory_size(), bigger_expected_size); diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index 2ddf826fb022..395e9afe122c 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -22,6 +22,7 @@ use crate::data_type::Int96; use crate::errors::ParquetError; use crate::file::metadata::ColumnChunkMetaData; use crate::file::page_index::index::{Index, NativeIndex}; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::ChunkReader; use crate::format::{ColumnIndex, OffsetIndex, PageLocation}; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; @@ -45,9 +46,9 @@ pub(crate) fn acc_range(a: Option>, b: Option>) -> Opt /// Returns an empty vector if this row group does not contain a /// [`ColumnIndex`]. /// -/// See [Column Index Documentation] for more details. +/// See [Page Index Documentation] for more details. /// -/// [Column Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub fn read_columns_indexes( reader: &R, chunks: &[ColumnChunkMetaData], @@ -81,9 +82,10 @@ pub fn read_columns_indexes( /// Return an empty vector if this row group does not contain an /// [`OffsetIndex]`. /// -/// See [Column Index Documentation] for more details. +/// See [Page Index Documentation] for more details. /// -/// [Column Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +#[deprecated(since = "53.0.0", note = "Use read_offset_indexes")] pub fn read_pages_locations( reader: &R, chunks: &[ColumnChunkMetaData], @@ -100,6 +102,42 @@ pub fn read_pages_locations( let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?; let get = |r: Range| &bytes[(r.start - fetch.start)..(r.end - fetch.start)]; + chunks + .iter() + .map(|c| match c.offset_index_range() { + Some(r) => decode_page_locations(get(r)), + None => Err(general_err!("missing offset index")), + }) + .collect() +} + +/// Reads per-column [`OffsetIndexMetaData`] for all columns of a row group by +/// decoding [`OffsetIndex`] . +/// +/// Returns a vector of `offset_index[column_number]`. +/// +/// Returns an empty vector if this row group does not contain an +/// [`OffsetIndex`]. +/// +/// See [Page Index Documentation] for more details. +/// +/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +pub fn read_offset_indexes( + reader: &R, + chunks: &[ColumnChunkMetaData], +) -> Result, ParquetError> { + let fetch = chunks + .iter() + .fold(None, |range, c| acc_range(range, c.offset_index_range())); + + let fetch = match fetch { + Some(r) => r, + None => return Ok(vec![]), + }; + + let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?; + let get = |r: Range| &bytes[(r.start - fetch.start)..(r.end - fetch.start)]; + chunks .iter() .map(|c| match c.offset_index_range() { @@ -109,7 +147,13 @@ pub fn read_pages_locations( .collect() } -pub(crate) fn decode_offset_index(data: &[u8]) -> Result, ParquetError> { +pub(crate) fn decode_offset_index(data: &[u8]) -> Result { + let mut prot = TCompactSliceInputProtocol::new(data); + let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; + OffsetIndexMetaData::try_new(offset) +} + +pub(crate) fn decode_page_locations(data: &[u8]) -> Result, ParquetError> { let mut prot = TCompactSliceInputProtocol::new(data); let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; Ok(offset.page_locations) diff --git a/parquet/src/file/page_index/mod.rs b/parquet/src/file/page_index/mod.rs index 9372645d76ee..a8077896db34 100644 --- a/parquet/src/file/page_index/mod.rs +++ b/parquet/src/file/page_index/mod.rs @@ -21,3 +21,4 @@ pub mod index; pub mod index_reader; +pub mod offset_index; diff --git a/parquet/src/file/page_index/offset_index.rs b/parquet/src/file/page_index/offset_index.rs new file mode 100644 index 000000000000..2ae3464141ca --- /dev/null +++ b/parquet/src/file/page_index/offset_index.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`OffsetIndexMetaData`] structure holding decoded [`OffsetIndex`] information + +use crate::errors::ParquetError; +use crate::format::{OffsetIndex, PageLocation}; + +/// [`OffsetIndex`] information for a column chunk. Contains offsets and sizes for each page +/// in the chunk. Optionally stores fully decoded page sizes for BYTE_ARRAY columns. +#[derive(Debug, Clone, PartialEq)] +pub struct OffsetIndexMetaData { + pub page_locations: Vec, + pub unencoded_byte_array_data_bytes: Option>, +} + +impl OffsetIndexMetaData { + /// Creates a new [`OffsetIndexMetaData`] from an [`OffsetIndex`]. + pub(crate) fn try_new(index: OffsetIndex) -> Result { + Ok(Self { + page_locations: index.page_locations, + unencoded_byte_array_data_bytes: index.unencoded_byte_array_data_bytes, + }) + } + + /// Vector of [`PageLocation`] objects, one per page in the chunk. + pub fn page_locations(&self) -> &Vec { + &self.page_locations + } + + /// Optional vector of unencoded page sizes, one per page in the chunk. Only defined + /// for BYTE_ARRAY columns. + pub fn unencoded_byte_array_data_bytes(&self) -> Option<&Vec> { + self.unencoded_byte_array_data_bytes.as_ref() + } + + // TODO: remove annotation after merge + #[allow(dead_code)] + pub(crate) fn to_thrift(&self) -> OffsetIndex { + OffsetIndex::new( + self.page_locations.clone(), + self.unencoded_byte_array_data_bytes.clone(), + ) + } +} diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index cd5969fda0be..61f6390c97d4 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -45,6 +45,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; /// Default value for [`WriterProperties::max_row_group_size`] pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; +/// Default value for [`WriterProperties::bloom_filter_position`] +pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; /// Default value for [`WriterProperties::created_by`] pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION")); /// Default value for [`WriterProperties::column_index_truncate_length`] @@ -88,6 +90,24 @@ impl FromStr for WriterVersion { } } +/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should +/// write Bloom filters +/// +/// Basic constant, which is not part of the Thrift definition. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BloomFilterPosition { + /// Write Bloom Filters of each row group right after the row group + /// + /// This saves memory by writing it as soon as it is computed, at the cost + /// of data locality for readers + AfterRowGroup, + /// Write Bloom Filters at the end of the file + /// + /// This allows better data locality for readers, at the cost of memory usage + /// for writers. + End, +} + /// Reference counted writer properties. pub type WriterPropertiesPtr = Arc; @@ -132,6 +152,7 @@ pub struct WriterProperties { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, + bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, pub(crate) key_value_metadata: Option>, @@ -219,6 +240,11 @@ impl WriterProperties { self.max_row_group_size } + /// Returns maximum number of rows in a row group. + pub fn bloom_filter_position(&self) -> BloomFilterPosition { + self.bloom_filter_position + } + /// Returns configured writer version. pub fn writer_version(&self) -> WriterVersion { self.writer_version @@ -340,6 +366,7 @@ pub struct WriterPropertiesBuilder { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, + bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, key_value_metadata: Option>, @@ -359,6 +386,7 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, + bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION, writer_version: DEFAULT_WRITER_VERSION, created_by: DEFAULT_CREATED_BY.to_string(), key_value_metadata: None, @@ -378,6 +406,7 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: self.data_page_row_count_limit, write_batch_size: self.write_batch_size, max_row_group_size: self.max_row_group_size, + bloom_filter_position: self.bloom_filter_position, writer_version: self.writer_version, created_by: self.created_by, key_value_metadata: self.key_value_metadata, @@ -489,6 +518,12 @@ impl WriterPropertiesBuilder { self } + /// Sets where in the final file Bloom Filters are written (default `AfterRowGroup`) + pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self { + self.bloom_filter_position = value; + self + } + /// Sets "created by" property (defaults to `parquet-rs version `). pub fn set_created_by(mut self, value: String) -> Self { self.created_by = value; @@ -1054,6 +1089,7 @@ mod tests { ); assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE); assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE); + assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION); assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION); assert_eq!(props.created_by(), DEFAULT_CREATED_BY); assert_eq!(props.key_value_metadata(), None); diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index ac7d2d287488..70aea6fd5ad3 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -28,6 +28,7 @@ use crate::column::page::{Page, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::page_index::index_reader; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::{ footer, metadata::*, @@ -214,7 +215,7 @@ impl SerializedFileReader { for rg in &mut filtered_row_groups { let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?; - let offset_index = index_reader::read_pages_locations(&chunk_reader, rg.columns())?; + let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?; columns_indexes.push(column_index); offset_indexes.push(offset_index); } @@ -285,7 +286,7 @@ impl FileReader for SerializedFileReader { pub struct SerializedRowGroupReader<'a, R: ChunkReader> { chunk_reader: Arc, metadata: &'a RowGroupMetaData, - page_locations: Option<&'a [Vec]>, + offset_index: Option<&'a [OffsetIndexMetaData]>, props: ReaderPropertiesPtr, bloom_filters: Vec>, } @@ -295,7 +296,7 @@ impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { pub fn new( chunk_reader: Arc, metadata: &'a RowGroupMetaData, - page_locations: Option<&'a [Vec]>, + offset_index: Option<&'a [OffsetIndexMetaData]>, props: ReaderPropertiesPtr, ) -> Result { let bloom_filters = if props.read_bloom_filter() { @@ -310,7 +311,7 @@ impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { Ok(Self { chunk_reader, metadata, - page_locations, + offset_index, props, bloom_filters, }) @@ -330,7 +331,7 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' fn get_column_page_reader(&self, i: usize) -> Result> { let col = self.metadata.column(i); - let page_locations = self.page_locations.map(|x| x[i].clone()); + let page_locations = self.offset_index.map(|x| x[i].page_locations.clone()); let props = Arc::clone(&self.props); Ok(Box::new(SerializedPageReader::new_with_properties( @@ -776,7 +777,7 @@ mod tests { use crate::data_type::private::ParquetValueType; use crate::data_type::{AsBytes, FixedLenByteArrayType}; use crate::file::page_index::index::{Index, NativeIndex}; - use crate::file::page_index::index_reader::{read_columns_indexes, read_pages_locations}; + use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes}; use crate::file::writer::SerializedFileWriter; use crate::record::RowAccessor; use crate::schema::parser::parse_message_type; @@ -1314,7 +1315,7 @@ mod tests { // only one row group assert_eq!(offset_indexes.len(), 1); let offset_index = &offset_indexes[0]; - let page_offset = &offset_index[0][0]; + let page_offset = &offset_index[0].page_locations()[0]; assert_eq!(4, page_offset.offset); assert_eq!(152, page_offset.compressed_page_size); @@ -1337,8 +1338,8 @@ mod tests { b.reverse(); assert_eq!(a, b); - let a = read_pages_locations(&test_file, columns).unwrap(); - let mut b = read_pages_locations(&test_file, &reversed).unwrap(); + let a = read_offset_indexes(&test_file, columns).unwrap(); + let mut b = read_offset_indexes(&test_file, &reversed).unwrap(); b.reverse(); assert_eq!(a, b); } @@ -1375,7 +1376,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 0), BoundaryOrder::UNORDERED, ); - assert_eq!(row_group_offset_indexes[0].len(), 325); + assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325); } else { unreachable!() }; @@ -1383,7 +1384,7 @@ mod tests { assert!(&column_index[0][1].is_sorted()); if let Index::BOOLEAN(index) = &column_index[0][1] { assert_eq!(index.indexes.len(), 82); - assert_eq!(row_group_offset_indexes[1].len(), 82); + assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82); } else { unreachable!() }; @@ -1396,7 +1397,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 2), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[2].len(), 325); + assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325); } else { unreachable!() }; @@ -1409,7 +1410,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 3), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[3].len(), 325); + assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325); } else { unreachable!() }; @@ -1422,7 +1423,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 4), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[4].len(), 325); + assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325); } else { unreachable!() }; @@ -1435,7 +1436,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 5), BoundaryOrder::UNORDERED, ); - assert_eq!(row_group_offset_indexes[5].len(), 528); + assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528); } else { unreachable!() }; @@ -1448,7 +1449,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 6), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[6].len(), 325); + assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325); } else { unreachable!() }; @@ -1461,7 +1462,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 7), BoundaryOrder::UNORDERED, ); - assert_eq!(row_group_offset_indexes[7].len(), 528); + assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528); } else { unreachable!() }; @@ -1474,7 +1475,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 8), BoundaryOrder::UNORDERED, ); - assert_eq!(row_group_offset_indexes[8].len(), 974); + assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974); } else { unreachable!() }; @@ -1487,7 +1488,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 9), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[9].len(), 352); + assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352); } else { unreachable!() }; @@ -1495,7 +1496,7 @@ mod tests { //Notice: min_max values for each page for this col not exits. assert!(!&column_index[0][10].is_sorted()); if let Index::NONE = &column_index[0][10] { - assert_eq!(row_group_offset_indexes[10].len(), 974); + assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974); } else { unreachable!() }; @@ -1508,7 +1509,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 11), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[11].len(), 325); + assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325); } else { unreachable!() }; @@ -1521,7 +1522,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 12), BoundaryOrder::UNORDERED, ); - assert_eq!(row_group_offset_indexes[12].len(), 325); + assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325); } else { unreachable!() }; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 90e985a95028..c44a7e6697f0 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -34,8 +34,9 @@ use crate::column::{ }; use crate::data_type::DataType; use crate::errors::{ParquetError, Result}; +use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr}; use crate::file::reader::ChunkReader; -use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC}; +use crate::file::{metadata::*, PARQUET_MAGIC}; use crate::schema::types::{self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr}; /// A wrapper around a [`Write`] that keeps track of the number @@ -115,9 +116,10 @@ pub type OnCloseColumnChunk<'a> = Box Result<() /// - the row group metadata /// - the column index for each column chunk /// - the offset index for each column chunk -pub type OnCloseRowGroup<'a> = Box< +pub type OnCloseRowGroup<'a, W> = Box< dyn FnOnce( - RowGroupMetaDataPtr, + &'a mut TrackedWrite, + RowGroupMetaData, Vec>, Vec>, Vec>, @@ -143,7 +145,7 @@ pub struct SerializedFileWriter { schema: TypePtr, descr: SchemaDescPtr, props: WriterPropertiesPtr, - row_groups: Vec, + row_groups: Vec, bloom_filters: Vec>>, column_indexes: Vec>>, offset_indexes: Vec>>, @@ -197,18 +199,29 @@ impl SerializedFileWriter { self.row_group_index += 1; + let bloom_filter_position = self.properties().bloom_filter_position(); let row_groups = &mut self.row_groups; let row_bloom_filters = &mut self.bloom_filters; let row_column_indexes = &mut self.column_indexes; let row_offset_indexes = &mut self.offset_indexes; - let on_close = - |metadata, row_group_bloom_filter, row_group_column_index, row_group_offset_index| { - row_groups.push(metadata); - row_bloom_filters.push(row_group_bloom_filter); - row_column_indexes.push(row_group_column_index); - row_offset_indexes.push(row_group_offset_index); - Ok(()) + let on_close = move |buf, + mut metadata, + row_group_bloom_filter, + row_group_column_index, + row_group_offset_index| { + row_bloom_filters.push(row_group_bloom_filter); + row_column_indexes.push(row_group_column_index); + row_offset_indexes.push(row_group_offset_index); + // write bloom filters out immediately after the row group if requested + match bloom_filter_position { + BloomFilterPosition::AfterRowGroup => { + write_bloom_filters(buf, row_bloom_filters, &mut metadata)? + } + BloomFilterPosition::End => (), }; + row_groups.push(metadata); + Ok(()) + }; let row_group_writer = SerializedRowGroupWriter::new( self.descr.clone(), @@ -221,7 +234,7 @@ impl SerializedFileWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { &self.row_groups } @@ -273,34 +286,6 @@ impl SerializedFileWriter { Ok(()) } - /// Serialize all the bloom filter to the file - fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { - // iter row group - // iter each column - // write bloom filter to the file - for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { - for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() { - match &self.bloom_filters[row_group_idx][column_idx] { - Some(bloom_filter) => { - let start_offset = self.buf.bytes_written(); - bloom_filter.write(&mut self.buf)?; - let end_offset = self.buf.bytes_written(); - // set offset and index for bloom filter - let column_chunk_meta = column_chunk - .meta_data - .as_mut() - .expect("can't have bloom filter without column metadata"); - column_chunk_meta.bloom_filter_offset = Some(start_offset as i64); - column_chunk_meta.bloom_filter_length = - Some((end_offset - start_offset) as i32); - } - None => {} - } - } - } - Ok(()) - } - /// Serialize all the column index to the file fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { // iter row group @@ -331,6 +316,11 @@ impl SerializedFileWriter { self.finished = true; let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); + // write out any remaining bloom filters after all row groups + for row_group in &mut self.row_groups { + write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?; + } + let mut row_groups = self .row_groups .as_slice() @@ -338,7 +328,6 @@ impl SerializedFileWriter { .map(|v| v.to_thrift()) .collect::>(); - self.write_bloom_filters(&mut row_groups)?; // Write column indexes and offset indexes self.write_column_indexes(&mut row_groups)?; self.write_offset_indexes(&mut row_groups)?; @@ -443,6 +432,40 @@ impl SerializedFileWriter { } } +/// Serialize all the bloom filters of the given row group to the given buffer, +/// and returns the updated row group metadata. +fn write_bloom_filters( + buf: &mut TrackedWrite, + bloom_filters: &mut [Vec>], + row_group: &mut RowGroupMetaData, +) -> Result<()> { + // iter row group + // iter each column + // write bloom filter to the file + + let row_group_idx: u16 = row_group + .ordinal() + .expect("Missing row group ordinal") + .try_into() + .expect("Negative row group ordinal"); + let row_group_idx = row_group_idx as usize; + for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() { + if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() { + let start_offset = buf.bytes_written(); + bloom_filter.write(&mut *buf)?; + let end_offset = buf.bytes_written(); + // set offset and index for bloom filter + *column_chunk = column_chunk + .clone() + .into_builder() + .set_bloom_filter_offset(Some(start_offset as i64)) + .set_bloom_filter_length(Some((end_offset - start_offset) as i32)) + .build()?; + } + } + Ok(()) +} + /// Parquet row group writer API. /// Provides methods to access column writers in an iterator-like fashion, order is /// guaranteed to match the order of schema leaves (column descriptors). @@ -469,7 +492,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { offset_indexes: Vec>, row_group_index: i16, file_offset: i64, - on_close: Option>, + on_close: Option>, } impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { @@ -486,7 +509,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { properties: WriterPropertiesPtr, buf: &'a mut TrackedWrite, row_group_index: i16, - on_close: Option>, + on_close: Option>, ) -> Self { let num_columns = schema_descr.num_columns(); let file_offset = buf.bytes_written() as i64; @@ -637,7 +660,8 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_total_uncompressed_size(metadata.uncompressed_size()) .set_num_values(metadata.num_values()) .set_data_page_offset(map_offset(src_data_offset)) - .set_dictionary_page_offset(src_dictionary_offset.map(map_offset)); + .set_dictionary_page_offset(src_dictionary_offset.map(map_offset)) + .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes()); if let Some(statistics) = metadata.statistics() { builder = builder.set_statistics(statistics.clone()) @@ -670,12 +694,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_file_offset(self.file_offset) .build()?; - let metadata = Arc::new(row_group_metadata); - self.row_group_metadata = Some(metadata.clone()); + self.row_group_metadata = Some(Arc::new(row_group_metadata.clone())); if let Some(on_close) = self.on_close.take() { on_close( - metadata, + self.buf, + row_group_metadata, self.bloom_filters, self.column_indexes, self.offset_indexes, @@ -805,7 +829,7 @@ mod tests { use crate::column::page::{Page, PageReader}; use crate::column::reader::get_typed_column_reader; use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; - use crate::data_type::{BoolType, Int32Type}; + use crate::data_type::{BoolType, ByteArrayType, Int32Type}; use crate::file::page_index::index::Index; use crate::file::properties::EnabledStatistics; use crate::file::serialized_reader::ReadOptionsBuilder; @@ -818,6 +842,7 @@ mod tests { use crate::record::{Row, RowAccessor}; use crate::schema::parser::parse_message_type; use crate::schema::types::{ColumnDescriptor, ColumnPath}; + use crate::util::test_common::rand_gen::RandGen; #[test] fn test_row_group_writer_error_not_all_columns_written() { @@ -1447,7 +1472,7 @@ mod tests { assert_eq!(flushed.len(), idx + 1); assert_eq!(Some(idx as i16), last_group.ordinal()); assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset()); - assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); + assert_eq!(&flushed[idx], last_group.as_ref()); } let file_metadata = file_writer.close().unwrap(); @@ -1829,4 +1854,83 @@ mod tests { let b_idx = &column_index[0][1]; assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); } + + #[test] + fn test_byte_array_size_statistics() { + let message_type = " + message test_schema { + OPTIONAL BYTE_ARRAY a (UTF8); + } + "; + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let data = ByteArrayType::gen_vec(32, 7); + let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1]; + let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum(); + let file: File = tempfile::tempfile().unwrap(); + let props = Arc::new( + WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(), + ); + + let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); + let mut row_group_writer = writer.next_row_group().unwrap(); + + let mut col_writer = row_group_writer.next_column().unwrap().unwrap(); + col_writer + .typed::() + .write_batch(&data, Some(&def_levels), None) + .unwrap(); + col_writer.close().unwrap(); + row_group_writer.close().unwrap(); + let file_metadata = writer.close().unwrap(); + + assert_eq!(file_metadata.row_groups.len(), 1); + assert_eq!(file_metadata.row_groups[0].columns.len(), 1); + assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); + + assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); + let meta_data = file_metadata.row_groups[0].columns[0] + .meta_data + .as_ref() + .unwrap(); + assert!(meta_data.size_statistics.is_some()); + let size_stats = meta_data.size_statistics.as_ref().unwrap(); + + assert!(size_stats.repetition_level_histogram.is_none()); + assert!(size_stats.definition_level_histogram.is_none()); + assert!(size_stats.unencoded_byte_array_data_bytes.is_some()); + assert_eq!( + unenc_size, + size_stats.unencoded_byte_array_data_bytes.unwrap() + ); + + // check that the read metadata is also correct + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(file, options).unwrap(); + + let rfile_metadata = reader.metadata().file_metadata(); + assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows); + assert_eq!(reader.num_row_groups(), 1); + let rowgroup = reader.get_row_group(0).unwrap(); + assert_eq!(rowgroup.num_columns(), 1); + let column = rowgroup.metadata().column(0); + assert!(column.unencoded_byte_array_data_bytes().is_some()); + assert_eq!( + unenc_size, + column.unencoded_byte_array_data_bytes().unwrap() + ); + + assert!(reader.metadata().offset_index().is_some()); + let offset_index = reader.metadata().offset_index().unwrap(); + assert_eq!(offset_index.len(), 1); + assert_eq!(offset_index[0].len(), 1); + assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some()); + let page_sizes = offset_index[0][0] + .unencoded_byte_array_data_bytes + .as_ref() + .unwrap(); + assert_eq!(page_sizes.len(), 1); + assert_eq!(page_sizes[0], unenc_size); + } } diff --git a/parquet/src/format.rs b/parquet/src/format.rs index ae68865be694..287d08b7a95c 100644 --- a/parquet/src/format.rs +++ b/parquet/src/format.rs @@ -117,12 +117,12 @@ impl ConvertedType { /// a list is converted into an optional field containing a repeated field for its /// values pub const LIST: ConvertedType = ConvertedType(3); - /// an enum is converted into a binary field + /// an enum is converted into a BYTE_ARRAY field pub const ENUM: ConvertedType = ConvertedType(4); /// A decimal value. /// - /// This may be used to annotate binary or fixed primitive types. The - /// underlying byte array stores the unscaled value encoded as two's + /// This may be used to annotate BYTE_ARRAY or FIXED_LEN_BYTE_ARRAY primitive + /// types. The underlying byte array stores the unscaled value encoded as two's /// complement using big-endian byte order (the most significant byte is the /// zeroth element). The value of the decimal is the value * 10^{-scale}. /// @@ -185,7 +185,7 @@ impl ConvertedType { pub const JSON: ConvertedType = ConvertedType(19); /// An embedded BSON document /// - /// A BSON document embedded within a single BINARY column. + /// A BSON document embedded within a single BYTE_ARRAY column. pub const BSON: ConvertedType = ConvertedType(20); /// An interval of time /// @@ -288,9 +288,9 @@ impl From<&ConvertedType> for i32 { pub struct FieldRepetitionType(pub i32); impl FieldRepetitionType { - /// This field is required (can not be null) and each record has exactly 1 value. + /// This field is required (can not be null) and each row has exactly 1 value. pub const REQUIRED: FieldRepetitionType = FieldRepetitionType(0); - /// The field is optional (can be null) and each record has 0 or 1 values. + /// The field is optional (can be null) and each row has 0 or 1 values. pub const OPTIONAL: FieldRepetitionType = FieldRepetitionType(1); /// The field is repeated and can contain 0 or more values pub const REPEATED: FieldRepetitionType = FieldRepetitionType(2); @@ -379,12 +379,15 @@ impl Encoding { pub const DELTA_BYTE_ARRAY: Encoding = Encoding(7); /// Dictionary encoding: the ids are encoded using the RLE encoding pub const RLE_DICTIONARY: Encoding = Encoding(8); - /// Encoding for floating-point data. + /// Encoding for fixed-width data (FLOAT, DOUBLE, INT32, INT64, FIXED_LEN_BYTE_ARRAY). /// K byte-streams are created where K is the size in bytes of the data type. - /// The individual bytes of an FP value are scattered to the corresponding stream and + /// The individual bytes of a value are scattered to the corresponding stream and /// the streams are concatenated. /// This itself does not reduce the size of the data but can lead to better compression /// afterwards. + /// + /// Added in 2.8 for FLOAT and DOUBLE. + /// Support for INT32, INT64 and FIXED_LEN_BYTE_ARRAY added in 2.11. pub const BYTE_STREAM_SPLIT: Encoding = Encoding(9); pub const ENUM_VALUES: &'static [Self] = &[ Self::PLAIN, @@ -634,6 +637,143 @@ impl From<&BoundaryOrder> for i32 { } } +// +// SizeStatistics +// + +/// A structure for capturing metadata for estimating the unencoded, +/// uncompressed size of data written. This is useful for readers to estimate +/// how much memory is needed to reconstruct data in their memory model and for +/// fine grained filter pushdown on nested structures (the histograms contained +/// in this structure can help determine the number of nulls at a particular +/// nesting level and maximum length of lists). +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct SizeStatistics { + /// The number of physical bytes stored for BYTE_ARRAY data values assuming + /// no encoding. This is exclusive of the bytes needed to store the length of + /// each byte array. In other words, this field is equivalent to the `(size + /// of PLAIN-ENCODING the byte array values) - (4 bytes * number of values + /// written)`. To determine unencoded sizes of other types readers can use + /// schema information multiplied by the number of non-null and null values. + /// The number of null/non-null values can be inferred from the histograms + /// below. + /// + /// For example, if a column chunk is dictionary-encoded with dictionary + /// ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2], + /// then this value for that data page should be 7 (1 + 1 + 2 + 3). + /// + /// This field should only be set for types that use BYTE_ARRAY as their + /// physical type. + pub unencoded_byte_array_data_bytes: Option, + /// When present, there is expected to be one element corresponding to each + /// repetition (i.e. size=max repetition_level+1) where each element + /// represents the number of times the repetition level was observed in the + /// data. + /// + /// This field may be omitted if max_repetition_level is 0 without loss + /// of information. + /// + pub repetition_level_histogram: Option>, + /// Same as repetition_level_histogram except for definition levels. + /// + /// This field may be omitted if max_definition_level is 0 or 1 without + /// loss of information. + /// + pub definition_level_histogram: Option>, +} + +impl SizeStatistics { + pub fn new(unencoded_byte_array_data_bytes: F1, repetition_level_histogram: F2, definition_level_histogram: F3) -> SizeStatistics where F1: Into>, F2: Into>>, F3: Into>> { + SizeStatistics { + unencoded_byte_array_data_bytes: unencoded_byte_array_data_bytes.into(), + repetition_level_histogram: repetition_level_histogram.into(), + definition_level_histogram: definition_level_histogram.into(), + } + } +} + +impl crate::thrift::TSerializable for SizeStatistics { + fn read_from_in_protocol(i_prot: &mut T) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option> = None; + let mut f_3: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_i64()?; + f_1 = Some(val); + }, + 2 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_0 = i_prot.read_i64()?; + val.push(list_elem_0); + } + i_prot.read_list_end()?; + f_2 = Some(val); + }, + 3 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_1 = i_prot.read_i64()?; + val.push(list_elem_1); + } + i_prot.read_list_end()?; + f_3 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = SizeStatistics { + unencoded_byte_array_data_bytes: f_1, + repetition_level_histogram: f_2, + definition_level_histogram: f_3, + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut T) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("SizeStatistics"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(fld_var) = self.unencoded_byte_array_data_bytes { + o_prot.write_field_begin(&TFieldIdentifier::new("unencoded_byte_array_data_bytes", TType::I64, 1))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()? + } + if let Some(ref fld_var) = self.repetition_level_histogram { + o_prot.write_field_begin(&TFieldIdentifier::new("repetition_level_histogram", TType::List, 2))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::I64, fld_var.len() as i32))?; + for e in fld_var { + o_prot.write_i64(*e)?; + } + o_prot.write_list_end()?; + o_prot.write_field_end()? + } + if let Some(ref fld_var) = self.definition_level_histogram { + o_prot.write_field_begin(&TFieldIdentifier::new("definition_level_histogram", TType::List, 3))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::I64, fld_var.len() as i32))?; + for e in fld_var { + o_prot.write_i64(*e)?; + } + o_prot.write_list_end()?; + o_prot.write_field_end()? + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + // // Statistics // @@ -1123,7 +1263,7 @@ impl crate::thrift::TSerializable for NullType { /// To maintain forward-compatibility in v1, implementations using this logical /// type must also set scale and precision on the annotated SchemaElement. /// -/// Allowed for physical types: INT32, INT64, FIXED, and BINARY +/// Allowed for physical types: INT32, INT64, FIXED_LEN_BYTE_ARRAY, and BYTE_ARRAY. #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DecimalType { pub scale: i32, @@ -1620,7 +1760,7 @@ impl crate::thrift::TSerializable for IntType { /// Embedded JSON logical type annotation /// -/// Allowed for physical types: BINARY +/// Allowed for physical types: BYTE_ARRAY #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct JsonType { } @@ -1660,7 +1800,7 @@ impl crate::thrift::TSerializable for JsonType { /// Embedded BSON logical type annotation /// -/// Allowed for physical types: BINARY +/// Allowed for physical types: BYTE_ARRAY #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct BsonType { } @@ -2150,7 +2290,12 @@ impl crate::thrift::TSerializable for SchemaElement { /// Data page header #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DataPageHeader { - /// Number of values, including NULLs, in this data page. * + /// Number of values, including NULLs, in this data page. + /// + /// If a OffsetIndex is present, a page must begin at a row + /// boundary (repetition_level = 0). Otherwise, pages may begin + /// within a row (repetition_level > 0). + /// pub num_values: i32, /// Encoding used for this data page * pub encoding: Encoding, @@ -2158,7 +2303,7 @@ pub struct DataPageHeader { pub definition_level_encoding: Encoding, /// Encoding used for repetition levels * pub repetition_level_encoding: Encoding, - /// Optional statistics for the data in this page* + /// Optional statistics for the data in this page * pub statistics: Option, } @@ -2394,21 +2539,24 @@ pub struct DataPageHeaderV2 { /// Number of NULL values, in this data page. /// Number of non-null = num_values - num_nulls which is also the number of values in the data section * pub num_nulls: i32, - /// Number of rows in this data page. which means pages change on record boundaries (r = 0) * + /// Number of rows in this data page. Every page must begin at a + /// row boundary (repetition_level = 0): rows must **not** be + /// split across page boundaries when using V2 data pages. + /// pub num_rows: i32, /// Encoding used for data in this page * pub encoding: Encoding, - /// length of the definition levels + /// Length of the definition levels pub definition_levels_byte_length: i32, - /// length of the repetition levels + /// Length of the repetition levels pub repetition_levels_byte_length: i32, - /// whether the values are compressed. + /// Whether the values are compressed. /// Which means the section of the page between /// definition_levels_byte_length + repetition_levels_byte_length + 1 and compressed_page_size (included) /// is compressed with the compression_codec. /// If missing it is considered compressed pub is_compressed: Option, - /// optional statistics for the data in this page * + /// Optional statistics for the data in this page * pub statistics: Option, } @@ -3211,10 +3359,10 @@ impl crate::thrift::TSerializable for KeyValue { // SortingColumn // -/// Wrapper struct to specify sort order +/// Sort order within a RowGroup of a leaf column #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct SortingColumn { - /// The column index (in this row group) * + /// The ordinal position of the column (in this row group) * pub column_idx: i32, /// If true, indicates this column is sorted in descending order. * pub descending: bool, @@ -3421,10 +3569,15 @@ pub struct ColumnMetaData { /// Writers should write this field so readers can read the bloom filter /// in a single I/O. pub bloom_filter_length: Option, + /// Optional statistics to help estimate total memory when converted to in-memory + /// representations. The histograms contained in these statistics can + /// also be useful in some cases for more fine-grained nullability/list length + /// filter pushdown. + pub size_statistics: Option, } impl ColumnMetaData { - pub fn new(type_: Type, encodings: Vec, path_in_schema: Vec, codec: CompressionCodec, num_values: i64, total_uncompressed_size: i64, total_compressed_size: i64, key_value_metadata: F8, data_page_offset: i64, index_page_offset: F10, dictionary_page_offset: F11, statistics: F12, encoding_stats: F13, bloom_filter_offset: F14, bloom_filter_length: F15) -> ColumnMetaData where F8: Into>>, F10: Into>, F11: Into>, F12: Into>, F13: Into>>, F14: Into>, F15: Into> { + pub fn new(type_: Type, encodings: Vec, path_in_schema: Vec, codec: CompressionCodec, num_values: i64, total_uncompressed_size: i64, total_compressed_size: i64, key_value_metadata: F8, data_page_offset: i64, index_page_offset: F10, dictionary_page_offset: F11, statistics: F12, encoding_stats: F13, bloom_filter_offset: F14, bloom_filter_length: F15, size_statistics: F16) -> ColumnMetaData where F8: Into>>, F10: Into>, F11: Into>, F12: Into>, F13: Into>>, F14: Into>, F15: Into>, F16: Into> { ColumnMetaData { type_, encodings, @@ -3441,6 +3594,7 @@ impl ColumnMetaData { encoding_stats: encoding_stats.into(), bloom_filter_offset: bloom_filter_offset.into(), bloom_filter_length: bloom_filter_length.into(), + size_statistics: size_statistics.into(), } } } @@ -3463,6 +3617,7 @@ impl crate::thrift::TSerializable for ColumnMetaData { let mut f_13: Option> = None; let mut f_14: Option = None; let mut f_15: Option = None; + let mut f_16: Option = None; loop { let field_ident = i_prot.read_field_begin()?; if field_ident.field_type == TType::Stop { @@ -3478,8 +3633,8 @@ impl crate::thrift::TSerializable for ColumnMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_0 = Encoding::read_from_in_protocol(i_prot)?; - val.push(list_elem_0); + let list_elem_2 = Encoding::read_from_in_protocol(i_prot)?; + val.push(list_elem_2); } i_prot.read_list_end()?; f_2 = Some(val); @@ -3488,8 +3643,8 @@ impl crate::thrift::TSerializable for ColumnMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_1 = i_prot.read_string()?; - val.push(list_elem_1); + let list_elem_3 = i_prot.read_string()?; + val.push(list_elem_3); } i_prot.read_list_end()?; f_3 = Some(val); @@ -3514,8 +3669,8 @@ impl crate::thrift::TSerializable for ColumnMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_2 = KeyValue::read_from_in_protocol(i_prot)?; - val.push(list_elem_2); + let list_elem_4 = KeyValue::read_from_in_protocol(i_prot)?; + val.push(list_elem_4); } i_prot.read_list_end()?; f_8 = Some(val); @@ -3540,8 +3695,8 @@ impl crate::thrift::TSerializable for ColumnMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_3 = PageEncodingStats::read_from_in_protocol(i_prot)?; - val.push(list_elem_3); + let list_elem_5 = PageEncodingStats::read_from_in_protocol(i_prot)?; + val.push(list_elem_5); } i_prot.read_list_end()?; f_13 = Some(val); @@ -3554,6 +3709,10 @@ impl crate::thrift::TSerializable for ColumnMetaData { let val = i_prot.read_i32()?; f_15 = Some(val); }, + 16 => { + let val = SizeStatistics::read_from_in_protocol(i_prot)?; + f_16 = Some(val); + }, _ => { i_prot.skip(field_ident.field_type)?; }, @@ -3585,6 +3744,7 @@ impl crate::thrift::TSerializable for ColumnMetaData { encoding_stats: f_13, bloom_filter_offset: f_14, bloom_filter_length: f_15, + size_statistics: f_16, }; Ok(ret) } @@ -3666,6 +3826,11 @@ impl crate::thrift::TSerializable for ColumnMetaData { o_prot.write_i32(fld_var)?; o_prot.write_field_end()? } + if let Some(ref fld_var) = self.size_statistics { + o_prot.write_field_begin(&TFieldIdentifier::new("size_statistics", TType::Struct, 16))?; + fld_var.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()? + } o_prot.write_field_stop()?; o_prot.write_struct_end() } @@ -3745,8 +3910,8 @@ impl crate::thrift::TSerializable for EncryptionWithColumnKey { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_4 = i_prot.read_string()?; - val.push(list_elem_4); + let list_elem_6 = i_prot.read_string()?; + val.push(list_elem_6); } i_prot.read_list_end()?; f_1 = Some(val); @@ -3885,11 +4050,19 @@ pub struct ColumnChunk { /// metadata. This path is relative to the current file. /// pub file_path: Option, - /// Byte offset in file_path to the ColumnMetaData * + /// Deprecated: Byte offset in file_path to the ColumnMetaData + /// + /// Past use of this field has been inconsistent, with some implementations + /// using it to point to the ColumnMetaData and some using it to point to + /// the first page in the column chunk. In many cases, the ColumnMetaData at this + /// location is wrong. This field is now deprecated and should not be used. + /// Writers should set this field to 0 if no ColumnMetaData has been written outside + /// the footer. pub file_offset: i64, - /// Column metadata for this chunk. This is the same content as what is at - /// file_path/file_offset. Having it here has it replicated in the file - /// metadata. + /// Column metadata for this chunk. Some writers may also replicate this at the + /// location pointed to by file_path/file_offset. + /// Note: while marked as optional, this field is in fact required by most major + /// Parquet implementations. As such, writers MUST populate this field. /// pub meta_data: Option, /// File offset of ColumnChunk's OffsetIndex * @@ -4111,8 +4284,8 @@ impl crate::thrift::TSerializable for RowGroup { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_5 = ColumnChunk::read_from_in_protocol(i_prot)?; - val.push(list_elem_5); + let list_elem_7 = ColumnChunk::read_from_in_protocol(i_prot)?; + val.push(list_elem_7); } i_prot.read_list_end()?; f_1 = Some(val); @@ -4129,8 +4302,8 @@ impl crate::thrift::TSerializable for RowGroup { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_6 = SortingColumn::read_from_in_protocol(i_prot)?; - val.push(list_elem_6); + let list_elem_8 = SortingColumn::read_from_in_protocol(i_prot)?; + val.push(list_elem_8); } i_prot.read_list_end()?; f_4 = Some(val); @@ -4335,8 +4508,9 @@ pub struct PageLocation { /// Size of the page, including header. Sum of compressed_page_size and header /// length pub compressed_page_size: i32, - /// Index within the RowGroup of the first row of the page; this means pages - /// change on record boundaries (r = 0). + /// Index within the RowGroup of the first row of the page. When an + /// OffsetIndex is present, pages must begin on row boundaries + /// (repetition_level = 0). pub first_row_index: i64, } @@ -4413,17 +4587,28 @@ impl crate::thrift::TSerializable for PageLocation { // OffsetIndex // +/// Optional offsets for each data page in a ColumnChunk. +/// +/// Forms part of the page index, along with ColumnIndex. +/// +/// OffsetIndex may be present even if ColumnIndex is not. #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct OffsetIndex { /// PageLocations, ordered by increasing PageLocation.offset. It is required /// that page_locations\[i\].first_row_index < page_locations\[i+1\].first_row_index. pub page_locations: Vec, + /// Unencoded/uncompressed size for BYTE_ARRAY types. + /// + /// See documention for unencoded_byte_array_data_bytes in SizeStatistics for + /// more details on this field. + pub unencoded_byte_array_data_bytes: Option>, } impl OffsetIndex { - pub fn new(page_locations: Vec) -> OffsetIndex { + pub fn new(page_locations: Vec, unencoded_byte_array_data_bytes: F2) -> OffsetIndex where F2: Into>> { OffsetIndex { page_locations, + unencoded_byte_array_data_bytes: unencoded_byte_array_data_bytes.into(), } } } @@ -4432,6 +4617,7 @@ impl crate::thrift::TSerializable for OffsetIndex { fn read_from_in_protocol(i_prot: &mut T) -> thrift::Result { i_prot.read_struct_begin()?; let mut f_1: Option> = None; + let mut f_2: Option> = None; loop { let field_ident = i_prot.read_field_begin()?; if field_ident.field_type == TType::Stop { @@ -4443,12 +4629,22 @@ impl crate::thrift::TSerializable for OffsetIndex { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_7 = PageLocation::read_from_in_protocol(i_prot)?; - val.push(list_elem_7); + let list_elem_9 = PageLocation::read_from_in_protocol(i_prot)?; + val.push(list_elem_9); } i_prot.read_list_end()?; f_1 = Some(val); }, + 2 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_10 = i_prot.read_i64()?; + val.push(list_elem_10); + } + i_prot.read_list_end()?; + f_2 = Some(val); + }, _ => { i_prot.skip(field_ident.field_type)?; }, @@ -4459,6 +4655,7 @@ impl crate::thrift::TSerializable for OffsetIndex { verify_required_field_exists("OffsetIndex.page_locations", &f_1)?; let ret = OffsetIndex { page_locations: f_1.expect("auto-generated code should have checked for presence of required fields"), + unencoded_byte_array_data_bytes: f_2, }; Ok(ret) } @@ -4472,6 +4669,15 @@ impl crate::thrift::TSerializable for OffsetIndex { } o_prot.write_list_end()?; o_prot.write_field_end()?; + if let Some(ref fld_var) = self.unencoded_byte_array_data_bytes { + o_prot.write_field_begin(&TFieldIdentifier::new("unencoded_byte_array_data_bytes", TType::List, 2))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::I64, fld_var.len() as i32))?; + for e in fld_var { + o_prot.write_i64(*e)?; + } + o_prot.write_list_end()?; + o_prot.write_field_end()? + } o_prot.write_field_stop()?; o_prot.write_struct_end() } @@ -4481,8 +4687,14 @@ impl crate::thrift::TSerializable for OffsetIndex { // ColumnIndex // -/// Description for ColumnIndex. -/// Each ``\[i\] refers to the page at OffsetIndex.page_locations\[i\] +/// Optional statistics for each data page in a ColumnChunk. +/// +/// Forms part the page index, along with OffsetIndex. +/// +/// If this structure is present, OffsetIndex must also be present. +/// +/// For each field in this structure, ``\[i\] refers to the page at +/// OffsetIndex.page_locations\[i\] #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct ColumnIndex { /// A list of Boolean values to determine the validity of the corresponding @@ -4508,16 +4720,33 @@ pub struct ColumnIndex { pub boundary_order: BoundaryOrder, /// A list containing the number of null values for each page * pub null_counts: Option>, + /// Contains repetition level histograms for each page + /// concatenated together. The repetition_level_histogram field on + /// SizeStatistics contains more details. + /// + /// When present the length should always be (number of pages * + /// (max_repetition_level + 1)) elements. + /// + /// Element 0 is the first element of the histogram for the first page. + /// Element (max_repetition_level + 1) is the first element of the histogram + /// for the second page. + /// + pub repetition_level_histograms: Option>, + /// Same as repetition_level_histograms except for definitions levels. + /// + pub definition_level_histograms: Option>, } impl ColumnIndex { - pub fn new(null_pages: Vec, min_values: Vec>, max_values: Vec>, boundary_order: BoundaryOrder, null_counts: F5) -> ColumnIndex where F5: Into>> { + pub fn new(null_pages: Vec, min_values: Vec>, max_values: Vec>, boundary_order: BoundaryOrder, null_counts: F5, repetition_level_histograms: F6, definition_level_histograms: F7) -> ColumnIndex where F5: Into>>, F6: Into>>, F7: Into>> { ColumnIndex { null_pages, min_values, max_values, boundary_order, null_counts: null_counts.into(), + repetition_level_histograms: repetition_level_histograms.into(), + definition_level_histograms: definition_level_histograms.into(), } } } @@ -4530,6 +4759,8 @@ impl crate::thrift::TSerializable for ColumnIndex { let mut f_3: Option>> = None; let mut f_4: Option = None; let mut f_5: Option> = None; + let mut f_6: Option> = None; + let mut f_7: Option> = None; loop { let field_ident = i_prot.read_field_begin()?; if field_ident.field_type == TType::Stop { @@ -4541,8 +4772,8 @@ impl crate::thrift::TSerializable for ColumnIndex { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_8 = i_prot.read_bool()?; - val.push(list_elem_8); + let list_elem_11 = i_prot.read_bool()?; + val.push(list_elem_11); } i_prot.read_list_end()?; f_1 = Some(val); @@ -4551,8 +4782,8 @@ impl crate::thrift::TSerializable for ColumnIndex { let list_ident = i_prot.read_list_begin()?; let mut val: Vec> = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_9 = i_prot.read_bytes()?; - val.push(list_elem_9); + let list_elem_12 = i_prot.read_bytes()?; + val.push(list_elem_12); } i_prot.read_list_end()?; f_2 = Some(val); @@ -4561,8 +4792,8 @@ impl crate::thrift::TSerializable for ColumnIndex { let list_ident = i_prot.read_list_begin()?; let mut val: Vec> = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_10 = i_prot.read_bytes()?; - val.push(list_elem_10); + let list_elem_13 = i_prot.read_bytes()?; + val.push(list_elem_13); } i_prot.read_list_end()?; f_3 = Some(val); @@ -4575,12 +4806,32 @@ impl crate::thrift::TSerializable for ColumnIndex { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_11 = i_prot.read_i64()?; - val.push(list_elem_11); + let list_elem_14 = i_prot.read_i64()?; + val.push(list_elem_14); } i_prot.read_list_end()?; f_5 = Some(val); }, + 6 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_15 = i_prot.read_i64()?; + val.push(list_elem_15); + } + i_prot.read_list_end()?; + f_6 = Some(val); + }, + 7 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_16 = i_prot.read_i64()?; + val.push(list_elem_16); + } + i_prot.read_list_end()?; + f_7 = Some(val); + }, _ => { i_prot.skip(field_ident.field_type)?; }, @@ -4598,6 +4849,8 @@ impl crate::thrift::TSerializable for ColumnIndex { max_values: f_3.expect("auto-generated code should have checked for presence of required fields"), boundary_order: f_4.expect("auto-generated code should have checked for presence of required fields"), null_counts: f_5, + repetition_level_histograms: f_6, + definition_level_histograms: f_7, }; Ok(ret) } @@ -4637,6 +4890,24 @@ impl crate::thrift::TSerializable for ColumnIndex { o_prot.write_list_end()?; o_prot.write_field_end()? } + if let Some(ref fld_var) = self.repetition_level_histograms { + o_prot.write_field_begin(&TFieldIdentifier::new("repetition_level_histograms", TType::List, 6))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::I64, fld_var.len() as i32))?; + for e in fld_var { + o_prot.write_i64(*e)?; + } + o_prot.write_list_end()?; + o_prot.write_field_end()? + } + if let Some(ref fld_var) = self.definition_level_histograms { + o_prot.write_field_begin(&TFieldIdentifier::new("definition_level_histograms", TType::List, 7))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::I64, fld_var.len() as i32))?; + for e in fld_var { + o_prot.write_i64(*e)?; + } + o_prot.write_list_end()?; + o_prot.write_field_end()? + } o_prot.write_field_stop()?; o_prot.write_struct_end() } @@ -4996,8 +5267,8 @@ impl crate::thrift::TSerializable for FileMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_12 = SchemaElement::read_from_in_protocol(i_prot)?; - val.push(list_elem_12); + let list_elem_17 = SchemaElement::read_from_in_protocol(i_prot)?; + val.push(list_elem_17); } i_prot.read_list_end()?; f_2 = Some(val); @@ -5010,8 +5281,8 @@ impl crate::thrift::TSerializable for FileMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_13 = RowGroup::read_from_in_protocol(i_prot)?; - val.push(list_elem_13); + let list_elem_18 = RowGroup::read_from_in_protocol(i_prot)?; + val.push(list_elem_18); } i_prot.read_list_end()?; f_4 = Some(val); @@ -5020,8 +5291,8 @@ impl crate::thrift::TSerializable for FileMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_14 = KeyValue::read_from_in_protocol(i_prot)?; - val.push(list_elem_14); + let list_elem_19 = KeyValue::read_from_in_protocol(i_prot)?; + val.push(list_elem_19); } i_prot.read_list_end()?; f_5 = Some(val); @@ -5034,8 +5305,8 @@ impl crate::thrift::TSerializable for FileMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_15 = ColumnOrder::read_from_in_protocol(i_prot)?; - val.push(list_elem_15); + let list_elem_20 = ColumnOrder::read_from_in_protocol(i_prot)?; + val.push(list_elem_20); } i_prot.read_list_end()?; f_7 = Some(val); diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index cd124031cfdc..3e0f6ce3a8b3 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -89,12 +89,15 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { for (column_index, column_layout) in offset_index.iter().zip(&row_group_layout.columns) { assert_eq!( - column_index.len(), + column_index.page_locations.len(), column_layout.pages.len(), "index page count mismatch" ); - for (idx, (page, page_layout)) in - column_index.iter().zip(&column_layout.pages).enumerate() + for (idx, (page, page_layout)) in column_index + .page_locations + .iter() + .zip(&column_layout.pages) + .enumerate() { assert_eq!( page.compressed_page_size as usize, @@ -102,6 +105,7 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { "index page {idx} size mismatch" ); let next_first_row_index = column_index + .page_locations .get(idx + 1) .map(|x| x.first_row_index) .unwrap_or_else(|| row_group.num_rows());