Skip to content

Commit

Permalink
collect columns for merge (quickwit-oss#1812)
Browse files Browse the repository at this point in the history
* collect columns for merge

* return column_type from, fix visibility

* fix

Co-authored-by: Paul Masurel <paul@quickwit.io>
  • Loading branch information
2 people authored and Jamie Hodkinson committed Jan 30, 2023
1 parent af85713 commit df212a8
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 5 deletions.
7 changes: 7 additions & 0 deletions columnar/src/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ impl<T: PartialOrd> Column<T> {
}
}
}

pub fn min_value(&self) -> T {
self.values.min_value()
}
pub fn max_value(&self) -> T {
self.values.max_value()
}
}

impl<T: PartialOrd + Copy + Send + Sync + 'static> Column<T> {
Expand Down
5 changes: 3 additions & 2 deletions columnar/src/column_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ impl<'a> ColumnIndex<'a> {
}
}
ColumnIndex::Multivalued(multivalued_index) => {
let start = multivalued_index.get_val(row_id);
let end = multivalued_index.get_val(row_id + 1);
let multivalued_index_ref = &**multivalued_index;
let start: u32 = multivalued_index_ref.get_val(row_id);
let end: u32 = multivalued_index_ref.get_val(row_id + 1);
start..end
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ fn test_sparse_block_set_u16_max() {
use proptest::prelude::*;

proptest! {
#![proptest_config(ProptestConfig::with_cases(1))]
#[test]
fn test_prop_test_dense(els in proptest::collection::btree_set(0..=u16::MAX, 0..=u16::MAX as usize)) {
let vals: Vec<u16> = els.into_iter().collect();
Expand Down
26 changes: 26 additions & 0 deletions columnar/src/column_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,32 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync {
}
}

impl<T: Copy + PartialOrd> ColumnValues<T> for std::sync::Arc<dyn ColumnValues<T>> {
fn get_val(&self, idx: u32) -> T {
self.as_ref().get_val(idx)
}

fn min_value(&self) -> T {
self.as_ref().min_value()
}

fn max_value(&self) -> T {
self.as_ref().max_value()
}

fn num_vals(&self) -> u32 {
self.as_ref().num_vals()
}

fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = T> + 'b> {
self.as_ref().iter()
}

fn get_range(&self, start: u64, output: &mut [T]) {
self.as_ref().get_range(start, output)
}
}

impl<'a, C: ColumnValues<T> + ?Sized, T: Copy + PartialOrd> ColumnValues<T> for &'a C {
fn get_val(&self, idx: u32) -> T {
(*self).get_val(idx)
Expand Down
16 changes: 14 additions & 2 deletions columnar/src/columnar/column_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ impl From<NumericalType> for ColumnType {
}

impl ColumnType {
/// get column type category
pub(crate) fn column_type_category(self) -> ColumnTypeCategory {
match self {
ColumnType::I64 | ColumnType::U64 | ColumnType::F64 => ColumnTypeCategory::Numerical,
ColumnType::Bytes => ColumnTypeCategory::Bytes,
ColumnType::Str => ColumnTypeCategory::Str,
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
}
}

pub fn numerical_type(&self) -> Option<NumericalType> {
match self {
ColumnType::I64 => Some(NumericalType::I64),
Expand Down Expand Up @@ -149,9 +161,9 @@ impl HasAssociatedColumnType for Ipv6Addr {
/// at most one column exist per `ColumnTypeCategory`.
///
/// See also [README.md].
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)]
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
#[repr(u8)]
pub(crate) enum ColumnTypeCategory {
pub enum ColumnTypeCategory {
Bool,
Str,
Numerical,
Expand Down
143 changes: 143 additions & 0 deletions columnar/src/columnar/merge.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::collections::HashMap;
use std::io;

use super::column_type::ColumnTypeCategory;
use crate::columnar::ColumnarReader;
use crate::dynamic_column::DynamicColumn;

pub enum MergeDocOrder {
/// Columnar tables are simply stacked one above the other.
Expand Down Expand Up @@ -31,3 +34,143 @@ pub fn merge_columnar(
}
}
}

pub fn collect_columns(
columnar_readers: &[&ColumnarReader],
) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> {
// Each column name may have multiple types of column associated.
// For merging we are interested in the same column type category since they can be merged.
let mut field_name_to_group: HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>> =
HashMap::new();

for columnar_reader in columnar_readers {
let column_name_and_handle = columnar_reader.list_columns()?;
for (column_name, handle) in column_name_and_handle {
let column_type_to_handles = field_name_to_group
.entry(column_name.to_string())
.or_default();

let columns = column_type_to_handles
.entry(handle.column_type().column_type_category())
.or_default();
columns.push(handle.open()?);
}
}

normalize_columns(&mut field_name_to_group);

Ok(field_name_to_group)
}

/// Cast numerical type columns to the same type
pub(crate) fn normalize_columns(
map: &mut HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>,
) {
for (_field_name, type_category_to_columns) in map.iter_mut() {
for (type_category, columns) in type_category_to_columns {
if type_category == &ColumnTypeCategory::Numerical {
let casted_columns = cast_to_common_numerical_column(&columns);
*columns = casted_columns;
}
}
}
}

/// Receives a list of columns of numerical types (u64, i64, f64)
///
/// Returns a list of `DynamicColumn` which are all of the same numerical type
fn cast_to_common_numerical_column(columns: &[DynamicColumn]) -> Vec<DynamicColumn> {
assert!(columns
.iter()
.all(|column| column.column_type().numerical_type().is_some()));
let coerce_to_i64: Vec<_> = columns
.iter()
.map(|column| column.clone().coerce_to_i64())
.collect();

if coerce_to_i64.iter().all(|column| column.is_some()) {
return coerce_to_i64
.into_iter()
.map(|column| column.unwrap())
.collect();
}

let coerce_to_u64: Vec<_> = columns
.iter()
.map(|column| column.clone().coerce_to_u64())
.collect();

if coerce_to_u64.iter().all(|column| column.is_some()) {
return coerce_to_u64
.into_iter()
.map(|column| column.unwrap())
.collect();
}

columns
.iter()
.map(|column| {
column
.clone()
.coerce_to_f64()
.expect("couldn't cast column to f64")
})
.collect()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::ColumnarWriter;

#[test]
fn test_column_coercion() {
// i64 type
let columnar1 = {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(1u32, "numbers", 1i64);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(2, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
};
// u64 type
let columnar2 = {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(1u32, "numbers", u64::MAX - 100);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(2, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
};

// f64 type
let columnar3 = {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(1u32, "numbers", 30.5);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(2, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
};

let column_map = collect_columns(&[&columnar1, &columnar2, &columnar3]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);

let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_f64()));

let column_map = collect_columns(&[&columnar1, &columnar1]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_i64()));

let column_map = collect_columns(&[&columnar2, &columnar2]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_u64()));
}
}
Loading

0 comments on commit df212a8

Please sign in to comment.