Skip to content

Commit

Permalink
chore: add explicit schema support for json streaming (#2766)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed Mar 13, 2024
1 parent 78621b8 commit 708cfd4
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 42 deletions.
87 changes: 46 additions & 41 deletions crates/datasources/src/json/table.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::vec::Vec;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::datatypes::{DataType, Field, FieldRef, Schema};
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::streaming::PartitionStream;
Expand All @@ -16,6 +16,7 @@ use crate::object_store::{ObjStoreAccess, ObjStoreAccessor};
pub async fn json_streaming_table(
store_access: Arc<dyn ObjStoreAccess>,
source_url: DatasourceUrl,
fields: Option<Vec<FieldRef>>,
) -> Result<Arc<dyn TableProvider>, JsonError> {
let path = source_url.path().into_owned();

Expand All @@ -32,49 +33,53 @@ pub async fn json_streaming_table(

let store = accessor.into_object_store();

// TODO: we should be able to avoid schema inference entirely
// (currently we read the entire first stream into memory), get
// the total schema of those documents and then add any subsequent
// objects as lazily streamed values.
let mut data = Vec::new();
{
let first_obj = list.pop().ok_or_else(|| JsonError::NotFound(path))?;
let blob = store
.get(&first_obj.location)
.await?
.bytes()
.await?
.to_vec();

push_unwind_json_values(
&mut data,
serde_json::Deserializer::from_slice(&blob).into_iter(),
)?;
}
let mut streams = Vec::<Arc<dyn PartitionStream>>::with_capacity(list.len());

let mut field_set = indexmap::IndexMap::<String, DataType>::new();
for obj in &data {
for (key, value) in obj.into_iter() {
let typ = type_for_value(value);
match field_set.get(key) {
Some(v) => match widen_type(v, typ) {
Some(wider) => field_set.insert(key.to_string(), wider),
None => None,
},
None => field_set.insert(key.to_string(), typ),
};
}
}
let schema = Arc::new(Schema::new(
field_set
.into_iter()
.map(|(k, v)| Field::new(k, v, true))
.collect::<Vec<_>>(),
));
let schema = match fields {
Some(fields) => Arc::new(Schema::new(fields)),
None => {
let mut data = Vec::new();
{
let first_obj = list.pop().ok_or_else(|| JsonError::NotFound(path))?;
let blob = store
.get(&first_obj.location)
.await?
.bytes()
.await?
.to_vec();

push_unwind_json_values(
&mut data,
serde_json::Deserializer::from_slice(&blob).into_iter(),
)?;
}

let mut streams = Vec::<Arc<dyn PartitionStream>>::with_capacity(list.len());
let mut field_set = indexmap::IndexMap::<String, DataType>::new();
for obj in &data {
for (key, value) in obj.into_iter() {
let typ = type_for_value(value);
match field_set.get(key) {
Some(v) => match widen_type(v, typ) {
Some(wider) => field_set.insert(key.to_string(), wider),
None => None,
},
None => field_set.insert(key.to_string(), typ),
};
}
}

let schema = Arc::new(Schema::new(
field_set
.into_iter()
.map(|(k, v)| Field::new(k, v, true))
.collect::<Vec<_>>(),
));

streams.push(Arc::new(VectorPartition::new(schema.clone(), data)));
schema
}
};

streams.push(Arc::new(VectorPartition::new(schema.clone(), data)));

for obj in list {
streams.push(Arc::new(ObjectStorePartition::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/sqlbuiltins/src/functions/table/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ impl TableFunc for JsonScan {
let store_access = storage_options_into_store_access(&source_url, &storage_options)
.map_err(ExtensionError::access)?;

Ok(json_streaming_table(store_access, source_url).await?)
Ok(json_streaming_table(store_access, source_url, None).await?)
}
}

0 comments on commit 708cfd4

Please sign in to comment.