Skip to content

Commit

Permalink
feat: support dimensions in conclusion feed (#535)
Browse files Browse the repository at this point in the history
* feat: support dimensions in conclusion feed

* fix: use map type for dimensions

* feat: use dictionary for dimension values

* fix: allow for ~4B unique dimensions values
  • Loading branch information
nathanielc authored Sep 19, 2024
1 parent 65d90d8 commit 8a44b03
Show file tree
Hide file tree
Showing 9 changed files with 649 additions and 426 deletions.
395 changes: 189 additions & 206 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 15 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ members = [

ahash = "0.8"
anyhow = { version = "1" }
arrow = { version = "52.0.0", features = ["prettyprint"] }
arrow-array = "52.2.0"
arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] }
arrow-ipc = "52.2.0"
arrow-schema = "52.2.0"
arrow = { version = "53", features = ["prettyprint"] }
arrow-array = "53"
arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
arrow-ipc = "53"
arrow-schema = "53"
async-broadcast = "0.4.1"
async-channel = "1.7.1"
async-recursion = "1"
Expand Down Expand Up @@ -86,9 +86,12 @@ criterion2 = "0.7.0"
crossterm = "0.25"
ctrlc = "3.2.2"
dag-jose = "0.2"
datafusion = "41.0.0"
datafusion-federation = "0.2"
datafusion-flight-sql-server = "0.2"
datafusion = "42"
#datafusion-federation = "0.2"
#datafusion-flight-sql-server = "0.2"
#datafusion-flight-sql-table-provider = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", branch = "main" }
datafusion-federation = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", branch = "main" }
datafusion-flight-sql-server = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", branch = "main" }
datafusion-flight-sql-table-provider = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", branch = "main" }
deadqueue = "0.2.3"
derivative = "2.2"
Expand All @@ -106,7 +109,7 @@ handlebars = "4"
headers = "0.3.7"
hex = "0.4.3"
hex-literal = "0.3.4"
http = "0.2"
http = "1"
http-body = "0.4.5"
http-serde = "1.1"
humansize = "2"
Expand Down Expand Up @@ -153,9 +156,8 @@ paste = "1.0.9"
phf = "0.11"
prometheus-client = "0.22"
proptest = "1"
#TODO upgrade to 0.12
prost = "0.11"
prost-build = "0.11.1"
prost = "0.13"
prost-build = "0.13"
quic-rpc = { version = "0.3.2", default-features = false }
rand = "0.8.5"
rand_chacha = "0.3.1"
Expand Down Expand Up @@ -213,7 +215,7 @@ tokio-stream = "0.1.11"
tokio-test = "0.4.2"
tokio-util = { version = "0.7.10", features = ["compat", "rt"] }
toml = "0.5.9"
tonic = { version = "0.11", features = ["tls"] }
tonic = { version = "0.12", features = ["tls"] }
tower = "0.4"
tower-http = "0.3"
tower-layer = "0.3"
Expand Down
100 changes: 80 additions & 20 deletions arrow-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@

use std::{any::Any, sync::Arc};

use arrow::{array::StringBuilder, datatypes::DataType, record_batch::RecordBatch};
use arrow::{
array::{ArrayIter, ListBuilder, StringBuilder},
datatypes::DataType,
record_batch::RecordBatch,
};
use cid::Cid;
use datafusion::{
common::{cast::as_binary_array, exec_datafusion_err},
common::{
cast::{as_binary_array, as_list_array},
exec_datafusion_err,
},
dataframe::DataFrame,
execution::context::SessionContext,
functions_aggregate::expr_fn::array_agg,
logical_expr::{
col, expr::ScalarFunction, Cast, ColumnarValue, Expr, ScalarUDF, ScalarUDFImpl, Signature,
TypeSignature, Volatility,
Expand Down Expand Up @@ -72,6 +78,70 @@ impl ScalarUDFImpl for CidString {
}
}

/// ScalarUDF to convert a binary CID into a string for easier inspection.
#[derive(Debug)]
pub struct CidStringList {
signature: Signature,
}

impl Default for CidStringList {
fn default() -> Self {
Self::new()
}
}

impl CidStringList {
/// Construct new instance
pub fn new() -> Self {
Self {
signature: Signature::new(
TypeSignature::Exact(vec![DataType::new_list(DataType::Binary, true)]),
Volatility::Immutable,
),
}
}
}

impl ScalarUDFImpl for CidStringList {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"array_cid_string"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _args: &[DataType]) -> datafusion::common::Result<DataType> {
Ok(DataType::new_list(DataType::Utf8, true))
}
fn invoke(&self, args: &[ColumnarValue]) -> datafusion::common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let all_cids = as_list_array(&args[0])?;
let mut strs = ListBuilder::new(StringBuilder::new());
for cids in ArrayIter::new(all_cids) {
if let Some(cids) = cids {
let cids = as_binary_array(&cids)?;
for cid in cids {
if let Some(cid) = cid {
strs.values().append_value(
Cid::read_bytes(cid)
.map_err(|err| exec_datafusion_err!("Error {err}"))?
.to_string(),
);
} else {
strs.values().append_null()
}
}
strs.append(true)
} else {
strs.append_null()
}
}
Ok(ColumnarValue::Array(Arc::new(strs.finish())))
}
}

/// Applies various transformations on a record batch of conclusion_feed data to make it easier to
/// read.
/// Useful in conjuction with expect_test.
Expand All @@ -87,9 +157,8 @@ pub async fn pretty_feed_from_batch(batch: RecordBatch) -> Vec<RecordBatch> {
/// Useful in conjuction with expect_test.
pub async fn pretty_feed(conclusion_feed: DataFrame) -> Vec<RecordBatch> {
let cid_string = Arc::new(ScalarUDF::from(CidString::new()));
let cid_string_list = Arc::new(ScalarUDF::from(CidStringList::new()));
conclusion_feed
.unnest_columns(&["previous"])
.unwrap()
.select(vec![
col("index"),
col("event_type"),
Expand All @@ -100,29 +169,20 @@ pub async fn pretty_feed(conclusion_feed: DataFrame) -> Vec<RecordBatch> {
.alias("stream_cid"),
col("stream_type"),
col("controller"),
col("dimensions"),
Expr::ScalarFunction(ScalarFunction::new_udf(
cid_string.clone(),
vec![col("event_cid")],
))
.alias("event_cid"),
Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"),
Expr::ScalarFunction(ScalarFunction::new_udf(cid_string, vec![col("previous")]))
.alias("previous"),
Expr::ScalarFunction(ScalarFunction::new_udf(
cid_string_list,
vec![col("previous")],
))
.alias("previous"),
])
.unwrap()
.aggregate(
vec![
col("index"),
col("event_type"),
col("stream_cid"),
col("stream_type"),
col("controller"),
col("event_cid"),
col("data"),
],
vec![array_agg(col("previous")).alias("previous")],
)
.unwrap()
.sort(vec![col("index").sort(true, true)])
.unwrap()
.collect()
Expand Down
Loading

0 comments on commit 8a44b03

Please sign in to comment.