Skip to content

Commit

Permalink
forward error in aggregation collect
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed May 12, 2022
1 parent 9834b16 commit 351290e
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 43 deletions.
2 changes: 2 additions & 0 deletions src/aggregation/agg_req_with_accessor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! This will enhance the request tree with access to the fastfield and metadata.

use std::rc::Rc;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
Expand Down
25 changes: 14 additions & 11 deletions src/aggregation/bucket/histogram/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl SegmentHistogramCollector {
doc: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
force_flush: bool,
) {
) -> crate::Result<()> {
let bounds = self.bounds;
let interval = self.interval;
let offset = self.offset;
Expand Down Expand Up @@ -341,28 +341,28 @@ impl SegmentHistogramCollector {
bucket_pos0,
docs[0],
&bucket_with_accessor.sub_aggregation,
);
)?;
self.increment_bucket_if_in_bounds(
val1,
&bounds,
bucket_pos1,
docs[1],
&bucket_with_accessor.sub_aggregation,
);
)?;
self.increment_bucket_if_in_bounds(
val2,
&bounds,
bucket_pos2,
docs[2],
&bucket_with_accessor.sub_aggregation,
);
)?;
self.increment_bucket_if_in_bounds(
val3,
&bounds,
bucket_pos3,
docs[3],
&bucket_with_accessor.sub_aggregation,
);
)?;
}
for doc in iter.remainder() {
let val = f64_from_fastfield_u64(accessor.get(*doc), &self.field_type);
Expand All @@ -376,16 +376,17 @@ impl SegmentHistogramCollector {
self.buckets[bucket_pos].key,
get_bucket_val(val, self.interval, self.offset) as f64
);
self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation);
self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation)?;
}
if force_flush {
if let Some(sub_aggregations) = self.sub_aggregations.as_mut() {
for sub_aggregation in sub_aggregations {
sub_aggregation
.flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush);
.flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush)?;
}
}
}
Ok(())
}

#[inline]
Expand All @@ -396,15 +397,16 @@ impl SegmentHistogramCollector {
bucket_pos: usize,
doc: DocId,
bucket_with_accessor: &AggregationsWithAccessor,
) {
) -> crate::Result<()> {
if bounds.contains(val) {
debug_assert_eq!(
self.buckets[bucket_pos].key,
get_bucket_val(val, self.interval, self.offset) as f64
);

self.increment_bucket(bucket_pos, doc, bucket_with_accessor);
self.increment_bucket(bucket_pos, doc, bucket_with_accessor)?;
}
Ok(())
}

#[inline]
Expand All @@ -413,12 +415,13 @@ impl SegmentHistogramCollector {
bucket_pos: usize,
doc: DocId,
bucket_with_accessor: &AggregationsWithAccessor,
) {
) -> crate::Result<()> {
let bucket = &mut self.buckets[bucket_pos];
bucket.doc_count += 1;
if let Some(sub_aggregation) = self.sub_aggregations.as_mut() {
(&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor);
(&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor)?;
}
Ok(())
}

fn f64_from_fastfield_u64(&self, val: u64) -> f64 {
Expand Down
20 changes: 11 additions & 9 deletions src/aggregation/bucket/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl SegmentRangeCollector {
doc: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
force_flush: bool,
) {
) -> crate::Result<()> {
let mut iter = doc.chunks_exact(4);
let accessor = bucket_with_accessor
.accessor
Expand All @@ -240,24 +240,25 @@ impl SegmentRangeCollector {
let bucket_pos3 = self.get_bucket_pos(val3);
let bucket_pos4 = self.get_bucket_pos(val4);

self.increment_bucket(bucket_pos1, docs[0], &bucket_with_accessor.sub_aggregation);
self.increment_bucket(bucket_pos2, docs[1], &bucket_with_accessor.sub_aggregation);
self.increment_bucket(bucket_pos3, docs[2], &bucket_with_accessor.sub_aggregation);
self.increment_bucket(bucket_pos4, docs[3], &bucket_with_accessor.sub_aggregation);
self.increment_bucket(bucket_pos1, docs[0], &bucket_with_accessor.sub_aggregation)?;
self.increment_bucket(bucket_pos2, docs[1], &bucket_with_accessor.sub_aggregation)?;
self.increment_bucket(bucket_pos3, docs[2], &bucket_with_accessor.sub_aggregation)?;
self.increment_bucket(bucket_pos4, docs[3], &bucket_with_accessor.sub_aggregation)?;
}
for doc in iter.remainder() {
let val = accessor.get(*doc);
let bucket_pos = self.get_bucket_pos(val);
self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation);
self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation)?;
}
if force_flush {
for bucket in &mut self.buckets {
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation
.flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush);
.flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush)?;
}
}
}
Ok(())
}

#[inline]
Expand All @@ -266,13 +267,14 @@ impl SegmentRangeCollector {
bucket_pos: usize,
doc: DocId,
bucket_with_accessor: &AggregationsWithAccessor,
) {
) -> crate::Result<()> {
let bucket = &mut self.buckets[bucket_pos];

bucket.bucket.doc_count += 1;
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
sub_aggregation.collect(doc, bucket_with_accessor);
sub_aggregation.collect(doc, bucket_with_accessor)?;
}
Ok(())
}

#[inline]
Expand Down
26 changes: 14 additions & 12 deletions src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,26 +246,27 @@ impl TermBuckets {
doc: DocId,
bucket_with_accessor: &AggregationsWithAccessor,
blueprint: &Option<SegmentAggregationResultsCollector>,
) {
// self.ensure_vec_exists(term_ids);
) -> crate::Result<()> {
for &term_id in term_ids {
let entry = self
.entries
.entry(term_id as u32)
.or_insert_with(|| TermBucketEntry::from_blueprint(blueprint));
entry.doc_count += 1;
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
sub_aggregations.collect(doc, bucket_with_accessor);
sub_aggregations.collect(doc, bucket_with_accessor)?;
}
}
Ok(())
}

fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) {
fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> {
for entry in &mut self.entries.values_mut() {
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
sub_aggregations.flush_staged_docs(agg_with_accessor, false);
sub_aggregations.flush_staged_docs(agg_with_accessor, false)?;
}
}
Ok(())
}
}

Expand Down Expand Up @@ -421,7 +422,7 @@ impl SegmentTermCollector {
doc: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
force_flush: bool,
) {
) -> crate::Result<()> {
let accessor = bucket_with_accessor
.accessor
.as_multi()
Expand All @@ -442,25 +443,25 @@ impl SegmentTermCollector {
docs[0],
&bucket_with_accessor.sub_aggregation,
&self.blueprint,
);
)?;
self.term_buckets.increment_bucket(
&vals2,
docs[1],
&bucket_with_accessor.sub_aggregation,
&self.blueprint,
);
)?;
self.term_buckets.increment_bucket(
&vals3,
docs[2],
&bucket_with_accessor.sub_aggregation,
&self.blueprint,
);
)?;
self.term_buckets.increment_bucket(
&vals4,
docs[3],
&bucket_with_accessor.sub_aggregation,
&self.blueprint,
);
)?;
}
for &doc in iter.remainder() {
accessor.get_vals(doc, &mut vals1);
Expand All @@ -470,12 +471,13 @@ impl SegmentTermCollector {
doc,
&bucket_with_accessor.sub_aggregation,
&self.blueprint,
);
)?;
}
if force_flush {
self.term_buckets
.force_flush(&bucket_with_accessor.sub_aggregation);
.force_flush(&bucket_with_accessor.sub_aggregation)?;
}
Ok(())
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/aggregation/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,13 @@ impl SegmentCollector for AggregationSegmentCollector {

#[inline]
fn collect(&mut self, doc: crate::DocId, _score: crate::Score) -> crate::Result<()> {
self.result.collect(doc, &self.aggs_with_accessor);
self.result.collect(doc, &self.aggs_with_accessor)?;
Ok(())
}

fn harvest(mut self) -> Self::Fruit {
self.result
.flush_staged_docs(&self.aggs_with_accessor, true);
.flush_staged_docs(&self.aggs_with_accessor, true)?;
self.result
.into_intermediate_aggregations_result(&self.aggs_with_accessor)
}
Expand Down
21 changes: 12 additions & 9 deletions src/aggregation/segment_agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,22 @@ impl SegmentAggregationResultsCollector {
&mut self,
doc: crate::DocId,
agg_with_accessor: &AggregationsWithAccessor,
) {
) -> crate::Result<()> {
self.staged_docs[self.num_staged_docs] = doc;
self.num_staged_docs += 1;
if self.num_staged_docs == self.staged_docs.len() {
self.flush_staged_docs(agg_with_accessor, false);
self.flush_staged_docs(agg_with_accessor, false)?;
}
Ok(())
}

pub(crate) fn flush_staged_docs(
&mut self,
agg_with_accessor: &AggregationsWithAccessor,
force_flush: bool,
) {
) -> crate::Result<()> {
if self.num_staged_docs == 0 {
return;
return Ok(());
}
if let Some(metrics) = &mut self.metrics {
for (collector, agg_with_accessor) in
Expand All @@ -148,11 +149,12 @@ impl SegmentAggregationResultsCollector {
&self.staged_docs[..self.num_staged_docs],
agg_with_accessor,
force_flush,
);
)?;
}
}

self.num_staged_docs = 0;
Ok(())
}
}

Expand Down Expand Up @@ -256,17 +258,18 @@ impl SegmentBucketResultCollector {
doc: &[DocId],
bucket_with_accessor: &BucketAggregationWithAccessor,
force_flush: bool,
) {
) -> crate::Result<()> {
match self {
SegmentBucketResultCollector::Range(range) => {
range.collect_block(doc, bucket_with_accessor, force_flush);
range.collect_block(doc, bucket_with_accessor, force_flush)?;
}
SegmentBucketResultCollector::Histogram(histogram) => {
histogram.collect_block(doc, bucket_with_accessor, force_flush)
histogram.collect_block(doc, bucket_with_accessor, force_flush)?;
}
SegmentBucketResultCollector::Terms(terms) => {
terms.collect_block(doc, bucket_with_accessor, force_flush)
terms.collect_block(doc, bucket_with_accessor, force_flush)?;
}
}
Ok(())
}
}

0 comments on commit 351290e

Please sign in to comment.