Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multi-line json streaming #2729

Merged
merged 32 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7bf04bb
feat: multi-line multi-document json streaming
tychoish Mar 3, 2024
4360067
refactoring and bug finding
tychoish Mar 3, 2024
e6fd5c7
fix lint
tychoish Mar 3, 2024
76541f6
skip experimentation
tychoish Mar 4, 2024
8e29154
fix stream deserializer
tychoish Mar 4, 2024
2bfe353
incremental
tychoish Mar 4, 2024
aa76632
unwind unclear blocking
tychoish Mar 4, 2024
31b3208
stream fixes
tychoish Mar 5, 2024
df7914a
sanity slt
tychoish Mar 5, 2024
1b610d7
slts
tychoish Mar 5, 2024
e1bf40a
chore(deps): bump mio from 0.8.9 to 0.8.11 (#2733)
dependabot[bot] Mar 5, 2024
9dda83f
fix: Iceberg for format v1 (#2718)
vrongmeal Mar 5, 2024
6dbdf9e
chore: pytest path cleanup and port management (#2734)
tychoish Mar 5, 2024
5cf3b10
avoid extra copy
tychoish Mar 5, 2024
9a9fe89
Merge remote-tracking branch 'origin/main' into tycho/full-json-strea…
tychoish Mar 5, 2024
81daf3a
avoid looping after error
tychoish Mar 5, 2024
523c90d
Merge remote-tracking branch 'origin/main' into tycho/full-json-strea…
tychoish Mar 5, 2024
133bef4
restore json globbing test
tychoish Mar 5, 2024
dad1f72
cleanup glob handling
tychoish Mar 5, 2024
0ebc336
fix: Allow a process to acquire a lease its already acquired (#2735)
scsmithr Mar 5, 2024
facc9d5
release: v0.9.1 (#2736)
greyscaled Mar 5, 2024
4866cdf
fix: omit version specification (#2741)
tychoish Mar 5, 2024
6247276
chore(release): napi --skip-gh-release (#2740)
greyscaled Mar 5, 2024
cd0faf9
chore: use a lance fork without the duckdb submodule (#2742)
universalmind303 Mar 5, 2024
9dcda74
chore: add dbt read_csv test (#2732)
talagluck Mar 6, 2024
698cc29
feat: insert into sqlite tables (#2745)
tychoish Mar 6, 2024
74bb2d6
feedback
tychoish Mar 6, 2024
59abfb6
Merge remote-tracking branch 'origin/main' into tycho/full-json-strea…
tychoish Mar 6, 2024
f70d18b
more cleanup
tychoish Mar 6, 2024
3e92692
Merge remote-tracking branch 'origin/main' into tycho/full-json-strea…
tychoish Mar 6, 2024
b23e32c
Merge remote-tracking branch 'origin/main' into tycho/full-json-strea…
tychoish Mar 6, 2024
2807bb5
Merge branch 'main' into tycho/full-json-streaming
tychoish Mar 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ scylla = { version = "0.12.0" }
glob = "0.3.1"
indexmap = "2.2.5"
async-sqlite = "0.2.2"
json-stream = { git = "https://github.com/tychoish/json-stream", rev = "bd4990fab95f789740a75a8eea98d5dac1f0160a" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale? Are there changes to this? Can those be upstreamed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is just the content of a PR that's open: resyncgg/json-stream#5

The maintainer is responsive and said he'd look at it by the end of the week.


# SSH tunnels
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies]
Expand Down
21 changes: 21 additions & 0 deletions crates/datasources/src/json/errors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use datafusion::error::DataFusionError;
use datafusion_ext::errors::ExtensionError;

use crate::object_store::errors::ObjectStoreSourceError;
Expand All @@ -13,21 +14,41 @@ pub enum JsonError {
#[error("no objects found {0}")]
NotFound(String),

#[error("sending data already in progress")]
SendAlreadyInProgress,

#[error(transparent)]
ObjectStoreSource(#[from] ObjectStoreSourceError),

#[error(transparent)]
ObjectStore(#[from] object_store::Error),

#[error(transparent)]
ObjectStorePath(#[from] object_store::path::Error),

#[error(transparent)]
Arrow(#[from] datafusion::arrow::error::ArrowError),

#[error(transparent)]
Datafusion(#[from] datafusion::error::DataFusionError),

#[error(transparent)]
ChannelSend(#[from] futures::channel::mpsc::SendError),

#[error(transparent)]
ChannelRecv(#[from] futures::channel::mpsc::TryRecvError),
}

impl From<JsonError> for ExtensionError {
fn from(e: JsonError) -> Self {
ExtensionError::String(e.to_string())
}
}

impl From<JsonError> for DataFusionError {
fn from(e: JsonError) -> Self {
DataFusionError::External(Box::new(e))
}
}

pub type Result<T, E = JsonError> = std::result::Result<T, E>;
218 changes: 127 additions & 91 deletions crates/datasources/src/json/stream.rs
Original file line number Diff line number Diff line change
@@ -1,145 +1,181 @@
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::{Context, Poll};

use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::json::ReaderBuilder;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::DataFusionError;
use datafusion::error::{DataFusionError, Result as DFResult};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::streaming::PartitionStream;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use json_stream::JsonStream;
use object_store::{ObjectMeta, ObjectStore};
use serde_json::{Map, Value};

use crate::json::errors::JsonError;
use crate::json::table::push_unwind_json_values;
use crate::json::errors::{JsonError, Result};

pub type SendableCheckedRecordBatchStream =
Pin<Box<dyn Stream<Item = Result<RecordBatch, DataFusionError>> + Send>>;
pub type CheckedRecordBatchStream = Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>;

pub struct JsonStream {
/// VectorPartition wraps a vector of serde_json documents as a
/// Partition as one of DataFusion's streaming table. Well all of
pub struct VectorPartition {
schema: Arc<Schema>,
stream: SendableCheckedRecordBatchStream,
objs: Vec<Map<String, Value>>,
}

impl Stream for JsonStream {
type Item = Result<RecordBatch, DataFusionError>;
impl PartitionStream for VectorPartition {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
Box::pin(JsonHandler::new(
self.schema.clone(),
futures::stream::iter(self.objs.clone().into_iter().map(Ok)).boxed(),
))
}
}

impl RecordBatchStream for JsonStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
impl VectorPartition {
pub fn new(schema: Arc<Schema>, objs: Vec<Map<String, Value>>) -> Self {
Self { schema, objs }
}
}

pub struct JsonPartitionStream {
/// ObjectStorePartition holds a reference to the object store and the
/// object metadata and represents a partition that is read only when
/// the partition is executed.
pub(crate) struct ObjectStorePartition {
schema: Arc<Schema>,
stream: Mutex<Option<SendableCheckedRecordBatchStream>>,
store: Arc<dyn ObjectStore>,
obj: ObjectMeta,
}

impl ObjectStorePartition {
pub fn new(schema: Arc<Schema>, store: Arc<dyn ObjectStore>, obj: ObjectMeta) -> Self {
Self { schema, store, obj }
}
}

impl PartitionStream for JsonPartitionStream {
impl PartitionStream for ObjectStorePartition {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let partition = self
.stream
.lock()
.unwrap()
.take()
.expect("stream can only be used once")
.boxed();

Box::pin(JsonStream {
schema: self.schema.clone(),
stream: partition,
})
Box::pin(JsonHandler::new_from_object_store(
self.schema.to_owned(),
self.store.clone(),
self.obj.clone(),
))
}
}

impl JsonPartitionStream {
pub fn new(schema: Arc<Schema>, chunk: Vec<Map<String, Value>>) -> Self {
let stream_schema = schema.clone();
let stream = futures::stream::iter(chunk)
.chunks(1000)
.map(move |objs| {
let mut decoder = ReaderBuilder::new(stream_schema.to_owned()).build_decoder()?;
decoder
.serialize(&objs)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(decoder.flush()?.unwrap())
})
.boxed();

Self {
schema: schema.clone(),
stream: Mutex::new(Some(stream)),
}
}
}

pub(crate) struct LazyJsonPartitionStream {
/// JsonObjectStream represents a sequence of "json documents" in an
/// intermediate format produced by serde_json.
type JsonObjectStream = Pin<Box<dyn Stream<Item = Result<Map<String, Value>>> + Send>>;

/// JsonHandler is the basis of all stream handling, converting
/// streams of serde_json objects to RecordBatches, including from
/// object store and from iterators of values.
///
/// These are used by the PartitionStream implementations (above)
/// which interface into the table function and providers.
struct JsonHandler {
schema: Arc<Schema>,
store: Arc<dyn ObjectStore>,
obj: ObjectMeta,
stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>,
}

impl PartitionStream for LazyJsonPartitionStream {
fn schema(&self) -> &SchemaRef {
&self.schema
impl RecordBatchStream for JsonHandler {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let stream_schema = self.schema.to_owned();
let store = self.store.clone();
let obj = self.obj.clone();

Box::pin(JsonStream {
schema: self.schema.clone(),
stream: futures::stream::once(async move {
futures::stream::iter(match Self::build(stream_schema, store, obj).await {
Ok(batches) => batches,
Err(e) => vec![Err(DataFusionError::External(Box::new(e)))],
})
})
.flatten()
.boxed(),
})
impl Stream for JsonHandler {
type Item = DFResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}

impl LazyJsonPartitionStream {
pub fn new(schema: Arc<Schema>, store: Arc<dyn ObjectStore>, obj: ObjectMeta) -> Self {
Self { schema, store, obj }
impl JsonHandler {
fn new(schema: Arc<Schema>, stream: JsonObjectStream) -> Self {
let stream = Self::convert_stream(schema.clone(), stream);
Self { schema, stream }
}

async fn build(
fn new_from_object_store(
schema: Arc<Schema>,
store: Arc<dyn ObjectStore>,
obj: ObjectMeta,
) -> Result<Vec<Result<RecordBatch, DataFusionError>>, JsonError> {
let mut data = Vec::new();
push_unwind_json_values(
&mut data,
serde_json::from_slice::<Value>(&store.get(&obj.location).await?.bytes().await?),
)?;

Ok(data
.chunks(1000)
.map(|chunk| {
let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder()?;
) -> Self {
let stream_schema = schema.clone();
let stream = futures::stream::once(async move {
Self::convert_stream(
stream_schema,
JsonStream::<Value, _>::new(match store.get(&obj.location).await {
Ok(stream) => stream.into_stream().map_err(JsonError::from),
Err(e) => return futures::stream::once(async move { Err(e.into()) }).boxed(),
})
.flat_map(Self::unwind_json_value)
.boxed(),
)
})
.flatten()
.boxed();

Self { schema, stream }
}

fn convert_stream(schema: Arc<Schema>, input: JsonObjectStream) -> CheckedRecordBatchStream {
input
.map(|rb| rb.map_err(JsonError::from))
.chunks(1024)
.map(move |chunk| {
let chunk = chunk.into_iter().collect::<Result<Vec<_>>>()?;
let mut decoder = ReaderBuilder::new(schema.to_owned()).build_decoder()?;
decoder
.serialize(chunk)
.serialize(&chunk)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(decoder.flush()?.unwrap())
})
.collect())
.boxed()
}


fn unwind_json_value(input: Result<Value>) -> JsonObjectStream {
futures::stream::iter(match input {
Ok(value) => match value {
Value::Array(vals) => {
let mut out = Vec::with_capacity(vals.len());
for v in vals {
match v {
Value::Object(doc) => out.push(Ok(doc)),
Value::Null => out.push(Ok(Map::new())),
_ => {
out.push(Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
)));
break;
}
}
}
out
}
Value::Object(doc) => vec![Ok(doc)],
Value::Null => vec![Ok(Map::new())],
_ => {
vec![Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
))]
}
},
Err(e) => vec![Err(e)],
})
.boxed()
}
}
Loading