Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add scan_to_stream() to Table trait to postpone the stream generation #1639

Merged
merged 18 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ async-stream = "0.3"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
# TODO(ruihang): use arrow-datafusion when it contains https://github.com/apache/arrow-datafusion/pull/6032
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" }
futures = "0.3"
futures-util = "0.3"
parquet = "37.0"
Expand Down
1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ serde_json = "1.0"
session = { path = "../session" }
snafu = { version = "0.7", features = ["backtraces"] }
storage = { path = "../storage" }
store-api = { path = "../store-api" }
table = { path = "../table" }
tokio.workspace = true

Expand Down
136 changes: 102 additions & 34 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@ mod columns;
mod tables;

use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use datafusion::datasource::streaming::{PartitionStream, StreamingTable};
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use snafu::ResultExt;
use table::table::adapter::TableAdapter;
use table::TableRef;
use store_api::storage::ScanRequest;
use table::error::SchemaConversionSnafu;
use table::{Result as TableResult, Table, TableRef};

use self::columns::InformationSchemaColumns;
use crate::error::{DatafusionSnafu, Result, TableSchemaMismatchSnafu};
use crate::error::Result;
use crate::information_schema::tables::InformationSchemaTables;
use crate::{CatalogProviderRef, SchemaProvider};

Expand Down Expand Up @@ -59,44 +64,107 @@ impl SchemaProvider for InformationSchemaProvider {
}

async fn table(&self, name: &str) -> Result<Option<TableRef>> {
let table = match name.to_ascii_lowercase().as_ref() {
TABLES => {
let inner = Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
));
Arc::new(
StreamingTable::try_new(inner.schema().clone(), vec![inner]).with_context(
|_| DatafusionSnafu {
msg: format!("Failed to get InformationSchema table '{name}'"),
},
)?,
)
}
COLUMNS => {
let inner = Arc::new(InformationSchemaColumns::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
));
Arc::new(
StreamingTable::try_new(inner.schema().clone(), vec![inner]).with_context(
|_| DatafusionSnafu {
msg: format!("Failed to get InformationSchema table '{name}'"),
},
)?,
)
}
let stream = match name.to_ascii_lowercase().as_ref() {
TABLES => InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
)
.to_stream()?,
COLUMNS => InformationSchemaColumns::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
)
.to_stream()?,
_ => {
return Ok(None);
}
};

let table = TableAdapter::new(table).context(TableSchemaMismatchSnafu)?;
Ok(Some(Arc::new(table)))
Ok(Some(Arc::new(InformationTable::new(stream))))
}

async fn table_exist(&self, name: &str) -> Result<bool> {
let normalized_name = name.to_ascii_lowercase();
Ok(self.tables.contains(&normalized_name))
}
}

pub struct InformationTable {
schema: SchemaRef,
stream: Arc<Mutex<Option<SendableRecordBatchStream>>>,
}

impl InformationTable {
pub fn new(stream: SendableRecordBatchStream) -> Self {
let schema = stream.schema();
Self {
schema,
stream: Arc::new(Mutex::new(Some(stream))),
}
}
}

#[async_trait]
impl Table for InformationTable {
waynexia marked this conversation as resolved.
Show resolved Hide resolved
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn table_info(&self) -> table::metadata::TableInfoRef {
unreachable!("Should not call table_info() of InformationTable directly")
}

/// Scan the table and returns a SendableRecordBatchStream.
async fn scan(
&self,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
// limit can be used to reduce the amount scanned
// from the datasource as a performance optimization.
// If set, it contains the amount of rows needed by the `LogicalPlan`,
// The datasource should return *at least* this number of rows if available.
waynexia marked this conversation as resolved.
Show resolved Hide resolved
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {
unimplemented!()
}

async fn scan_to_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
let projection = request.projection;
let projected_schema = if let Some(projection) = &projection {
Arc::new(
self.schema()
.try_project(projection)
.context(SchemaConversionSnafu)?,
)
} else {
self.schema().clone()
};
let stream = self
.stream
.lock()
.unwrap()
.take()
.unwrap()
waynexia marked this conversation as resolved.
Show resolved Hide resolved
.map(move |batch| {
batch
.map(|batch| {
if let Some(projection) = &projection {
let projected = batch.try_project(projection);
projected
} else {
Ok(batch)
}
})
.flatten()
});
let stream = RecordBatchStreamAdaptor {
schema: projected_schema,
waynexia marked this conversation as resolved.
Show resolved Hide resolved
stream: Box::pin(stream),
};
Ok(Box::pin(stream))
}
}
28 changes: 26 additions & 2 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::{
SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX,
};
use common_error::prelude::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::RecordBatch;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::datasource::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand All @@ -29,7 +31,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::ResultExt;

use crate::error::{CreateRecordBatchSnafu, Result};
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::CatalogProviderRef;

pub(super) struct InformationSchemaColumns {
Expand Down Expand Up @@ -69,6 +71,28 @@ impl InformationSchemaColumns {
self.catalog_provider.clone(),
)
}

pub fn to_stream(&self) -> Result<SendableRecordBatchStream> {
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
let schema = self.schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema.clone(),
futures::stream::once(async move {
builder
.make_tables()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(Box::pin(DfRecordBatchStreamAdapter::new(
schema, stream,
)))
waynexia marked this conversation as resolved.
Show resolved Hide resolved
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}

struct InformationSchemaColumnsBuilder {
Expand Down
28 changes: 26 additions & 2 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ use std::sync::Arc;

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_NAME;
use common_error::prelude::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::RecordBatch;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::datasource::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand All @@ -27,7 +29,7 @@ use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use snafu::ResultExt;
use table::metadata::TableType;

use crate::error::{CreateRecordBatchSnafu, Result};
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::CatalogProviderRef;

pub(super) struct InformationSchemaTables {
Expand Down Expand Up @@ -60,6 +62,28 @@ impl InformationSchemaTables {
self.catalog_provider.clone(),
)
}

pub fn to_stream(&self) -> Result<SendableRecordBatchStream> {
let schema = self.schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema.clone(),
futures::stream::once(async move {
builder
.make_tables()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(Box::pin(DfRecordBatchStreamAdapter::new(
schema, stream,
)))
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}

/// Builds the `information_schema.TABLE` table row by row
Expand Down
1 change: 1 addition & 0 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![feature(assert_matches)]
#![feature(result_flattening)]

use std::any::Any;
use std::collections::HashMap;
Expand Down
Loading