From af0743d1f650b28924be6028fdcf46f896eddc02 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 14 Feb 2024 12:35:46 -0500 Subject: [PATCH] fix: make insert plan schema correct and clear (#2642) --- crates/datasources/src/lance/mod.rs | 6 +++--- crates/datasources/src/mongodb/exec.rs | 14 ++++++------- crates/datasources/src/mongodb/insert.rs | 7 +++---- crates/datasources/src/mongodb/mod.rs | 4 ++-- crates/datasources/src/native/insert.rs | 21 +++++-------------- crates/datasources/src/postgres/mod.rs | 4 ++-- crates/datasources/src/postgres/query_exec.rs | 12 +++++------ 7 files changed, 28 insertions(+), 40 deletions(-) diff --git a/crates/datasources/src/lance/mod.rs b/crates/datasources/src/lance/mod.rs index 73d26042e..cb825ced6 100644 --- a/crates/datasources/src/lance/mod.rs +++ b/crates/datasources/src/lance/mod.rs @@ -29,7 +29,7 @@ use lance::dataset::{WriteMode, WriteParams}; use lance::Dataset; use protogen::metastore::types::options::StorageOptions; -use crate::common::util::create_count_record_batch; +use crate::common::util::{create_count_record_batch, COUNT_SCHEMA}; pub struct LanceTable { dataset: Dataset, @@ -117,7 +117,7 @@ impl ExecutionPlan for LanceInsertExecPlan { } fn schema(&self) -> SchemaRef { - TableProvider::schema(&self.dataset) + COUNT_SCHEMA.clone() } fn output_partitioning(&self) -> Partitioning { @@ -152,7 +152,7 @@ impl ExecutionPlan for LanceInsertExecPlan { ) -> datafusion::error::Result { let mut stream = execute_stream(self.input.clone(), ctx)?.chunks(32); let mut ds = self.dataset.clone(); - let schema = self.schema().clone(); + let schema = self.input.schema(); Ok(Box::pin(RecordBatchStreamAdapter::new( schema.clone(), diff --git a/crates/datasources/src/mongodb/exec.rs b/crates/datasources/src/mongodb/exec.rs index 41aeccb15..617f9a058 100644 --- a/crates/datasources/src/mongodb/exec.rs +++ b/crates/datasources/src/mongodb/exec.rs @@ -31,7 +31,7 @@ use super::errors::{MongoDbError, Result}; use crate::bson::builder::RecordStructBuilder; #[derive(Debug)] -pub struct MongoDbBsonExec { +pub struct MongoDbQueryExecPlan { cursor: Mutex>>, schema: Arc, limit: Option, @@ -39,14 +39,14 @@ pub struct MongoDbBsonExec { estimated_rows: u64, } -impl MongoDbBsonExec { +impl MongoDbQueryExecPlan { pub fn new( cursor: Mutex>>, schema: Arc, limit: Option, estimated_rows: u64, - ) -> MongoDbBsonExec { - MongoDbBsonExec { + ) -> MongoDbQueryExecPlan { + MongoDbQueryExecPlan { cursor, schema, limit, @@ -56,7 +56,7 @@ impl MongoDbBsonExec { } } -impl ExecutionPlan for MongoDbBsonExec { +impl ExecutionPlan for MongoDbQueryExecPlan { fn as_any(&self) -> &dyn Any { self } @@ -130,9 +130,9 @@ impl ExecutionPlan for MongoDbBsonExec { } } -impl DisplayAs for MongoDbBsonExec { +impl DisplayAs for MongoDbQueryExecPlan { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "MongoBsonExec") + write!(f, "MongoDbQueryExec") } } diff --git a/crates/datasources/src/mongodb/insert.rs b/crates/datasources/src/mongodb/insert.rs index 6c59087a5..73f735be6 100644 --- a/crates/datasources/src/mongodb/insert.rs +++ b/crates/datasources/src/mongodb/insert.rs @@ -21,7 +21,7 @@ use futures::StreamExt; use mongodb::bson::RawDocumentBuf; use mongodb::Collection; -use crate::common::util::create_count_record_batch; +use crate::common::util::{create_count_record_batch, COUNT_SCHEMA}; pub struct MongoDbInsertExecPlan { collection: Collection, @@ -57,7 +57,7 @@ impl ExecutionPlan for MongoDbInsertExecPlan { } fn schema(&self) -> SchemaRef { - self.input.schema() + COUNT_SCHEMA.clone() } fn output_partitioning(&self) -> Partitioning { @@ -97,11 +97,10 @@ impl ExecutionPlan for MongoDbInsertExecPlan { } let mut stream = execute_stream(self.input.clone(), ctx)?; - let schema = self.input.schema().clone(); let coll = self.collection.clone(); Ok(Box::pin(RecordBatchStreamAdapter::new( - schema.clone(), + self.input.schema(), futures::stream::once(async move { let mut count: u64 = 0; while let Some(batch) = stream.next().await { diff --git a/crates/datasources/src/mongodb/mod.rs b/crates/datasources/src/mongodb/mod.rs index 85631d227..0a2c58140 100644 --- a/crates/datasources/src/mongodb/mod.rs +++ b/crates/datasources/src/mongodb/mod.rs @@ -29,7 +29,7 @@ use tracing::debug; use crate::bson::array_to_bson; use crate::mongodb::errors::{MongoDbError, Result}; -use crate::mongodb::exec::MongoDbBsonExec; +use crate::mongodb::exec::MongoDbQueryExecPlan; use crate::mongodb::infer::TableSampler; /// Field name in mongo for uniquely identifying a record. Some special handling @@ -326,7 +326,7 @@ impl TableProvider for MongoDbTableProvider { .await .map_err(|e| DataFusionError::External(Box::new(e)))?, )); - Ok(Arc::new(MongoDbBsonExec::new( + Ok(Arc::new(MongoDbQueryExecPlan::new( cursor, schema, limit, diff --git a/crates/datasources/src/native/insert.rs b/crates/datasources/src/native/insert.rs index 936fe882d..9bb79ff0b 100644 --- a/crates/datasources/src/native/insert.rs +++ b/crates/datasources/src/native/insert.rs @@ -1,9 +1,7 @@ use std::any::Any; use std::sync::Arc; -use datafusion::arrow::array::UInt64Array; -use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef}; -use datafusion::arrow::record_batch::RecordBatch; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::context::SessionState; use datafusion::execution::TaskContext; @@ -24,6 +22,8 @@ use deltalake::protocol::SaveMode; use deltalake::table::state::DeltaTableState; use futures::StreamExt; +use crate::common::util::{create_count_record_batch, COUNT_SCHEMA}; + /// An execution plan for inserting data into a delta table. #[derive(Debug)] pub struct NativeTableInsertExec { @@ -49,21 +49,13 @@ impl NativeTableInsertExec { } } -fn output_schema() -> Arc { - Arc::new(ArrowSchema::new(vec![Field::new( - "count", - DataType::UInt64, - false, - )])) -} - impl ExecutionPlan for NativeTableInsertExec { fn as_any(&self) -> &dyn Any { self } fn schema(&self) -> SchemaRef { - output_schema() + COUNT_SCHEMA.clone() } fn output_partitioning(&self) -> Partitioning { @@ -135,10 +127,7 @@ impl ExecutionPlan for NativeTableInsertExec { .map(|metrics| metrics.output_rows().unwrap_or_default()) .unwrap_or_default(); - let arr = UInt64Array::from_value(count as u64, 1); - let batch = RecordBatch::try_new(output_schema(), vec![Arc::new(arr)])?; - - Ok(batch) + Ok(create_count_record_batch(count as u64)) }) .boxed(); diff --git a/crates/datasources/src/postgres/mod.rs b/crates/datasources/src/postgres/mod.rs index 0bbdfe480..f86ce8e5b 100644 --- a/crates/datasources/src/postgres/mod.rs +++ b/crates/datasources/src/postgres/mod.rs @@ -75,7 +75,7 @@ use tokio_postgres::types::{FromSql, Type as PostgresType}; use tokio_postgres::{Client, Config, Connection, CopyOutStream, NoTls, Socket}; use tracing::{debug, warn}; -use self::query_exec::PostgresQueryExec; +use self::query_exec::PostgresInsertExec; use crate::common::ssh::key::SshKey; use crate::common::ssh::session::{SshTunnelAccess, SshTunnelSession}; use crate::common::util::{self, create_count_record_batch}; @@ -736,7 +736,7 @@ impl TableProvider for PostgresTableProvider { debug!(%query, "inserting into postgres datasource"); - let exec = PostgresQueryExec::new(query, self.state.clone()); + let exec = PostgresInsertExec::new(query, self.state.clone()); Ok(Arc::new(exec)) } } diff --git a/crates/datasources/src/postgres/query_exec.rs b/crates/datasources/src/postgres/query_exec.rs index 4d07ec72b..e6466c2ea 100644 --- a/crates/datasources/src/postgres/query_exec.rs +++ b/crates/datasources/src/postgres/query_exec.rs @@ -27,15 +27,15 @@ use super::PostgresAccessState; use crate::common::util::{create_count_record_batch, COUNT_SCHEMA}; #[derive(Debug)] -pub struct PostgresQueryExec { +pub struct PostgresInsertExec { query: String, state: Arc, metrics: ExecutionPlanMetricsSet, } -impl PostgresQueryExec { +impl PostgresInsertExec { pub fn new(query: String, state: Arc) -> Self { - PostgresQueryExec { + PostgresInsertExec { query, state, metrics: ExecutionPlanMetricsSet::new(), @@ -43,7 +43,7 @@ impl PostgresQueryExec { } } -impl ExecutionPlan for PostgresQueryExec { +impl ExecutionPlan for PostgresInsertExec { fn as_any(&self) -> &dyn Any { self } @@ -105,9 +105,9 @@ impl ExecutionPlan for PostgresQueryExec { } } -impl DisplayAs for PostgresQueryExec { +impl DisplayAs for PostgresInsertExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "PostgresQueryExec(query = {})", self.query) + write!(f, "PostgresInsertExec(query = {})", self.query) } }