Skip to content

Commit

Permalink
refactor: remove Table::scan method (#1855)
Browse files Browse the repository at this point in the history
* remove scan method

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored Jun 30, 2023
1 parent 605776f commit c77b946
Show file tree
Hide file tree
Showing 16 changed files with 49 additions and 696 deletions.
16 changes: 0 additions & 16 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use std::sync::{Arc, Weak};

use async_trait::async_trait;
use common_error::prelude::BoxedError;
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
Expand Down Expand Up @@ -104,20 +102,6 @@ impl Table for InformationTable {
unreachable!("Should not call table_info() of InformationTable directly")
}

/// Scan the table and returns a SendableRecordBatchStream.
async fn scan(
&self,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
// limit can be used to reduce the amount scanned
// from the datasource as a performance optimization.
// If set, it contains the amount of rows needed by the `LogicalPlan`,
// The datasource should return *at least* this number of rows if available.
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {
unimplemented!()
}

async fn scan_to_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
let projection = request.projection;
let projected_schema = if let Some(projection) = &projection {
Expand Down
26 changes: 9 additions & 17 deletions src/catalog/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MITO_ENGINE,
SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME,
};
use common_query::logical_plan::Expr;
use common_query::physical_plan::{PhysicalPlanRef, SessionContext};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_time::util;
Expand Down Expand Up @@ -60,15 +58,6 @@ impl Table for SystemCatalogTable {
self.0.schema()
}

async fn scan(
&self,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
self.0.scan(projection, filters, limit).await
}

async fn scan_to_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
self.0.scan_to_stream(request).await
}
Expand Down Expand Up @@ -136,14 +125,17 @@ impl SystemCatalogTable {
/// Create a stream of all entries inside system catalog table
pub async fn records(&self) -> Result<SendableRecordBatchStream> {
let full_projection = None;
let ctx = SessionContext::new();
let scan = self
.scan(full_projection, &[], None)
let scan_req = ScanRequest {
sequence: None,
projection: full_projection,
filters: vec![],
output_ordering: None,
limit: None,
};
let stream = self
.scan_to_stream(scan_req)
.await
.context(error::SystemCatalogTableScanSnafu)?;
let stream = scan
.execute(0, ctx.task_ctx())
.context(error::SystemCatalogTableScanExecSnafu)?;
Ok(stream)
}
}
Expand Down
26 changes: 1 addition & 25 deletions src/file-table-engine/src/table/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use async_trait::async_trait;
use common_datasource::file_format::Format;
use common_datasource::object_store::build_backend;
use common_error::prelude::BoxedError;
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
Expand All @@ -37,7 +35,7 @@ use crate::manifest::immutable::{
read_table_manifest, write_table_manifest, ImmutableMetadata, INIT_META_VERSION,
};
use crate::manifest::table_manifest_dir;
use crate::table::format::{create_physical_plan, CreateScanPlanContext, ScanPlanConfig};
use crate::table::format::{CreateScanPlanContext, ScanPlanConfig};

#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
Expand Down Expand Up @@ -76,28 +74,6 @@ impl Table for ImmutableFileTable {
self.table_info().table_type
}

async fn scan(
&self,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {
create_physical_plan(
&self.format,
&CreateScanPlanContext::default(),
&ScanPlanConfig {
file_schema: self.schema(),
files: &self.files,
projection,
filters,
limit,
store: self.object_store.clone(),
},
)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}

async fn scan_to_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
create_stream(
&self.format,
Expand Down
12 changes: 6 additions & 6 deletions src/frontend/src/statement/copy_table_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ use common_datasource::file_format::csv::stream_to_csv;
use common_datasource::file_format::json::stream_to_json;
use common_datasource::file_format::Format;
use common_datasource::object_store::{build_backend, parse_url};
use common_query::physical_plan::SessionContext;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use object_store::ObjectStore;
use snafu::ResultExt;
use storage::sst::SstInfo;
use storage::{ParquetWriter, Source};
use store_api::storage::ScanRequest;
use table::engine::TableReference;
use table::requests::CopyTableRequest;

Expand Down Expand Up @@ -94,18 +94,18 @@ impl StatementExecutor {
.into_iter()
.collect::<Vec<_>>();

let scan_req = ScanRequest {
filters,
..Default::default()
};
let stream =
table
.scan(None, &filters, None)
.scan_to_stream(scan_req)
.await
.with_context(|_| error::CopyTableSnafu {
table_name: table_ref.to_string(),
})?;

let stream = stream
.execute(0, SessionContext::default().task_ctx())
.context(error::TableScanExecSnafu)?;

let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
let object_store =
build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
Expand Down
49 changes: 0 additions & 49 deletions src/frontend/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,55 +99,6 @@ impl Table for DistTable {
Ok(affected_rows as usize)
}

async fn scan(
&self,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
let partition_manager = self.catalog_manager.partition_manager();
let datanode_clients = self.catalog_manager.datanode_clients();

let partition_rule = partition_manager
.find_table_partition_rule(&self.table_name)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;

let regions = partition_manager
.find_regions_by_filters(partition_rule, filters)
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let datanodes = partition_manager
.find_region_datanodes(&self.table_name, regions)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;

let table_name = &self.table_name;
let mut partition_execs = Vec::with_capacity(datanodes.len());
for (datanode, _regions) in datanodes.iter() {
let client = datanode_clients.get_client(datanode).await;
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);

partition_execs.push(Arc::new(PartitionExec {
table_name: table_name.clone(),
datanode_instance,
projection: projection.cloned(),
filters: filters.to_vec(),
limit,
batches: Arc::new(RwLock::new(None)),
}));
}

let dist_scan = DistTableScan {
schema: project_schema(self.schema(), projection),
partition_execs,
};
Ok(Arc::new(dist_scan))
}

// TODO(ruihang): DistTable should not call this method directly
async fn scan_to_stream(
&self,
Expand Down
37 changes: 16 additions & 21 deletions src/mito/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//! Tests for mito table engine.
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::physical_plan::SessionContext;
use common_recordbatch::util;
use common_test_util::temp_dir::TempDir;
use datatypes::prelude::ConcreteDataType;
Expand All @@ -30,7 +29,7 @@ use storage::config::EngineConfig as StorageEngineConfig;
use storage::region::RegionImpl;
use storage::EngineImpl;
use store_api::manifest::Manifest;
use store_api::storage::ReadContext;
use store_api::storage::{ReadContext, ScanRequest};
use table::engine::region_id;
use table::metadata::TableType;
use table::requests::{
Expand Down Expand Up @@ -125,9 +124,7 @@ async fn test_column_default_constraint() {
let insert_req = new_insert_request(table_name.to_string(), columns_values);
assert_eq!(2, table.insert(insert_req).await.unwrap());

let session_ctx = SessionContext::new();
let stream = table.scan(None, &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let stream = table.scan_to_stream(ScanRequest::default()).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());

Expand Down Expand Up @@ -157,9 +154,7 @@ async fn test_insert_with_column_default_constraint() {
let insert_req = new_insert_request(table_name.to_string(), columns_values);
assert_eq!(2, table.insert(insert_req).await.unwrap());

let session_ctx = SessionContext::new();
let stream = table.scan(None, &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let stream = table.scan_to_stream(ScanRequest::default()).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());

Expand Down Expand Up @@ -257,9 +252,7 @@ async fn test_create_table_insert_scan() {
let insert_req = new_insert_request("demo".to_string(), columns_values);
assert_eq!(2, table.insert(insert_req).await.unwrap());

let session_ctx = SessionContext::new();
let stream = table.scan(None, &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let stream = table.scan_to_stream(ScanRequest::default()).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
assert_eq!(batches[0].num_columns(), 4);
Expand All @@ -279,8 +272,11 @@ async fn test_create_table_insert_scan() {
assert_eq!(tss, *batch.column(3));

// Scan with projections: cpu and memory
let stream = table.scan(Some(&vec![1, 2]), &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let scan_req = ScanRequest {
projection: Some(vec![1, 2]),
..Default::default()
};
let stream = table.scan_to_stream(scan_req).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
assert_eq!(batches[0].num_columns(), 2);
Expand All @@ -297,8 +293,11 @@ async fn test_create_table_insert_scan() {
assert_eq!(memories, *batch.column(1));

// Scan with projections: only ts
let stream = table.scan(Some(&vec![3]), &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let scan_req = ScanRequest {
projection: Some(vec![3]),
..Default::default()
};
let stream = table.scan_to_stream(scan_req).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
assert_eq!(batches[0].num_columns(), 1);
Expand Down Expand Up @@ -344,9 +343,7 @@ async fn test_create_table_scan_batches() {
let insert_req = new_insert_request("demo".to_string(), columns_values);
assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap());

let session_ctx = SessionContext::new();
let stream = table.scan(None, &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let stream = table.scan_to_stream(ScanRequest::default()).await.unwrap();
let batches = util::collect(stream).await.unwrap();
let mut total = 0;
for batch in batches {
Expand Down Expand Up @@ -856,9 +853,7 @@ async fn test_table_delete_rows() {
let del_req = DeleteRequest { key_column_values };
let _ = table.delete(del_req).await.unwrap();

let session_ctx = SessionContext::new();
let stream = table.scan(None, &[], None).await.unwrap();
let stream = stream.execute(0, session_ctx.task_ctx()).unwrap();
let stream = table.scan_to_stream(ScanRequest::default()).await.unwrap();
let batches = util::collect_batches(stream).await.unwrap();

assert_eq!(
Expand Down
Loading

0 comments on commit c77b946

Please sign in to comment.