Skip to content

Commit

Permalink
fix: make insert plan schema correct and clear (#2642)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish authored Feb 14, 2024
1 parent 564a3f0 commit af0743d
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 40 deletions.
6 changes: 3 additions & 3 deletions crates/datasources/src/lance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use lance::dataset::{WriteMode, WriteParams};
use lance::Dataset;
use protogen::metastore::types::options::StorageOptions;

use crate::common::util::create_count_record_batch;
use crate::common::util::{create_count_record_batch, COUNT_SCHEMA};

pub struct LanceTable {
dataset: Dataset,
Expand Down Expand Up @@ -117,7 +117,7 @@ impl ExecutionPlan for LanceInsertExecPlan {
}

fn schema(&self) -> SchemaRef {
TableProvider::schema(&self.dataset)
COUNT_SCHEMA.clone()
}

fn output_partitioning(&self) -> Partitioning {
Expand Down Expand Up @@ -152,7 +152,7 @@ impl ExecutionPlan for LanceInsertExecPlan {
) -> datafusion::error::Result<SendableRecordBatchStream> {
let mut stream = execute_stream(self.input.clone(), ctx)?.chunks(32);
let mut ds = self.dataset.clone();
let schema = self.schema().clone();
let schema = self.input.schema();

Ok(Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
Expand Down
14 changes: 7 additions & 7 deletions crates/datasources/src/mongodb/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@ use super::errors::{MongoDbError, Result};
use crate::bson::builder::RecordStructBuilder;

#[derive(Debug)]
pub struct MongoDbBsonExec {
pub struct MongoDbQueryExecPlan {
cursor: Mutex<Option<Cursor<RawDocumentBuf>>>,
schema: Arc<ArrowSchema>,
limit: Option<usize>,
metrics: ExecutionPlanMetricsSet,
estimated_rows: u64,
}

impl MongoDbBsonExec {
impl MongoDbQueryExecPlan {
pub fn new(
cursor: Mutex<Option<Cursor<RawDocumentBuf>>>,
schema: Arc<ArrowSchema>,
limit: Option<usize>,
estimated_rows: u64,
) -> MongoDbBsonExec {
MongoDbBsonExec {
) -> MongoDbQueryExecPlan {
MongoDbQueryExecPlan {
cursor,
schema,
limit,
Expand All @@ -56,7 +56,7 @@ impl MongoDbBsonExec {
}
}

impl ExecutionPlan for MongoDbBsonExec {
impl ExecutionPlan for MongoDbQueryExecPlan {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -130,9 +130,9 @@ impl ExecutionPlan for MongoDbBsonExec {
}
}

impl DisplayAs for MongoDbBsonExec {
impl DisplayAs for MongoDbQueryExecPlan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MongoBsonExec")
write!(f, "MongoDbQueryExec")
}
}

Expand Down
7 changes: 3 additions & 4 deletions crates/datasources/src/mongodb/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures::StreamExt;
use mongodb::bson::RawDocumentBuf;
use mongodb::Collection;

use crate::common::util::create_count_record_batch;
use crate::common::util::{create_count_record_batch, COUNT_SCHEMA};

pub struct MongoDbInsertExecPlan {
collection: Collection<RawDocumentBuf>,
Expand Down Expand Up @@ -57,7 +57,7 @@ impl ExecutionPlan for MongoDbInsertExecPlan {
}

fn schema(&self) -> SchemaRef {
self.input.schema()
COUNT_SCHEMA.clone()
}

fn output_partitioning(&self) -> Partitioning {
Expand Down Expand Up @@ -97,11 +97,10 @@ impl ExecutionPlan for MongoDbInsertExecPlan {
}

let mut stream = execute_stream(self.input.clone(), ctx)?;
let schema = self.input.schema().clone();
let coll = self.collection.clone();

Ok(Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
self.input.schema(),
futures::stream::once(async move {
let mut count: u64 = 0;
while let Some(batch) = stream.next().await {
Expand Down
4 changes: 2 additions & 2 deletions crates/datasources/src/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::debug;

use crate::bson::array_to_bson;
use crate::mongodb::errors::{MongoDbError, Result};
use crate::mongodb::exec::MongoDbBsonExec;
use crate::mongodb::exec::MongoDbQueryExecPlan;
use crate::mongodb::infer::TableSampler;

/// Field name in mongo for uniquely identifying a record. Some special handling
Expand Down Expand Up @@ -326,7 +326,7 @@ impl TableProvider for MongoDbTableProvider {
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?,
));
Ok(Arc::new(MongoDbBsonExec::new(
Ok(Arc::new(MongoDbQueryExecPlan::new(
cursor,
schema,
limit,
Expand Down
21 changes: 5 additions & 16 deletions crates/datasources/src/native/insert.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::any::Any;
use std::sync::Arc;

use datafusion::arrow::array::UInt64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::SessionState;
use datafusion::execution::TaskContext;
Expand All @@ -24,6 +22,8 @@ use deltalake::protocol::SaveMode;
use deltalake::table::state::DeltaTableState;
use futures::StreamExt;

use crate::common::util::{create_count_record_batch, COUNT_SCHEMA};

/// An execution plan for inserting data into a delta table.
#[derive(Debug)]
pub struct NativeTableInsertExec {
Expand All @@ -49,21 +49,13 @@ impl NativeTableInsertExec {
}
}

fn output_schema() -> Arc<ArrowSchema> {
Arc::new(ArrowSchema::new(vec![Field::new(
"count",
DataType::UInt64,
false,
)]))
}

impl ExecutionPlan for NativeTableInsertExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
output_schema()
COUNT_SCHEMA.clone()
}

fn output_partitioning(&self) -> Partitioning {
Expand Down Expand Up @@ -135,10 +127,7 @@ impl ExecutionPlan for NativeTableInsertExec {
.map(|metrics| metrics.output_rows().unwrap_or_default())
.unwrap_or_default();

let arr = UInt64Array::from_value(count as u64, 1);
let batch = RecordBatch::try_new(output_schema(), vec![Arc::new(arr)])?;

Ok(batch)
Ok(create_count_record_batch(count as u64))
})
.boxed();

Expand Down
4 changes: 2 additions & 2 deletions crates/datasources/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ use tokio_postgres::types::{FromSql, Type as PostgresType};
use tokio_postgres::{Client, Config, Connection, CopyOutStream, NoTls, Socket};
use tracing::{debug, warn};

use self::query_exec::PostgresQueryExec;
use self::query_exec::PostgresInsertExec;
use crate::common::ssh::key::SshKey;
use crate::common::ssh::session::{SshTunnelAccess, SshTunnelSession};
use crate::common::util::{self, create_count_record_batch};
Expand Down Expand Up @@ -736,7 +736,7 @@ impl TableProvider for PostgresTableProvider {

debug!(%query, "inserting into postgres datasource");

let exec = PostgresQueryExec::new(query, self.state.clone());
let exec = PostgresInsertExec::new(query, self.state.clone());
Ok(Arc::new(exec))
}
}
Expand Down
12 changes: 6 additions & 6 deletions crates/datasources/src/postgres/query_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ use super::PostgresAccessState;
use crate::common::util::{create_count_record_batch, COUNT_SCHEMA};

#[derive(Debug)]
pub struct PostgresQueryExec {
pub struct PostgresInsertExec {
query: String,
state: Arc<PostgresAccessState>,
metrics: ExecutionPlanMetricsSet,
}

impl PostgresQueryExec {
impl PostgresInsertExec {
pub fn new(query: String, state: Arc<PostgresAccessState>) -> Self {
PostgresQueryExec {
PostgresInsertExec {
query,
state,
metrics: ExecutionPlanMetricsSet::new(),
}
}
}

impl ExecutionPlan for PostgresQueryExec {
impl ExecutionPlan for PostgresInsertExec {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -105,9 +105,9 @@ impl ExecutionPlan for PostgresQueryExec {
}
}

impl DisplayAs for PostgresQueryExec {
impl DisplayAs for PostgresInsertExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "PostgresQueryExec(query = {})", self.query)
write!(f, "PostgresInsertExec(query = {})", self.query)
}
}

Expand Down

0 comments on commit af0743d

Please sign in to comment.