Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: partition results helper #2299

Merged
merged 34 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
727563b
feat: add non-cryptographic hash function
tychoish Dec 23, 2023
e901342
feat: partition results helper
tychoish Dec 23, 2023
1586c02
fix tests
tychoish Dec 23, 2023
5c498d1
add multi-value
tychoish Dec 23, 2023
b90a281
fix lint
tychoish Dec 23, 2023
7ee4254
Merge branch 'tycho/hashing-fnv-and-siphash' into tycho/workload-shar…
tychoish Dec 23, 2023
8619d28
Merge remote-tracking branch 'origin/main' into tycho/hashing-fnv-and…
tychoish Dec 23, 2023
dba516b
Merge branch 'tycho/hashing-fnv-and-siphash' into tycho/workload-shar…
tychoish Dec 23, 2023
0f37091
fix types
tychoish Dec 23, 2023
133f60c
fix sig
tychoish Dec 24, 2023
bd3da32
fix: scalar udf function parsing
tychoish Dec 24, 2023
94895c0
interger upcast
tychoish Dec 24, 2023
9afe476
mod updates
tychoish Dec 24, 2023
50d4bd8
cleanup complete
tychoish Dec 24, 2023
1d05030
fix merge
tychoish Dec 24, 2023
1d7f073
Merge branch 'tycho/hashing-fnv-and-siphash' into tycho/workload-shar…
tychoish Dec 25, 2023
3ce73d8
backport
tychoish Dec 25, 2023
b888e88
Merge branch 'tycho/scalar-value-handling' into tycho/hashing-fnv-and…
tychoish Dec 25, 2023
31a45e3
Merge branch 'tycho/scalar-value-handling' into tycho/workload-sharding
tychoish Dec 25, 2023
1f59ad0
fix lint
tychoish Dec 25, 2023
38db426
Merge branch 'tycho/scalar-value-handling' into tycho/hashing-fnv-and…
tychoish Dec 25, 2023
50eb8b1
Merge branch 'tycho/scalar-value-handling' into tycho/workload-sharding
tychoish Dec 25, 2023
61834a4
cleanup
tychoish Dec 25, 2023
732715e
Merge branch 'tycho/hashing-fnv-and-siphash' into tycho/workload-shar…
tychoish Dec 25, 2023
6a0eea3
reject missing expected value
tychoish Dec 25, 2023
e373eb6
fix
tychoish Dec 25, 2023
29c1f5b
Merge branch 'main' into tycho/scalar-value-handling
tychoish Dec 27, 2023
6d13faa
Merge remote-tracking branch 'origin/main' into tycho/hashing-fnv-and…
tychoish Dec 27, 2023
6835f65
add test case
tychoish Dec 27, 2023
dcfd47f
Merge branch 'tycho/scalar-value-handling' into tycho/hashing-fnv-and…
tychoish Dec 27, 2023
6e70939
Merge remote-tracking branch 'origin/tycho/scalar-value-handling' int…
tychoish Dec 27, 2023
69a26be
fixup merge
tychoish Dec 28, 2023
842ed85
Merge branch 'tycho/hashing-fnv-and-siphash' into tycho/workload-shar…
tychoish Dec 28, 2023
8350038
Merge remote-tracking branch 'origin/main' into tycho/workload-sharding
tychoish Dec 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/sqlbuiltins/src/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use once_cell::sync::Lazy;

use protogen::metastore::types::catalog::{EntryMeta, EntryType, FunctionEntry, FunctionType};
use scalars::df_scalars::ArrowCastFunction;
use scalars::hashing::{FnvHash, SipHash};
use scalars::hashing::{FnvHash, PartitionResults, SipHash};
use scalars::kdl::{KDLMatches, KDLSelect};
use scalars::postgres::*;
use scalars::{ConnectionId, Version};
Expand Down Expand Up @@ -192,9 +192,10 @@ impl FunctionRegistry {
// KDL functions
Arc::new(KDLMatches),
Arc::new(KDLSelect),
// Hashing/Sharding
// Hashing/Partitioning
Arc::new(SipHash),
Arc::new(FnvHash),
Arc::new(PartitionResults),
];
let udfs = udfs
.into_iter()
Expand Down
67 changes: 67 additions & 0 deletions crates/sqlbuiltins/src/functions/scalars/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl ConstBuiltinFunction for FnvHash {
))
}
}

impl BuiltinScalarUDF for FnvHash {
fn as_expr(&self, args: Vec<Expr>) -> Expr {
let udf = ScalarUDF {
Expand All @@ -86,3 +87,69 @@ impl BuiltinScalarUDF for FnvHash {
))
}
}

pub struct PartitionResults;

impl ConstBuiltinFunction for PartitionResults {
const NAME: &'static str = "partition_results";
const DESCRIPTION: &'static str =
"Returns true if the value is in the partition ID given the number of partitions.";
const EXAMPLE: &'static str = "partition_results(<value>, <num_partitions>, <partition_id>)";
const FUNCTION_TYPE: FunctionType = FunctionType::Scalar;

fn signature(&self) -> Option<Signature> {
Some(Signature::new(
// args: <FIELD>, <num_partitions>, <partition_id>
TypeSignature::Any(3),
Volatility::Immutable,
))
}
}

impl BuiltinScalarUDF for PartitionResults {
fn as_expr(&self, args: Vec<Expr>) -> Expr {
let udf = ScalarUDF {
name: Self::NAME.to_string(),
signature: ConstBuiltinFunction::signature(self).unwrap(),
return_type: Arc::new(|_| Ok(Arc::new(DataType::Utf8))),
fun: Arc::new(move |input| {
if input.len() != 3 {
return Err(DataFusionError::Execution(
"must specify exactly three arguments".to_string(),
));
}

let num_partitions = get_nth_u64_fn_arg(input, 1)?;
let partition_id = get_nth_u64_fn_arg(input, 2)?;

if partition_id >= num_partitions {
return Err(DataFusionError::Execution(
format!(
"id {} must be less than number of partitions {}",
partition_id, num_partitions,
)
.to_string(),
));
}

// hash at the end once the other arguments are
// validated because the hashing is potentially the
// expensive part
Ok(get_nth_scalar_value(input, 0, &|value| -> Result<
ScalarValue,
BuiltinError,
> {
let mut hasher = FnvHasher::default();
value.hash(&mut hasher);
Ok(ScalarValue::Boolean(Some(
hasher.finish() % num_partitions == partition_id,
)))
})?)
}),
};
Expr::ScalarUDF(datafusion::logical_expr::expr::ScalarUDF::new(
Arc::new(udf),
args,
))
}
}
59 changes: 59 additions & 0 deletions testdata/sqllogictests/functions/hashing.slt
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,62 @@ query I
select siphash('42');
----
8771948186893062792

########################################################################
#
# partition_results(<value>, <num_shards>, <shard_id>)
#
########################################################################

statement error
select partition_results();

statement error
select partition_results('buddy', 100, 2, 3);

statement error
select partition_results('buddy', -100, -2);

statement error
select partition_results('buddy', 100, 200);

statement error
select partition_results('9001', '100', '10');

statement error
select partition_results(9001, 100, '10');

statement error
select partition_results(9001, '100', 10);

statement ok
select partition_results(100, 10, 0);

statement ok
select partition_results(100, 10.0, 1.0);

statement error
select partition_results(100, 10.5, 1.5);

statement error
select partition_results(16, 4, 4);

query B
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically there is no 'B'. Only 'I', 'R', and 'T'. For bools, I just use 'T'. I also don't think the types are actually checked, just that there's the correct number of them.

https://github.com/risinglightdb/sqllogictest-rs/blob/main/sqllogictest/src/column_type.rs#L11-L43

select partition_results(16, 4, 0);
----
t

query B
select partition_results(16, 4, 1);
----
f

query B
select partition_results(16, 4, 2);
----
f

query B
select partition_results(16, 4, 3);
----
f
Loading