Skip to content

Commit

Permalink
Integrated fastfield codecs into columnar.
Browse files Browse the repository at this point in the history
Introduced asymetric OptionalCodec / SerializableOptionalCodec
Removed cardinality from the columnar sstable.
Added DynamicColumn
Reorganized all files
Change DenseCodec serialization logic.
  • Loading branch information
fulmicoton committed Jan 12, 2023
1 parent 7a8fce0 commit 44d010c
Show file tree
Hide file tree
Showing 43 changed files with 6,024 additions and 291 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ unstable = [] # useful for benches.
quickwit = ["sstable"]

[workspace]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "columnar", "tokenizer-api"]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "tokenizer-api"]

# Following the "fail" crate best practises, we isolate
# tests that define specific behavior in fail check points
Expand Down
13 changes: 12 additions & 1 deletion columnar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,19 @@ thiserror = "1"
fnv = "1"
sstable = { path = "../sstable", package = "tantivy-sstable" }
common = { path = "../common", package = "tantivy-common" }
fastfield_codecs = { path = "../fastfield_codecs"}
itertools = "0.10"
log = "0.4"
tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
prettytable-rs = {version="0.10.0", optional= true}
rand = {version="0.8.3", optional= true}
fastdivide = "0.4"
measure_time = { version="0.8.2", optional=true}

[dev-dependencies]
proptest = "1"
more-asserts = "0.3.0"
rand = "0.8.3"

# temporary
[workspace]
members = []
43 changes: 43 additions & 0 deletions columnar/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,46 @@ be done by listing all keys prefixed by

The associated range of bytes refer to a range of bytes

This crate exposes a columnar format for tantivy.
This format is described in README.md


The crate introduces the following concepts.

`Columnar` is an equivalent of a dataframe.
It maps `column_key` to `Column`.

A `Column<T>` asssociated a `RowId` (u32) to any
number of values.

This is made possible by wrapping a `ColumnIndex` and a `ColumnValue` object.
The `ColumnValue<T>` represents a mapping that associates each `RowId` to
exactly one single value.

The `ColumnIndex` then maps each RowId to a set of `RowId` in the
`ColumnValue`.

For optimization, and compression purposes, the `ColumnIndex` has three
possible representation, each for different cardinalities.

- Full

All RowId have exactly one value. The ColumnIndex is the trivial mapping.

- Optional

All RowIds can have at most one value. The ColumnIndex is the trivial mapping `ColumnRowId -> Option<ColumnValueRowId>`.

- Multivalued

All RowIds can have any number of values.
The column index is mapping values to a range.


All these objects are implemented an unit tested independently
in their own module:

- columnar
- column_index
- column_values
- column
30 changes: 30 additions & 0 deletions columnar/src/TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Review the public API via cargo doc
Add versioning.

add dictionary encoded stuff
better errors? io::Error is overused at the moment.
fix multivalued
replug i128
replug all unit tests
find a way to make columnar work with strict types
go through TODOs
re-add ZSTD compression for dictionaries
fix enhance column-cli
remvoe all doc_id occurences -> row_id
add metrics helper for aggregate. sum(row_id)
review inline
merges
adhoc solution for bool?
f32?

Investigate column length, padding, etc.

plug to tantivy fast field
plug to aggregation

rationalize codecs? Do we need sparse + dense for the optional codec


---
not yet
autodetect datetime ipaddr, plug customizable tokenizer.
75 changes: 75 additions & 0 deletions columnar/src/column/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
mod serialize;

use std::sync::Arc;

use common::BinarySerializable;
pub use serialize::{open_column_u64, serialize_column_u64};

use crate::column_index::ColumnIndex;
use crate::column_values::ColumnValues;
use crate::{InvalidData, RowId};

#[derive(Clone)]
pub struct Column<T> {
pub idx: ColumnIndex<'static>,
pub values: Arc<dyn ColumnValues<T>>,
}

impl<T: PartialOrd> Column<T> {
pub fn first(&self, row_id: RowId) -> Option<T> {
match &self.idx {
ColumnIndex::Full => Some(self.values.get_val(row_id)),
ColumnIndex::Optional(opt_idx) => {
let value_row_idx = opt_idx.translate_to_codec_idx(row_id)?;
Some(self.values.get_val(value_row_idx))
}
ColumnIndex::Multivalued(multivalued_index) => {
todo!();
}
}
}
}

/// Enum describing the number of values that can exist per document
/// (or per row if you will).
///
/// The cardinality must fit on 2 bits.
#[derive(Clone, Copy, Hash, Default, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
pub enum Cardinality {
/// All documents contain exactly one value.
/// `Full` is the default for auto-detecting the Cardinality, since it is the most strict.
#[default]
Full = 0,
/// All documents contain at most one value.
Optional = 1,
/// All documents may contain any number of values.
Multivalued = 2,
}

impl Cardinality {
pub(crate) fn to_code(self) -> u8 {
self as u8
}

pub(crate) fn try_from_code(code: u8) -> Result<Cardinality, InvalidData> {
match code {
0 => Ok(Cardinality::Full),
1 => Ok(Cardinality::Optional),
2 => Ok(Cardinality::Multivalued),
_ => Err(InvalidData),
}
}
}

impl BinarySerializable for Cardinality {
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
self.to_code().serialize(writer)
}

fn deserialize<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
let cardinality_code = u8::deserialize(reader)?;
let cardinality = Cardinality::try_from_code(cardinality_code)?;
Ok(cardinality)
}
}
41 changes: 41 additions & 0 deletions columnar/src/column/serialize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::io;
use std::io::Write;

use common::{CountingWriter, OwnedBytes};

use crate::column::Column;
use crate::column_index::{serialize_column_index, SerializableColumnIndex};
use crate::column_values::{
serialize_column_values, ColumnValues, MonotonicallyMappableToU64, ALL_CODEC_TYPES,
};

pub fn serialize_column_u64<T: MonotonicallyMappableToU64>(
column_index: SerializableColumnIndex<'_>,
column_values: &impl ColumnValues<T>,
output: &mut impl Write,
) -> io::Result<()> {
let mut counting_writer = CountingWriter::wrap(output);
serialize_column_index(column_index, &mut counting_writer)?;
let column_index_num_bytes = counting_writer.written_bytes() as u32;
let output = counting_writer.finish();
serialize_column_values(column_values, &ALL_CODEC_TYPES[..], output)?;
output.write_all(&column_index_num_bytes.to_le_bytes())?;
Ok(())
}

pub fn open_column_u64<T: MonotonicallyMappableToU64>(bytes: OwnedBytes) -> io::Result<Column<T>> {
let (body, column_index_num_bytes_payload) = bytes.rsplit(4);
let column_index_num_bytes = u32::from_le_bytes(
column_index_num_bytes_payload
.as_slice()
.try_into()
.unwrap(),
);
let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize);
let column_index = crate::column_index::open_column_index(column_index_data)?;
let column_values = crate::column_values::open_u64_mapped(column_values_data)?;
Ok(Column {
idx: column_index,
values: column_values,
})
}
39 changes: 39 additions & 0 deletions columnar/src/column_index/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
mod multivalued_index;
mod optional_index;
mod serialize;

use std::sync::Arc;

pub use optional_index::SerializableOptionalIndex;
pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex};

use crate::column::Cardinality;
use crate::column_index::optional_index::OptionalIndex;

#[derive(Clone)]
pub enum ColumnIndex<'a> {
Full,
Optional(OptionalIndex),
// TODO remove the Arc<dyn> apart from serialization this is not
// dynamic at all.
Multivalued(Arc<dyn MultivaluedIndex + 'a>),
}

impl<'a> ColumnIndex<'a> {
pub fn get_cardinality(&self) -> Cardinality {
match self {
ColumnIndex::Full => Cardinality::Full,
ColumnIndex::Optional(_) => Cardinality::Optional,
ColumnIndex::Multivalued(_) => Cardinality::Multivalued,
}
}
}

pub trait MultivaluedIndex {
/// The number of docs in the column.
fn num_docs(&self) -> u32;
/// The number of values in the column.
fn num_vals(&self) -> u32;
/// Return the start index of the values for each doc
fn iter(&self) -> Box<dyn Iterator<Item = u32> + '_>;
}
17 changes: 17 additions & 0 deletions columnar/src/column_index/multivalued_index/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::io;
use std::sync::Arc;

use common::OwnedBytes;

use super::MultivaluedIndex;

pub fn serialize_multivalued_index(
multivalued_index: &dyn MultivaluedIndex,
output: &mut impl io::Write,
) -> io::Result<()> {
todo!();
}

pub fn open_multivalued_index(bytes: OwnedBytes) -> io::Result<Arc<dyn MultivaluedIndex>> {
todo!();
}
Loading

0 comments on commit 44d010c

Please sign in to comment.