Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support inserts into mongodb #2605

Merged
merged 20 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,8 @@ jobs:
path: "sqllogictests_mongodb/*"
prepare: |
./scripts/prepare-testdata.sh
export MONGO_CONN_STRING=$(./scripts/create-test-mongo-db.sh)
export MONGO_CONN_STRING=$(./scripts/create-test-mongodb.sh)
./scripts/create-fixture-mongodb.sh
- name: Sqlserver
path: "sqllogictests_sqlserver/*"
prepare: |
Expand All @@ -641,8 +642,12 @@ jobs:

just slt-bin ${{matrix.settings.path}}

[ "${{matrix.settings.name}}" = "MongoDB" ] && ./scripts/create-fixture-mongodb.sh

just slt-bin --protocol=rpc --exclude '*/tunnels/ssh' ${{matrix.settings.path}}

[ "${{matrix.settings.name}}" = "MongoDB" ] && ./scripts/create-fixture-mongodb.sh

# for sqlserver, skip flightsql because the suite takes 4-5
# minutes, and Sqlserver is the longest/last task to finish
[ "${{matrix.settings.name}}" = "Sqlserver" ] && exit 0
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ DROP DATABASE my_pg;
| PostgreSQL | ✅ | ✅ | ✅ | ✅ | ✅ |
| MariaDB _(via mysql)_ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Microsoft SQL Server | ✅ | 🚧 | ✅ | ✅ | ✅ |
| MongoDB | ✅ | 🚧 | ✅ | ✅ | ✅ |
| MongoDB | ✅ | | ✅ | ✅ | ✅ |
| Snowflake | ✅ | 🚧 | ✅ | ✅ | ✅ |
| BigQuery | ✅ | 🚧 | ✅ | ✅ | ✅ |
| Cassandra/ScyllaDB | ✅ | 🚧 | ✅ | ✅ | ✅ |
Expand Down
48 changes: 39 additions & 9 deletions crates/datasources/src/bson/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use datafusion::arrow::datatypes::{
TimestampNanosecondType,
};
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;

pub struct BsonBatchConverter {
batch: StructArray,
Expand All @@ -70,12 +71,24 @@ impl BsonBatchConverter {
}
}

pub fn from_record_batch(value: RecordBatch) -> Self {
Self::new(
StructArray::new(
value.schema().fields.clone(),
value.columns().to_vec(),
None,
),
value.schema().fields.clone(),
)
}

fn setup(&mut self) -> Result<(), ArrowError> {
for col in self.batch.columns().iter() {
self.columns
.push(array_to_bson(col).map_err(|e| ArrowError::from_external_error(Box::new(e)))?)
if !self.started {
for col in self.batch.columns().iter() {
self.columns.push(array_to_bson(col)?)
}
self.started = true
}
self.started = true;
Ok(())
}
}
Expand All @@ -84,10 +97,8 @@ impl Iterator for BsonBatchConverter {
type Item = Result<bson::Document, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
if !self.started {
if let Err(e) = self.setup() {
return Some(Err(e));
}
if let Err(e) = self.setup() {
return Some(Err(e));
}

if self.row >= self.batch.len() {
Expand All @@ -96,7 +107,26 @@ impl Iterator for BsonBatchConverter {

let mut doc = bson::Document::new();
for (i, field) in self.schema.iter().enumerate() {
doc.insert(field.to_string(), self.columns[i][self.row].to_owned());
let value = &self.columns[i][self.row];
match (field.as_str(), value) {
("_id", bson::Bson::Binary(v)) => {
// if we have a binary-typed _id field where the
// value is empty, this is (almost certainly the
// result of an unpopulated object_id field in a
// round-trip from mongodb where an insert in GlareDB omitted an generating an object id.)
if v.as_raw_binary().bytes.is_empty() {
doc.insert(
field.to_string(),
bson::Bson::ObjectId(bson::oid::ObjectId::new()),
);
} else {
doc.insert(field.to_string(), value.to_owned());
}
}
_ => {
doc.insert(field.to_string(), value.to_owned());
}
}
}

self.row += 1;
Expand Down
141 changes: 141 additions & 0 deletions crates/datasources/src/mongodb/insert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use std::any::Any;
use std::sync::Arc;

use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
execute_stream,
DisplayAs,
DisplayFormatType,
ExecutionPlan,
Partitioning,
SendableRecordBatchStream,
Statistics,
};
use futures::StreamExt;
use mongodb::bson::RawDocumentBuf;
use mongodb::Collection;

use crate::common::util::create_count_record_batch;

pub struct MongoDbInsertExecPlan {
collection: Collection<RawDocumentBuf>,
input: Arc<dyn ExecutionPlan>,
metrics: ExecutionPlanMetricsSet,
}

impl MongoDbInsertExecPlan {
pub fn new(collection: Collection<RawDocumentBuf>, input: Arc<dyn ExecutionPlan>) -> Self {
Self {
collection,
input,
metrics: ExecutionPlanMetricsSet::new(),
}
}
}

impl DisplayAs for MongoDbInsertExecPlan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "MongoDbInsertExecPlan")
}
}

impl std::fmt::Debug for MongoDbInsertExecPlan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MongoDbInsertExecPlan: {:?}", self.schema())
}
}

impl ExecutionPlan for MongoDbInsertExecPlan {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.input.schema()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Vec::new()
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(self)
} else {
Err(DataFusionError::Execution(
"cannot replace children for MongoDbInsertExecPlan".to_string(),
))
}
}

fn execute(
&self,
partition: usize,
ctx: Arc<TaskContext>,
) -> datafusion::error::Result<SendableRecordBatchStream> {
if partition != 0 {
return Err(DataFusionError::Execution(
"cannot partition mongodb insert exec".to_string(),
));
}

let mut stream = execute_stream(self.input.clone(), ctx)?;
let schema = self.input.schema().clone();
let coll = self.collection.clone();

Ok(Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
futures::stream::once(async move {
let mut count: u64 = 0;
while let Some(batch) = stream.next().await {
let rb = batch?;

let mut docs = Vec::with_capacity(rb.num_rows());
let converted = crate::bson::BsonBatchConverter::from_record_batch(rb);

for d in converted.into_iter() {
let doc = d.map_err(|e| DataFusionError::Execution(e.to_string()))?;

docs.push(
RawDocumentBuf::from_document(&doc)
.map_err(|e| DataFusionError::Execution(e.to_string()))?,
);
}

count += coll
.insert_many(docs, None)
.await
.map(|res| res.inserted_ids.len())
.map_err(|e| DataFusionError::External(Box::new(e)))?
as u64;
}
Ok::<RecordBatch, DataFusionError>(create_count_record_batch(count))
}),
)))
}

fn statistics(&self) -> datafusion::error::Result<Statistics> {
Ok(Statistics::new_unknown(self.schema().as_ref()))
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
23 changes: 20 additions & 3 deletions crates/datasources/src/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod errors;

mod exec;
mod infer;
mod insert;

use std::any::Any;
use std::fmt::{Display, Write};
Expand All @@ -20,16 +21,16 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
use datafusion_ext::errors::ExtensionError;
use datafusion_ext::functions::VirtualLister;
use errors::{MongoDbError, Result};
use exec::MongoDbBsonExec;
use infer::TableSampler;
use mongodb::bson::spec::BinarySubtype;
use mongodb::bson::{bson, Binary, Bson, Document, RawDocumentBuf};
use mongodb::options::{ClientOptions, FindOptions};
use mongodb::{Client, Collection};
use tracing::debug;

use crate::bson::array_to_bson;
use crate::mongodb::errors::{MongoDbError, Result};
use crate::mongodb::exec::MongoDbBsonExec;
use crate::mongodb::infer::TableSampler;

/// Field name in mongo for uniquely identifying a record. Some special handling
/// needs to be done with the field when projecting.
Expand Down Expand Up @@ -332,6 +333,22 @@ impl TableProvider for MongoDbTableProvider {
self.estimated_count,
)))
}

async fn insert_into(
&self,
_state: &SessionState,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
) -> DatafusionResult<Arc<dyn ExecutionPlan>> {
if overwrite {
return Err(DataFusionError::Execution("cannot overwrite".to_string()));
}

Ok(Arc::new(insert::MongoDbInsertExecPlan::new(
self.collection.clone(),
input.clone(),
)))
}
}

fn exprs_to_mdb_query(exprs: &[Expr]) -> Result<Document, ExtensionError> {
Expand Down
11 changes: 11 additions & 0 deletions scripts/create-fixture-mongodb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env bash

set -e

CONTAINER_NAME="glaredb_mongodb_test"

CONTAINER_ID=$(docker ps -aqf "name=glaredb_mongodb_test")

docker exec "${CONTAINER_ID}" mongosh \
"${MONGO_CONN_STRING}" \
--quiet /tmp/mdb-fixture.js 1>&2
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ REPO_ROOT="$(git rev-parse --show-toplevel)"
# Copy in test data.
docker cp \
${REPO_ROOT}/testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv \
${CONTAINER_ID}:/tmp/.
${CONTAINER_ID}:/tmp/

docker cp ${REPO_ROOT}/scripts/mdb-fixture.js ${CONTAINER_ID}:/tmp/mdb-fixture.js

# Exec into container to load test data.
docker exec $CONTAINER_ID mongoimport \
Expand All @@ -40,11 +42,6 @@ docker exec $CONTAINER_ID mongoimport \
"mongodb://localhost:27017/${DB_NAME}" \
/tmp/bikeshare_stations.csv 1>&2

# insert fixture data for a null handling regression test.
docker exec $CONTAINER_ID mongosh \
"mongodb://localhost:27017/${DB_NAME}" \
--eval "db.null_test.insertMany([{a:1},{a:null}])" 1>&2

# The mongod docker container is kinda bad. The MONGO_INITDB_... environment vars
# might look like the obvious solution, but they don't work as you would expect.
#
Expand Down
9 changes: 9 additions & 0 deletions scripts/mdb-fixture.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
print("--- null_test fixture ---");
db.null_test.drop();
db.null_test.insertMany([{a:1},{a:null}]);
printjson(db.null_test.find());
print("--- insert_test fixture ---");
db.insert_test.drop();
db.insert_test.insertOne({"a":0,"b":0,"c":0});
printjson(db.insert_test.find());
print("---");
39 changes: 39 additions & 0 deletions testdata/sqllogictests_mongodb/insert.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
statement ok
CREATE EXTERNAL TABLE insert_test
FROM mongo
OPTIONS (
connection_string = '${MONGO_CONN_STRING}',
database = 'test',
collection = 'insert_test',
);

statement ok
ALTER TABLE insert_test SET ACCESS_MODE TO READ_WRITE;

query I
SELECT count(*) FROM insert_test;
----
1

statement ok
INSERT INTO insert_test (a, b, c) VALUES (1, 2, 3);

query I
SELECT count(*) FROM insert_test;
----
2

statement ok
INSERT INTO insert_test (a,b,c) VALUES (4, 5, 6);

query I
SELECT count(*) FROM insert_test;
----
3

query I
SELECT a, b, c FROM insert_test;
----
0 0 0
1 2 3
4 5 6
Loading