Skip to content

Commit

Permalink
fix(udf): fix missing chunks (risingwavelabs#9025)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <wangrunji0408@163.com>
Co-authored-by: lmatz <lmatz823@gmail.com>
  • Loading branch information
wangrunji0408 and lmatz authored Apr 6, 2023
1 parent ba2f6e1 commit 78505b1
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions src/common/src/array/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ macro_rules! converts {
array.iter().collect()
}
}
impl From<&[$ArrowType]> for $ArrayType {
fn from(arrays: &[$ArrowType]) -> Self {
arrays.iter().flat_map(|a| a.iter()).collect()
}
}
};
// convert values using FromIntoArrow
($ArrayType:ty, $ArrowType:ty, @map) => {
Expand All @@ -218,6 +223,19 @@ macro_rules! converts {
.collect()
}
}
impl From<&[$ArrowType]> for $ArrayType {
fn from(arrays: &[$ArrowType]) -> Self {
arrays
.iter()
.flat_map(|a| a.iter())
.map(|o| {
o.map(|v| {
<<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
})
})
.collect()
}
}
};
}
converts!(BoolArray, arrow_array::BooleanArray);
Expand Down
1 change: 1 addition & 0 deletions src/udf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ normal = ["workspace-hack"]
arrow-array = "36"
arrow-flight = "36"
arrow-schema = "36"
arrow-select = "36"
futures-util = "0.3.25"
thiserror = "1"
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "macros"] }
Expand Down
17 changes: 16 additions & 1 deletion src/udf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,20 @@ impl ArrowFlightUdfClient {
/// Call a function.
pub async fn call(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
let mut output_stream = self.call_stream(id, stream::once(async { input })).await?;
output_stream.next().await.ok_or(Error::NoReturned)?
// TODO: support no output
let head = output_stream.next().await.ok_or(Error::NoReturned)??;
let mut remaining = vec![];
while let Some(batch) = output_stream.next().await {
remaining.push(batch?);
}
if remaining.is_empty() {
Ok(head)
} else {
Ok(arrow_select::concat::concat_batches(
&head.schema(),
std::iter::once(&head).chain(remaining.iter()),
)?)
}
}

/// Call a function with streaming input and output.
Expand Down Expand Up @@ -157,6 +170,8 @@ pub enum Error {
expected: String,
actual: String,
},
#[error("arrow error: {0}")]
Arrow(#[from] arrow_schema::ArrowError),
#[error("UDF service returned no data")]
NoReturned,
}
Expand Down

0 comments on commit 78505b1

Please sign in to comment.