From 5824fbb938277ef51bcc4649404659dee36e2915 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 22 May 2023 19:40:26 +0800 Subject: [PATCH 01/17] add scan_to_stream to Table Signed-off-by: Ruihang Xia --- Cargo.lock | 2 + src/catalog/Cargo.toml | 1 + src/catalog/src/information_schema.rs | 103 +++++++++---- src/catalog/src/information_schema/columns.rs | 28 +++- src/catalog/src/information_schema/tables.rs | 28 +++- src/catalog/src/system.rs | 11 +- src/common/grpc-expr/src/insert.rs | 53 +------ src/file-table-engine/src/table/format.rs | 136 ++++++++++++++++++ src/file-table-engine/src/table/immutable.rs | 7 +- src/frontend/src/table.rs | 10 +- src/mito/src/table.rs | 6 +- src/query/Cargo.toml | 1 + src/query/src/tests/time_range_filter_test.rs | 10 +- src/store-api/src/storage/requests.rs | 7 + src/table/src/table.rs | 5 +- src/table/src/table/adapter.rs | 131 +++++++++-------- src/table/src/table/numbers.rs | 12 +- src/table/src/test_util/empty_table.rs | 7 +- src/table/src/test_util/memtable.rs | 35 ++++- 19 files changed, 428 insertions(+), 165 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce65ac5592c5..10b5471caba0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,6 +1212,7 @@ dependencies = [ "session", "snafu", "storage", + "store-api", "table", "tokio", ] @@ -6627,6 +6628,7 @@ dependencies = [ "sql", "statrs", "stats-cli", + "store-api", "streaming-stats", "table", "tokio", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 5ceb7b2954a0..42f68998601c 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -36,6 +36,7 @@ serde_json = "1.0" session = { path = "../session" } snafu = { version = "0.7", features = ["backtraces"] } storage = { path = "../storage" } +store-api = { path = "../store-api" } table = { path = "../table" } tokio.workspace = true diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 9e5fa135faed..ca8181ae912b 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -16,13 +16,18 @@ mod columns; mod tables; use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; +use common_query::physical_plan::PhysicalPlanRef; +use common_query::prelude::Expr; +use common_recordbatch::{DfSendableRecordBatchStream, SendableRecordBatchStream}; use datafusion::datasource::streaming::{PartitionStream, StreamingTable}; +use datatypes::schema::SchemaRef; use snafu::ResultExt; +use store_api::storage::ScanRequest; use table::table::adapter::TableAdapter; -use table::TableRef; +use table::{Result as TableResult, Table, TableRef}; use self::columns::InformationSchemaColumns; use crate::error::{DatafusionSnafu, Result, TableSchemaMismatchSnafu}; @@ -59,40 +64,23 @@ impl SchemaProvider for InformationSchemaProvider { } async fn table(&self, name: &str) -> Result> { - let table = match name.to_ascii_lowercase().as_ref() { - TABLES => { - let inner = Arc::new(InformationSchemaTables::new( - self.catalog_name.clone(), - self.catalog_provider.clone(), - )); - Arc::new( - StreamingTable::try_new(inner.schema().clone(), vec![inner]).with_context( - |_| DatafusionSnafu { - msg: format!("Failed to get InformationSchema table '{name}'"), - }, - )?, - ) - } - COLUMNS => { - let inner = Arc::new(InformationSchemaColumns::new( - self.catalog_name.clone(), - self.catalog_provider.clone(), - )); - Arc::new( - StreamingTable::try_new(inner.schema().clone(), vec![inner]).with_context( - |_| DatafusionSnafu { - msg: format!("Failed to get InformationSchema table '{name}'"), - }, - )?, - ) - } + let stream = match name.to_ascii_lowercase().as_ref() { + TABLES => InformationSchemaTables::new( + self.catalog_name.clone(), + self.catalog_provider.clone(), + ) + .to_stream()?, + COLUMNS => InformationSchemaColumns::new( + self.catalog_name.clone(), + self.catalog_provider.clone(), + ) + .to_stream()?, _ => { return Ok(None); } }; - let table = TableAdapter::new(table).context(TableSchemaMismatchSnafu)?; - Ok(Some(Arc::new(table))) + Ok(Some(Arc::new(InformationTable::new(stream)))) } async fn table_exist(&self, name: &str) -> Result { @@ -100,3 +88,56 @@ impl SchemaProvider for InformationSchemaProvider { Ok(self.tables.contains(&normalized_name)) } } + +pub struct InformationTable { + schema: SchemaRef, + stream: Arc>>, +} + +impl InformationTable { + pub fn new(stream: SendableRecordBatchStream) -> Self { + let schema = stream.schema(); + Self { + schema, + stream: Arc::new(Mutex::new(Some(stream))), + } + } +} + +#[async_trait] +impl Table for InformationTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_info(&self) -> table::metadata::TableInfoRef { + unreachable!("Should not call table_info() of InformationTable directly") + } + + /// Scan the table and returns a SendableRecordBatchStream. + async fn scan( + &self, + projection: Option<&Vec>, + filters: &[Expr], + // limit can be used to reduce the amount scanned + // from the datasource as a performance optimization. + // If set, it contains the amount of rows needed by the `LogicalPlan`, + // The datasource should return *at least* this number of rows if available. + limit: Option, + ) -> TableResult { + unimplemented!() + } + + async fn scan_to_stream( + &self, + _request: ScanRequest, + ) -> TableResult { + // TODO(ruihang): remove the second unwrap + let stream = self.stream.lock().unwrap().take().unwrap(); + Ok(stream) + } +} diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 760dd75fb2f9..7b5b4a55ab79 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -18,8 +18,10 @@ use arrow_schema::SchemaRef as ArrowSchemaRef; use common_catalog::consts::{ SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX, }; +use common_error::prelude::BoxedError; use common_query::physical_plan::TaskContext; -use common_recordbatch::RecordBatch; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datafusion::datasource::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; @@ -29,7 +31,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{StringVectorBuilder, VectorRef}; use snafu::ResultExt; -use crate::error::{CreateRecordBatchSnafu, Result}; +use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; use crate::CatalogProviderRef; pub(super) struct InformationSchemaColumns { @@ -69,6 +71,28 @@ impl InformationSchemaColumns { self.catalog_provider.clone(), ) } + + pub fn to_stream(&self) -> Result { + let schema = self.schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::once(async move { + builder + .make_tables() + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(Box::pin(DfRecordBatchStreamAdapter::new( + schema, stream, + ))) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } } struct InformationSchemaColumnsBuilder { diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 1a5afa9d3e6e..f630af833e1d 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -16,8 +16,10 @@ use std::sync::Arc; use arrow_schema::SchemaRef as ArrowSchemaRef; use common_catalog::consts::INFORMATION_SCHEMA_NAME; +use common_error::prelude::BoxedError; use common_query::physical_plan::TaskContext; -use common_recordbatch::RecordBatch; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datafusion::datasource::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; @@ -27,7 +29,7 @@ use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder}; use snafu::ResultExt; use table::metadata::TableType; -use crate::error::{CreateRecordBatchSnafu, Result}; +use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; use crate::CatalogProviderRef; pub(super) struct InformationSchemaTables { @@ -60,6 +62,28 @@ impl InformationSchemaTables { self.catalog_provider.clone(), ) } + + pub fn to_stream(&self) -> Result { + let schema = self.schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::once(async move { + builder + .make_tables() + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(Box::pin(DfRecordBatchStreamAdapter::new( + schema, stream, + ))) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } } /// Builds the `information_schema.TABLE` table row by row diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 1d8c9649356e..854602b54fff 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -30,12 +30,13 @@ use datatypes::schema::{ColumnSchema, RawSchema, SchemaRef}; use datatypes::vectors::{BinaryVector, TimestampMillisecondVector, UInt8Vector}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::storage::ScanRequest; use table::engine::{EngineContext, TableEngineRef}; use table::metadata::{TableId, TableInfoRef}; use table::requests::{ CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest, TableOptions, }; -use table::{Table, TableRef}; +use table::{Result as TableResult, Table, TableRef}; use crate::error::{ self, CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu, @@ -68,8 +69,12 @@ impl Table for SystemCatalogTable { self.0.scan(projection, filters, limit).await } + async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { + self.0.scan_to_stream(request).await + } + /// Insert values into table. - async fn insert(&self, request: InsertRequest) -> table::error::Result { + async fn insert(&self, request: InsertRequest) -> TableResult { self.0.insert(request).await } @@ -77,7 +82,7 @@ impl Table for SystemCatalogTable { self.0.table_info() } - async fn delete(&self, request: DeleteRequest) -> table::Result { + async fn delete(&self, request: DeleteRequest) -> TableResult { self.0.delete(request).await } diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index dc5b8fec2dd5..42e381517b99 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -459,26 +459,20 @@ fn is_null(null_mask: &BitVec, idx: usize) -> Option { #[cfg(test)] mod tests { - use std::any::Any; use std::sync::Arc; - use std::{assert_eq, unimplemented, vec}; + use std::{assert_eq, vec}; use api::helper::ColumnDataTypeWrapper; use api::v1::column::{self, SemanticType, Values}; use api::v1::{Column, ColumnDataType}; use common_base::BitVec; use common_catalog::consts::MITO_ENGINE; - use common_query::physical_plan::PhysicalPlanRef; - use common_query::prelude::Expr; use common_time::timestamp::Timestamp; use datatypes::data_type::ConcreteDataType; - use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; + use datatypes::schema::{ColumnSchema, SchemaBuilder}; use datatypes::types::{TimestampMillisecondType, TimestampSecondType, TimestampType}; use datatypes::value::Value; use snafu::ResultExt; - use table::error::Result as TableResult; - use table::metadata::TableInfoRef; - use table::Table; use super::*; use crate::error; @@ -733,49 +727,6 @@ mod tests { assert_eq!(None, is_null(&null_mask, 99)); } - struct DemoTable; - - #[async_trait::async_trait] - impl Table for DemoTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - let column_schemas = vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true), - ]; - - Arc::new( - SchemaBuilder::try_from(column_schemas) - .unwrap() - .build() - .unwrap(), - ) - } - - fn table_info(&self) -> TableInfoRef { - unimplemented!() - } - - async fn scan( - &self, - _projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> TableResult { - unimplemented!(); - } - } - fn mock_insert_batch() -> (Vec, u32) { let row_count = 2; diff --git a/src/file-table-engine/src/table/format.rs b/src/file-table-engine/src/table/format.rs index b089fe5ef9d9..eecb2e336faa 100644 --- a/src/file-table-engine/src/table/format.rs +++ b/src/file-table-engine/src/table/format.rs @@ -20,7 +20,9 @@ use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, P use common_datasource::file_format::Format; use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef}; use common_query::prelude::Expr; +use common_query::DfPhysicalPlan; use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{DfSendableRecordBatchStream, SendableRecordBatchStream}; use datafusion::common::ToDFSchema; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; @@ -116,6 +118,38 @@ fn build_scan_plan( Ok(Arc::new(SimpleTableScan::new(Box::pin(adapter)))) } +fn build_record_batch_stream( + opener: T, + file_schema: Arc, + files: &[String], + projection: Option<&Vec>, + limit: Option, +) -> Result { + let stream = FileStream::new( + &FileScanConfig { + object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used + file_schema, + file_groups: vec![files + .iter() + .map(|filename| PartitionedFile::new(filename.to_string(), 0)) + .collect::>()], + statistics: Default::default(), + projection: projection.cloned(), + limit, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }, + 0, // partition: hard-code + opener, + &ExecutionPlanMetricsSet::new(), + ) + .context(error::BuildStreamSnafu)?; + let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream)) + .context(error::BuildStreamAdapterSnafu)?; + Ok(Box::pin(adapter)) +} + fn new_csv_scan_plan( _ctx: &CreateScanPlanContext, config: &ScanPlanConfig, @@ -132,6 +166,22 @@ fn new_csv_scan_plan( ) } +fn new_csv_stream( + _ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, + format: &CsvFormat, +) -> Result { + let file_schema = config.file_schema.arrow_schema().clone(); + let opener = build_csv_opener(file_schema.clone(), config, format)?; + build_record_batch_stream( + opener, + file_schema, + config.files, + config.projection, + config.limit, + ) +} + fn new_json_scan_plan( _ctx: &CreateScanPlanContext, config: &ScanPlanConfig, @@ -148,6 +198,22 @@ fn new_json_scan_plan( ) } +fn new_json_stream( + _ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, + format: &JsonFormat, +) -> Result { + let file_schema = config.file_schema.arrow_schema().clone(); + let opener = build_json_opener(file_schema.clone(), config, format)?; + build_record_batch_stream( + opener, + file_schema, + config.files, + config.projection, + config.limit, + ) +} + fn new_parquet_scan_plan( _ctx: &CreateScanPlanContext, config: &ScanPlanConfig, @@ -218,6 +284,76 @@ fn new_parquet_scan_plan( ))) } +fn new_parquet_stream( + _ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, + _format: &ParquetFormat, +) -> Result { + let file_schema = config.file_schema.arrow_schema().clone(); + let ScanPlanConfig { + files, + projection, + limit, + filters, + store, + .. + } = config; + + let scan_config = FileScanConfig { + object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used + file_schema: file_schema.clone(), + file_groups: vec![files + .iter() + .map(|filename| PartitionedFile::new(filename.to_string(), 0)) + .collect::>()], + statistics: Default::default(), + projection: projection.cloned(), + limit: *limit, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }; + + let filters = filters + .iter() + .map(|f| f.df_expr().clone()) + .collect::>(); + + let filters = if let Some(expr) = conjunction(filters) { + let df_schema = file_schema + .clone() + .to_dfschema_ref() + .context(error::ParquetScanPlanSnafu)?; + + let filters = create_physical_expr(&expr, &df_schema, &file_schema, &ExecutionProps::new()) + .context(error::ParquetScanPlanSnafu)?; + Some(filters) + } else { + None + }; + + let exec = ParquetExec::new(scan_config, filters, None).with_parquet_file_reader_factory( + Arc::new(DefaultParquetFileReaderFactory::new(store.clone())), + ); + + let projected_schema = if let Some(projection) = config.projection { + Arc::new( + file_schema + .project(projection) + .context(error::ProjectSchemaSnafu)?, + ) + } else { + file_schema + }; + + // let schema = Schema::try_from(projected_schema).context(error::ConvertSchemaSnafu)?; + // let stream = exec.execute(partition, context) + // let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream)) + // .context(error::BuildStreamAdapterSnafu)?; + // Ok(Box::pin(adapter)) + todo!() +} + #[derive(Debug, Clone)] pub struct ScanPlanConfig<'a> { pub file_schema: SchemaRef, diff --git a/src/file-table-engine/src/table/immutable.rs b/src/file-table-engine/src/table/immutable.rs index 980c4d9b02a6..cea04873614b 100644 --- a/src/file-table-engine/src/table/immutable.rs +++ b/src/file-table-engine/src/table/immutable.rs @@ -21,11 +21,12 @@ use common_datasource::object_store::build_backend; use common_error::prelude::BoxedError; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; +use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::SchemaRef; use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, ScanRequest}; use table::error::{self as table_error, Result as TableResult}; use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType}; use table::{requests, Table}; @@ -96,6 +97,10 @@ impl Table for ImmutableFileTable { .context(table_error::TableOperationSnafu) } + async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { + todo!() + } + async fn flush( &self, _region_number: Option, diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 2b9660a16e7c..3f28709136ed 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -39,7 +39,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use partition::manager::PartitionRuleManagerRef; use partition::splitter::WriteSplitter; use snafu::prelude::*; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, ScanRequest}; use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef}; use table::requests::{AlterKind, AlterTableRequest, DeleteRequest, InsertRequest}; @@ -155,6 +155,14 @@ impl Table for DistTable { Ok(Arc::new(dist_scan)) } + // TODO(ruihang): DistTable should not call this method directly + async fn scan_to_stream( + &self, + request: ScanRequest, + ) -> table::Result { + unimplemented!() + } + fn supports_filters_pushdown( &self, filters: &[&Expr], diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index a4c2b502e54b..f41ccba44d52 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -27,7 +27,7 @@ use common_error::ext::BoxedError; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult}; -use common_recordbatch::{RecordBatch, RecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use common_telemetry::{logging, warn}; use datatypes::schema::Schema; use futures::task::{Context, Poll}; @@ -214,6 +214,10 @@ impl Table for MitoTable { Ok(Arc::new(SimpleTableScan::new(stream))) } + async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { + todo!() + } + fn supports_filters_pushdown(&self, filters: &[&Expr]) -> TableResult> { Ok(vec![FilterPushDownType::Inexact; filters.len()]) } diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 1a9195d16cf8..14339b03945d 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -54,5 +54,6 @@ paste = "1.0" rand.workspace = true statrs = "0.16" stats-cli = "3.0" +store-api = { path = "../store-api" } streaming-stats = "0.2" tokio-stream = "0.1" diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index e133ce4032a7..0cbc26e9b004 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -18,13 +18,14 @@ use std::sync::Arc; use catalog::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider}; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; -use common_recordbatch::RecordBatch; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; +use store_api::storage::ScanRequest; use table::metadata::{FilterPushDownType, TableInfoRef}; use table::predicate::TimeRangePredicateBuilder; use table::test_util::MemTable; @@ -69,6 +70,13 @@ impl Table for MemTableWrapper { self.inner.scan(projection, filters, limit).await } + async fn scan_to_stream( + &self, + request: ScanRequest, + ) -> table::Result { + self.inner.scan_to_stream(request).await + } + fn supports_filters_pushdown( &self, filters: &[&Expr], diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 9338ff88d8d2..2ea3c2b26f2f 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -49,6 +49,13 @@ pub struct ScanRequest { pub projection: Option>, /// Filters pushed down pub filters: Vec, + /// Expected output ordering. This is only a hint and isn't guaranteed. + pub output_ordering: Option>, + /// limit can be used to reduce the amount scanned + /// from the datasource as a performance optimization. + /// If set, it contains the amount of rows needed by the caller, + /// The data source should return *at least* this number of rows if available. + pub limit: Option, } #[derive(Debug)] diff --git a/src/table/src/table.rs b/src/table/src/table.rs index b2bd036f5755..3dd38bfe083f 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -22,8 +22,9 @@ use std::sync::Arc; use async_trait::async_trait; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; +use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::SchemaRef; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, ScanRequest}; use crate::error::{Result, UnsupportedSnafu}; use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType}; @@ -73,6 +74,8 @@ pub trait Table: Send + Sync { limit: Option, ) -> Result; + async fn scan_to_stream(&self, request: ScanRequest) -> Result; + /// Tests whether the table provider can make use of any or all filter expressions /// to optimise data retrieval. fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 15c915fe9cec..8f7b9ad7f461 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -113,58 +113,57 @@ impl TableAdapter { } } -#[async_trait::async_trait] -impl Table for TableAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> TableSchemaRef { - self.schema.clone() - } - - fn table_info(&self) -> TableInfoRef { - unreachable!("Should not call table_info of TableAdaptor directly") - } - - fn table_type(&self) -> TableType { - match self.table_provider.table_type() { - DfTableType::Base => TableType::Base, - DfTableType::View => TableType::View, - DfTableType::Temporary => TableType::Temporary, - } - } - - async fn scan( - &self, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Result { - let ctx = SessionContext::new(); - let filters: Vec = filters.iter().map(|e| e.df_expr().clone()).collect(); - debug!("TableScan filter size: {}", filters.len()); - let execution_plan = self - .table_provider - .scan(&ctx.state(), projection, &filters, limit) - .await - .context(error::DatafusionSnafu)?; - let schema: SchemaRef = Arc::new( - execution_plan - .schema() - .try_into() - .context(error::SchemaConversionSnafu)?, - ); - Ok(Arc::new(PhysicalPlanAdapter::new(schema, execution_plan))) - } - - fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { - self.table_provider - .supports_filters_pushdown(&filters.iter().map(|x| x.df_expr()).collect::>()) - .context(error::DatafusionSnafu) - .map(|v| v.into_iter().map(Into::into).collect::>()) - } -} +// #[async_trait::async_trait] +// impl Table for TableAdapter { +// fn as_any(&self) -> &dyn Any { +// self +// } + +// fn schema(&self) -> TableSchemaRef { +// self.schema.clone() +// } + +// fn table_info(&self) -> TableInfoRef { +// unreachable!("Should not call table_info of TableAdaptor directly") +// } + +// fn table_type(&self) -> TableType { +// match self.table_provider.table_type() { +// DfTableType::Base => TableType::Base, +// DfTableType::View => TableType::View, +// DfTableType::Temporary => TableType::Temporary, +// } +// } + +// async fn scan( +// &self, +// projection: Option<&Vec>, +// filters: &[Expr], +// limit: Option, +// ) -> Result { +// let ctx = SessionContext::new(); +// let filters: Vec = filters.iter().map(|e| e.df_expr().clone()).collect(); +// let execution_plan = self +// .table_provider +// .scan(&ctx.state(), projection, &filters, limit) +// .await +// .context(error::DatafusionSnafu)?; +// let schema: SchemaRef = Arc::new( +// execution_plan +// .schema() +// .try_into() +// .context(error::SchemaConversionSnafu)?, +// ); +// Ok(Arc::new(PhysicalPlanAdapter::new(schema, execution_plan))) +// } + +// fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { +// self.table_provider +// .supports_filters_pushdown(&filters.iter().map(|x| x.df_expr()).collect::>()) +// .context(error::DatafusionSnafu) +// .map(|v| v.into_iter().map(Into::into).collect::>()) +// } +// } #[cfg(test)] mod tests { @@ -174,18 +173,18 @@ mod tests { use super::*; use crate::metadata::TableType::Base; - #[test] - #[should_panic] - fn test_table_adaptor_info() { - let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); - let table_adapter = TableAdapter::new(df_table).unwrap(); - let _ = table_adapter.table_info(); - } - - #[test] - fn test_table_adaptor_type() { - let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); - let table_adapter = TableAdapter::new(df_table).unwrap(); - assert_eq!(Base, table_adapter.table_type()); - } + // #[test] + // #[should_panic] + // fn test_table_adaptor_info() { + // let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); + // let table_adapter = TableAdapter::new(df_table).unwrap(); + // let _ = table_adapter.table_info(); + // } + + // #[test] + // fn test_table_adaptor_type() { + // let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); + // let table_adapter = TableAdapter::new(df_table).unwrap(); + // assert_eq!(Base, table_adapter.table_type()); + // } } diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 304734640910..3b04e67900ec 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::arrow::compute::SortOptions; use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch; use datafusion_common::from_slice::FromSlice; @@ -29,7 +29,7 @@ use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use futures::task::{Context, Poll}; use futures::Stream; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, ScanRequest}; use crate::error::Result; use crate::metadata::{TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; @@ -136,6 +136,14 @@ impl Table for NumbersTable { )) } + async fn scan_to_stream(&self, request: ScanRequest) -> Result { + Ok(Box::pin(NumbersStream { + limit: request.limit.unwrap_or(100) as u32, + schema: self.schema.clone(), + already_run: false, + })) + } + async fn flush(&self, _region_number: Option, _wait: Option) -> Result<()> { Ok(()) } diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs index 7fd81682355a..acc2710b5b11 100644 --- a/src/table/src/test_util/empty_table.rs +++ b/src/table/src/test_util/empty_table.rs @@ -16,7 +16,8 @@ use std::sync::Arc; use async_trait::async_trait; use common_query::physical_plan::PhysicalPlanRef; -use common_recordbatch::EmptyRecordBatchStream; +use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; +use store_api::storage::ScanRequest; use crate::metadata::{TableInfo, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; use crate::requests::{CreateTableRequest, InsertRequest}; @@ -85,4 +86,8 @@ impl Table for EmptyTable { let scan = SimpleTableScan::new(Box::pin(EmptyRecordBatchStream::new(self.schema()))); Ok(Arc::new(scan)) } + + async fn scan_to_stream(&self, request: ScanRequest) -> Result { + Ok(Box::pin(EmptyRecordBatchStream::new(self.schema()))) + } } diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index 8aa5f3dd2d19..0a6cc272961f 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -21,14 +21,14 @@ use common_error::prelude::BoxedError; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::UInt32Vector; use futures::task::{Context, Poll}; use futures::Stream; use snafu::prelude::*; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, ScanRequest}; use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu}; use crate::metadata::{ @@ -173,6 +173,37 @@ impl Table for MemTable { })))) } + async fn scan_to_stream(&self, request: ScanRequest) -> Result { + let df_recordbatch = if let Some(indices) = request.projection { + self.recordbatch + .df_record_batch() + .project(&indices) + .context(TableProjectionSnafu)? + } else { + self.recordbatch.df_record_batch().clone() + }; + + let rows = df_recordbatch.num_rows(); + let limit = if let Some(limit) = request.limit { + limit.min(rows) + } else { + rows + }; + let df_recordbatch = df_recordbatch.slice(0, limit); + + let recordbatch = RecordBatch::try_from_df_record_batch( + Arc::new(Schema::try_from(df_recordbatch.schema()).context(SchemaConversionSnafu)?), + df_recordbatch, + ) + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu)?; + + Ok(Box::pin(MemtableStream { + schema: recordbatch.schema.clone(), + recordbatch: Some(recordbatch), + })) + } + fn statistics(&self) -> Option { let df_recordbatch = self.recordbatch.df_record_batch(); let num_rows = df_recordbatch.num_rows(); From b7f52ee47dc851af69f23737a35f4f49d840842e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 22 May 2023 20:45:10 +0800 Subject: [PATCH 02/17] impl parquet stream Signed-off-by: Ruihang Xia --- Cargo.lock | 18 +++--- Cargo.toml | 14 ++--- src/file-table-engine/src/error.rs | 2 +- src/file-table-engine/src/table/format.rs | 62 +++++++++++++------- src/file-table-engine/src/table/immutable.rs | 16 ++++- 5 files changed, 74 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10b5471caba0..ca3ebeffa5c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2334,7 +2334,7 @@ dependencies = [ [[package]] name = "datafusion" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "ahash 0.8.3", "arrow", @@ -2383,7 +2383,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "arrow", "arrow-array", @@ -2397,7 +2397,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "dashmap", "datafusion-common", @@ -2414,7 +2414,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "ahash 0.8.3", "arrow", @@ -2425,7 +2425,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "arrow", "async-trait", @@ -2442,7 +2442,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "ahash 0.8.3", "arrow", @@ -2473,7 +2473,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "arrow", "datafusion-common", @@ -2484,7 +2484,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "arrow", "arrow-schema", @@ -2497,7 +2497,7 @@ dependencies = [ [[package]] name = "datafusion-substrait" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "async-recursion", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 1cd431c9c50e..021e64138037 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,13 +62,13 @@ async-stream = "0.3" async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } # TODO(ruihang): use arrow-datafusion when it contains https://github.com/apache/arrow-datafusion/pull/6032 -datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } +datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } futures = "0.3" futures-util = "0.3" parquet = "37.0" diff --git a/src/file-table-engine/src/error.rs b/src/file-table-engine/src/error.rs index 7fc4c91d224e..ec45857fe2b1 100644 --- a/src/file-table-engine/src/error.rs +++ b/src/file-table-engine/src/error.rs @@ -142,7 +142,7 @@ pub enum Error { #[snafu(display("Failed to build stream: {}", source))] BuildStream { - source: datafusion::error::DataFusionError, + source: DataFusionError, location: Location, }, diff --git a/src/file-table-engine/src/table/format.rs b/src/file-table-engine/src/table/format.rs index eecb2e336faa..6503c182a384 100644 --- a/src/file-table-engine/src/table/format.rs +++ b/src/file-table-engine/src/table/format.rs @@ -29,7 +29,9 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::optimizer::utils::conjunction; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_expr::execution_props::ExecutionProps; -use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream, ParquetExec}; +use datafusion::physical_plan::file_format::{ + FileOpener, FileScanConfig, FileStream, ParquetExec, ParquetOpener, +}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datatypes::arrow::datatypes::Schema as ArrowSchema; use datatypes::schema::{Schema, SchemaRef}; @@ -37,7 +39,7 @@ use object_store::ObjectStore; use snafu::ResultExt; use table::table::scan::SimpleTableScan; -use crate::error::{self, Result}; +use crate::error::{self, BuildStreamSnafu, Result}; const DEFAULT_BATCH_SIZE: usize = 8192; @@ -332,26 +334,34 @@ fn new_parquet_stream( None }; - let exec = ParquetExec::new(scan_config, filters, None).with_parquet_file_reader_factory( - Arc::new(DefaultParquetFileReaderFactory::new(store.clone())), - ); - - let projected_schema = if let Some(projection) = config.projection { - Arc::new( - file_schema - .project(projection) - .context(error::ProjectSchemaSnafu)?, - ) - } else { - file_schema + let parquet_opener = ParquetOpener { + partition_index: 0, // partition: hard-code. This is only for statistics purpose + projection: Arc::from(projection.cloned().unwrap_or_default()), + batch_size: DEFAULT_BATCH_SIZE, + limit: *limit, + predicate: filters, + pruning_predicate: None, + page_pruning_predicate: None, + table_schema: file_schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new(store.clone())), + pushdown_filters: true, + reorder_filters: true, + enable_page_index: true, }; - // let schema = Schema::try_from(projected_schema).context(error::ConvertSchemaSnafu)?; - // let stream = exec.execute(partition, context) - // let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream)) - // .context(error::BuildStreamAdapterSnafu)?; - // Ok(Box::pin(adapter)) - todo!() + let stream = FileStream::new( + &scan_config, + 0, + parquet_opener, + &ExecutionPlanMetricsSet::new(), + ) + .context(BuildStreamSnafu)?; + + let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream)) + .context(error::BuildStreamAdapterSnafu)?; + Ok(Box::pin(adapter)) } #[derive(Debug, Clone)] @@ -375,3 +385,15 @@ pub fn create_physical_plan( Format::Parquet(format) => new_parquet_scan_plan(ctx, config, format), } } + +pub fn create_stream( + format: &Format, + ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, +) -> Result { + match format { + Format::Csv(format) => new_csv_stream(ctx, config, format), + Format::Json(format) => new_json_stream(ctx, config, format), + Format::Parquet(format) => new_parquet_stream(ctx, config, format), + } +} diff --git a/src/file-table-engine/src/table/immutable.rs b/src/file-table-engine/src/table/immutable.rs index cea04873614b..9171f3dd1385 100644 --- a/src/file-table-engine/src/table/immutable.rs +++ b/src/file-table-engine/src/table/immutable.rs @@ -31,6 +31,7 @@ use table::error::{self as table_error, Result as TableResult}; use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType}; use table::{requests, Table}; +use super::format::create_stream; use crate::error::{self, ConvertRawSnafu, Result}; use crate::manifest::immutable::{ read_table_manifest, write_table_manifest, ImmutableMetadata, INIT_META_VERSION, @@ -98,7 +99,20 @@ impl Table for ImmutableFileTable { } async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { - todo!() + create_stream( + &self.format, + &CreateScanPlanContext::default(), + &ScanPlanConfig { + file_schema: self.schema(), + files: &self.files, + projection: request.projection.as_ref(), + filters: &request.filters, + limit: request.limit, + store: self.object_store.clone(), + }, + ) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu) } async fn flush( From 5b079a2b2da71f2f987fb8ae17e2d4baabc5127d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 23 May 2023 11:28:02 +0800 Subject: [PATCH 03/17] reorganise adapters Signed-off-by: Ruihang Xia --- src/catalog/src/information_schema.rs | 1 - src/file-table-engine/src/table/format.rs | 4 +- src/mito/src/table.rs | 4 +- src/store-api/src/storage.rs | 2 +- src/store-api/src/storage/requests.rs | 11 +- src/table/src/table/adapter.rs | 134 ++++------------------ src/table/src/table/numbers.rs | 4 +- src/table/src/table/scan.rs | 13 ++- src/table/src/test_util/empty_table.rs | 6 +- src/table/src/test_util/memtable.rs | 4 +- 10 files changed, 53 insertions(+), 130 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index ca8181ae912b..494f0b0ff7b3 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -26,7 +26,6 @@ use datafusion::datasource::streaming::{PartitionStream, StreamingTable}; use datatypes::schema::SchemaRef; use snafu::ResultExt; use store_api::storage::ScanRequest; -use table::table::adapter::TableAdapter; use table::{Result as TableResult, Table, TableRef}; use self::columns::InformationSchemaColumns; diff --git a/src/file-table-engine/src/table/format.rs b/src/file-table-engine/src/table/format.rs index 6503c182a384..a548cc51a955 100644 --- a/src/file-table-engine/src/table/format.rs +++ b/src/file-table-engine/src/table/format.rs @@ -37,7 +37,7 @@ use datatypes::arrow::datatypes::Schema as ArrowSchema; use datatypes::schema::{Schema, SchemaRef}; use object_store::ObjectStore; use snafu::ResultExt; -use table::table::scan::SimpleTableScan; +use table::table::scan::StreamScanAdapter; use crate::error::{self, BuildStreamSnafu, Result}; @@ -117,7 +117,7 @@ fn build_scan_plan( .context(error::BuildStreamSnafu)?; let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream)) .context(error::BuildStreamAdapterSnafu)?; - Ok(Arc::new(SimpleTableScan::new(Box::pin(adapter)))) + Ok(Arc::new(StreamScanAdapter::new(Box::pin(adapter)))) } fn build_record_batch_stream( diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index f41ccba44d52..0debccb53c2e 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -48,7 +48,7 @@ use table::metadata::{ use table::requests::{ AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest, }; -use table::table::scan::SimpleTableScan; +use table::table::scan::StreamScanAdapter; use table::table::{AlterContext, Table}; use table::{error as table_error, RegionStat}; use tokio::sync::Mutex; @@ -211,7 +211,7 @@ impl Table for MitoTable { }); let stream = Box::pin(ChunkStream { schema, stream }); - Ok(Arc::new(SimpleTableScan::new(stream))) + Ok(Arc::new(StreamScanAdapter::new(stream))) } async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index d8978db21f48..b089a2573f13 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -36,7 +36,7 @@ pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine} pub use self::metadata::RegionMeta; pub use self::region::{FlushContext, FlushReason, Region, RegionStat, WriteContext}; pub use self::requests::{ - AddColumn, AlterOperation, AlterRequest, GetRequest, ScanRequest, WriteRequest, + AddColumn, AlterOperation, AlterRequest, GetRequest, OrderOption, ScanRequest, WriteRequest, }; pub use self::responses::{GetResponse, ScanResponse, WriteResponse}; pub use self::snapshot::{ReadContext, Snapshot}; diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 2ea3c2b26f2f..8fa9de1c38d1 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet}; use common_error::ext::ErrorExt; use common_query::logical_plan::Expr; +use datatypes::arrow::compute::SortOptions; use datatypes::vectors::VectorRef; use crate::storage::{ColumnDescriptor, RegionDescriptor, SequenceNumber}; @@ -38,7 +39,7 @@ pub trait WriteRequest: Send { fn delete(&mut self, keys: HashMap) -> Result<(), Self::Error>; } -#[derive(Default)] +#[derive(Default, Clone, Debug)] pub struct ScanRequest { /// Max sequence number to read, None for latest sequence. /// @@ -50,7 +51,7 @@ pub struct ScanRequest { /// Filters pushed down pub filters: Vec, /// Expected output ordering. This is only a hint and isn't guaranteed. - pub output_ordering: Option>, + pub output_ordering: Option>, /// limit can be used to reduce the amount scanned /// from the datasource as a performance optimization. /// If set, it contains the amount of rows needed by the caller, @@ -58,6 +59,12 @@ pub struct ScanRequest { pub limit: Option, } +#[derive(Debug, Clone, Copy)] +pub struct OrderOption { + pub index: usize, + pub options: SortOptions, +} + #[derive(Debug)] pub struct GetRequest {} diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 8f7b9ad7f461..fc519233f7a9 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -13,39 +13,43 @@ // limitations under the License. use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use common_query::logical_plan::Expr; -use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlanAdapter, PhysicalPlanRef}; +use common_query::physical_plan::DfPhysicalPlanAdapter; use common_query::DfPhysicalPlan; -use common_telemetry::debug; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::datasource::datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown; use datafusion::datasource::{TableProvider, TableType as DfTableType}; use datafusion::error::Result as DfResult; use datafusion::execution::context::SessionState; -use datafusion::prelude::SessionContext; use datafusion_expr::expr::Expr as DfExpr; -use datatypes::schema::{SchemaRef as TableSchemaRef, SchemaRef}; -use snafu::prelude::*; +use store_api::storage::{OrderOption, ScanRequest}; -use crate::error::{self, Result}; -use crate::metadata::TableInfoRef; -use crate::table::{FilterPushDownType, Table, TableRef, TableType}; +use super::scan::StreamScanAdapter; +use crate::table::{TableRef, TableType}; -/// Greptime Table -> datafusion TableProvider +/// Adapt greptime's [Table] to DataFusion's [TableProvider]. pub struct DfTableProviderAdapter { table: TableRef, + scan_req: Arc>, } impl DfTableProviderAdapter { pub fn new(table: TableRef) -> Self { - Self { table } + Self { + table, + scan_req: Arc::default(), + } } pub fn table(&self) -> TableRef { self.table.clone() } + + pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) { + self.scan_req.lock().unwrap().output_ordering = Some(order_opts.to_vec()); + } } #[async_trait::async_trait] @@ -74,8 +78,16 @@ impl TableProvider for DfTableProviderAdapter { limit: Option, ) -> DfResult> { let filters: Vec = filters.iter().map(Clone::clone).map(Into::into).collect(); - let inner = self.table.scan(projection, &filters, limit).await?; - Ok(Arc::new(DfPhysicalPlanAdapter(inner))) + let request = { + let mut request = self.scan_req.lock().unwrap(); + request.filters = filters; + request.projection = projection.cloned(); + request.limit = limit; + request.clone() + }; + let stream = self.table.scan_to_stream(request).await?; + let stream_adapter = Arc::new(StreamScanAdapter::new(stream)); + Ok(Arc::new(DfPhysicalPlanAdapter(stream_adapter))) } fn supports_filters_pushdown( @@ -92,99 +104,3 @@ impl TableProvider for DfTableProviderAdapter { .map(|v| v.into_iter().map(Into::into).collect::>())?) } } - -/// Datafusion TableProvider -> greptime Table -pub struct TableAdapter { - schema: TableSchemaRef, - table_provider: Arc, -} - -impl TableAdapter { - pub fn new(table_provider: Arc) -> Result { - Ok(Self { - schema: Arc::new( - table_provider - .schema() - .try_into() - .context(error::SchemaConversionSnafu)?, - ), - table_provider, - }) - } -} - -// #[async_trait::async_trait] -// impl Table for TableAdapter { -// fn as_any(&self) -> &dyn Any { -// self -// } - -// fn schema(&self) -> TableSchemaRef { -// self.schema.clone() -// } - -// fn table_info(&self) -> TableInfoRef { -// unreachable!("Should not call table_info of TableAdaptor directly") -// } - -// fn table_type(&self) -> TableType { -// match self.table_provider.table_type() { -// DfTableType::Base => TableType::Base, -// DfTableType::View => TableType::View, -// DfTableType::Temporary => TableType::Temporary, -// } -// } - -// async fn scan( -// &self, -// projection: Option<&Vec>, -// filters: &[Expr], -// limit: Option, -// ) -> Result { -// let ctx = SessionContext::new(); -// let filters: Vec = filters.iter().map(|e| e.df_expr().clone()).collect(); -// let execution_plan = self -// .table_provider -// .scan(&ctx.state(), projection, &filters, limit) -// .await -// .context(error::DatafusionSnafu)?; -// let schema: SchemaRef = Arc::new( -// execution_plan -// .schema() -// .try_into() -// .context(error::SchemaConversionSnafu)?, -// ); -// Ok(Arc::new(PhysicalPlanAdapter::new(schema, execution_plan))) -// } - -// fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { -// self.table_provider -// .supports_filters_pushdown(&filters.iter().map(|x| x.df_expr()).collect::>()) -// .context(error::DatafusionSnafu) -// .map(|v| v.into_iter().map(Into::into).collect::>()) -// } -// } - -#[cfg(test)] -mod tests { - use datafusion::arrow; - use datafusion::datasource::empty::EmptyTable; - - use super::*; - use crate::metadata::TableType::Base; - - // #[test] - // #[should_panic] - // fn test_table_adaptor_info() { - // let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); - // let table_adapter = TableAdapter::new(df_table).unwrap(); - // let _ = table_adapter.table_info(); - // } - - // #[test] - // fn test_table_adaptor_type() { - // let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); - // let table_adapter = TableAdapter::new(df_table).unwrap(); - // assert_eq!(Base, table_adapter.table_type()); - // } -} diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 3b04e67900ec..4ee32e40c827 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -33,7 +33,7 @@ use store_api::storage::{RegionNumber, ScanRequest}; use crate::error::Result; use crate::metadata::{TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; -use crate::table::scan::SimpleTableScan; +use crate::table::scan::StreamScanAdapter; use crate::table::{Expr, Table}; const NUMBER_COLUMN: &str = "number"; @@ -132,7 +132,7 @@ impl Table for NumbersTable { ) .into()]; Ok(Arc::new( - SimpleTableScan::new(stream).with_output_ordering(output_ordering), + StreamScanAdapter::new(stream).with_output_ordering(output_ordering), )) } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 01acd5854af0..c510dbdde8b7 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -25,22 +25,23 @@ use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::SchemaRef; use snafu::OptionExt; -pub struct SimpleTableScan { +/// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan]. +pub struct StreamScanAdapter { stream: Mutex>, schema: SchemaRef, output_ordering: Option>, } -impl Debug for SimpleTableScan { +impl Debug for StreamScanAdapter { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SimpleTableScan") + f.debug_struct("StreamScanAdapter") .field("stream", &"") .field("schema", &self.schema) .finish() } } -impl SimpleTableScan { +impl StreamScanAdapter { pub fn new(stream: SendableRecordBatchStream) -> Self { let schema = stream.schema(); @@ -57,7 +58,7 @@ impl SimpleTableScan { } } -impl PhysicalPlan for SimpleTableScan { +impl PhysicalPlan for StreamScanAdapter { fn as_any(&self) -> &dyn Any { self } @@ -126,7 +127,7 @@ mod test { RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap(); let stream = recordbatches.as_stream(); - let scan = SimpleTableScan::new(stream); + let scan = StreamScanAdapter::new(stream); assert_eq!(scan.schema(), schema); diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs index acc2710b5b11..c2388dc2922e 100644 --- a/src/table/src/test_util/empty_table.rs +++ b/src/table/src/test_util/empty_table.rs @@ -21,7 +21,7 @@ use store_api::storage::ScanRequest; use crate::metadata::{TableInfo, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; use crate::requests::{CreateTableRequest, InsertRequest}; -use crate::table::scan::SimpleTableScan; +use crate::table::scan::StreamScanAdapter; use crate::{Result, Table}; pub struct EmptyTable { @@ -83,11 +83,11 @@ impl Table for EmptyTable { _filters: &[common_query::prelude::Expr], _limit: Option, ) -> Result { - let scan = SimpleTableScan::new(Box::pin(EmptyRecordBatchStream::new(self.schema()))); + let scan = StreamScanAdapter::new(Box::pin(EmptyRecordBatchStream::new(self.schema()))); Ok(Arc::new(scan)) } - async fn scan_to_stream(&self, request: ScanRequest) -> Result { + async fn scan_to_stream(&self, _: ScanRequest) -> Result { Ok(Box::pin(EmptyRecordBatchStream::new(self.schema()))) } } diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index 0a6cc272961f..ecb888f8d09a 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -34,7 +34,7 @@ use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu, TablesRe use crate::metadata::{ TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, TableVersion, }; -use crate::table::scan::SimpleTableScan; +use crate::table::scan::StreamScanAdapter; use crate::{ColumnStatistics, Table, TableStatistics}; #[derive(Debug, Clone)] @@ -167,7 +167,7 @@ impl Table for MemTable { ) .map_err(BoxedError::new) .context(TablesRecordBatchSnafu)?; - Ok(Arc::new(SimpleTableScan::new(Box::pin(MemtableStream { + Ok(Arc::new(StreamScanAdapter::new(Box::pin(MemtableStream { schema: recordbatch.schema.clone(), recordbatch: Some(recordbatch), })))) From 2101b3a130c1ed597795e9509232967ec5bf0bcb Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 23 May 2023 11:34:50 +0800 Subject: [PATCH 04/17] implement scan_to_stream for mito table Signed-off-by: Ruihang Xia --- src/catalog/src/information_schema.rs | 5 +-- src/mito/src/table.rs | 58 ++++++++++++++++++++++++++- 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 494f0b0ff7b3..c0ce8183735f 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -21,10 +21,9 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; -use common_recordbatch::{DfSendableRecordBatchStream, SendableRecordBatchStream}; -use datafusion::datasource::streaming::{PartitionStream, StreamingTable}; +use common_recordbatch::SendableRecordBatchStream; +use datafusion::datasource::streaming::PartitionStream; use datatypes::schema::SchemaRef; -use snafu::ResultExt; use store_api::storage::ScanRequest; use table::{Result as TableResult, Table, TableRef}; diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 0debccb53c2e..e3994ea1e373 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -215,7 +215,63 @@ impl Table for MitoTable { } async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { - todo!() + let read_ctx = ReadContext::default(); + let mut readers = Vec::with_capacity(self.regions.len()); + let mut first_schema: Option> = None; + + let table_info = self.table_info.load(); + // TODO(hl): Currently the API between frontend and datanode is under refactoring in + // https://github.com/GreptimeTeam/greptimedb/issues/597 . Once it's finished, query plan + // can carry filtered region info to avoid scanning all regions on datanode. + for region in self.regions.values() { + let snapshot = region + .snapshot(&read_ctx) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + let reader = snapshot + .scan(&read_ctx, request.clone()) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)? + .reader; + + let schema = reader.user_schema().clone(); + if let Some(first_schema) = &first_schema { + // TODO(hl): we assume all regions' schemas are the same, but undergoing table altering + // may make these schemas inconsistent. + ensure!( + first_schema.version() == schema.version(), + RegionSchemaMismatchSnafu { + table: common_catalog::format_full_table_name( + &table_info.catalog_name, + &table_info.schema_name, + &table_info.name + ) + } + ); + } else { + first_schema = Some(schema); + } + readers.push(reader); + } + + // TODO(hl): we assume table contains at least one region, but with region migration this + // assumption may become invalid. + let stream_schema = first_schema.context(InvalidTableSnafu { + table_id: table_info.ident.table_id, + })?; + + let schema = stream_schema.clone(); + let stream = Box::pin(async_stream::try_stream! { + for mut reader in readers { + while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? { + let chunk = reader.project_chunk(chunk); + yield RecordBatch::new(stream_schema.clone(), chunk.columns)? + } + } + }); + + Ok(Box::pin(ChunkStream { schema, stream })) } fn supports_filters_pushdown(&self, filters: &[&Expr]) -> TableResult> { From 8441330413639fd348117b36c747af2ef7270c0c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 23 May 2023 14:31:05 +0800 Subject: [PATCH 05/17] clean up Signed-off-by: Ruihang Xia --- src/catalog/src/information_schema.rs | 9 ++++----- src/file-table-engine/src/table/format.rs | 3 +-- src/frontend/src/table.rs | 2 +- src/table/src/table/adapter.rs | 2 +- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index c0ce8183735f..9cc8b6636845 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -22,13 +22,12 @@ use async_trait::async_trait; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; use common_recordbatch::SendableRecordBatchStream; -use datafusion::datasource::streaming::PartitionStream; use datatypes::schema::SchemaRef; use store_api::storage::ScanRequest; use table::{Result as TableResult, Table, TableRef}; use self::columns::InformationSchemaColumns; -use crate::error::{DatafusionSnafu, Result, TableSchemaMismatchSnafu}; +use crate::error::Result; use crate::information_schema::tables::InformationSchemaTables; use crate::{CatalogProviderRef, SchemaProvider}; @@ -119,13 +118,13 @@ impl Table for InformationTable { /// Scan the table and returns a SendableRecordBatchStream. async fn scan( &self, - projection: Option<&Vec>, - filters: &[Expr], + _projection: Option<&Vec>, + _filters: &[Expr], // limit can be used to reduce the amount scanned // from the datasource as a performance optimization. // If set, it contains the amount of rows needed by the `LogicalPlan`, // The datasource should return *at least* this number of rows if available. - limit: Option, + _limit: Option, ) -> TableResult { unimplemented!() } diff --git a/src/file-table-engine/src/table/format.rs b/src/file-table-engine/src/table/format.rs index a548cc51a955..47d58ee69c1c 100644 --- a/src/file-table-engine/src/table/format.rs +++ b/src/file-table-engine/src/table/format.rs @@ -20,9 +20,8 @@ use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, P use common_datasource::file_format::Format; use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef}; use common_query::prelude::Expr; -use common_query::DfPhysicalPlan; use common_recordbatch::adapter::RecordBatchStreamAdapter; -use common_recordbatch::{DfSendableRecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::SendableRecordBatchStream; use datafusion::common::ToDFSchema; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 3f28709136ed..3254f77c859b 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -158,7 +158,7 @@ impl Table for DistTable { // TODO(ruihang): DistTable should not call this method directly async fn scan_to_stream( &self, - request: ScanRequest, + _request: ScanRequest, ) -> table::Result { unimplemented!() } diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index fc519233f7a9..53f3a9600e2a 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -29,7 +29,7 @@ use store_api::storage::{OrderOption, ScanRequest}; use super::scan::StreamScanAdapter; use crate::table::{TableRef, TableType}; -/// Adapt greptime's [Table] to DataFusion's [TableProvider]. +/// Adapt greptime's [TableRef] to DataFusion's [TableProvider]. pub struct DfTableProviderAdapter { table: TableRef, scan_req: Arc>, From 8546bf422c6029f064e015c0da70d4069803eb87 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 23 May 2023 17:06:32 +0800 Subject: [PATCH 06/17] add location info Signed-off-by: Ruihang Xia --- src/common/query/src/physical_plan.rs | 1 + src/query/src/error.rs | 60 +++++++++++++++++---------- 2 files changed, 38 insertions(+), 23 deletions(-) diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index c949cdf414e6..83affe52e75d 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -71,6 +71,7 @@ pub trait PhysicalPlan: Debug + Send + Sync { ) -> Result; } +/// Adapt DataFusion's [`ExecutionPlan`](DfPhysicalPlan) to GreptimeDB's [`PhysicalPlan`]. #[derive(Debug)] pub struct PhysicalPlanAdapter { schema: SchemaRef, diff --git a/src/query/src/error.rs b/src/query/src/error.rs index b45045a4edfd..3f658e20a391 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -34,8 +34,8 @@ pub enum Error { #[snafu(display("General catalog error: {}", source))] Catalog { - #[snafu(backtrace)] source: catalog::error::Error, + location: Location, }, #[snafu(display("Catalog not found: {}", catalog))] @@ -49,35 +49,49 @@ pub enum Error { #[snafu(display("Failed to do vector computation, source: {}", source))] VectorComputation { - #[snafu(backtrace)] source: datatypes::error::Error, + location: Location, }, #[snafu(display("Failed to create RecordBatch, source: {}", source))] CreateRecordBatch { - #[snafu(backtrace)] source: common_recordbatch::error::Error, + location: Location, }, #[snafu(display("Failure during query execution, source: {}", source))] - QueryExecution { source: BoxedError }, + QueryExecution { + source: BoxedError, + location: Location, + }, #[snafu(display("Failure during query planning, source: {}", source))] - QueryPlan { source: BoxedError }, + QueryPlan { + source: BoxedError, + location: Location, + }, #[snafu(display("Failure during query parsing, query: {}, source: {}", query, source))] - QueryParse { query: String, source: BoxedError }, + QueryParse { + query: String, + source: BoxedError, + location: Location, + }, #[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))] - QueryAccessDenied { catalog: String, schema: String }, + QueryAccessDenied { + catalog: String, + schema: String, + location: Location, + }, #[snafu(display("The SQL string has multiple statements, query: {}", query))] MultipleStatements { query: String, location: Location }, #[snafu(display("Failed to convert Datafusion schema: {}", source))] ConvertDatafusionSchema { - #[snafu(backtrace)] source: datatypes::error::Error, + location: Location, }, #[snafu(display("Failed to parse timestamp `{}`: {}", raw, source))] @@ -102,7 +116,7 @@ pub enum Error { #[snafu(display("General SQL error: {}", source))] Sql { - #[snafu(backtrace)] + location: Location, source: sql::error::Error, }, @@ -122,21 +136,21 @@ pub enum Error { #[snafu(display("Failed to convert value to sql value: {}", value))] ConvertSqlValue { value: Value, - #[snafu(backtrace)] source: sql::error::Error, + location: Location, }, #[snafu(display("Failed to convert concrete type to sql type: {:?}", datatype))] ConvertSqlType { datatype: ConcreteDataType, - #[snafu(backtrace)] source: sql::error::Error, + location: Location, }, #[snafu(display("Failed to parse SQL, source: {}", source))] ParseSql { - #[snafu(backtrace)] source: sql::error::Error, + location: Location, }, #[snafu(display("Missing required field: {}", name))] @@ -150,32 +164,32 @@ pub enum Error { #[snafu(display("Failed to build data source backend, source: {}", source))] BuildBackend { - #[snafu(backtrace)] source: common_datasource::error::Error, + location: Location, }, #[snafu(display("Failed to list objects, source: {}", source))] ListObjects { - #[snafu(backtrace)] source: common_datasource::error::Error, + location: Location, }, #[snafu(display("Failed to parse file format: {}", source))] ParseFileFormat { - #[snafu(backtrace)] source: common_datasource::error::Error, + location: Location, }, #[snafu(display("Failed to infer schema: {}", source))] InferSchema { - #[snafu(backtrace)] source: common_datasource::error::Error, + location: Location, }, #[snafu(display("Failed to convert datafusion schema, source: {}", source))] ConvertSchema { - #[snafu(backtrace)] source: datatypes::error::Error, + location: Location, }, } @@ -201,15 +215,15 @@ impl ErrorExt for Error { ParseFileFormat { source, .. } | InferSchema { source, .. } => source.status_code(), QueryAccessDenied { .. } => StatusCode::AccessDenied, - Catalog { source } => source.status_code(), - VectorComputation { source } | ConvertDatafusionSchema { source } => { + Catalog { source, .. } => source.status_code(), + VectorComputation { source, .. } | ConvertDatafusionSchema { source, .. } => { source.status_code() } - ParseSql { source } => source.status_code(), - CreateRecordBatch { source } => source.status_code(), - QueryExecution { source } | QueryPlan { source } => source.status_code(), + ParseSql { source, .. } => source.status_code(), + CreateRecordBatch { source, .. } => source.status_code(), + QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(), DataFusion { .. } | MissingTimestampColumn { .. } => StatusCode::Internal, - Sql { source } => source.status_code(), + Sql { source, .. } => source.status_code(), PlanSql { .. } => StatusCode::PlanQuery, ConvertSqlType { source, .. } | ConvertSqlValue { source, .. } => source.status_code(), } From bacefd6d1571583d8b168200c8cf5d5a935c2677 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 24 May 2023 16:44:10 +0800 Subject: [PATCH 07/17] fix: table scan --- src/mito/src/table.rs | 21 ++++++++++++++++--- src/query/src/tests/time_range_filter_test.rs | 1 + 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index e3994ea1e373..0742a6ba532a 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -216,20 +216,34 @@ impl Table for MitoTable { async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { let read_ctx = ReadContext::default(); - let mut readers = Vec::with_capacity(self.regions.len()); + let regions = self.regions.load(); + let mut readers = Vec::with_capacity(regions.len()); let mut first_schema: Option> = None; let table_info = self.table_info.load(); // TODO(hl): Currently the API between frontend and datanode is under refactoring in // https://github.com/GreptimeTeam/greptimedb/issues/597 . Once it's finished, query plan // can carry filtered region info to avoid scanning all regions on datanode. - for region in self.regions.values() { + for region in regions.values() { let snapshot = region .snapshot(&read_ctx) .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; + + let projection = self + .transform_projection(region, request.projection.clone()) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + let filters = request.filters.clone().into(); + + let scan_request = ScanRequest { + projection, + filters, + ..Default::default() + }; + let reader = snapshot - .scan(&read_ctx, request.clone()) + .scan(&read_ctx, scan_request) .await .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)? @@ -262,6 +276,7 @@ impl Table for MitoTable { })?; let schema = stream_schema.clone(); + let stream = Box::pin(async_stream::try_stream! { for mut reader in readers { while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? { diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 0cbc26e9b004..73431ed2484b 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -74,6 +74,7 @@ impl Table for MemTableWrapper { &self, request: ScanRequest, ) -> table::Result { + *self.filter.write().await = request.filters.clone(); self.inner.scan_to_stream(request).await } From a08fe403999090187ebfb6f7050c0bd07c37f06d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 24 May 2023 19:17:22 +0800 Subject: [PATCH 08/17] UT pass Signed-off-by: Ruihang Xia --- src/common/recordbatch/src/adapter.rs | 1 + src/common/recordbatch/src/lib.rs | 20 ++++++ src/frontend/src/table.rs | 70 ++++++++++++++++++- src/mito/src/table.rs | 27 ++----- .../common/optimizer/order_by.result | 24 ++++--- 5 files changed, 106 insertions(+), 36 deletions(-) diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 4be42c1175d5..f9da1e422289 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -160,6 +160,7 @@ enum AsyncRecordBatchStreamAdapterState { Failed, } +// TODO(ruihang): figure out what this adapter adapts to. pub struct AsyncRecordBatchStreamAdapter { schema: SchemaRef, state: AsyncRecordBatchStreamAdapterState, diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 477b8527bd49..c38d36305622 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -181,6 +181,26 @@ impl Stream for SimpleRecordBatchStream { } } +/// Adapt a [Stream] of [RecordBatch] to a [RecordBatchStream]. +pub struct RecordBatchStreamAdaptor { + pub schema: SchemaRef, + pub stream: Pin> + Send>>, +} + +impl RecordBatchStream for RecordBatchStreamAdaptor { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for RecordBatchStreamAdaptor { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(ctx) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 3254f77c859b..e6428253d5ea 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::iter; +use std::pin::Pin; use std::sync::Arc; use api::v1::AlterExpr; @@ -28,7 +29,12 @@ use common_query::logical_plan::Expr; use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; use common_query::Output; use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter; -use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; +use common_recordbatch::error::{ + InitRecordbatchStreamSnafu, PollStreamSnafu, Result as RecordBatchResult, +}; +use common_recordbatch::{ + RecordBatch, RecordBatchStreamAdaptor, RecordBatches, SendableRecordBatchStream, +}; use common_telemetry::debug; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::{ @@ -36,6 +42,7 @@ use datafusion::physical_plan::{ }; use datafusion_common::DataFusionError; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use futures_util::{Stream, StreamExt}; use partition::manager::PartitionRuleManagerRef; use partition::splitter::WriteSplitter; use snafu::prelude::*; @@ -43,6 +50,7 @@ use store_api::storage::{RegionNumber, ScanRequest}; use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef}; use table::requests::{AlterKind, AlterTableRequest, DeleteRequest, InsertRequest}; +use table::table::scan::StreamScanAdapter; use table::table::AlterContext; use table::{meter_insert_request, Table}; use tokio::sync::RwLock; @@ -158,9 +166,65 @@ impl Table for DistTable { // TODO(ruihang): DistTable should not call this method directly async fn scan_to_stream( &self, - _request: ScanRequest, + request: ScanRequest, ) -> table::Result { - unimplemented!() + let partition_rule = self + .partition_manager + .find_table_partition_rule(&self.table_name) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + + let regions = self + .partition_manager + .find_regions_by_filters(partition_rule, &request.filters) + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + let datanodes = self + .partition_manager + .find_region_datanodes(&self.table_name, regions) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + + let table_name = &self.table_name; + let mut partition_execs = Vec::with_capacity(datanodes.len()); + for (datanode, _regions) in datanodes.iter() { + let client = self.datanode_clients.get_client(datanode).await; + let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client); + let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db); + + partition_execs.push(Arc::new(PartitionExec { + table_name: table_name.clone(), + datanode_instance, + projection: request.projection.clone(), + filters: request.filters.clone(), + limit: request.limit, + batches: Arc::new(RwLock::new(None)), + })); + } + + let schema = project_schema(self.schema(), request.projection.as_ref()); + let schema_to_move = schema.clone(); + let stream: Pin> + Send>> = Box::pin( + async_stream::try_stream! { + for partition_exec in partition_execs { + partition_exec + .maybe_init() + .await + .map_err(|e| DataFusionError::External(Box::new(e))) + .context(InitRecordbatchStreamSnafu)?; + let mut stream = partition_exec.as_stream().await.context(InitRecordbatchStreamSnafu)?; + + while let Some(batch) = stream.next().await{ + yield RecordBatch::try_from_df_record_batch(schema_to_move.clone(),batch.context(PollStreamSnafu)?)? + } + } + }, + ); + let record_batch_stream = RecordBatchStreamAdaptor { schema, stream }; + + Ok(Box::pin(record_batch_stream)) } fn supports_filters_pushdown( diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 0742a6ba532a..d5918e418970 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -27,7 +27,9 @@ use common_error::ext::BoxedError; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult}; -use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::{ + RecordBatch, RecordBatchStream, RecordBatchStreamAdaptor, SendableRecordBatchStream, +}; use common_telemetry::{logging, warn}; use datatypes::schema::Schema; use futures::task::{Context, Poll}; @@ -210,7 +212,7 @@ impl Table for MitoTable { } }); - let stream = Box::pin(ChunkStream { schema, stream }); + let stream = Box::pin(RecordBatchStreamAdaptor { schema, stream }); Ok(Arc::new(StreamScanAdapter::new(stream))) } @@ -286,7 +288,7 @@ impl Table for MitoTable { } }); - Ok(Box::pin(ChunkStream { schema, stream })) + Ok(Box::pin(RecordBatchStreamAdaptor { schema, stream })) } fn supports_filters_pushdown(&self, filters: &[&Expr]) -> TableResult> { @@ -416,25 +418,6 @@ impl Table for MitoTable { } } -struct ChunkStream { - schema: SchemaRef, - stream: Pin> + Send>>, -} - -impl RecordBatchStream for ChunkStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -impl Stream for ChunkStream { - type Item = RecordBatchResult; - - fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_next(ctx) - } -} - #[inline] fn column_qualified_name(table_name: &str, region_name: &str, column_name: &str) -> String { format!("{table_name}.{region_name}.{column_name}") diff --git a/tests/cases/standalone/common/optimizer/order_by.result b/tests/cases/standalone/common/optimizer/order_by.result index d661c83068ff..87f57f31e7c6 100644 --- a/tests/cases/standalone/common/optimizer/order_by.result +++ b/tests/cases/standalone/common/optimizer/order_by.result @@ -27,7 +27,8 @@ explain select * from numbers order by number asc; +---------------+------------------------------------------+ | logical_plan | Sort: numbers.number ASC NULLS LAST | | | TableScan: numbers projection=[number] | -| physical_plan | ExecutionPlan(PlaceHolder) | +| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | +| | ExecutionPlan(PlaceHolder) | | | | +---------------+------------------------------------------+ @@ -47,14 +48,15 @@ explain select * from numbers order by number desc limit 10; explain select * from numbers order by number asc limit 10; -+---------------+-------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------+ -| logical_plan | Limit: skip=0, fetch=10 | -| | Sort: numbers.number ASC NULLS LAST, fetch=10 | -| | TableScan: numbers projection=[number] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | ExecutionPlan(PlaceHolder) | -| | | -+---------------+-------------------------------------------------+ ++---------------+------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------+ +| logical_plan | Limit: skip=0, fetch=10 | +| | Sort: numbers.number ASC NULLS LAST, fetch=10 | +| | TableScan: numbers projection=[number] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] | +| | ExecutionPlan(PlaceHolder) | +| | | ++---------------+------------------------------------------------------+ From 3714414d60b1e747306ecdcaa10c220a358729ec Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 24 May 2023 20:15:20 +0800 Subject: [PATCH 09/17] impl project record batch Signed-off-by: Ruihang Xia --- src/catalog/src/information_schema.rs | 36 +++++++++++++---- src/catalog/src/lib.rs | 1 + src/common/recordbatch/src/adapter.rs | 2 +- src/common/recordbatch/src/error.rs | 9 ++++- src/common/recordbatch/src/recordbatch.rs | 24 +++++++++++- src/datatypes/src/error.rs | 6 +++ src/datatypes/src/lib.rs | 2 + src/datatypes/src/schema.rs | 47 +++++++++++++++++------ 8 files changed, 104 insertions(+), 23 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 9cc8b6636845..5e03c635ad20 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -21,8 +21,9 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; use datatypes::schema::SchemaRef; +use futures_util::StreamExt; use store_api::storage::ScanRequest; use table::{Result as TableResult, Table, TableRef}; @@ -129,12 +130,31 @@ impl Table for InformationTable { unimplemented!() } - async fn scan_to_stream( - &self, - _request: ScanRequest, - ) -> TableResult { - // TODO(ruihang): remove the second unwrap - let stream = self.stream.lock().unwrap().take().unwrap(); - Ok(stream) + async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { + let projection = request.projection; + let stream = self + .stream + .lock() + .unwrap() + .take() + .unwrap() + .map(move |batch| { + batch + .map(|batch| { + if let Some(projection) = &projection { + let projected = batch.try_project(projection); + println!("{:?}", projected); + projected + } else { + Ok(batch) + } + }) + .flatten() + }); + let stream = RecordBatchStreamAdaptor { + schema: self.schema(), + stream: Box::pin(stream), + }; + Ok(Box::pin(stream)) } } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index ad8a1735ccad..5f4358e7df9f 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] +#![feature(result_flattening)] use std::any::Any; use std::collections::HashMap; diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index f9da1e422289..2d3fb7a47d94 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -111,7 +111,7 @@ impl Stream for DfRecordBatchStreamAdapter { } } -/// DataFusion SendableRecordBatchStream -> Greptime RecordBatchStream +/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream] pub struct RecordBatchStreamAdapter { schema: SchemaRef, stream: DfSendableRecordBatchStream, diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 53a63ded23d8..41b76f55d260 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -70,6 +70,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to project Arrow RecordBatch, source: {}", source))] + ProjectArrowRecordBatch { + source: datatypes::arrow::error::ArrowError, + location: Location, + }, + #[snafu(display("Column {} not exists in table {}", column_name, table_name))] ColumnNotExists { column_name: String, @@ -101,7 +107,8 @@ impl ErrorExt for Error { | Error::PollStream { .. } | Error::Format { .. } | Error::InitRecordbatchStream { .. } - | Error::ColumnNotExists { .. } => StatusCode::Internal, + | Error::ColumnNotExists { .. } + | Error::ProjectArrowRecordBatch { .. } => StatusCode::Internal, Error::External { source } => source.status_code(), diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 721accaf1039..4a9dd6b8731d 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use datatypes::schema::SchemaRef; use datatypes::value::Value; @@ -21,7 +22,10 @@ use serde::ser::{Error, SerializeStruct}; use serde::{Serialize, Serializer}; use snafu::{OptionExt, ResultExt}; -use crate::error::{self, CastVectorSnafu, ColumnNotExistsSnafu, Result}; +use crate::error::{ + self, CastVectorSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu, + Result, +}; use crate::DfRecordBatch; /// A two-dimensional batch of column-oriented data with a defined schema. @@ -51,6 +55,24 @@ impl RecordBatch { }) } + pub fn try_project(&self, indices: &[usize]) -> Result { + let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?); + let mut columns = Vec::with_capacity(indices.len()); + for index in indices { + columns.push(self.columns[*index].clone()); + } + let df_record_batch = self + .df_record_batch + .project(indices) + .context(ProjectArrowRecordBatchSnafu)?; + + Ok(Self { + schema, + columns, + df_record_batch, + }) + } + /// Create a new [`RecordBatch`] from `schema` and `df_record_batch`. /// /// This method doesn't check the schema. diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 0caec0fed5e3..9fad402dd1e6 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -89,6 +89,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to project arrow schema, source: {}", source))] + ProjectArrowSchema { + source: arrow::error::ArrowError, + location: Location, + }, + #[snafu(display("Unsupported column default constraint expression: {}", expr))] UnsupportedDefaultExpr { expr: String, location: Location }, diff --git a/src/datatypes/src/lib.rs b/src/datatypes/src/lib.rs index 60f3b853159f..1721ee8b2964 100644 --- a/src/datatypes/src/lib.rs +++ b/src/datatypes/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(let_chains)] + pub mod arrow_array; pub mod data_type; pub mod error; diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index dde4f8e9a026..4a30996a2395 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -24,7 +24,7 @@ use datafusion_common::DFSchemaRef; use snafu::{ensure, ResultExt}; use crate::data_type::DataType; -use crate::error::{self, Error, Result}; +use crate::error::{self, Error, ProjectArrowSchemaSnafu, Result}; pub use crate::schema::column_schema::{ColumnSchema, Metadata, COMMENT_KEY, TIME_INDEX_KEY}; pub use crate::schema::constraint::ColumnDefaultConstraint; pub use crate::schema::raw::RawSchema; @@ -70,12 +70,10 @@ impl Schema { SchemaBuilder::try_from(column_schemas)?.build() } - #[inline] pub fn arrow_schema(&self) -> &Arc { &self.arrow_schema } - #[inline] pub fn column_schemas(&self) -> &[ColumnSchema] { &self.column_schemas } @@ -89,51 +87,76 @@ impl Schema { /// Retrieve the column's name by index /// # Panics /// This method **may** panic if the index is out of range of column schemas. - #[inline] pub fn column_name_by_index(&self, idx: usize) -> &str { &self.column_schemas[idx].name } - #[inline] pub fn column_index_by_name(&self, name: &str) -> Option { self.name_to_index.get(name).copied() } - #[inline] pub fn contains_column(&self, name: &str) -> bool { self.name_to_index.contains_key(name) } - #[inline] pub fn num_columns(&self) -> usize { self.column_schemas.len() } - #[inline] pub fn is_empty(&self) -> bool { self.column_schemas.is_empty() } /// Returns index of the timestamp key column. - #[inline] pub fn timestamp_index(&self) -> Option { self.timestamp_index } - #[inline] pub fn timestamp_column(&self) -> Option<&ColumnSchema> { self.timestamp_index.map(|idx| &self.column_schemas[idx]) } - #[inline] pub fn version(&self) -> u32 { self.version } - #[inline] pub fn metadata(&self) -> &HashMap { &self.arrow_schema.metadata } + + /// Generate a new projected schema + /// + /// # Panic + /// + /// If the index out ouf bound + pub fn try_project(&self, indices: &[usize]) -> Result { + let mut column_schemas = Vec::with_capacity(indices.len()); + let mut timestamp_index = None; + for index in indices { + column_schemas.push(self.column_schemas[*index].clone()); + if let Some(ts_index) = self.timestamp_index && ts_index == *index { + // Safety: column_schemas won't be empty + timestamp_index = Some(column_schemas.len() - 1); + } + } + let arrow_schema = self + .arrow_schema + .project(indices) + .context(ProjectArrowSchemaSnafu)?; + let name_to_index = column_schemas + .iter() + .enumerate() + .map(|(pos, column_schema)| (column_schema.name.clone(), pos)) + .collect(); + + Ok(Self { + column_schemas, + name_to_index, + arrow_schema: Arc::new(arrow_schema), + timestamp_index, + version: self.version, + }) + } } #[derive(Default)] From 2b78f639a8133e03e9180272e3ea9b13204a5a98 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 24 May 2023 20:27:30 +0800 Subject: [PATCH 10/17] fix information schema Signed-off-by: Ruihang Xia --- src/catalog/src/information_schema.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 5e03c635ad20..f82b14db3df3 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -24,7 +24,9 @@ use common_query::prelude::Expr; use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; use datatypes::schema::SchemaRef; use futures_util::StreamExt; +use snafu::ResultExt; use store_api::storage::ScanRequest; +use table::error::SchemaConversionSnafu; use table::{Result as TableResult, Table, TableRef}; use self::columns::InformationSchemaColumns; @@ -132,6 +134,15 @@ impl Table for InformationTable { async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { let projection = request.projection; + let projected_schema = if let Some(projection) = &projection { + Arc::new( + self.schema() + .try_project(projection) + .context(SchemaConversionSnafu)?, + ) + } else { + self.schema().clone() + }; let stream = self .stream .lock() @@ -143,7 +154,6 @@ impl Table for InformationTable { .map(|batch| { if let Some(projection) = &projection { let projected = batch.try_project(projection); - println!("{:?}", projected); projected } else { Ok(batch) @@ -152,7 +162,7 @@ impl Table for InformationTable { .flatten() }); let stream = RecordBatchStreamAdaptor { - schema: self.schema(), + schema: projected_schema, stream: Box::pin(stream), }; Ok(Box::pin(stream)) From 02cf9a5541fa1dbf5a7ec6c7b8fd5e90c8b5cb73 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 25 May 2023 11:07:02 +0800 Subject: [PATCH 11/17] fix clippy Signed-off-by: Ruihang Xia --- src/catalog/src/information_schema.rs | 19 ++++++++----------- src/catalog/src/lib.rs | 1 - src/frontend/src/table.rs | 1 - src/mito/src/table.rs | 11 +++-------- 4 files changed, 11 insertions(+), 21 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index f82b14db3df3..277c5a3e3f0c 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -141,7 +141,7 @@ impl Table for InformationTable { .context(SchemaConversionSnafu)?, ) } else { - self.schema().clone() + self.schema() }; let stream = self .stream @@ -150,16 +150,13 @@ impl Table for InformationTable { .take() .unwrap() .map(move |batch| { - batch - .map(|batch| { - if let Some(projection) = &projection { - let projected = batch.try_project(projection); - projected - } else { - Ok(batch) - } - }) - .flatten() + batch.and_then(|batch| { + if let Some(projection) = &projection { + batch.try_project(projection) + } else { + Ok(batch) + } + }) }); let stream = RecordBatchStreamAdaptor { schema: projected_schema, diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 5f4358e7df9f..ad8a1735ccad 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(assert_matches)] -#![feature(result_flattening)] use std::any::Any; use std::collections::HashMap; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index e6428253d5ea..fe9c4b91c20b 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -50,7 +50,6 @@ use store_api::storage::{RegionNumber, ScanRequest}; use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef}; use table::requests::{AlterKind, AlterTableRequest, DeleteRequest, InsertRequest}; -use table::table::scan::StreamScanAdapter; use table::table::AlterContext; use table::{meter_insert_request, Table}; use tokio::sync::RwLock; diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index d5918e418970..244dd56a1644 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -17,7 +17,6 @@ pub mod test_util; use std::any::Any; use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use arc_swap::ArcSwap; @@ -26,14 +25,10 @@ use common_datasource::compression::CompressionType; use common_error::ext::BoxedError; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; -use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult}; -use common_recordbatch::{ - RecordBatch, RecordBatchStream, RecordBatchStreamAdaptor, SendableRecordBatchStream, -}; +use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::{RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream}; use common_telemetry::{logging, warn}; use datatypes::schema::Schema; -use futures::task::{Context, Poll}; -use futures::Stream; use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; @@ -236,7 +231,7 @@ impl Table for MitoTable { .transform_projection(region, request.projection.clone()) .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; - let filters = request.filters.clone().into(); + let filters = request.filters.clone(); let scan_request = ScanRequest { projection, From ae0488bc7c34660f9218b610b3818f75b86c5033 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 29 May 2023 11:00:49 +0800 Subject: [PATCH 12/17] resolve CR comments Signed-off-by: Ruihang Xia --- src/catalog/src/information_schema.rs | 6 +++--- src/catalog/src/information_schema/columns.rs | 8 +++----- src/catalog/src/information_schema/tables.rs | 8 +++----- src/datatypes/src/schema.rs | 5 ++--- src/table/src/error.rs | 7 ++++++- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 277c5a3e3f0c..3b2abf19fb23 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -24,9 +24,9 @@ use common_query::prelude::Expr; use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; use datatypes::schema::SchemaRef; use futures_util::StreamExt; -use snafu::ResultExt; +use snafu::{ResultExt, OptionExt}; use store_api::storage::ScanRequest; -use table::error::SchemaConversionSnafu; +use table::error::{DuplicatedExecuteCallSnafu, SchemaConversionSnafu}; use table::{Result as TableResult, Table, TableRef}; use self::columns::InformationSchemaColumns; @@ -148,7 +148,7 @@ impl Table for InformationTable { .lock() .unwrap() .take() - .unwrap() + .context(DuplicatedExecuteCallSnafu)? .map(move |batch| { batch.and_then(|batch| { if let Some(projection) = &projection { diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 7b5b4a55ab79..60b19b817e82 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -86,11 +86,9 @@ impl InformationSchemaColumns { }), )); Ok(Box::pin( - RecordBatchStreamAdapter::try_new(Box::pin(DfRecordBatchStreamAdapter::new( - schema, stream, - ))) - .map_err(BoxedError::new) - .context(InternalSnafu)?, + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, )) } } diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index f630af833e1d..544a74bdfd2a 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -77,11 +77,9 @@ impl InformationSchemaTables { }), )); Ok(Box::pin( - RecordBatchStreamAdapter::try_new(Box::pin(DfRecordBatchStreamAdapter::new( - schema, stream, - ))) - .map_err(BoxedError::new) - .context(InternalSnafu)?, + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, )) } } diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 4a30996a2395..6372a9f16487 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -133,11 +133,10 @@ impl Schema { let mut column_schemas = Vec::with_capacity(indices.len()); let mut timestamp_index = None; for index in indices { - column_schemas.push(self.column_schemas[*index].clone()); if let Some(ts_index) = self.timestamp_index && ts_index == *index { - // Safety: column_schemas won't be empty - timestamp_index = Some(column_schemas.len() - 1); + timestamp_index = Some(column_schemas.len()); } + column_schemas.push(self.column_schemas[*index].clone()); } let arrow_schema = self .arrow_schema diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 31e411a4cd8e..0d6ec18e8c19 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -79,6 +79,9 @@ pub enum Error { location: Location, }, + #[snafu(display("Duplicated call to plan execute method"))] + DuplicatedExecuteCall { location: Location }, + #[snafu(display( "Not allowed to remove index column {} from table {}", column_name, @@ -141,7 +144,9 @@ impl ErrorExt for Error { Error::RemoveColumnInIndex { .. } | Error::BuildColumnDescriptor { .. } => { StatusCode::InvalidArguments } - Error::TablesRecordBatch { .. } => StatusCode::Unexpected, + Error::TablesRecordBatch { .. } | Error::DuplicatedExecuteCall { .. } => { + StatusCode::Unexpected + } Error::ColumnExists { .. } => StatusCode::TableColumnExists, Error::SchemaBuild { source, .. } => source.status_code(), Error::TableOperation { source } => source.status_code(), From 80cd8f9c2559f581d025bf6bb38697e71c4a5f92 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 29 May 2023 11:03:04 +0800 Subject: [PATCH 13/17] remove one todo Signed-off-by: Ruihang Xia --- src/common/recordbatch/src/adapter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 2d3fb7a47d94..4e7c90fcb5ab 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -160,7 +160,6 @@ enum AsyncRecordBatchStreamAdapterState { Failed, } -// TODO(ruihang): figure out what this adapter adapts to. pub struct AsyncRecordBatchStreamAdapter { schema: SchemaRef, state: AsyncRecordBatchStreamAdapterState, From 07a24f109f36cc1abc4488edbf68018a0749dd68 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 29 May 2023 11:16:49 +0800 Subject: [PATCH 14/17] fix errors generated by merge commit Signed-off-by: Ruihang Xia --- Cargo.lock | 6 +++--- src/catalog/src/information_schema/columns.rs | 2 +- src/catalog/src/information_schema/tables.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2dce7e8c6e03..7cbe0b0adf45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5221,7 +5221,7 @@ dependencies = [ "num_cpus", "once_cell", "parking_lot", - "quanta 0.11.0", + "quanta 0.11.1", "rustc_version 0.4.0", "scheduled-thread-pool", "skeptic", @@ -6700,9 +6700,9 @@ dependencies = [ [[package]] name = "quanta" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cc73c42f9314c4bdce450c77e6f09ecbddefbeddb1b5979ded332a3913ded33" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" dependencies = [ "crossbeam-utils", "libc", diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 60b19b817e82..56c9e7105887 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -76,7 +76,7 @@ impl InformationSchemaColumns { let schema = self.schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( - schema.clone(), + schema, futures::stream::once(async move { builder .make_tables() diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 544a74bdfd2a..a7fb51d37be0 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -67,7 +67,7 @@ impl InformationSchemaTables { let schema = self.schema().clone(); let mut builder = self.builder(); let stream = Box::pin(DfRecordBatchStreamAdapter::new( - schema.clone(), + schema, futures::stream::once(async move { builder .make_tables() From 96236b5ccf29d5822467ce3f1c8b0abffa53fbd5 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 29 May 2023 11:47:34 +0800 Subject: [PATCH 15/17] add output_ordering method to record batch stream Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/common/recordbatch/src/lib.rs | 11 ++++++++++ src/store-api/Cargo.toml | 1 + src/store-api/src/storage.rs | 2 +- src/store-api/src/storage/requests.rs | 8 +------- src/table/src/table/adapter.rs | 29 ++++++++++++++++++++++++--- 6 files changed, 41 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7cbe0b0adf45..10f761dbc235 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8654,6 +8654,7 @@ dependencies = [ "common-base", "common-error", "common-query", + "common-recordbatch", "common-time", "datatypes", "derive_builder 0.11.2", diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index c38d36305622..632a88267eb6 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use datafusion::physical_plan::memory::MemoryStream; pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::arrow::compute::SortOptions; pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow::util::pretty; use datatypes::prelude::VectorRef; @@ -34,10 +35,20 @@ use snafu::{ensure, ResultExt}; pub trait RecordBatchStream: Stream> { fn schema(&self) -> SchemaRef; + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } } pub type SendableRecordBatchStream = Pin>; +#[derive(Debug, Clone, Copy)] +pub struct OrderOption { + pub index: usize, + pub options: SortOptions, +} + /// EmptyRecordBatchStream can be used to create a RecordBatchStream /// that will produce no results pub struct EmptyRecordBatchStream { diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 03bd025810bc..4834af37bb94 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -10,6 +10,7 @@ bytes = "1.1" common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-query = { path = "../common/query" } +common-recordbatch = { path = "../common/recordbatch" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } derive_builder = "0.11" diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index b089a2573f13..d8978db21f48 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -36,7 +36,7 @@ pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine} pub use self::metadata::RegionMeta; pub use self::region::{FlushContext, FlushReason, Region, RegionStat, WriteContext}; pub use self::requests::{ - AddColumn, AlterOperation, AlterRequest, GetRequest, OrderOption, ScanRequest, WriteRequest, + AddColumn, AlterOperation, AlterRequest, GetRequest, ScanRequest, WriteRequest, }; pub use self::responses::{GetResponse, ScanResponse, WriteResponse}; pub use self::snapshot::{ReadContext, Snapshot}; diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 8fa9de1c38d1..159f7b983a28 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; use common_error::ext::ErrorExt; use common_query::logical_plan::Expr; -use datatypes::arrow::compute::SortOptions; +use common_recordbatch::OrderOption; use datatypes::vectors::VectorRef; use crate::storage::{ColumnDescriptor, RegionDescriptor, SequenceNumber}; @@ -59,12 +59,6 @@ pub struct ScanRequest { pub limit: Option, } -#[derive(Debug, Clone, Copy)] -pub struct OrderOption { - pub index: usize, - pub options: SortOptions, -} - #[derive(Debug)] pub struct GetRequest {} diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 53f3a9600e2a..a9e1b5a4c5f0 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -18,13 +18,16 @@ use std::sync::{Arc, Mutex}; use common_query::logical_plan::Expr; use common_query::physical_plan::DfPhysicalPlanAdapter; use common_query::DfPhysicalPlan; +use common_recordbatch::OrderOption; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::datasource::datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown; use datafusion::datasource::{TableProvider, TableType as DfTableType}; use datafusion::error::Result as DfResult; use datafusion::execution::context::SessionState; use datafusion_expr::expr::Expr as DfExpr; -use store_api::storage::{OrderOption, ScanRequest}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::PhysicalSortExpr; +use store_api::storage::ScanRequest; use super::scan::StreamScanAdapter; use crate::table::{TableRef, TableType}; @@ -86,8 +89,28 @@ impl TableProvider for DfTableProviderAdapter { request.clone() }; let stream = self.table.scan_to_stream(request).await?; - let stream_adapter = Arc::new(StreamScanAdapter::new(stream)); - Ok(Arc::new(DfPhysicalPlanAdapter(stream_adapter))) + + // build sort physical expr + let schema = stream.schema(); + let sort_expr = stream.output_ordering().map(|order_opts| { + order_opts + .iter() + .map(|order_opt| { + let col_name = schema.column_name_by_index(order_opt.index); + let col_expr = Arc::new(Column::new(col_name, order_opt.index)); + PhysicalSortExpr { + expr: col_expr, + options: order_opt.options, + } + }) + .collect::>() + }); + + let mut stream_adapter = StreamScanAdapter::new(stream); + if let Some(sort_expr) = sort_expr { + stream_adapter = stream_adapter.with_output_ordering(sort_expr); + } + Ok(Arc::new(DfPhysicalPlanAdapter(Arc::new(stream_adapter)))) } fn supports_filters_pushdown( From 6de07077b7bff29f51ce407162c3af1b3e888897 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 29 May 2023 14:53:37 +0800 Subject: [PATCH 16/17] fix rustfmt Signed-off-by: Ruihang Xia --- src/catalog/src/information_schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 3b2abf19fb23..bd42cf37cb69 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -24,7 +24,7 @@ use common_query::prelude::Expr; use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; use datatypes::schema::SchemaRef; use futures_util::StreamExt; -use snafu::{ResultExt, OptionExt}; +use snafu::{OptionExt, ResultExt}; use store_api::storage::ScanRequest; use table::error::{DuplicatedExecuteCallSnafu, SchemaConversionSnafu}; use table::{Result as TableResult, Table, TableRef}; From 892d9f57d16a6d9b39833b769dcbd4d68f1cb753 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 29 May 2023 18:04:13 +0800 Subject: [PATCH 17/17] enhance error types Signed-off-by: Ruihang Xia --- src/catalog/src/information_schema.rs | 4 +++- src/common/recordbatch/src/error.rs | 9 ++++++++- src/common/recordbatch/src/recordbatch.rs | 10 ++++++---- src/table/src/error.rs | 4 ++-- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index bd42cf37cb69..95f253da1d08 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -148,7 +148,9 @@ impl Table for InformationTable { .lock() .unwrap() .take() - .context(DuplicatedExecuteCallSnafu)? + .with_context(|| DuplicatedExecuteCallSnafu { + table: self.table_info().name.clone(), + })? .map(move |batch| { batch.and_then(|batch| { if let Some(projection) = &projection { diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 41b76f55d260..10fee35e54bb 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -70,10 +70,17 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to project Arrow RecordBatch, source: {}", source))] + #[snafu(display( + "Failed to project Arrow RecordBatch with schema {:?} and projection {:?}, source: {}", + schema, + projection, + source + ))] ProjectArrowRecordBatch { source: datatypes::arrow::error::ArrowError, location: Location, + schema: datatypes::schema::SchemaRef, + projection: Vec, }, #[snafu(display("Column {} not exists in table {}", column_name, table_name))] diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 4a9dd6b8731d..c524840ff5c6 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -61,10 +61,12 @@ impl RecordBatch { for index in indices { columns.push(self.columns[*index].clone()); } - let df_record_batch = self - .df_record_batch - .project(indices) - .context(ProjectArrowRecordBatchSnafu)?; + let df_record_batch = self.df_record_batch.project(indices).with_context(|_| { + ProjectArrowRecordBatchSnafu { + schema: self.schema.clone(), + projection: indices.to_vec(), + } + })?; Ok(Self { schema, diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 0d6ec18e8c19..e377099c0027 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -79,8 +79,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Duplicated call to plan execute method"))] - DuplicatedExecuteCall { location: Location }, + #[snafu(display("Duplicated call to plan execute method. table: {}", table))] + DuplicatedExecuteCall { location: Location, table: String }, #[snafu(display( "Not allowed to remove index column {} from table {}",