-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: multi-line json streaming #2729
Merged
+737
−436
Merged
Changes from all commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
7bf04bb
feat: multi-line multi-document json streaming
tychoish 4360067
refactoring and bug finding
tychoish e6fd5c7
fix lint
tychoish 76541f6
skip experimentation
tychoish 8e29154
fix stream deserializer
tychoish 2bfe353
incremental
tychoish aa76632
unwind unclear blocking
tychoish 31b3208
stream fixes
tychoish df7914a
sanity slt
tychoish 1b610d7
slts
tychoish e1bf40a
chore(deps): bump mio from 0.8.9 to 0.8.11 (#2733)
dependabot[bot] 9dda83f
fix: Iceberg for format v1 (#2718)
vrongmeal 6dbdf9e
chore: pytest path cleanup and port management (#2734)
tychoish 5cf3b10
avoid extra copy
tychoish 9a9fe89
Merge remote-tracking branch 'origin/main' into tycho/full-json-strea…
tychoish 81daf3a
avoid looping after error
tychoish 523c90d
Merge remote-tracking branch 'origin/main' into tycho/full-json-strea…
tychoish 133bef4
restore json globbing test
tychoish dad1f72
cleanup glob handling
tychoish 0ebc336
fix: Allow a process to acquire a lease its already acquired (#2735)
scsmithr facc9d5
release: v0.9.1 (#2736)
greyscaled 4866cdf
fix: omit version specification (#2741)
tychoish 6247276
chore(release): napi --skip-gh-release (#2740)
greyscaled cd0faf9
chore: use a lance fork without the duckdb submodule (#2742)
universalmind303 9dcda74
chore: add dbt read_csv test (#2732)
talagluck 698cc29
feat: insert into sqlite tables (#2745)
tychoish 74bb2d6
feedback
tychoish 59abfb6
Merge remote-tracking branch 'origin/main' into tycho/full-json-strea…
tychoish f70d18b
more cleanup
tychoish 3e92692
Merge remote-tracking branch 'origin/main' into tycho/full-json-strea…
tychoish b23e32c
Merge remote-tracking branch 'origin/main' into tycho/full-json-strea…
tychoish 2807bb5
Merge branch 'main' into tycho/full-json-streaming
tychoish File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,145 +1,181 @@ | ||
use std::pin::Pin; | ||
use std::sync::{Arc, Mutex}; | ||
use std::sync::Arc; | ||
use std::task::{Context, Poll}; | ||
|
||
use datafusion::arrow::datatypes::{Schema, SchemaRef}; | ||
use datafusion::arrow::json::ReaderBuilder; | ||
use datafusion::arrow::record_batch::RecordBatch; | ||
use datafusion::error::DataFusionError; | ||
use datafusion::error::{DataFusionError, Result as DFResult}; | ||
use datafusion::execution::TaskContext; | ||
use datafusion::physical_plan::streaming::PartitionStream; | ||
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; | ||
use futures::{Stream, StreamExt}; | ||
use futures::{Stream, StreamExt, TryStreamExt}; | ||
use json_stream::JsonStream; | ||
use object_store::{ObjectMeta, ObjectStore}; | ||
use serde_json::{Map, Value}; | ||
|
||
use crate::json::errors::JsonError; | ||
use crate::json::table::push_unwind_json_values; | ||
use crate::json::errors::{JsonError, Result}; | ||
|
||
pub type SendableCheckedRecordBatchStream = | ||
Pin<Box<dyn Stream<Item = Result<RecordBatch, DataFusionError>> + Send>>; | ||
pub type CheckedRecordBatchStream = Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>; | ||
|
||
pub struct JsonStream { | ||
/// VectorPartition wraps a vector of serde_json documents as a | ||
/// Partition as one of DataFusion's streaming table. Well all of | ||
pub struct VectorPartition { | ||
schema: Arc<Schema>, | ||
stream: SendableCheckedRecordBatchStream, | ||
objs: Vec<Map<String, Value>>, | ||
} | ||
|
||
impl Stream for JsonStream { | ||
type Item = Result<RecordBatch, DataFusionError>; | ||
impl PartitionStream for VectorPartition { | ||
fn schema(&self) -> &SchemaRef { | ||
&self.schema | ||
} | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
self.stream.poll_next_unpin(cx) | ||
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream { | ||
Box::pin(JsonHandler::new( | ||
self.schema.clone(), | ||
futures::stream::iter(self.objs.clone().into_iter().map(Ok)).boxed(), | ||
)) | ||
} | ||
} | ||
|
||
impl RecordBatchStream for JsonStream { | ||
fn schema(&self) -> SchemaRef { | ||
self.schema.clone() | ||
impl VectorPartition { | ||
pub fn new(schema: Arc<Schema>, objs: Vec<Map<String, Value>>) -> Self { | ||
Self { schema, objs } | ||
} | ||
} | ||
|
||
pub struct JsonPartitionStream { | ||
/// ObjectStorePartition holds a reference to the object store and the | ||
/// object metadata and represents a partition that is read only when | ||
/// the partition is executed. | ||
pub(crate) struct ObjectStorePartition { | ||
schema: Arc<Schema>, | ||
stream: Mutex<Option<SendableCheckedRecordBatchStream>>, | ||
store: Arc<dyn ObjectStore>, | ||
obj: ObjectMeta, | ||
} | ||
|
||
impl ObjectStorePartition { | ||
pub fn new(schema: Arc<Schema>, store: Arc<dyn ObjectStore>, obj: ObjectMeta) -> Self { | ||
Self { schema, store, obj } | ||
} | ||
} | ||
|
||
impl PartitionStream for JsonPartitionStream { | ||
impl PartitionStream for ObjectStorePartition { | ||
fn schema(&self) -> &SchemaRef { | ||
&self.schema | ||
} | ||
|
||
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream { | ||
let partition = self | ||
.stream | ||
.lock() | ||
.unwrap() | ||
.take() | ||
.expect("stream can only be used once") | ||
.boxed(); | ||
|
||
Box::pin(JsonStream { | ||
schema: self.schema.clone(), | ||
stream: partition, | ||
}) | ||
Box::pin(JsonHandler::new_from_object_store( | ||
self.schema.to_owned(), | ||
self.store.clone(), | ||
self.obj.clone(), | ||
)) | ||
} | ||
} | ||
|
||
impl JsonPartitionStream { | ||
pub fn new(schema: Arc<Schema>, chunk: Vec<Map<String, Value>>) -> Self { | ||
let stream_schema = schema.clone(); | ||
let stream = futures::stream::iter(chunk) | ||
.chunks(1000) | ||
.map(move |objs| { | ||
let mut decoder = ReaderBuilder::new(stream_schema.to_owned()).build_decoder()?; | ||
decoder | ||
.serialize(&objs) | ||
.map_err(|e| DataFusionError::External(Box::new(e)))?; | ||
Ok(decoder.flush()?.unwrap()) | ||
}) | ||
.boxed(); | ||
|
||
Self { | ||
schema: schema.clone(), | ||
stream: Mutex::new(Some(stream)), | ||
} | ||
} | ||
} | ||
|
||
pub(crate) struct LazyJsonPartitionStream { | ||
/// JsonObjectStream represents a sequence of "json documents" in an | ||
/// intermediate format produced by serde_json. | ||
type JsonObjectStream = Pin<Box<dyn Stream<Item = Result<Map<String, Value>>> + Send>>; | ||
|
||
/// JsonHandler is the basis of all stream handling, converting | ||
/// streams of serde_json objects to RecordBatches, including from | ||
/// object store and from iterators of values. | ||
/// | ||
/// These are used by the PartitionStream implementations (above) | ||
/// which interface into the table function and providers. | ||
struct JsonHandler { | ||
schema: Arc<Schema>, | ||
store: Arc<dyn ObjectStore>, | ||
obj: ObjectMeta, | ||
stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>, | ||
} | ||
|
||
impl PartitionStream for LazyJsonPartitionStream { | ||
fn schema(&self) -> &SchemaRef { | ||
&self.schema | ||
impl RecordBatchStream for JsonHandler { | ||
fn schema(&self) -> SchemaRef { | ||
self.schema.clone() | ||
} | ||
} | ||
|
||
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream { | ||
let stream_schema = self.schema.to_owned(); | ||
let store = self.store.clone(); | ||
let obj = self.obj.clone(); | ||
|
||
Box::pin(JsonStream { | ||
schema: self.schema.clone(), | ||
stream: futures::stream::once(async move { | ||
futures::stream::iter(match Self::build(stream_schema, store, obj).await { | ||
Ok(batches) => batches, | ||
Err(e) => vec![Err(DataFusionError::External(Box::new(e)))], | ||
}) | ||
}) | ||
.flatten() | ||
.boxed(), | ||
}) | ||
impl Stream for JsonHandler { | ||
type Item = DFResult<RecordBatch>; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
self.stream.poll_next_unpin(cx) | ||
} | ||
} | ||
|
||
impl LazyJsonPartitionStream { | ||
pub fn new(schema: Arc<Schema>, store: Arc<dyn ObjectStore>, obj: ObjectMeta) -> Self { | ||
Self { schema, store, obj } | ||
impl JsonHandler { | ||
fn new(schema: Arc<Schema>, stream: JsonObjectStream) -> Self { | ||
let stream = Self::convert_stream(schema.clone(), stream); | ||
Self { schema, stream } | ||
} | ||
|
||
async fn build( | ||
fn new_from_object_store( | ||
schema: Arc<Schema>, | ||
store: Arc<dyn ObjectStore>, | ||
obj: ObjectMeta, | ||
) -> Result<Vec<Result<RecordBatch, DataFusionError>>, JsonError> { | ||
let mut data = Vec::new(); | ||
push_unwind_json_values( | ||
&mut data, | ||
serde_json::from_slice::<Value>(&store.get(&obj.location).await?.bytes().await?), | ||
)?; | ||
|
||
Ok(data | ||
.chunks(1000) | ||
.map(|chunk| { | ||
let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder()?; | ||
) -> Self { | ||
let stream_schema = schema.clone(); | ||
let stream = futures::stream::once(async move { | ||
Self::convert_stream( | ||
stream_schema, | ||
JsonStream::<Value, _>::new(match store.get(&obj.location).await { | ||
Ok(stream) => stream.into_stream().map_err(JsonError::from), | ||
Err(e) => return futures::stream::once(async move { Err(e.into()) }).boxed(), | ||
}) | ||
.flat_map(Self::unwind_json_value) | ||
.boxed(), | ||
) | ||
}) | ||
.flatten() | ||
.boxed(); | ||
|
||
Self { schema, stream } | ||
} | ||
|
||
fn convert_stream(schema: Arc<Schema>, input: JsonObjectStream) -> CheckedRecordBatchStream { | ||
input | ||
.map(|rb| rb.map_err(JsonError::from)) | ||
.chunks(1024) | ||
.map(move |chunk| { | ||
let chunk = chunk.into_iter().collect::<Result<Vec<_>>>()?; | ||
let mut decoder = ReaderBuilder::new(schema.to_owned()).build_decoder()?; | ||
decoder | ||
.serialize(chunk) | ||
.serialize(&chunk) | ||
.map_err(|e| DataFusionError::External(Box::new(e)))?; | ||
Ok(decoder.flush()?.unwrap()) | ||
}) | ||
.collect()) | ||
.boxed() | ||
} | ||
|
||
|
||
fn unwind_json_value(input: Result<Value>) -> JsonObjectStream { | ||
futures::stream::iter(match input { | ||
Ok(value) => match value { | ||
Value::Array(vals) => { | ||
let mut out = Vec::with_capacity(vals.len()); | ||
for v in vals { | ||
match v { | ||
Value::Object(doc) => out.push(Ok(doc)), | ||
Value::Null => out.push(Ok(Map::new())), | ||
_ => { | ||
out.push(Err(JsonError::UnspportedType( | ||
"only objects and arrays of objects are supported", | ||
))); | ||
break; | ||
} | ||
} | ||
} | ||
out | ||
} | ||
Value::Object(doc) => vec![Ok(doc)], | ||
Value::Null => vec![Ok(Map::new())], | ||
_ => { | ||
vec![Err(JsonError::UnspportedType( | ||
"only objects and arrays of objects are supported", | ||
))] | ||
} | ||
}, | ||
Err(e) => vec![Err(e)], | ||
}) | ||
.boxed() | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the rationale? Are there changes to this? Can those be upstreamed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is just the content of a PR that's open: resyncgg/json-stream#5
The maintainer is responsive and said he'd look at it by the end of the week.