Skip to content

Commit

Permalink
reuse stats for average
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Jan 13, 2023
1 parent 2650111 commit 78273bf
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 80 deletions.
18 changes: 10 additions & 8 deletions src/aggregation/intermediate_agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,21 +204,23 @@ pub enum IntermediateAggregationResult {
/// Holds the intermediate data for metric results
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum IntermediateMetricResult {
/// Average containing intermediate average data result
/// Intermediate average result
Average(IntermediateAverage),
/// AverageData variant
/// Intermediate stats result
Stats(IntermediateStats),
}

impl From<SegmentMetricResultCollector> for IntermediateMetricResult {
fn from(tree: SegmentMetricResultCollector) -> Self {
match tree {
SegmentMetricResultCollector::Average(collector) => {
IntermediateMetricResult::Average(IntermediateAverage::from_collector(collector))
}
SegmentMetricResultCollector::Stats(collector) => {
IntermediateMetricResult::Stats(collector.stats)
}
SegmentMetricResultCollector::Stats(collector) => match collector.collecting_for {
super::metric::SegmentStatsType::Stats => {
IntermediateMetricResult::Stats(collector.stats)
}
super::metric::SegmentStatsType::Avg => IntermediateMetricResult::Average(
IntermediateAverage::from_collector(collector),
),
},
}
}
}
Expand Down
64 changes: 7 additions & 57 deletions src/aggregation/metric/average.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use std::fmt::Debug;

use fastfield_codecs::Column;
use serde::{Deserialize, Serialize};

use crate::aggregation::f64_from_fastfield_u64;
use crate::schema::Type;
use crate::DocId;
use super::SegmentStatsCollector;

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
/// A single-value metric aggregation that computes the average of numeric values that are
Expand Down Expand Up @@ -36,61 +33,19 @@ impl AverageAggregation {
}
}

#[derive(Clone, PartialEq)]
pub(crate) struct SegmentAverageCollector {
pub data: IntermediateAverage,
field_type: Type,
}

impl Debug for SegmentAverageCollector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AverageCollector")
.field("data", &self.data)
.finish()
}
}

impl SegmentAverageCollector {
pub fn from_req(field_type: Type) -> Self {
Self {
field_type,
data: Default::default(),
}
}
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn Column<u64>) {
let mut iter = doc.chunks_exact(4);
for docs in iter.by_ref() {
let val1 = field.get_val(docs[0]);
let val2 = field.get_val(docs[1]);
let val3 = field.get_val(docs[2]);
let val4 = field.get_val(docs[3]);
let val1 = f64_from_fastfield_u64(val1, &self.field_type);
let val2 = f64_from_fastfield_u64(val2, &self.field_type);
let val3 = f64_from_fastfield_u64(val3, &self.field_type);
let val4 = f64_from_fastfield_u64(val4, &self.field_type);
self.data.collect(val1);
self.data.collect(val2);
self.data.collect(val3);
self.data.collect(val4);
}
for &doc in iter.remainder() {
let val = field.get_val(doc);
let val = f64_from_fastfield_u64(val, &self.field_type);
self.data.collect(val);
}
}
}

/// Contains mergeable version of average data.
#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct IntermediateAverage {
pub(crate) sum: f64,
pub(crate) doc_count: u64,
pub(crate) doc_count: u32,
}

impl IntermediateAverage {
pub(crate) fn from_collector(collector: SegmentAverageCollector) -> Self {
collector.data
pub(crate) fn from_collector(collector: SegmentStatsCollector) -> Self {
Self {
sum: collector.stats.sum,
doc_count: collector.stats.count,
}
}

/// Merge average data into this instance.
Expand All @@ -106,9 +61,4 @@ impl IntermediateAverage {
Some(self.sum / self.doc_count as f64)
}
}
#[inline]
fn collect(&mut self, val: f64) {
self.doc_count += 1;
self.sum += val;
}
}
27 changes: 20 additions & 7 deletions src/aggregation/metric/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl StatsAggregation {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Stats {
/// The number of documents.
pub count: usize,
pub count: u32,
/// The sum of the fast field values.
pub sum: f64,
/// The standard deviation of the fast field values. `None` for count == 0.
Expand Down Expand Up @@ -73,11 +73,16 @@ impl Stats {
/// `IntermediateStats` contains the mergeable version for stats.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct IntermediateStats {
count: usize,
sum: f64,
squared_sum: f64,
min: f64,
max: f64,
/// the number of values
pub count: u32,
/// the sum of the values
pub sum: f64,
/// the squared sum of the values
pub squared_sum: f64,
/// the min value of the values
pub min: f64,
/// the max value of the values
pub max: f64,
}
impl Default for IntermediateStats {
fn default() -> Self {
Expand Down Expand Up @@ -150,17 +155,25 @@ impl IntermediateStats {
}
}

#[derive(Clone, Debug, PartialEq)]
pub(crate) enum SegmentStatsType {
Stats,
Avg,
}

#[derive(Clone, Debug, PartialEq)]
pub(crate) struct SegmentStatsCollector {
pub(crate) stats: IntermediateStats,
field_type: Type,
pub(crate) collecting_for: SegmentStatsType,
}

impl SegmentStatsCollector {
pub fn from_req(field_type: Type) -> Self {
pub fn from_req(field_type: Type, collecting_for: SegmentStatsType) -> Self {
Self {
field_type,
stats: IntermediateStats::default(),
collecting_for,
}
}
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn Column<u64>) {
Expand Down
12 changes: 4 additions & 8 deletions src/aggregation/segment_agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector, SegmentTer
use super::collector::MAX_BUCKET_COUNT;
use super::intermediate_agg_result::{IntermediateAggregationResults, IntermediateBucketResult};
use super::metric::{
AverageAggregation, SegmentAverageCollector, SegmentStatsCollector, StatsAggregation,
AverageAggregation, SegmentStatsCollector, SegmentStatsType, StatsAggregation,
};
use super::VecWithNames;
use crate::aggregation::agg_req::BucketAggregationType;
Expand Down Expand Up @@ -163,30 +163,26 @@ impl SegmentAggregationResultsCollector {

#[derive(Clone, Debug, PartialEq)]
pub(crate) enum SegmentMetricResultCollector {
Average(SegmentAverageCollector),
Stats(SegmentStatsCollector),
}

impl SegmentMetricResultCollector {
pub fn from_req_and_validate(req: &MetricAggregationWithAccessor) -> crate::Result<Self> {
match &req.metric {
MetricAggregation::Average(AverageAggregation { field: _ }) => {
Ok(SegmentMetricResultCollector::Average(
SegmentAverageCollector::from_req(req.field_type),
Ok(SegmentMetricResultCollector::Stats(
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Avg),
))
}
MetricAggregation::Stats(StatsAggregation { field: _ }) => {
Ok(SegmentMetricResultCollector::Stats(
SegmentStatsCollector::from_req(req.field_type),
SegmentStatsCollector::from_req(req.field_type, SegmentStatsType::Stats),
))
}
}
}
pub(crate) fn collect_block(&mut self, doc: &[DocId], metric: &MetricAggregationWithAccessor) {
match self {
SegmentMetricResultCollector::Average(avg_collector) => {
avg_collector.collect_block(doc, &*metric.accessor);
}
SegmentMetricResultCollector::Stats(stats_collector) => {
stats_collector.collect_block(doc, &*metric.accessor);
}
Expand Down

0 comments on commit 78273bf

Please sign in to comment.