Skip to content

Commit

Permalink
refactor: corrected to aggregation function
Browse files Browse the repository at this point in the history
  • Loading branch information
sunng87 committed Oct 5, 2024
1 parent 8bdccaf commit 5d75f3e
Show file tree
Hide file tree
Showing 24 changed files with 244 additions and 240 deletions.
3 changes: 3 additions & 0 deletions src/common/function/src/scalars/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub use polyval::PolyvalAccumulatorCreator;
pub use scipy_stats_norm_cdf::ScipyStatsNormCdfAccumulatorCreator;
pub use scipy_stats_norm_pdf::ScipyStatsNormPdfAccumulatorCreator;

use super::geo::encoding::GeojsonPathEncodeFunctionCreator;
use crate::function_registry::FunctionRegistry;

/// A function creates `AggregateFunctionCreator`.
Expand Down Expand Up @@ -91,5 +92,7 @@ impl AggregateFunctions {
register_aggr_func!("argmin", 1, ArgminAccumulatorCreator);
register_aggr_func!("scipystatsnormcdf", 2, ScipyStatsNormCdfAccumulatorCreator);
register_aggr_func!("scipystatsnormpdf", 2, ScipyStatsNormPdfAccumulatorCreator);

register_aggr_func!("geojson_encode_path", 3, GeojsonPathEncodeFunctionCreator);
}
}
5 changes: 4 additions & 1 deletion src/common/function/src/scalars/aggregate/argmax.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::cmp::Ordering;
use std::sync::Arc;

use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{BadAccumulatorImplSnafu, CreateAccumulatorSnafu, Result};
use common_query::error::{
BadAccumulatorImplSnafu, CreateAccumulatorSnafu, InvalidInputStateSnafu, Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
Expand Down
5 changes: 4 additions & 1 deletion src/common/function/src/scalars/aggregate/argmin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::cmp::Ordering;
use std::sync::Arc;

use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{BadAccumulatorImplSnafu, CreateAccumulatorSnafu, Result};
use common_query::error::{
BadAccumulatorImplSnafu, CreateAccumulatorSnafu, InvalidInputStateSnafu, Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
Expand Down
4 changes: 3 additions & 1 deletion src/common/function/src/scalars/aggregate/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use std::sync::Arc;

use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
CreateAccumulatorSnafu, DowncastVectorSnafu, FromScalarValueSnafu, Result,
CreateAccumulatorSnafu, DowncastVectorSnafu, FromScalarValueSnafu, InvalidInputStateSnafu,
Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
Expand Down
4 changes: 3 additions & 1 deletion src/common/function/src/scalars/aggregate/mean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use std::sync::Arc;

use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu, Result,
BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu, InvalidInputStateSnafu,
Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
Expand Down
3 changes: 2 additions & 1 deletion src/common/function/src/scalars/aggregate/polyval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
self, BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu,
FromScalarValueSnafu, InvalidInputColSnafu, Result,
FromScalarValueSnafu, InvalidInputColSnafu, InvalidInputStateSnafu, Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
self, BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu,
FromScalarValueSnafu, GenerateFunctionSnafu, InvalidInputColSnafu, Result,
FromScalarValueSnafu, GenerateFunctionSnafu, InvalidInputColSnafu, InvalidInputStateSnafu,
Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
self, BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu,
FromScalarValueSnafu, GenerateFunctionSnafu, InvalidInputColSnafu, Result,
FromScalarValueSnafu, GenerateFunctionSnafu, InvalidInputColSnafu, InvalidInputStateSnafu,
Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
Expand Down
5 changes: 1 addition & 4 deletions src/common/function/src/scalars/geo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::sync::Arc;
mod encoding;
pub(crate) mod encoding;
mod geohash;
mod h3;
mod helpers;
Expand Down Expand Up @@ -42,8 +42,5 @@ impl GeoFunctions {
registry.register(Arc::new(h3::H3CellToString));
registry.register(Arc::new(h3::H3IsNeighbour));
registry.register(Arc::new(h3::H3StringToCell));

// encodings
registry.register(Arc::new(encoding::GeojsonPathEncode));
}
}
230 changes: 154 additions & 76 deletions src/common/function/src/scalars/geo/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,108 +12,147 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_query::error::{self, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{self, InvalidFuncArgsSnafu, InvalidInputStateSnafu, Result};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::AccumulatorCreatorFunction;
use common_time::Timestamp;
use datafusion::logical_expr::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use derive_more::Display;
use once_cell::sync::Lazy;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::VectorRef;
use snafu::{ensure, ResultExt};

use super::helpers::{ensure_columns_len, ensure_columns_n};
use crate::function::{Function, FunctionContext};

static COORDINATE_TYPES: Lazy<Vec<ConcreteDataType>> = Lazy::new(|| {
vec![
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
]
});

fn build_sorted_path(
columns: &[VectorRef],
) -> Result<Vec<(Option<f64>, Option<f64>, Option<Timestamp>)>> {
// this macro ensures column vectos has same size as well
ensure_columns_n!(columns, 3);

let lat = &columns[0];
let lng = &columns[1];
let ts = &columns[2];

let size = lat.len();

let mut work_vec = Vec::with_capacity(size);
for idx in 0..size {
work_vec.push((
lat.get(idx).as_f64_lossy(),
lng.get(idx).as_f64_lossy(),
ts.get(idx).as_timestamp(),
));
}

// sort by timestamp, we treat null timestamp as 0
work_vec.sort_unstable_by_key(|tuple| tuple.2.unwrap_or(Timestamp::new_second(0)));
Ok(work_vec)
/// Accumulator of lat, lng, timestmap tuples

Check warning on line 32 in src/common/function/src/scalars/geo/encoding.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"timestmap" should be "timestamp".
#[derive(Debug)]
pub struct GeojsonPathAccumulator {
timestamp_type: ConcreteDataType,
lat: Vec<Option<f64>>,
lng: Vec<Option<f64>>,
timestamp: Vec<Option<Timestamp>>,
}

/// This function accept rows of lat, lng and timestamp, sort with timestamp and
/// encoding them into a geojson-like path.
///
/// Example:
///
/// ```sql
/// SELECT geojson_encode(lat, lon, timestamp) FROM table;
/// ```
///
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct GeojsonPathEncode;
impl GeojsonPathAccumulator {
fn new(timestamp_type: ConcreteDataType) -> Self {
Self {
lat: Vec::default(),
lng: Vec::default(),
timestamp: Vec::default(),
timestamp_type,
}
}
}

impl Function for GeojsonPathEncode {
fn name(&self) -> &str {
"geojson_encode"
impl Accumulator for GeojsonPathAccumulator {
fn state(&self) -> Result<Vec<Value>> {
Ok(vec![
Value::List(ListValue::new(
self.lat.iter().map(|i| Value::from(i.clone())).collect(),

Check failure on line 56 in src/common/function/src/scalars/geo/encoding.rs

View workflow job for this annotation

GitHub Actions / Clippy

using `clone` on type `Option<f64>` which implements the `Copy` trait
ConcreteDataType::float64_datatype(),
)),
Value::List(ListValue::new(
self.lng.iter().map(|i| Value::from(i.clone())).collect(),

Check failure on line 60 in src/common/function/src/scalars/geo/encoding.rs

View workflow job for this annotation

GitHub Actions / Clippy

using `clone` on type `Option<f64>` which implements the `Copy` trait
ConcreteDataType::float64_datatype(),
)),
Value::List(ListValue::new(
self.timestamp
.iter()
.map(|i| Value::from(i.clone()))

Check failure on line 66 in src/common/function/src/scalars/geo/encoding.rs

View workflow job for this annotation

GitHub Actions / Clippy

using `clone` on type `Option<Timestamp>` which implements the `Copy` trait
.collect(),
self.timestamp_type.clone(),
)),
])
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
fn update_batch(&mut self, columns: &[VectorRef]) -> Result<()> {
ensure_columns_n!(columns, 3);

let lat = &columns[0];
let lng = &columns[1];
let ts = &columns[2];

let size = lat.len();

for idx in 0..size {
self.lat.push(lat.get(idx).as_f64_lossy());
self.lng.push(lng.get(idx).as_f64_lossy());
self.timestamp.push(ts.get(idx).as_timestamp());
}

Ok(())
}

fn signature(&self) -> Signature {
let mut signatures = Vec::new();
let coord_types = COORDINATE_TYPES.as_slice();

let ts_types = ConcreteDataType::timestamps();
for lat_type in coord_types {
for lng_type in coord_types {
for ts_type in &ts_types {
signatures.push(TypeSignature::Exact(vec![
lat_type.clone(),
lng_type.clone(),
ts_type.clone(),
]));
fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> {
ensure_columns_n!(states, 3);

let lat_lists = &states[0];
let lng_lists = &states[1];
let ts_lists = &states[2];

let len = lat_lists.len();

for idx in 0..len {
if let Some(lat_list) = lat_lists
.get(idx)
.as_list()
.map_err(|e| BoxedError::new(e))

Check failure on line 104 in src/common/function/src/scalars/geo/encoding.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant closure
.context(error::ExecuteSnafu)?
{
for v in lat_list.items() {
self.lat.push(v.as_f64_lossy());
}
}

if let Some(lng_list) = lng_lists
.get(idx)
.as_list()
.map_err(|e| BoxedError::new(e))

Check failure on line 115 in src/common/function/src/scalars/geo/encoding.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant closure
.context(error::ExecuteSnafu)?
{
for v in lng_list.items() {
self.lng.push(v.as_f64_lossy());
}
}

if let Some(ts_list) = ts_lists
.get(idx)
.as_list()
.map_err(|e| BoxedError::new(e))

Check failure on line 126 in src/common/function/src/scalars/geo/encoding.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant closure
.context(error::ExecuteSnafu)?
{
for v in ts_list.items() {
self.timestamp.push(v.as_timestamp());
}
}
}

Signature::one_of(signatures, Volatility::Stable)
Ok(())
}

fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
let work_vec = build_sorted_path(columns)?;
fn evaluate(&self) -> Result<Value> {
let mut work_vec: Vec<(&Option<f64>, &Option<f64>, &Option<Timestamp>)> = self
.lat
.iter()
.zip(self.lng.iter())
.zip(self.timestamp.iter())
.map(|((a, b), c)| (a, b, c))
.collect();

let mut results = StringVectorBuilder::with_capacity(1);
// sort by timestamp, we treat null timestamp as 0
work_vec.sort_unstable_by_key(|tuple| tuple.2.unwrap_or_else(|| Timestamp::new_second(0)));

let result = serde_json::to_string(
&work_vec
.into_iter()
// note that we transform to lng,lat for geojson compatibility
.map(|(lat, lng, _)| vec![lng, lat])
.collect::<Vec<Vec<Option<f64>>>>(),
.collect::<Vec<Vec<&Option<f64>>>>(),
)
.map_err(|e| {
BoxedError::new(PlainError::new(
Expand All @@ -123,8 +162,47 @@ impl Function for GeojsonPathEncode {
})
.context(error::ExecuteSnafu)?;

results.push(Some(&result));
Ok(Value::String(result.into()))
}
}

/// This function accept rows of lat, lng and timestamp, sort with timestamp and
/// encoding them into a geojson-like path.
///
/// Example:
///
/// ```sql
/// SELECT geojson_encode(lat, lon, timestamp) FROM table;
/// ```
///
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct GeojsonPathEncodeFunctionCreator {}

impl AggregateFunctionCreator for GeojsonPathEncodeFunctionCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
let ts_type = types[2].clone();
Ok(Box::new(GeojsonPathAccumulator::new(ts_type)))
});

creator
}

fn output_type(&self) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}

fn state_types(&self) -> Result<Vec<ConcreteDataType>> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 3, InvalidInputStateSnafu);

let timestamp_type = input_types[2].clone();

Ok(results.to_vector())
Ok(vec![
ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
ConcreteDataType::list_datatype(timestamp_type),
])
}
}
Loading

0 comments on commit 5d75f3e

Please sign in to comment.