Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make insert plan schema correct and clear #2642

Merged
merged 5 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/datasources/src/lance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use lance::dataset::{WriteMode, WriteParams};
use lance::Dataset;
use protogen::metastore::types::options::StorageOptions;

use crate::common::util::create_count_record_batch;
use crate::common::util::{create_count_record_batch, COUNT_SCHEMA};

pub struct LanceTable {
dataset: Dataset,
Expand Down Expand Up @@ -117,7 +117,7 @@ impl ExecutionPlan for LanceInsertExecPlan {
}

fn schema(&self) -> SchemaRef {
TableProvider::schema(&self.dataset)
COUNT_SCHEMA.clone()
}

fn output_partitioning(&self) -> Partitioning {
Expand Down Expand Up @@ -152,7 +152,7 @@ impl ExecutionPlan for LanceInsertExecPlan {
) -> datafusion::error::Result<SendableRecordBatchStream> {
let mut stream = execute_stream(self.input.clone(), ctx)?.chunks(32);
let mut ds = self.dataset.clone();
let schema = self.schema().clone();
let schema = self.input.schema();

Ok(Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
Expand Down
14 changes: 7 additions & 7 deletions crates/datasources/src/mongodb/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@ use super::errors::{MongoDbError, Result};
use crate::bson::builder::RecordStructBuilder;

#[derive(Debug)]
pub struct MongoDbBsonExec {
pub struct MongoDbQueryExecPlan {
cursor: Mutex<Option<Cursor<RawDocumentBuf>>>,
schema: Arc<ArrowSchema>,
limit: Option<usize>,
metrics: ExecutionPlanMetricsSet,
estimated_rows: u64,
}

impl MongoDbBsonExec {
impl MongoDbQueryExecPlan {
pub fn new(
cursor: Mutex<Option<Cursor<RawDocumentBuf>>>,
schema: Arc<ArrowSchema>,
limit: Option<usize>,
estimated_rows: u64,
) -> MongoDbBsonExec {
MongoDbBsonExec {
) -> MongoDbQueryExecPlan {
MongoDbQueryExecPlan {
cursor,
schema,
limit,
Expand All @@ -56,7 +56,7 @@ impl MongoDbBsonExec {
}
}

impl ExecutionPlan for MongoDbBsonExec {
impl ExecutionPlan for MongoDbQueryExecPlan {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -130,9 +130,9 @@ impl ExecutionPlan for MongoDbBsonExec {
}
}

impl DisplayAs for MongoDbBsonExec {
impl DisplayAs for MongoDbQueryExecPlan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MongoBsonExec")
write!(f, "MongoDbQueryExec")
}
}

Expand Down
7 changes: 3 additions & 4 deletions crates/datasources/src/mongodb/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures::StreamExt;
use mongodb::bson::RawDocumentBuf;
use mongodb::Collection;

use crate::common::util::create_count_record_batch;
use crate::common::util::{create_count_record_batch, COUNT_SCHEMA};

pub struct MongoDbInsertExecPlan {
collection: Collection<RawDocumentBuf>,
Expand Down Expand Up @@ -57,7 +57,7 @@ impl ExecutionPlan for MongoDbInsertExecPlan {
}

fn schema(&self) -> SchemaRef {
self.input.schema()
COUNT_SCHEMA.clone()
}

fn output_partitioning(&self) -> Partitioning {
Expand Down Expand Up @@ -97,11 +97,10 @@ impl ExecutionPlan for MongoDbInsertExecPlan {
}

let mut stream = execute_stream(self.input.clone(), ctx)?;
let schema = self.input.schema().clone();
let coll = self.collection.clone();

Ok(Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
self.input.schema(),
futures::stream::once(async move {
let mut count: u64 = 0;
while let Some(batch) = stream.next().await {
Expand Down
4 changes: 2 additions & 2 deletions crates/datasources/src/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::debug;

use crate::bson::array_to_bson;
use crate::mongodb::errors::{MongoDbError, Result};
use crate::mongodb::exec::MongoDbBsonExec;
use crate::mongodb::exec::MongoDbQueryExecPlan;
use crate::mongodb::infer::TableSampler;

/// Field name in mongo for uniquely identifying a record. Some special handling
Expand Down Expand Up @@ -326,7 +326,7 @@ impl TableProvider for MongoDbTableProvider {
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?,
));
Ok(Arc::new(MongoDbBsonExec::new(
Ok(Arc::new(MongoDbQueryExecPlan::new(
cursor,
schema,
limit,
Expand Down
21 changes: 5 additions & 16 deletions crates/datasources/src/native/insert.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::any::Any;
use std::sync::Arc;

use datafusion::arrow::array::UInt64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::SessionState;
use datafusion::execution::TaskContext;
Expand All @@ -24,6 +22,8 @@ use deltalake::protocol::SaveMode;
use deltalake::table::state::DeltaTableState;
use futures::StreamExt;

use crate::common::util::{create_count_record_batch, COUNT_SCHEMA};

/// An execution plan for inserting data into a delta table.
#[derive(Debug)]
pub struct NativeTableInsertExec {
Expand All @@ -49,21 +49,13 @@ impl NativeTableInsertExec {
}
}

fn output_schema() -> Arc<ArrowSchema> {
Arc::new(ArrowSchema::new(vec![Field::new(
"count",
DataType::UInt64,
false,
)]))
}

impl ExecutionPlan for NativeTableInsertExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
output_schema()
COUNT_SCHEMA.clone()
}

fn output_partitioning(&self) -> Partitioning {
Expand Down Expand Up @@ -135,10 +127,7 @@ impl ExecutionPlan for NativeTableInsertExec {
.map(|metrics| metrics.output_rows().unwrap_or_default())
.unwrap_or_default();

let arr = UInt64Array::from_value(count as u64, 1);
let batch = RecordBatch::try_new(output_schema(), vec![Arc::new(arr)])?;

Ok(batch)
Ok(create_count_record_batch(count as u64))
})
.boxed();

Expand Down
4 changes: 2 additions & 2 deletions crates/datasources/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ use tokio_postgres::types::{FromSql, Type as PostgresType};
use tokio_postgres::{Client, Config, Connection, CopyOutStream, NoTls, Socket};
use tracing::{debug, warn};

use self::query_exec::PostgresQueryExec;
use self::query_exec::PostgresInsertExec;
use crate::common::ssh::key::SshKey;
use crate::common::ssh::session::{SshTunnelAccess, SshTunnelSession};
use crate::common::util::{self, create_count_record_batch};
Expand Down Expand Up @@ -736,7 +736,7 @@ impl TableProvider for PostgresTableProvider {

debug!(%query, "inserting into postgres datasource");

let exec = PostgresQueryExec::new(query, self.state.clone());
let exec = PostgresInsertExec::new(query, self.state.clone());
Ok(Arc::new(exec))
}
}
Expand Down
12 changes: 6 additions & 6 deletions crates/datasources/src/postgres/query_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ use super::PostgresAccessState;
use crate::common::util::{create_count_record_batch, COUNT_SCHEMA};

#[derive(Debug)]
pub struct PostgresQueryExec {
pub struct PostgresInsertExec {
query: String,
state: Arc<PostgresAccessState>,
metrics: ExecutionPlanMetricsSet,
}

impl PostgresQueryExec {
impl PostgresInsertExec {
pub fn new(query: String, state: Arc<PostgresAccessState>) -> Self {
PostgresQueryExec {
PostgresInsertExec {
query,
state,
metrics: ExecutionPlanMetricsSet::new(),
}
}
}

impl ExecutionPlan for PostgresQueryExec {
impl ExecutionPlan for PostgresInsertExec {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -105,9 +105,9 @@ impl ExecutionPlan for PostgresQueryExec {
}
}

impl DisplayAs for PostgresQueryExec {
impl DisplayAs for PostgresInsertExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "PostgresQueryExec(query = {})", self.query)
write!(f, "PostgresInsertExec(query = {})", self.query)
}
}

Expand Down
Loading