Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetch blocks of vals in aggregation for all cardinality #1950

Merged
merged 2 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions columnar/src/block_accessor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::{Column, DocId, RowId};

#[derive(Debug, Default, Clone)]
pub struct ColumnBlockAccessor<T> {
val_cache: Vec<T>,
docid_cache: Vec<DocId>,
row_id_cache: Vec<RowId>,
}

impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
ColumnBlockAccessor<T>
{
#[inline]
pub fn fetch_block(&mut self, docs: &[u32], accessor: &Column<T>) {
self.docid_cache.clear();
self.row_id_cache.clear();
accessor.row_ids_for_docs(docs, &mut self.docid_cache, &mut self.row_id_cache);
self.val_cache.resize(self.row_id_cache.len(), T::default());
accessor
.values
.get_vals(&self.row_id_cache, &mut self.val_cache);
}

#[inline]
pub fn iter_vals(&self) -> impl Iterator<Item = T> + '_ {
self.val_cache.iter().cloned()
}

#[inline]
pub fn iter_docid_vals(&self) -> impl Iterator<Item = (DocId, T)> + '_ {
self.docid_cache
.iter()
.cloned()
.zip(self.val_cache.iter().cloned())
}
}
23 changes: 20 additions & 3 deletions columnar/src/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use serialize::{
use crate::column_index::ColumnIndex;
use crate::column_values::monotonic_mapping::StrictlyMonotonicMappingToInternal;
use crate::column_values::{monotonic_map_column, ColumnValues};
use crate::{Cardinality, MonotonicallyMappableToU64, RowId};
use crate::{Cardinality, DocId, MonotonicallyMappableToU64, RowId};

#[derive(Clone)]
pub struct Column<T = u64> {
Expand Down Expand Up @@ -68,8 +68,25 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
self.values_for_doc(row_id).next()
}

pub fn values_for_doc(&self, row_id: RowId) -> impl Iterator<Item = T> + '_ {
self.value_row_ids(row_id)
/// Translates a block of docis to row_ids.
///
/// returns the row_ids and the matching docids on the same index
/// e.g.
/// DocId In: [0, 5, 6]
/// DocId Out: [0, 0, 6, 6]
/// RowId Out: [0, 1, 2, 3]
#[inline]
pub fn row_ids_for_docs(
&self,
doc_ids: &[DocId],
doc_ids_out: &mut Vec<DocId>,
row_ids: &mut Vec<RowId>,
) {
self.idx.docids_to_rowids(doc_ids, doc_ids_out, row_ids)
}

pub fn values_for_doc(&self, doc_id: DocId) -> impl Iterator<Item = T> + '_ {
self.value_row_ids(doc_id)
.map(|value_row_id: RowId| self.values.get_val(value_row_id))
}

Expand Down
39 changes: 39 additions & 0 deletions columnar/src/column_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,45 @@ impl ColumnIndex {
}
}

/// Translates a block of docis to row_ids.
///
/// returns the row_ids and the matching docids on the same index
/// e.g.
/// DocId In: [0, 5, 6]
/// DocId Out: [0, 0, 6, 6]
/// RowId Out: [0, 1, 2, 3]
#[inline]
pub fn docids_to_rowids(
&self,
doc_ids: &[DocId],
doc_ids_out: &mut Vec<DocId>,
row_ids: &mut Vec<RowId>,
) {
match self {
ColumnIndex::Empty { .. } => {}
ColumnIndex::Full => {
doc_ids_out.extend_from_slice(doc_ids);
row_ids.extend_from_slice(doc_ids);
}
ColumnIndex::Optional(optional_index) => {
for doc_id in doc_ids {
if let Some(row_id) = optional_index.rank_if_exists(*doc_id) {
doc_ids_out.push(*doc_id);
row_ids.push(row_id);
}
}
}
ColumnIndex::Multivalued(multivalued_index) => {
for doc_id in doc_ids {
for row_id in multivalued_index.range(*doc_id) {
doc_ids_out.push(*doc_id);
row_ids.push(row_id);
}
}
}
}
}

pub fn docid_range_to_rowids(&self, doc_id: Range<DocId>) -> Range<RowId> {
match self {
ColumnIndex::Empty { .. } => 0..0,
Expand Down
2 changes: 2 additions & 0 deletions columnar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern crate test;

use std::io;

mod block_accessor;
mod column;
mod column_index;
pub mod column_values;
Expand All @@ -19,6 +20,7 @@ mod iterable;
pub(crate) mod utils;
mod value;

pub use block_accessor::ColumnBlockAccessor;
pub use column::{BytesColumn, Column, StrColumn};
pub use column_index::ColumnIndex;
pub use column_values::{ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64};
Expand Down
6 changes: 5 additions & 1 deletion src/aggregation/agg_req_with_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::sync::Arc;

use columnar::{Column, ColumnType, ColumnValues, StrColumn};
use columnar::{Column, ColumnBlockAccessor, ColumnType, ColumnValues, StrColumn};

use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
use super::bucket::{
Expand Down Expand Up @@ -45,6 +45,7 @@ pub struct BucketAggregationWithAccessor {
pub(crate) bucket_agg: BucketAggregationType,
pub(crate) sub_aggregation: AggregationsWithAccessor,
pub(crate) limits: AggregationLimits,
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
}

impl BucketAggregationWithAccessor {
Expand Down Expand Up @@ -85,6 +86,7 @@ impl BucketAggregationWithAccessor {
bucket_agg: bucket.clone(),
str_dict_column,
limits,
column_block_accessor: Default::default(),
})
}
}
Expand All @@ -95,6 +97,7 @@ pub struct MetricAggregationWithAccessor {
pub metric: MetricAggregation,
pub field_type: ColumnType,
pub accessor: Column<u64>,
pub column_block_accessor: ColumnBlockAccessor<u64>,
}

impl MetricAggregationWithAccessor {
Expand All @@ -115,6 +118,7 @@ impl MetricAggregationWithAccessor {
accessor,
field_type,
metric: metric.clone(),
column_block_accessor: Default::default(),
})
}
}
Expand Down
71 changes: 26 additions & 45 deletions src/aggregation/bucket/histogram/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::aggregation::segment_agg_result::{
build_segment_agg_collector, AggregationLimits, SegmentAggregationCollector,
};
use crate::aggregation::{f64_from_fastfield_u64, format_date, VecWithNames};
use crate::{DocId, TantivyError};
use crate::TantivyError;

/// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`.
/// Each document value is rounded down to its bucket.
Expand Down Expand Up @@ -235,7 +235,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)
}
Expand All @@ -244,11 +244,9 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor;
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
let bucket_agg_accessor = &mut agg_with_accessor.buckets.values[self.accessor_idx];

let mem_pre = self.get_memory_consumption();

Expand All @@ -257,20 +255,26 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
let offset = self.offset;
let get_bucket_pos = |val| (get_bucket_pos_f64(val, interval, offset) as i64);

for doc in docs {
for val in accessor.values_for_doc(*doc) {
let val = self.f64_from_fastfield_u64(val);

let bucket_pos = get_bucket_pos(val);

if bounds.contains(val) {
self.increment_bucket(
bucket_pos,
*doc,
sub_aggregation_accessor,
interval,
offset,
)?;
bucket_agg_accessor
.column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor);

for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() {
let val = self.f64_from_fastfield_u64(val);

let bucket_pos = get_bucket_pos(val);

if bounds.contains(val) {
let bucket = self.buckets.entry(bucket_pos).or_insert_with(|| {
let key = get_bucket_key_from_pos(bucket_pos as f64, interval, offset);
SegmentHistogramBucketEntry { key, doc_count: 0 }
});
bucket.doc_count += 1;
if let Some(sub_aggregation_blueprint) = self.sub_aggregation_blueprint.as_mut() {
self.sub_aggregations
.entry(bucket_pos)
.or_insert_with(|| sub_aggregation_blueprint.clone())
.collect(doc, &mut bucket_agg_accessor.sub_aggregation)?;
}
}
}
Expand All @@ -283,9 +287,9 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
Ok(())
}

fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
&mut agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;

for sub_aggregation in self.sub_aggregations.values_mut() {
sub_aggregation.flush(sub_aggregation_accessor)?;
Expand Down Expand Up @@ -360,29 +364,6 @@ impl SegmentHistogramCollector {
})
}

#[inline]
fn increment_bucket(
&mut self,
bucket_pos: i64,
doc: DocId,
bucket_with_accessor: &AggregationsWithAccessor,
interval: f64,
offset: f64,
) -> crate::Result<()> {
let bucket = self.buckets.entry(bucket_pos).or_insert_with(|| {
let key = get_bucket_key_from_pos(bucket_pos as f64, interval, offset);
SegmentHistogramBucketEntry { key, doc_count: 0 }
});
bucket.doc_count += 1;
if let Some(sub_aggregation_blueprint) = self.sub_aggregation_blueprint.as_mut() {
self.sub_aggregations
.entry(bucket_pos)
.or_insert_with(|| sub_aggregation_blueprint.clone())
.collect(doc, bucket_with_accessor)?;
}
Ok(())
}

#[inline]
fn f64_from_fastfield_u64(&self, val: u64) -> f64 {
f64_from_fastfield_u64(val, &self.column_type)
Expand Down
31 changes: 16 additions & 15 deletions src/aggregation/bucket/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl SegmentAggregationCollector for SegmentRangeCollector {
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.collect_block(&[doc], agg_with_accessor)
}
Expand All @@ -221,30 +221,31 @@ impl SegmentAggregationCollector for SegmentRangeCollector {
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &AggregationsWithAccessor,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
let accessor = &agg_with_accessor.buckets.values[self.accessor_idx].accessor;
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
for doc in docs {
for val in accessor.values_for_doc(*doc) {
let bucket_pos = self.get_bucket_pos(val);
let bucket_agg_accessor = &mut agg_with_accessor.buckets.values[self.accessor_idx];

let bucket = &mut self.buckets[bucket_pos];
bucket_agg_accessor
.column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor);

bucket.bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.collect(*doc, sub_aggregation_accessor)?;
}
for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() {
let bucket_pos = self.get_bucket_pos(val);

let bucket = &mut self.buckets[bucket_pos];

bucket.bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.collect(doc, &mut bucket_agg_accessor.sub_aggregation)?;
}
}

Ok(())
}

fn flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
let sub_aggregation_accessor =
&agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;
&mut agg_with_accessor.buckets.values[self.accessor_idx].sub_aggregation;

for bucket in self.buckets.iter_mut() {
if let Some(sub_agg) = bucket.bucket.sub_aggregation.as_mut() {
Expand Down
Loading