Skip to content

Commit

Permalink
Make it possible to force a column type and intricate bugfix. (quickw…
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored and Jamie Hodkinson committed Jan 30, 2023
1 parent b64a137 commit af85713
Show file tree
Hide file tree
Showing 11 changed files with 513 additions and 142 deletions.
37 changes: 36 additions & 1 deletion columnar/src/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl<T: PartialOrd> Column<T> {
}
}

impl<T: PartialOrd> Column<T> {
impl<T: PartialOrd + Copy + Send + Sync + 'static> Column<T> {
pub fn first(&self, row_id: RowId) -> Option<T> {
self.values(row_id).next()
}
Expand All @@ -44,6 +44,13 @@ impl<T: PartialOrd> Column<T> {
self.value_row_ids(row_id)
.map(|value_row_id: RowId| self.values.get_val(value_row_id))
}

pub fn first_or_default_col(self, default_value: T) -> Arc<dyn ColumnValues<T>> {
Arc::new(FirstValueWithDefault {
column: self,
default_value,
})
}
}

impl<T> Deref for Column<T> {
Expand All @@ -65,3 +72,31 @@ impl BinarySerializable for Cardinality {
Ok(cardinality)
}
}

// TODO simplify or optimize
struct FirstValueWithDefault<T: Copy> {
column: Column<T>,
default_value: T,
}

impl<T: PartialOrd + Send + Sync + Copy + 'static> ColumnValues<T> for FirstValueWithDefault<T> {
fn get_val(&self, idx: u32) -> T {
self.column.first(idx).unwrap_or(self.default_value)
}

fn min_value(&self) -> T {
self.column.values.min_value()
}

fn max_value(&self) -> T {
self.column.values.max_value()
}

fn num_vals(&self) -> u32 {
match &self.column.idx {
ColumnIndex::Full => self.column.values.num_vals(),
ColumnIndex::Optional(optional_idx) => optional_idx.num_rows(),
ColumnIndex::Multivalued(_) => todo!(),
}
}
}
14 changes: 14 additions & 0 deletions columnar/src/column_values/monotonic_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,20 @@ impl MonotonicallyMappableToU64 for i64 {
}
}

impl MonotonicallyMappableToU64 for crate::DateTime {
#[inline(always)]
fn to_u64(self) -> u64 {
common::i64_to_u64(self.timestamp_micros)
}

#[inline(always)]
fn from_u64(val: u64) -> Self {
crate::DateTime {
timestamp_micros: common::u64_to_i64(val),
}
}
}

impl MonotonicallyMappableToU64 for bool {
#[inline(always)]
fn to_u64(self) -> u64 {
Expand Down
173 changes: 153 additions & 20 deletions columnar/src/columnar/column_type.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::Ipv6Addr;

use crate::value::NumericalType;
use crate::InvalidData;

Expand All @@ -11,10 +13,44 @@ pub enum ColumnType {
I64 = 0u8,
U64 = 1u8,
F64 = 2u8,
Bytes = 3u8,
Str = 4u8,
Bool = 5u8,
IpAddr = 6u8,
Bytes = 10u8,
Str = 14u8,
Bool = 18u8,
IpAddr = 22u8,
DateTime = 26u8,
}

#[cfg(test)]
const COLUMN_TYPES: [ColumnType; 8] = [
ColumnType::I64,
ColumnType::U64,
ColumnType::F64,
ColumnType::Bytes,
ColumnType::Str,
ColumnType::Bool,
ColumnType::IpAddr,
ColumnType::DateTime,
];

impl ColumnType {
pub fn to_code(self) -> u8 {
self as u8
}

pub(crate) fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> {
use ColumnType::*;
match code {
0u8 => Ok(I64),
1u8 => Ok(U64),
2u8 => Ok(F64),
10u8 => Ok(Bytes),
14u8 => Ok(Str),
18u8 => Ok(Bool),
22u8 => Ok(IpAddr),
26u8 => Ok(Self::DateTime),
_ => Err(InvalidData),
}
}
}

impl From<NumericalType> for ColumnType {
Expand All @@ -33,26 +69,108 @@ impl ColumnType {
ColumnType::I64 => Some(NumericalType::I64),
ColumnType::U64 => Some(NumericalType::U64),
ColumnType::F64 => Some(NumericalType::F64),
ColumnType::Bytes | ColumnType::Str | ColumnType::Bool | ColumnType::IpAddr => None,
ColumnType::Bytes
| ColumnType::Str
| ColumnType::Bool
| ColumnType::IpAddr
| ColumnType::DateTime => None,
}
}
}

/// Encoded over 6 bits.
pub(crate) fn to_code(self) -> u8 {
self as u8
// TODO remove if possible
pub trait HasAssociatedColumnType: 'static + Send + Sync + Copy + PartialOrd {
fn column_type() -> ColumnType;
fn default_value() -> Self;
}

impl HasAssociatedColumnType for u64 {
fn column_type() -> ColumnType {
ColumnType::U64
}

pub(crate) fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> {
use ColumnType::*;
match code {
0u8 => Ok(I64),
1u8 => Ok(U64),
2u8 => Ok(F64),
3u8 => Ok(Bytes),
4u8 => Ok(Str),
5u8 => Ok(Bool),
6u8 => Ok(IpAddr),
_ => Err(InvalidData),
fn default_value() -> Self {
0u64
}
}

impl HasAssociatedColumnType for i64 {
fn column_type() -> ColumnType {
ColumnType::I64
}

fn default_value() -> Self {
0i64
}
}

impl HasAssociatedColumnType for f64 {
fn column_type() -> ColumnType {
ColumnType::F64
}

fn default_value() -> Self {
Default::default()
}
}

impl HasAssociatedColumnType for bool {
fn column_type() -> ColumnType {
ColumnType::Bool
}
fn default_value() -> Self {
Default::default()
}
}

impl HasAssociatedColumnType for crate::DateTime {
fn column_type() -> ColumnType {
ColumnType::DateTime
}
fn default_value() -> Self {
Default::default()
}
}

impl HasAssociatedColumnType for Ipv6Addr {
fn column_type() -> ColumnType {
ColumnType::IpAddr
}

fn default_value() -> Self {
Ipv6Addr::from([0u8; 16])
}
}

/// Column types are grouped into different categories that
/// corresponds to the different types of `JsonValue` types.
///
/// The columnar writer will apply coercion rules to make sure that
/// at most one column exist per `ColumnTypeCategory`.
///
/// See also [README.md].
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug)]
#[repr(u8)]
pub(crate) enum ColumnTypeCategory {
Bool,
Str,
Numerical,
DateTime,
Bytes,
IpAddr,
}

impl From<ColumnType> for ColumnTypeCategory {
fn from(column_type: ColumnType) -> Self {
match column_type {
ColumnType::I64 => ColumnTypeCategory::Numerical,
ColumnType::U64 => ColumnTypeCategory::Numerical,
ColumnType::F64 => ColumnTypeCategory::Numerical,
ColumnType::Bytes => ColumnTypeCategory::Bytes,
ColumnType::Str => ColumnTypeCategory::Str,
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
}
}
}
Expand All @@ -73,7 +191,22 @@ mod tests {
assert!(column_type_set.insert(column_type));
}
}
assert_eq!(column_type_set.len(), 7);
assert_eq!(column_type_set.len(), super::COLUMN_TYPES.len());
}

#[test]
fn test_column_category_sort_consistent_with_column_type_sort() {
// This is a very important property because we
// we need to serialize colunmn in the right order.
let mut column_types: Vec<ColumnType> = super::COLUMN_TYPES.iter().copied().collect();
column_types.sort_by_key(|col| col.to_code());
let column_categories: Vec<ColumnTypeCategory> = column_types
.into_iter()
.map(ColumnTypeCategory::from)
.collect();
for (prev, next) in column_categories.iter().zip(column_categories.iter()) {
assert!(prev <= next);
}
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion columnar/src/columnar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod merge;
mod reader;
mod writer;

pub use column_type::ColumnType;
pub use column_type::{ColumnType, HasAssociatedColumnType};
pub use merge::{merge_columnar, MergeDocOrder};
pub use reader::ColumnarReader;
pub use writer::ColumnarWriter;
5 changes: 3 additions & 2 deletions columnar/src/columnar/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl ColumnarReader {
})
}

// TODO fix ugly API
// TODO Add unit tests
pub fn list_columns(&self) -> io::Result<Vec<(String, DynamicColumnHandle)>> {
let mut stream = self.column_dictionary.stream()?;
let mut results = Vec::new();
Expand All @@ -55,7 +55,8 @@ impl ColumnarReader {
.map_err(|_| io_invalid_data(format!("Unknown column code `{column_code}`")))?;
let range = stream.value().clone();
let column_name =
String::from_utf8_lossy(&key_bytes[..key_bytes.len() - 1]).to_string();
// The last two bytes are respectively the 0u8 separator and the column_type.
String::from_utf8_lossy(&key_bytes[..key_bytes.len() - 2]).to_string();
let file_slice = self
.column_data
.slice(range.start as usize..range.end as usize);
Expand Down
Loading

0 comments on commit af85713

Please sign in to comment.