Skip to content

Commit

Permalink
feat: external table for cassandra (#2361)
Browse files Browse the repository at this point in the history
closes #2345
  • Loading branch information
universalmind303 authored Jan 8, 2024
1 parent f12fb20 commit 0b4b1fc
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 16 deletions.
2 changes: 2 additions & 0 deletions crates/datasources/src/cassandra/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub enum CassandraError {
QueryError(#[from] scylla::transport::errors::QueryError),
#[error("Unsupported DataType: {0}")]
UnsupportedDataType(String),
#[error("Table not found: {0}")]
TableNotFound(String),
}

pub type Result<T, E = CassandraError> = std::result::Result<T, E>;
33 changes: 26 additions & 7 deletions crates/datasources/src/cassandra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use self::exec::CassandraExec;

struct CassandraAccess {
pub struct CassandraAccess {
session: Session,
}

Expand All @@ -67,8 +68,12 @@ fn try_convert_dtype(ty: &ColumnType) -> Result<DataType> {
}

impl CassandraAccess {
pub async fn try_new(conn_str: impl AsRef<str>) -> Result<Self> {
let session = SessionBuilder::new().known_node(conn_str).build().await?;
pub async fn try_new(host: impl AsRef<str>) -> Result<Self> {
let session = SessionBuilder::new()
.known_node(host)
.connection_timeout(Duration::from_secs(10))
.build()
.await?;
Ok(Self { session })
}
async fn get_schema(&self, ks: &str, table: &str) -> Result<ArrowSchema> {
Expand All @@ -86,6 +91,17 @@ impl CassandraAccess {
.collect::<Result<_>>()?;
Ok(ArrowSchema::new(fields))
}
pub async fn validate_table_access(&self, ks: &str, table: &str) -> Result<()> {
let query = format!("SELECT * FROM {ks}.{table} LIMIT 1");
let res = self.session.query(query, &[]).await?;
if res.col_specs.is_empty() {
return Err(CassandraError::TableNotFound(format!(
"table {} not found in keyspace {}",
table, ks
)));
}
Ok(())
}
}

#[derive(Debug, Clone)]
Expand All @@ -97,8 +113,8 @@ pub struct CassandraTableProvider {
}

impl CassandraTableProvider {
pub async fn try_new(conn_str: String, ks: String, table: String) -> Result<Self> {
let access = CassandraAccess::try_new(conn_str).await?;
pub async fn try_new(host: String, ks: String, table: String) -> Result<Self> {
let access = CassandraAccess::try_new(host).await?;
let schema = access.get_schema(&ks, &table).await?;
Ok(Self {
schema: Arc::new(schema),
Expand Down Expand Up @@ -135,7 +151,7 @@ impl TableProvider for CassandraTableProvider {
_ctx: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
limit: Option<usize>,
) -> DatafusionResult<Arc<dyn ExecutionPlan>> {
let projected_schema = match projection {
Some(projection) => Arc::new(self.schema.project(projection)?),
Expand All @@ -150,10 +166,13 @@ impl TableProvider for CassandraTableProvider {
.map(|f| f.name().clone())
.collect::<Vec<_>>()
.join(",");
let query = format!(
let mut query = format!(
"SELECT {} FROM {}.{}",
projection_string, self.ks, self.table
);
if let Some(limit) = limit {
query.push_str(&format!(" LIMIT {}", limit));
}

let exec = CassandraExec::new(projected_schema, query, self.session.clone());
Ok(Arc::new(exec))
Expand Down
3 changes: 2 additions & 1 deletion crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ pub fn init_session_registry<'a>(
| TableOptions::MongoDb(_)
| TableOptions::Snowflake(_)
| TableOptions::SqlServer(_)
| TableOptions::Clickhouse(_) => continue,
| TableOptions::Clickhouse(_)
| TableOptions::Cassandra(_) => continue,
};

let base_url = access.base_url()?;
Expand Down
7 changes: 7 additions & 0 deletions crates/protogen/proto/metastore/options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ message TableOptions {
TableOptionsObjectStore lance = 15;
TableOptionsObjectStore bson = 16;
TableOptionsClickhouse clickhouse = 17;
TableOptionsCassandra cassandra = 18;
}
// next: 18
}
Expand Down Expand Up @@ -230,6 +231,12 @@ message TableOptionsClickhouse {
optional string database = 3;
}

message TableOptionsCassandra {
string host = 1;
string keyspace = 2;
string table = 3;
}

// Tunnel options

message TunnelOptions {
Expand Down
33 changes: 33 additions & 0 deletions crates/protogen/src/metastore/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ pub enum TableOptions {
Lance(TableOptionsObjectStore),
Bson(TableOptionsObjectStore),
Clickhouse(TableOptionsClickhouse),
Cassandra(TableOptionsCassandra),
}

impl TableOptions {
Expand All @@ -569,6 +570,7 @@ impl TableOptions {
pub const LANCE: &'static str = "lance";
pub const BSON: &'static str = "bson";
pub const CLICKHOUSE: &'static str = "clickhouse";
pub const CASSANDRA: &'static str = "cassandra";

pub const fn new_internal(columns: Vec<InternalColumnDefinition>) -> TableOptions {
TableOptions::Internal(TableOptionsInternal { columns })
Expand All @@ -593,6 +595,7 @@ impl TableOptions {
TableOptions::Lance(_) => Self::LANCE,
TableOptions::Bson(_) => Self::BSON,
TableOptions::Clickhouse(_) => Self::CLICKHOUSE,
TableOptions::Cassandra(_) => Self::CASSANDRA,
}
}
}
Expand Down Expand Up @@ -626,6 +629,7 @@ impl TryFrom<options::table_options::Options> for TableOptions {
options::table_options::Options::Clickhouse(v) => {
TableOptions::Clickhouse(v.try_into()?)
}
options::table_options::Options::Cassandra(v) => TableOptions::Cassandra(v.try_into()?),
})
}
}
Expand Down Expand Up @@ -658,6 +662,7 @@ impl TryFrom<TableOptions> for options::table_options::Options {
TableOptions::Lance(v) => options::table_options::Options::Lance(v.into()),
TableOptions::Bson(v) => options::table_options::Options::Bson(v.into()),
TableOptions::Clickhouse(v) => options::table_options::Options::Clickhouse(v.into()),
TableOptions::Cassandra(v) => options::table_options::Options::Cassandra(v.into()),
})
}
}
Expand Down Expand Up @@ -1028,6 +1033,34 @@ impl From<TableOptionsClickhouse> for options::TableOptionsClickhouse {
}
}

#[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)]
pub struct TableOptionsCassandra {
pub host: String,
pub keyspace: String,
pub table: String,
}

impl TryFrom<options::TableOptionsCassandra> for TableOptionsCassandra {
type Error = ProtoConvError;
fn try_from(value: options::TableOptionsCassandra) -> Result<Self, Self::Error> {
Ok(TableOptionsCassandra {
host: value.host,
keyspace: value.keyspace,
table: value.table,
})
}
}

impl From<TableOptionsCassandra> for options::TableOptionsCassandra {
fn from(value: TableOptionsCassandra) -> Self {
options::TableOptionsCassandra {
host: value.host,
keyspace: value.keyspace,
table: value.table,
}
}
}

#[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)]
pub struct TableOptionsSnowflake {
pub account_name: String,
Expand Down
20 changes: 16 additions & 4 deletions crates/sqlexec/src/dispatch/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use datafusion::prelude::SessionContext;
use datafusion_ext::functions::{DefaultTableContextProvider, FuncParamValue};
use datasources::bigquery::{BigQueryAccessor, BigQueryTableAccess};
use datasources::bson::table::bson_streaming_table;
use datasources::cassandra::CassandraTableProvider;
use datasources::clickhouse::{ClickhouseAccess, ClickhouseTableProvider, OwnedClickhouseTableRef};
use datasources::common::url::DatasourceUrl;
use datasources::debug::DebugTableType;
Expand All @@ -36,10 +37,10 @@ use protogen::metastore::types::options::{
DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, DatabaseOptionsDebug,
DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql,
DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, TableOptions,
TableOptionsBigQuery, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs,
TableOptionsInternal, TableOptionsLocal, TableOptionsMongoDb, TableOptionsMysql,
TableOptionsObjectStore, TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake,
TableOptionsSqlServer, TunnelOptions,
TableOptionsBigQuery, TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug,
TableOptionsGcs, TableOptionsInternal, TableOptionsLocal, TableOptionsMongoDb,
TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres, TableOptionsS3,
TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions,
};
use sqlbuiltins::builtins::DEFAULT_CATALOG;
use sqlbuiltins::functions::FUNCTION_REGISTRY;
Expand Down Expand Up @@ -501,6 +502,17 @@ impl<'a> ExternalDispatcher<'a> {
.await?,
)
}
TableOptions::Cassandra(TableOptionsCassandra {
host,
keyspace,
table,
}) => {
let table =
CassandraTableProvider::try_new(host.clone(), keyspace.clone(), table.clone())
.await?;

Ok(Arc::new(table))
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/sqlexec/src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub enum DispatchError {
SshKey(#[from] datasources::common::ssh::key::SshKeyError),
#[error(transparent)]
ExtensionError(#[from] datafusion_ext::errors::ExtensionError),
#[error(transparent)]
CassandraDatasource(#[from] datasources::cassandra::CassandraError),

#[error("{0}")]
String(String),
Expand Down
1 change: 1 addition & 0 deletions crates/sqlexec/src/planner/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl_from_dispatch_variant!(datasources::lake::iceberg::errors::IcebergError);
impl_from_dispatch_variant!(datasources::object_store::errors::ObjectStoreSourceError);
impl_from_dispatch_variant!(datasources::sqlserver::errors::SqlServerError);
impl_from_dispatch_variant!(datasources::clickhouse::errors::ClickhouseError);
impl_from_dispatch_variant!(datasources::cassandra::CassandraError);

#[allow(unused_macros)]
macro_rules! internal {
Expand Down
22 changes: 18 additions & 4 deletions crates/sqlexec/src/planner/session_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use datafusion::sql::TableReference;
use datafusion_ext::planner::SqlQueryPlanner;
use datafusion_ext::AsyncContextProvider;
use datasources::bigquery::{BigQueryAccessor, BigQueryTableAccess};
use datasources::cassandra::CassandraAccess;
use datasources::clickhouse::{ClickhouseAccess, ClickhouseTableRef};
use datasources::common::ssh::{key::SshKey, SshConnection, SshConnectionParameters};
use datasources::common::url::{DatasourceUrl, DatasourceUrlType};
Expand Down Expand Up @@ -46,10 +47,10 @@ use protogen::metastore::types::options::{
DatabaseOptionsDebug, DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql,
DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog,
DeltaLakeUnityCatalog, StorageOptions, TableOptions, TableOptionsBigQuery,
TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsLocal,
TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres,
TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions,
TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh,
TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs,
TableOptionsLocal, TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore,
TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer,
TunnelOptions, TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh,
};
use protogen::metastore::types::service::{AlterDatabaseOperation, AlterTableOperation};
use sqlbuiltins::builtins::{CURRENT_SESSION_SCHEMA, DEFAULT_CATALOG};
Expand Down Expand Up @@ -533,6 +534,19 @@ impl<'a> SessionPlanner<'a> {
database: database_name,
})
}
TableOptions::CASSANDRA => {
let host: String = m.remove_required("host")?;
let keyspace: String = m.remove_required("keyspace")?;
let table: String = m.remove_required("table")?;
let access = CassandraAccess::try_new(host.clone()).await?;
access.validate_table_access(&keyspace, &table).await?;

TableOptions::Cassandra(TableOptionsCassandra {
host,
keyspace,
table,
})
}
TableOptions::LOCAL => {
let location: String = m.remove_required("location")?;

Expand Down
12 changes: 12 additions & 0 deletions testdata/sqllogictests_cassandra/basic.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Basic tests for external tables.

statement ok
CREATE EXTERNAL TABLE basic
FROM cassandra
OPTIONS (
host = '${CASSANDRA_CONN_STRING}',
keyspace = 'test',
table = 'bikeshare_stations'
);

include ${PWD}/testdata/sqllogictests_datasources_common/include/basic.slti
19 changes: 19 additions & 0 deletions testdata/sqllogictests_cassandra/external_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Basic tests for external tables.

statement ok
CREATE EXTERNAL TABLE external_table
FROM cassandra
OPTIONS (
host = '127.0.0.1:9042',
keyspace = 'test',
table = 'bikeshare_stations',
);

query I
SELECT count(*) FROM external_table;
----
102

statement ok
DROP TABLE external_table;

21 changes: 21 additions & 0 deletions testdata/sqllogictests_cassandra/validation.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Validation tests for clickhouse external database and external tables

# External database validation

statement error
CREATE EXTERNAL DATABASE wrong_host
FROM cassandra
OPTIONS (
host = '127.0.0.1:9876',
);

# Validation test error with the wrong table name
statement error
CREATE EXTERNAL TABLE missing_table
FROM cassandra
OPTIONS (
host = '${CASSANDRA_CONN_STRING}',
keyspace = 'test',
table = 'missing_table'
);

0 comments on commit 0b4b1fc

Please sign in to comment.