Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: partition results helper #2299

Merged
merged 34 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
727563b
feat: add non-cryptographic hash function
tychoish Dec 23, 2023
e901342
feat: partition results helper
tychoish Dec 23, 2023
1586c02
fix tests
tychoish Dec 23, 2023
5c498d1
add multi-value
tychoish Dec 23, 2023
b90a281
fix lint
tychoish Dec 23, 2023
7ee4254
Merge branch 'tycho/hashing-fnv-and-siphash' into tycho/workload-shar…
tychoish Dec 23, 2023
8619d28
Merge remote-tracking branch 'origin/main' into tycho/hashing-fnv-and…
tychoish Dec 23, 2023
dba516b
Merge branch 'tycho/hashing-fnv-and-siphash' into tycho/workload-shar…
tychoish Dec 23, 2023
0f37091
fix types
tychoish Dec 23, 2023
133f60c
fix sig
tychoish Dec 24, 2023
bd3da32
fix: scalar udf function parsing
tychoish Dec 24, 2023
94895c0
interger upcast
tychoish Dec 24, 2023
9afe476
mod updates
tychoish Dec 24, 2023
50d4bd8
cleanup complete
tychoish Dec 24, 2023
1d05030
fix merge
tychoish Dec 24, 2023
1d7f073
Merge branch 'tycho/hashing-fnv-and-siphash' into tycho/workload-shar…
tychoish Dec 25, 2023
3ce73d8
backport
tychoish Dec 25, 2023
b888e88
Merge branch 'tycho/scalar-value-handling' into tycho/hashing-fnv-and…
tychoish Dec 25, 2023
31a45e3
Merge branch 'tycho/scalar-value-handling' into tycho/workload-sharding
tychoish Dec 25, 2023
1f59ad0
fix lint
tychoish Dec 25, 2023
38db426
Merge branch 'tycho/scalar-value-handling' into tycho/hashing-fnv-and…
tychoish Dec 25, 2023
50eb8b1
Merge branch 'tycho/scalar-value-handling' into tycho/workload-sharding
tychoish Dec 25, 2023
61834a4
cleanup
tychoish Dec 25, 2023
732715e
Merge branch 'tycho/hashing-fnv-and-siphash' into tycho/workload-shar…
tychoish Dec 25, 2023
6a0eea3
reject missing expected value
tychoish Dec 25, 2023
e373eb6
fix
tychoish Dec 25, 2023
29c1f5b
Merge branch 'main' into tycho/scalar-value-handling
tychoish Dec 27, 2023
6d13faa
Merge remote-tracking branch 'origin/main' into tycho/hashing-fnv-and…
tychoish Dec 27, 2023
6835f65
add test case
tychoish Dec 27, 2023
dcfd47f
Merge branch 'tycho/scalar-value-handling' into tycho/hashing-fnv-and…
tychoish Dec 27, 2023
6e70939
Merge remote-tracking branch 'origin/tycho/scalar-value-handling' int…
tychoish Dec 27, 2023
69a26be
fixup merge
tychoish Dec 28, 2023
842ed85
Merge branch 'tycho/hashing-fnv-and-siphash' into tycho/workload-shar…
tychoish Dec 28, 2023
8350038
Merge remote-tracking branch 'origin/main' into tycho/workload-sharding
tychoish Dec 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/sqlbuiltins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ bson = "2.7.0"
tokio-util = "0.7.10"
bytes = "1.5.0"
kdl = "5.0.0-alpha.1"
siphasher = "1.0.0"
fnv = "1.0.7"
memoize = { version = "0.4.2", features = ["full"] }
53 changes: 51 additions & 2 deletions crates/sqlbuiltins/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,56 @@
#[derive(Debug, thiserror::Error)]
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::error::ArrowError;
use datafusion::error::DataFusionError;

#[derive(Clone, Debug, thiserror::Error)]
pub enum BuiltinError {
#[error("parse error: {0}")]
ParseError(String),

#[error("fundamental parsing error")]
FundamentalError,

#[error("missing value at index {0}")]
MissingValueAtIndex(usize),

#[error("expected value missing")]
MissingValue,

#[error("invalid value: {0}")]
InvalidValue(String),

#[error("columnar values not support at index {0}")]
InvalidColumnarValue(usize),

#[error("value was type {0}, expected {1}")]
IncorrectType(DataType, DataType),

#[error(transparent)]
DatafusionExtError(#[from] datafusion_ext::errors::ExtensionError),
KdlError(#[from] kdl::KdlError),

#[error("DataFusionError: {0}")]
DataFusionError(String),

#[error("ArrowError: {0}")]
ArrowError(String),
}

pub type Result<T, E = BuiltinError> = std::result::Result<T, E>;

impl From<BuiltinError> for DataFusionError {
fn from(e: BuiltinError) -> Self {
DataFusionError::Execution(e.to_string())
}
}

impl From<DataFusionError> for BuiltinError {
fn from(e: DataFusionError) -> Self {
BuiltinError::DataFusionError(e.to_string())
}
}

impl From<ArrowError> for BuiltinError {
fn from(e: ArrowError) -> Self {
BuiltinError::ArrowError(e.to_string())
}
}
27 changes: 17 additions & 10 deletions crates/sqlbuiltins/src/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@ mod aggregates;
mod scalars;
mod table;

use self::scalars::df_scalars::ArrowCastFunction;
use self::scalars::kdl::{KDLMatches, KDLSelect};
use self::scalars::{postgres::*, ConnectionId, Version};
use self::table::{BuiltinTableFuncs, TableFunc};
use std::collections::HashMap;
use std::sync::Arc;

use datafusion::logical_expr::{AggregateFunction, BuiltinScalarFunction, Expr, Signature};
use once_cell::sync::Lazy;

use scalars::df_scalars::ArrowCastFunction;
use scalars::hashing::{FnvHash, PartitionResults, SipHash};
use scalars::kdl::{KDLMatches, KDLSelect};
use scalars::postgres::*;
use scalars::{ConnectionId, Version};
use table::{BuiltinTableFuncs, TableFunc};

use protogen::metastore::types::catalog::{
EntryMeta, EntryType, FunctionEntry, FunctionType, RuntimePreference,
};

use std::collections::HashMap;
use std::sync::Arc;

/// Builtin table returning functions available for all sessions.
static BUILTIN_TABLE_FUNCS: Lazy<BuiltinTableFuncs> = Lazy::new(BuiltinTableFuncs::new);
pub static ARROW_CAST_FUNC: Lazy<ArrowCastFunction> = Lazy::new(|| ArrowCastFunction {});
Expand Down Expand Up @@ -187,12 +190,16 @@ impl FunctionRegistry {
Arc::new(PgTableIsVisible),
Arc::new(PgEncodingToChar),
Arc::new(PgArrayToString),
// System functions
Arc::new(ConnectionId),
Arc::new(Version),
// KDL functions
Arc::new(KDLMatches),
Arc::new(KDLSelect),
// Other functions
Arc::new(ConnectionId),
Arc::new(Version),
// Hashing/Partitioning
Arc::new(SipHash),
Arc::new(FnvHash),
Arc::new(PartitionResults),
];
let udfs = udfs
.into_iter()
Expand Down
155 changes: 155 additions & 0 deletions crates/sqlbuiltins/src/functions/scalars/hashing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use std::hash::{Hash, Hasher};

use fnv::FnvHasher;
use siphasher::sip::SipHasher24;

use super::*;

pub struct SipHash;

impl ConstBuiltinFunction for SipHash {
const NAME: &'static str = "siphash";
const DESCRIPTION: &'static str =
"Calculates a 64bit non-cryptographic hash (SipHash24) of the value.";
const EXAMPLE: &'static str = "siphash(<value>)";
const FUNCTION_TYPE: FunctionType = FunctionType::Scalar;

fn signature(&self) -> Option<Signature> {
Some(Signature::new(
// args: <FIELD>
TypeSignature::Any(1),
Volatility::Immutable,
))
}
}
impl BuiltinScalarUDF for SipHash {
fn as_expr(&self, args: Vec<Expr>) -> Expr {
let udf = ScalarUDF {
name: Self::NAME.to_string(),
signature: ConstBuiltinFunction::signature(self).unwrap(),
return_type: Arc::new(|_| Ok(Arc::new(DataType::UInt64))),
fun: Arc::new(move |input| {
Ok(get_nth_scalar_value(input, 0, &|value| -> Result<
ScalarValue,
BuiltinError,
> {
let mut hasher = SipHasher24::new();
value.hash(&mut hasher);
Ok(ScalarValue::UInt64(Some(hasher.finish())))
})?)
}),
};
Expr::ScalarUDF(datafusion::logical_expr::expr::ScalarUDF::new(
Arc::new(udf),
args,
))
}
}

pub struct FnvHash;

impl ConstBuiltinFunction for FnvHash {
const NAME: &'static str = "fnv";
const DESCRIPTION: &'static str =
"Calculates a 64bit non-cryptographic hash (fnv1a) of the value.";
const EXAMPLE: &'static str = "fnv(<value>)";
const FUNCTION_TYPE: FunctionType = FunctionType::Scalar;

fn signature(&self) -> Option<Signature> {
Some(Signature::new(
// args: <FIELD>
TypeSignature::Any(1),
Volatility::Immutable,
))
}
}

impl BuiltinScalarUDF for FnvHash {
fn as_expr(&self, args: Vec<Expr>) -> Expr {
let udf = ScalarUDF {
name: Self::NAME.to_string(),
signature: ConstBuiltinFunction::signature(self).unwrap(),
return_type: Arc::new(|_| Ok(Arc::new(DataType::UInt64))),
fun: Arc::new(move |input| {
Ok(get_nth_scalar_value(input, 0, &|value| -> Result<
ScalarValue,
BuiltinError,
> {
let mut hasher = FnvHasher::default();
value.hash(&mut hasher);
Ok(ScalarValue::UInt64(Some(hasher.finish())))
})?)
}),
};
Expr::ScalarUDF(datafusion::logical_expr::expr::ScalarUDF::new(
Arc::new(udf),
args,
))
}
}

pub struct PartitionResults;

impl ConstBuiltinFunction for PartitionResults {
const NAME: &'static str = "partition_results";
const DESCRIPTION: &'static str =
"Returns true if the value is in the partition ID given the number of partitions.";
const EXAMPLE: &'static str = "partition_results(<value>, <num_partitions>, <partition_id>)";
const FUNCTION_TYPE: FunctionType = FunctionType::Scalar;

fn signature(&self) -> Option<Signature> {
Some(Signature::new(
// args: <FIELD>, <num_partitions>, <partition_id>
TypeSignature::Any(3),
Volatility::Immutable,
))
}
}

impl BuiltinScalarUDF for PartitionResults {
fn as_expr(&self, args: Vec<Expr>) -> Expr {
let udf = ScalarUDF {
name: Self::NAME.to_string(),
signature: ConstBuiltinFunction::signature(self).unwrap(),
return_type: Arc::new(|_| Ok(Arc::new(DataType::Utf8))),
fun: Arc::new(move |input| {
if input.len() != 3 {
return Err(DataFusionError::Execution(
"must specify exactly three arguments".to_string(),
));
}

let num_partitions = get_nth_u64_fn_arg(input, 1)?;
let partition_id = get_nth_u64_fn_arg(input, 2)?;

if partition_id >= num_partitions {
return Err(DataFusionError::Execution(
format!(
"id {} must be less than number of partitions {}",
partition_id, num_partitions,
)
.to_string(),
));
}

// hash at the end once the other arguments are
// validated because the hashing is potentially the
// expensive part
Ok(get_nth_scalar_value(input, 0, &|value| -> Result<
ScalarValue,
BuiltinError,
> {
let mut hasher = FnvHasher::default();
value.hash(&mut hasher);
Ok(ScalarValue::Boolean(Some(
hasher.finish() % num_partitions == partition_id,
)))
})?)
}),
};
Expr::ScalarUDF(datafusion::logical_expr::expr::ScalarUDF::new(
Arc::new(udf),
args,
))
}
}
Loading
Loading