Skip to content

Commit

Permalink
feat: multi-line json streaming (#2729)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish authored Mar 6, 2024
1 parent 03b8ffb commit 167403e
Show file tree
Hide file tree
Showing 11 changed files with 737 additions and 436 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ scylla = { version = "0.12.0" }
glob = "0.3.1"
indexmap = "2.2.5"
async-sqlite = "0.2.2"
json-stream = { git = "https://github.com/tychoish/json-stream", rev = "bd4990fab95f789740a75a8eea98d5dac1f0160a" }

# SSH tunnels
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies]
Expand Down
21 changes: 21 additions & 0 deletions crates/datasources/src/json/errors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use datafusion::error::DataFusionError;
use datafusion_ext::errors::ExtensionError;

use crate::object_store::errors::ObjectStoreSourceError;
Expand All @@ -13,21 +14,41 @@ pub enum JsonError {
#[error("no objects found {0}")]
NotFound(String),

#[error("sending data already in progress")]
SendAlreadyInProgress,

#[error(transparent)]
ObjectStoreSource(#[from] ObjectStoreSourceError),

#[error(transparent)]
ObjectStore(#[from] object_store::Error),

#[error(transparent)]
ObjectStorePath(#[from] object_store::path::Error),

#[error(transparent)]
Arrow(#[from] datafusion::arrow::error::ArrowError),

#[error(transparent)]
Datafusion(#[from] datafusion::error::DataFusionError),

#[error(transparent)]
ChannelSend(#[from] futures::channel::mpsc::SendError),

#[error(transparent)]
ChannelRecv(#[from] futures::channel::mpsc::TryRecvError),
}

impl From<JsonError> for ExtensionError {
fn from(e: JsonError) -> Self {
ExtensionError::String(e.to_string())
}
}

impl From<JsonError> for DataFusionError {
fn from(e: JsonError) -> Self {
DataFusionError::External(Box::new(e))
}
}

pub type Result<T, E = JsonError> = std::result::Result<T, E>;
218 changes: 127 additions & 91 deletions crates/datasources/src/json/stream.rs
Original file line number Diff line number Diff line change
@@ -1,145 +1,181 @@
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::{Context, Poll};

use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::json::ReaderBuilder;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::DataFusionError;
use datafusion::error::{DataFusionError, Result as DFResult};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::streaming::PartitionStream;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use json_stream::JsonStream;
use object_store::{ObjectMeta, ObjectStore};
use serde_json::{Map, Value};

use crate::json::errors::JsonError;
use crate::json::table::push_unwind_json_values;
use crate::json::errors::{JsonError, Result};

pub type SendableCheckedRecordBatchStream =
Pin<Box<dyn Stream<Item = Result<RecordBatch, DataFusionError>> + Send>>;
pub type CheckedRecordBatchStream = Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>;

pub struct JsonStream {
/// VectorPartition wraps a vector of serde_json documents as a
/// Partition as one of DataFusion's streaming table. Well all of
pub struct VectorPartition {
schema: Arc<Schema>,
stream: SendableCheckedRecordBatchStream,
objs: Vec<Map<String, Value>>,
}

impl Stream for JsonStream {
type Item = Result<RecordBatch, DataFusionError>;
impl PartitionStream for VectorPartition {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
Box::pin(JsonHandler::new(
self.schema.clone(),
futures::stream::iter(self.objs.clone().into_iter().map(Ok)).boxed(),
))
}
}

impl RecordBatchStream for JsonStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
impl VectorPartition {
pub fn new(schema: Arc<Schema>, objs: Vec<Map<String, Value>>) -> Self {
Self { schema, objs }
}
}

pub struct JsonPartitionStream {
/// ObjectStorePartition holds a reference to the object store and the
/// object metadata and represents a partition that is read only when
/// the partition is executed.
pub(crate) struct ObjectStorePartition {
schema: Arc<Schema>,
stream: Mutex<Option<SendableCheckedRecordBatchStream>>,
store: Arc<dyn ObjectStore>,
obj: ObjectMeta,
}

impl ObjectStorePartition {
pub fn new(schema: Arc<Schema>, store: Arc<dyn ObjectStore>, obj: ObjectMeta) -> Self {
Self { schema, store, obj }
}
}

impl PartitionStream for JsonPartitionStream {
impl PartitionStream for ObjectStorePartition {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let partition = self
.stream
.lock()
.unwrap()
.take()
.expect("stream can only be used once")
.boxed();

Box::pin(JsonStream {
schema: self.schema.clone(),
stream: partition,
})
Box::pin(JsonHandler::new_from_object_store(
self.schema.to_owned(),
self.store.clone(),
self.obj.clone(),
))
}
}

impl JsonPartitionStream {
pub fn new(schema: Arc<Schema>, chunk: Vec<Map<String, Value>>) -> Self {
let stream_schema = schema.clone();
let stream = futures::stream::iter(chunk)
.chunks(1000)
.map(move |objs| {
let mut decoder = ReaderBuilder::new(stream_schema.to_owned()).build_decoder()?;
decoder
.serialize(&objs)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(decoder.flush()?.unwrap())
})
.boxed();

Self {
schema: schema.clone(),
stream: Mutex::new(Some(stream)),
}
}
}

pub(crate) struct LazyJsonPartitionStream {
/// JsonObjectStream represents a sequence of "json documents" in an
/// intermediate format produced by serde_json.
type JsonObjectStream = Pin<Box<dyn Stream<Item = Result<Map<String, Value>>> + Send>>;

/// JsonHandler is the basis of all stream handling, converting
/// streams of serde_json objects to RecordBatches, including from
/// object store and from iterators of values.
///
/// These are used by the PartitionStream implementations (above)
/// which interface into the table function and providers.
struct JsonHandler {
schema: Arc<Schema>,
store: Arc<dyn ObjectStore>,
obj: ObjectMeta,
stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>,
}

impl PartitionStream for LazyJsonPartitionStream {
fn schema(&self) -> &SchemaRef {
&self.schema
impl RecordBatchStream for JsonHandler {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let stream_schema = self.schema.to_owned();
let store = self.store.clone();
let obj = self.obj.clone();

Box::pin(JsonStream {
schema: self.schema.clone(),
stream: futures::stream::once(async move {
futures::stream::iter(match Self::build(stream_schema, store, obj).await {
Ok(batches) => batches,
Err(e) => vec![Err(DataFusionError::External(Box::new(e)))],
})
})
.flatten()
.boxed(),
})
impl Stream for JsonHandler {
type Item = DFResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}

impl LazyJsonPartitionStream {
pub fn new(schema: Arc<Schema>, store: Arc<dyn ObjectStore>, obj: ObjectMeta) -> Self {
Self { schema, store, obj }
impl JsonHandler {
fn new(schema: Arc<Schema>, stream: JsonObjectStream) -> Self {
let stream = Self::convert_stream(schema.clone(), stream);
Self { schema, stream }
}

async fn build(
fn new_from_object_store(
schema: Arc<Schema>,
store: Arc<dyn ObjectStore>,
obj: ObjectMeta,
) -> Result<Vec<Result<RecordBatch, DataFusionError>>, JsonError> {
let mut data = Vec::new();
push_unwind_json_values(
&mut data,
serde_json::from_slice::<Value>(&store.get(&obj.location).await?.bytes().await?),
)?;

Ok(data
.chunks(1000)
.map(|chunk| {
let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder()?;
) -> Self {
let stream_schema = schema.clone();
let stream = futures::stream::once(async move {
Self::convert_stream(
stream_schema,
JsonStream::<Value, _>::new(match store.get(&obj.location).await {
Ok(stream) => stream.into_stream().map_err(JsonError::from),
Err(e) => return futures::stream::once(async move { Err(e.into()) }).boxed(),
})
.flat_map(Self::unwind_json_value)
.boxed(),
)
})
.flatten()
.boxed();

Self { schema, stream }
}

fn convert_stream(schema: Arc<Schema>, input: JsonObjectStream) -> CheckedRecordBatchStream {
input
.map(|rb| rb.map_err(JsonError::from))
.chunks(1024)
.map(move |chunk| {
let chunk = chunk.into_iter().collect::<Result<Vec<_>>>()?;
let mut decoder = ReaderBuilder::new(schema.to_owned()).build_decoder()?;
decoder
.serialize(chunk)
.serialize(&chunk)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(decoder.flush()?.unwrap())
})
.collect())
.boxed()
}


fn unwind_json_value(input: Result<Value>) -> JsonObjectStream {
futures::stream::iter(match input {
Ok(value) => match value {
Value::Array(vals) => {
let mut out = Vec::with_capacity(vals.len());
for v in vals {
match v {
Value::Object(doc) => out.push(Ok(doc)),
Value::Null => out.push(Ok(Map::new())),
_ => {
out.push(Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
)));
break;
}
}
}
out
}
Value::Object(doc) => vec![Ok(doc)],
Value::Null => vec![Ok(Map::new())],
_ => {
vec![Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
))]
}
},
Err(e) => vec![Err(e)],
})
.boxed()
}
}
Loading

0 comments on commit 167403e

Please sign in to comment.