diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 93ed20609..a7899011f 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -286,6 +286,10 @@ jobs: # Prepare SLT (MongoDB) export MONGO_CONN_STRING=$(./scripts/create-test-mongo-db.sh) + # Prepare SLT (Clickhouse) + source ./scripts/ci-install-clickhouse.sh + export CLICKHOUSE_CONN_STRING=$(./scripts/create-test-clickhouse-db.sh) + # Prepare SLT (SQL Server) export SQL_SERVER_CONN_STRING=$(./scripts/create-test-sqlserver-db.sh) @@ -313,6 +317,7 @@ jobs: just sql-logic-tests --protocol=rpc 'sqllogictests_native/*' just sql-logic-tests --protocol=rpc 'sqllogictests_object_store/*' just sql-logic-tests --protocol=rpc 'sqllogictests_sqlserver/*' + just sql-logic-tests --protocol=rpc 'sqllogictests_clickhouse/*' just sql-logic-tests --protocol=rpc --exclude '*/tunnels/ssh' 'sqllogictests_mongodb/*' just sql-logic-tests --protocol=rpc --exclude '*/tunnels/ssh' 'sqllogictests_mysql/*' just sql-logic-tests --protocol=rpc --exclude '*/tunnels/ssh' 'sqllogictests_postgres/*' @@ -320,7 +325,7 @@ jobs: echo "-------------------------- REMOTE DATA STORAGE TESTS --------------------------------" # Test using a remote object store for storing databases and catalog # MinIO (S3) - just sql-logic-tests --location http://localhost:9000 \ + just sql-logic-tests --location http://localhost:9100 \ --option access_key_id=$MINIO_ACCESS_KEY \ --option secret_access_key=$MINIO_SECRET_KEY \ --option bucket=$TEST_BUCKET \ @@ -328,7 +333,7 @@ jobs: 'sqllogictests_native/*' # MinIO (S3) but with a sub-directory path - just slt -l http://localhost:9000/$TEST_BUCKET/path/to/folder \ + just slt -l http://localhost:9100/$TEST_BUCKET/path/to/folder \ -o access_key_id=$MINIO_ACCESS_KEY \ -o secret_access_key=$MINIO_SECRET_KEY \ 'sqllogictests/*' diff --git a/Cargo.lock b/Cargo.lock index 95c5ee906..1b8ff28b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1567,6 +1567,43 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" +[[package]] +name = "clickhouse-rs" +version = "1.1.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "802fe62a5480415bcdbb5217b3ea029d748c9a3ce3b884767cf58888e33e7f65" +dependencies = [ + "byteorder", + "chrono", + "chrono-tz", + "clickhouse-rs-cityhash-sys", + "combine", + "crossbeam", + "either", + "futures-core", + "futures-sink", + "futures-util", + "hostname", + "lazy_static", + "log", + "lz4", + "percent-encoding", + "pin-project", + "thiserror", + "tokio", + "url", + "uuid", +] + +[[package]] +name = "clickhouse-rs-cityhash-sys" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4baf9d4700a28d6cb600e17ed6ae2b43298a5245f1f76b4eab63027ebfd592b9" +dependencies = [ + "cc", +] + [[package]] name = "cmake" version = "0.1.50" @@ -1601,6 +1638,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "comfy-table" version = "7.1.0" @@ -2325,6 +2372,8 @@ dependencies = [ "bytes", "calamine", "chrono", + "chrono-tz", + "clickhouse-rs", "dashmap", "datafusion", "datafusion_ext", @@ -4053,6 +4102,9 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "serde", +] [[package]] name = "logutil" diff --git a/crates/datasources/Cargo.toml b/crates/datasources/Cargo.toml index af16fac19..dbc559f67 100644 --- a/crates/datasources/Cargo.toml +++ b/crates/datasources/Cargo.toml @@ -16,6 +16,8 @@ bitflags = "2.4" bitvec = "1" bytes = "1.4.0" chrono = { workspace = true } +chrono-tz = "0.8.4" +clickhouse-rs = { version = "1.1.0-alpha.1"} datafusion = { workspace = true } decimal = { path = "../decimal" } deltalake = { workspace = true } diff --git a/crates/datasources/src/clickhouse/errors.rs b/crates/datasources/src/clickhouse/errors.rs new file mode 100644 index 000000000..dbe7a861a --- /dev/null +++ b/crates/datasources/src/clickhouse/errors.rs @@ -0,0 +1,13 @@ +#[derive(Debug, thiserror::Error)] +pub enum ClickhouseError { + #[error(transparent)] + Clickhouse(#[from] clickhouse_rs::errors::Error), + #[error(transparent)] + UrlParse(#[from] url::ParseError), + #[error(transparent)] + Arrow(#[from] datafusion::arrow::error::ArrowError), + #[error("{0}")] + String(String), +} + +pub type Result = std::result::Result; diff --git a/crates/datasources/src/clickhouse/mod.rs b/crates/datasources/src/clickhouse/mod.rs new file mode 100644 index 000000000..2f8d683e5 --- /dev/null +++ b/crates/datasources/src/clickhouse/mod.rs @@ -0,0 +1,363 @@ +pub mod errors; + +mod stream; + +use clickhouse_rs::types::DateTimeType; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; +use errors::{ClickhouseError, Result}; +use parking_lot::Mutex; + +use async_trait::async_trait; +use clickhouse_rs::{ClientHandle, Options, Pool}; +use datafusion::arrow::datatypes::{ + DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, +}; +use datafusion::datasource::TableProvider; +use datafusion::error::{DataFusionError, Result as DatafusionResult}; +use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + Statistics, +}; +use futures::StreamExt; +use std::any::Any; +use std::fmt; +use std::sync::Arc; +use url::Url; + +use crate::clickhouse::stream::BlockStream; + +#[derive(Debug, Clone)] +pub struct ClickhouseAccess { + conn_string: String, +} + +impl ClickhouseAccess { + /// Create access configuration from a connection string. + /// + /// Format: clickhouse://user:password@host:9000/db + pub fn new_from_connection_string(conn_string: String) -> Self { + ClickhouseAccess { conn_string } + } + + /// Validate connection to the clickhouse server. + pub async fn validate_access(&self) -> Result<()> { + let _state = ClickhouseAccessState::connect(&self.conn_string).await?; + Ok(()) + } + + /// Validate that we have access to a specific table. + pub async fn validate_table_access(&self, table: &str) -> Result<()> { + let state = ClickhouseAccessState::connect(&self.conn_string).await?; + let _schema = state.get_table_schema(table).await?; + Ok(()) + } +} + +struct ClickhouseAccessState { + // TODO: We currently limit the pool to 1 connection to have it behave + // similarly to our other data sources. We will likely want to actually make + // use of a connection pool to avoid creating connections on every query. + // + // A depreceted `connect` method does return us a client direction, unsure + // if we want to use that or not. + pool: Pool, +} + +impl ClickhouseAccessState { + async fn connect(conn_str: &str) -> Result { + let pool = Pool::new(Options::new(Url::parse(conn_str)?).pool_min(1).pool_max(1)); + let mut client = pool.get_handle().await?; + client.ping().await?; + + Ok(ClickhouseAccessState { pool }) + } + + async fn get_table_schema(&self, name: &str) -> Result { + let mut client = self.pool.get_handle().await?; + // TODO: Does clickhouse actually return blocks for empty data sets? + let mut blocks = client + .query(format!("SELECT * FROM {name} LIMIT 1")) + .stream_blocks(); + + let block = match blocks.next().await { + Some(block) => block?, + None => { + return Err(ClickhouseError::String( + "unable to determine schema for table, no blocks returned".to_string(), + )) + } + }; + + let mut fields = Vec::with_capacity(block.columns().len()); + for col in block.columns() { + use clickhouse_rs::types::SqlType; + + /// Convert a clickhouse sql type to an arrow data type. + /// + /// Clickhouse type reference: + /// + /// TODO: Support more types. Note that adding a type here requires + /// implementing the appropriate conversion in `BlockStream`. + fn to_data_type(sql_type: &SqlType) -> Result { + Ok(match sql_type { + SqlType::Bool => DataType::Boolean, + SqlType::UInt8 => DataType::UInt8, + SqlType::UInt16 => DataType::UInt16, + SqlType::UInt32 => DataType::UInt32, + SqlType::UInt64 => DataType::UInt64, + SqlType::Int8 => DataType::Int8, + SqlType::Int16 => DataType::Int16, + SqlType::Int32 => DataType::Int32, + SqlType::Int64 => DataType::Int64, + SqlType::Float32 => DataType::Float32, + SqlType::Float64 => DataType::Float64, + SqlType::String | SqlType::FixedString(_) => DataType::Utf8, + // Clickhouse has both a 'Date' type (2 bytes) and a + // 'Date32' type (4 bytes). I think they're both represented + // with this one variant. + SqlType::Date => DataType::Date32, + SqlType::DateTime(DateTimeType::DateTime32) => { + DataType::Timestamp(TimeUnit::Second, None) + } + SqlType::DateTime(DateTimeType::DateTime64(precision, tz)) => { + // Precision represents the tick size, computed as + // 10^precision seconds. + // + // + let unit = match *precision { + 0 => TimeUnit::Second, + 3 => TimeUnit::Millisecond, + 6 => TimeUnit::Microsecond, + 9 => TimeUnit::Nanosecond, + other => { + return Err(ClickhouseError::String(format!( + "unsupported time precision: {other}" + ))) + } + }; + + let tz: Arc = tz.to_string().into_boxed_str().into(); + + DataType::Timestamp(unit, Some(tz)) + } + other => { + return Err(ClickhouseError::String(format!( + "unsupported Clickhouse type: {other:?}" + ))) + } + }) + } + + let (arrow_typ, nullable) = match col.sql_type() { + SqlType::Nullable(typ) => (to_data_type(typ)?, true), + typ => (to_data_type(&typ)?, false), + }; + + let field = Field::new(col.name(), arrow_typ, nullable); + fields.push(field); + } + + Ok(ArrowSchema::new(fields)) + } +} + +pub struct ClickhouseTableProvider { + state: Arc, + table: String, + schema: Arc, +} + +impl ClickhouseTableProvider { + pub async fn try_new(access: ClickhouseAccess, table: impl Into) -> Result { + let table = table.into(); + let state = Arc::new(ClickhouseAccessState::connect(&access.conn_string).await?); + let schema = Arc::new(state.get_table_schema(&table).await?); + + Ok(ClickhouseTableProvider { + state, + table, + schema, + }) + } +} + +#[async_trait] +impl TableProvider for ClickhouseTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + fn supports_filter_pushdown( + &self, + _filter: &Expr, + ) -> DatafusionResult { + Ok(TableProviderFilterPushDown::Inexact) + } + + async fn scan( + &self, + _ctx: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DatafusionResult> { + let projected_schema = match projection { + Some(projection) => Arc::new(self.schema.project(projection)?), + None => self.schema.clone(), + }; + + // Get the projected columns, joined by a ','. This will be put in the + // 'SELECT ...' portion of the query. + let projection_string = projected_schema + .fields + .iter() + .map(|f| f.name().clone()) + .collect::>() + .join(","); + + // TODO: Where, Limit + + let query = format!("SELECT {} FROM {};", projection_string, self.table); + + let client = + self.state.pool.get_handle().await.map_err(|e| { + DataFusionError::Execution(format!("failed to get client handle: {e}")) + })?; + + Ok(Arc::new(ClickhouseExec::new( + projected_schema, + query, + client, + ))) + } + + async fn insert_into( + &self, + _state: &SessionState, + _input: Arc, + _overwrite: bool, + ) -> DatafusionResult> { + Err(DataFusionError::Execution( + "inserts not yet supported for Clickhouse".to_string(), + )) + } +} + +struct ClickhouseExec { + /// Output schema. + schema: ArrowSchemaRef, + /// A single-use client handle to clickhouse. + handle: Mutex>, + /// Query to run against clickhouse. + query: String, + /// Execution metrics. + metrics: ExecutionPlanMetricsSet, +} + +impl ClickhouseExec { + fn new(schema: ArrowSchemaRef, query: String, handle: ClientHandle) -> ClickhouseExec { + ClickhouseExec { + schema, + handle: Mutex::new(Some(handle)), + query, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +impl ExecutionPlan for ClickhouseExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + Vec::new() + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DatafusionResult> { + Err(DataFusionError::Execution( + "cannot replace children for ClickhouseExec".to_string(), + )) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> DatafusionResult { + if partition != 0 { + return Err(DataFusionError::Execution( + "only single partition supported".to_string(), + )); + } + + // This would need to be updated for if/when we do multiple partitions + // (1 client handle per partition). + let client = match self.handle.lock().take() { + Some(client) => client, + None => { + return Err(DataFusionError::Execution( + "client handle already taken".to_string(), + )) + } + }; + + let stream = BlockStream::execute(client, self.query.clone(), self.schema()); + + Ok(Box::pin(DataSourceMetricsStreamAdapter::new( + stream, + partition, + &self.metrics, + ))) + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +impl DisplayAs for ClickhouseExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "ClickhouseExec") + } +} + +impl fmt::Debug for ClickhouseExec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ClickhouseExec") + .field("schema", &self.schema) + .field("query", &self.query) + .finish_non_exhaustive() + } +} diff --git a/crates/datasources/src/clickhouse/stream.rs b/crates/datasources/src/clickhouse/stream.rs new file mode 100644 index 000000000..9cbf78233 --- /dev/null +++ b/crates/datasources/src/clickhouse/stream.rs @@ -0,0 +1,229 @@ +use chrono::{DateTime, NaiveDate}; +use chrono_tz::Tz; +use clickhouse_rs::{ + types::{column::iter::Iterable, Column, Simple}, + Block, ClientHandle, +}; +use datafusion::{arrow::array::Date32Array, error::DataFusionError}; +use datafusion::{ + arrow::{ + array::{ + Array, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, + Int8Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, + UInt8Array, + }, + datatypes::{DataType, Schema, TimeUnit}, + record_batch::RecordBatch, + }, + physical_plan::RecordBatchStream, +}; +use futures::{Stream, StreamExt}; +use std::pin::Pin; +use std::str; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::sync::mpsc; +use tracing::trace; + +use crate::clickhouse::errors::ClickhouseError; + +use super::errors::Result; + +/// A stream that converts blocks from clickhouse into a stream of record +/// batches. +#[derive(Debug)] +pub struct BlockStream { + /// Schema of the output batches. + schema: Arc, + /// Receiver side for getting blocks from the clickhouse client. + receiver: mpsc::Receiver>, + _handle: tokio::task::JoinHandle<()>, +} + +impl BlockStream { + /// Execute a query against a client, and return a stream of record batches. + /// The provided schema should match the output of the query. + /// + /// This will spin up a separate tokio thread in the background to satisfy + /// lifetime requirements of the stream and client. + pub fn execute(mut handle: ClientHandle, query: String, schema: Arc) -> BlockStream { + let (sender, receiver) = mpsc::channel(1); + + let thread_handle = tokio::spawn(async move { + let mut stream = handle.query(query).stream_blocks(); + while let Some(block) = stream.next().await { + if sender.send(block).await.is_err() { + // This is fine, receiver side was dropped due to a global + // limit, or a query execution error. + trace!("block receiver closed"); + } + } + }); + + BlockStream { + schema, + receiver, + _handle: thread_handle, + } + } +} + +impl Stream for BlockStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.receiver.poll_recv(cx) { + Poll::Ready(Some(result)) => match result { + Ok(block) => Poll::Ready(Some( + block_to_batch(self.schema.clone(), block) + .map_err(|e| DataFusionError::Execution(e.to_string())), + )), + Err(e) => Poll::Ready(Some(Err(DataFusionError::Execution(format!( + "failed to convert block to batch: {e}" + ))))), + }, + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl RecordBatchStream for BlockStream { + fn schema(&self) -> Arc { + self.schema.clone() + } +} + +/// Convert a block to a record batch. +fn block_to_batch(schema: Arc, block: Block) -> Result { + let mut arrs = Vec::with_capacity(schema.fields.len()); + for (field, col) in schema.fields.iter().zip(block.columns()) { + let arr = column_to_array(field.data_type().clone(), col, field.is_nullable())?; + arrs.push(arr); + } + + Ok(RecordBatch::try_new(schema, arrs)?) +} + +/// Converts a column from a block into an arrow array. +/// +/// The column's data type should be known beforehand. +fn column_to_array( + datatype: DataType, + column: &Column, + nullable: bool, +) -> Result> { + // TODO: This could be a function, but I'm not too keen on figuring out the + // types right now. + macro_rules! make_primitive_array { + ($primitive:ty, $arr_type:ty, $nullable:expr) => {{ + if nullable { + let vals: Vec<_> = column + .iter::>()? + .map(|opt| opt.cloned()) + .collect(); + Arc::new(<$arr_type>::from(vals)) + } else { + let vals: Vec<_> = column.iter::<$primitive>()?.cloned().collect(); + Arc::new(<$arr_type>::from(vals)) + } + }}; + } + + let arr: Arc = match datatype { + DataType::Boolean => make_primitive_array!(bool, BooleanArray, nullable), + DataType::UInt8 => make_primitive_array!(u8, UInt8Array, nullable), + DataType::UInt16 => make_primitive_array!(u16, UInt16Array, nullable), + DataType::UInt32 => make_primitive_array!(u32, UInt32Array, nullable), + DataType::UInt64 => make_primitive_array!(u64, UInt64Array, nullable), + DataType::Int8 => make_primitive_array!(i8, Int8Array, nullable), + DataType::Int16 => make_primitive_array!(i16, Int16Array, nullable), + DataType::Int32 => make_primitive_array!(i32, Int32Array, nullable), + DataType::Int64 => make_primitive_array!(i64, Int64Array, nullable), + DataType::Float32 => make_primitive_array!(f32, Float32Array, nullable), + DataType::Float64 => make_primitive_array!(f64, Float64Array, nullable), + DataType::Utf8 => { + if nullable { + let vals: Vec<_> = + as Iterable>::iter(column, column.sql_type())? + .map(|bs| bs.map(|bs| str::from_utf8(bs).unwrap())) + .collect(); + Arc::new(StringArray::from(vals)) + } else { + let vals: Vec<_> = <&[u8]>::iter(column, column.sql_type())? + .map(|bs| str::from_utf8(bs).unwrap()) + .collect(); + Arc::new(StringArray::from(vals)) + } + } + DataType::Date32 => { + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + + if nullable { + let iter = + as Iterable>::iter(column, column.sql_type())?; + Arc::new(Date32Array::from( + iter.map(|date| { + date.map(|date| date.signed_duration_since(epoch).num_days() as i32) + }) + .collect::>(), + )) + } else { + let iter = ::iter(column, column.sql_type())?; + Arc::new(Date32Array::from( + iter.map(|date| date.signed_duration_since(epoch).num_days() as i32) + .collect::>(), + )) + } + } + DataType::Timestamp(unit, _tz) => { + if nullable { + let iter = + > as Iterable>::iter(column, column.sql_type())?; + match unit { + TimeUnit::Second => Arc::new(TimestampSecondArray::from( + iter.map(|time| time.map(|time| time.timestamp())) + .collect::>(), + )), + TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from( + iter.map(|time| time.map(|time| time.timestamp_millis())) + .collect::>(), + )), + TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from( + iter.map(|time| time.map(|time| time.timestamp_micros())) + .collect::>(), + )), + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from( + iter.map(|time| time.map(|time| time.timestamp_nanos_opt().unwrap())) + .collect::>(), + )), + } + } else { + let iter = >::iter(column, column.sql_type())?; + match unit { + TimeUnit::Second => Arc::new(TimestampSecondArray::from( + iter.map(|time| time.timestamp()).collect::>(), + )), + TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from( + iter.map(|time| time.timestamp_millis()).collect::>(), + )), + TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from( + iter.map(|time| time.timestamp_micros()).collect::>(), + )), + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from( + iter.map(|time| time.timestamp_nanos_opt().unwrap()) + .collect::>(), + )), + } + } + } + other => { + return Err(ClickhouseError::String(format!( + "unhandled data type trying to convert to arrow array: {other}" + ))) + } + }; + + Ok(arr) +} diff --git a/crates/datasources/src/lib.rs b/crates/datasources/src/lib.rs index 3de5320ae..de820148f 100644 --- a/crates/datasources/src/lib.rs +++ b/crates/datasources/src/lib.rs @@ -1,6 +1,7 @@ //! Data source implementations. pub mod bigquery; pub mod bson; +pub mod clickhouse; pub mod common; pub mod debug; pub mod excel; diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index 0948a8482..e8b0872fd 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -402,7 +402,8 @@ pub fn init_session_registry<'a>( | TableOptions::Mysql(_) | TableOptions::Mongo(_) | TableOptions::Snowflake(_) - | TableOptions::SqlServer(_) => continue, + | TableOptions::SqlServer(_) + | TableOptions::Clickhouse(_) => continue, }; let base_url = access.base_url()?; diff --git a/crates/protogen/proto/metastore/catalog.proto b/crates/protogen/proto/metastore/catalog.proto index e3ec71aae..0fba26e1f 100644 --- a/crates/protogen/proto/metastore/catalog.proto +++ b/crates/protogen/proto/metastore/catalog.proto @@ -14,6 +14,7 @@ // limited to a single method. For example, an S3 data source can only be added // as an external table. // +// TODO: Update me // Support matrix: // // | Data source | External Database? | External Table? | @@ -256,13 +257,7 @@ message UniformSignature { repeated common.arrow.ArrowType args = 2; } -message ExactSignature { - repeated common.arrow.ArrowType args = 1; -} -message AnySignature { - uint32 num_args = 1; -} +message ExactSignature { repeated common.arrow.ArrowType args = 1; } +message AnySignature { uint32 num_args = 1; } -message OneOfSignature { - repeated TypeSignature args = 1; -} +message OneOfSignature { repeated TypeSignature args = 1; } diff --git a/crates/protogen/proto/metastore/options.proto b/crates/protogen/proto/metastore/options.proto index a01047d87..927209594 100644 --- a/crates/protogen/proto/metastore/options.proto +++ b/crates/protogen/proto/metastore/options.proto @@ -46,8 +46,9 @@ message DatabaseOptions { DatabaseOptionsSnowflake snowflake = 7; DatabaseOptionsDeltaLake delta = 8; DatabaseOptionsSqlServer sql_server = 9; + DatabaseOptionsClickhouse clickhouse = 10; } - // next: 10 + // next: 11 } message DatabaseOptionsInternal {} @@ -75,6 +76,10 @@ message DatabaseOptionsSqlServer { string connection_string = 1; } +message DatabaseOptionsClickhouse { + string connection_string = 1; +} + message DatabaseOptionsSnowflake { string account_name = 1; string login_name = 2; @@ -122,8 +127,9 @@ message TableOptions { TableOptionsSqlServer sql_server = 14; TableOptionsObjectStore lance = 15; TableOptionsObjectStore bson = 16; + TableOptionsClickhouse clickhouse = 17; } - // next: 17 + // next: 18 } message TableOptionsInternal { @@ -218,6 +224,11 @@ message TableOptionsSqlServer { string table = 3; } +message TableOptionsClickhouse { + string connection_string = 1; + string table = 2; +} + // Tunnel options message TunnelOptions { diff --git a/crates/protogen/src/metastore/types/options.rs b/crates/protogen/src/metastore/types/options.rs index a2157d515..1b6509839 100644 --- a/crates/protogen/src/metastore/types/options.rs +++ b/crates/protogen/src/metastore/types/options.rs @@ -92,6 +92,7 @@ pub enum DatabaseOptions { Snowflake(DatabaseOptionsSnowflake), Delta(DatabaseOptionsDeltaLake), SqlServer(DatabaseOptionsSqlServer), + Clickhouse(DatabaseOptionsClickhouse), } impl DatabaseOptions { @@ -104,6 +105,7 @@ impl DatabaseOptions { pub const SNOWFLAKE: &'static str = "snowflake"; pub const DELTA: &'static str = "delta"; pub const SQL_SERVER: &'static str = "sql_server"; + pub const CLICKHOUSE: &'static str = "clickhouse"; pub fn as_str(&self) -> &'static str { match self { @@ -116,6 +118,7 @@ impl DatabaseOptions { DatabaseOptions::Snowflake(_) => Self::SNOWFLAKE, DatabaseOptions::Delta(_) => Self::DELTA, DatabaseOptions::SqlServer(_) => Self::SQL_SERVER, + DatabaseOptions::Clickhouse(_) => Self::CLICKHOUSE, } } } @@ -149,6 +152,9 @@ impl TryFrom for DatabaseOptions { options::database_options::Options::SqlServer(v) => { DatabaseOptions::SqlServer(v.try_into()?) } + options::database_options::Options::Clickhouse(v) => { + DatabaseOptions::Clickhouse(v.try_into()?) + } }) } } @@ -176,6 +182,9 @@ impl From for options::database_options::Options { DatabaseOptions::SqlServer(v) => { options::database_options::Options::SqlServer(v.into()) } + DatabaseOptions::Clickhouse(v) => { + options::database_options::Options::Clickhouse(v.into()) + } } } } @@ -333,6 +342,28 @@ impl From for options::DatabaseOptionsSqlServer { } } +#[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)] +pub struct DatabaseOptionsClickhouse { + pub connection_string: String, +} + +impl TryFrom for DatabaseOptionsClickhouse { + type Error = ProtoConvError; + fn try_from(value: options::DatabaseOptionsClickhouse) -> Result { + Ok(DatabaseOptionsClickhouse { + connection_string: value.connection_string, + }) + } +} + +impl From for options::DatabaseOptionsClickhouse { + fn from(value: DatabaseOptionsClickhouse) -> Self { + options::DatabaseOptionsClickhouse { + connection_string: value.connection_string, + } + } +} + #[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)] pub struct DatabaseOptionsSnowflake { pub account_name: String, @@ -515,6 +546,7 @@ pub enum TableOptions { SqlServer(TableOptionsSqlServer), Lance(TableOptionsObjectStore), Bson(TableOptionsObjectStore), + Clickhouse(TableOptionsClickhouse), } impl TableOptions { @@ -534,6 +566,7 @@ impl TableOptions { pub const SQL_SERVER: &'static str = "sql_server"; pub const LANCE: &'static str = "lance"; pub const BSON: &'static str = "bson"; + pub const CLICKHOUSE: &'static str = "clickhouse"; pub const fn new_internal(columns: Vec) -> TableOptions { TableOptions::Internal(TableOptionsInternal { columns }) @@ -557,6 +590,7 @@ impl TableOptions { TableOptions::SqlServer(_) => Self::SQL_SERVER, TableOptions::Lance(_) => Self::LANCE, TableOptions::Bson(_) => Self::BSON, + TableOptions::Clickhouse(_) => Self::CLICKHOUSE, } } } @@ -587,6 +621,9 @@ impl TryFrom for TableOptions { options::table_options::Options::SqlServer(v) => TableOptions::SqlServer(v.try_into()?), options::table_options::Options::Lance(v) => TableOptions::Lance(v.try_into()?), options::table_options::Options::Bson(v) => TableOptions::Bson(v.try_into()?), + options::table_options::Options::Clickhouse(v) => { + TableOptions::Clickhouse(v.try_into()?) + } }) } } @@ -618,6 +655,7 @@ impl TryFrom for options::table_options::Options { TableOptions::SqlServer(v) => options::table_options::Options::SqlServer(v.into()), TableOptions::Lance(v) => options::table_options::Options::Lance(v.into()), TableOptions::Bson(v) => options::table_options::Options::Bson(v.into()), + TableOptions::Clickhouse(v) => options::table_options::Options::Clickhouse(v.into()), }) } } @@ -960,6 +998,31 @@ impl From for options::TableOptionsSqlServer { } } +#[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)] +pub struct TableOptionsClickhouse { + pub connection_string: String, + pub table: String, +} + +impl TryFrom for TableOptionsClickhouse { + type Error = ProtoConvError; + fn try_from(value: options::TableOptionsClickhouse) -> Result { + Ok(TableOptionsClickhouse { + connection_string: value.connection_string, + table: value.table, + }) + } +} + +impl From for options::TableOptionsClickhouse { + fn from(value: TableOptionsClickhouse) -> Self { + options::TableOptionsClickhouse { + connection_string: value.connection_string, + table: value.table, + } + } +} + #[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)] pub struct TableOptionsSnowflake { pub account_name: String, diff --git a/crates/sqlbuiltins/src/functions/table/clickhouse.rs b/crates/sqlbuiltins/src/functions/table/clickhouse.rs new file mode 100644 index 000000000..001e83857 --- /dev/null +++ b/crates/sqlbuiltins/src/functions/table/clickhouse.rs @@ -0,0 +1,67 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::DataType; +use datafusion::datasource::TableProvider; +use datafusion::logical_expr::{Signature, Volatility}; +use datafusion_ext::errors::{ExtensionError, Result}; +use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; +use datasources::clickhouse::{ClickhouseAccess, ClickhouseTableProvider}; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; + +use super::TableFunc; +use crate::functions::ConstBuiltinFunction; + +#[derive(Debug, Clone, Copy)] +pub struct ReadClickhouse; + +impl ConstBuiltinFunction for ReadClickhouse { + const NAME: &'static str = "read_clickhouse"; + const DESCRIPTION: &'static str = "Read a Clickhouse table"; + const EXAMPLE: &'static str = + "SELECT * FROM read_clickhouse('clickhouse://user:password@localhost:9000/database', 'table')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; + + fn signature(&self) -> Option { + Some(Signature::uniform( + 2, + vec![DataType::Utf8], + Volatility::Stable, + )) + } +} + +#[async_trait] +impl TableFunc for ReadClickhouse { + fn detect_runtime( + &self, + _args: &[FuncParamValue], + _parent: RuntimePreference, + ) -> Result { + Ok(RuntimePreference::Remote) + } + + async fn create_provider( + &self, + _: &dyn TableFuncContextProvider, + args: Vec, + _opts: HashMap, + ) -> Result> { + match args.len() { + 2 => { + let mut args = args.into_iter(); + let conn_string: String = args.next().unwrap().try_into()?; + let table: String = args.next().unwrap().try_into()?; + + let access = ClickhouseAccess::new_from_connection_string(conn_string); + let prov = ClickhouseTableProvider::try_new(access, &table) + .await + .map_err(|e| ExtensionError::Access(Box::new(e)))?; + + Ok(Arc::new(prov)) + } + _ => Err(ExtensionError::InvalidNumArgs), + } + } +} diff --git a/crates/sqlbuiltins/src/functions/table/mod.rs b/crates/sqlbuiltins/src/functions/table/mod.rs index 382466c10..37a1cb099 100644 --- a/crates/sqlbuiltins/src/functions/table/mod.rs +++ b/crates/sqlbuiltins/src/functions/table/mod.rs @@ -1,6 +1,7 @@ //! Builtin table returning functions. mod bigquery; mod bson; +mod clickhouse; mod delta; mod excel; mod generate_series; @@ -30,6 +31,7 @@ use std::sync::Arc; use self::bigquery::ReadBigQuery; use self::bson::BsonScan; +use self::clickhouse::ReadClickhouse; use self::delta::DeltaScan; use self::excel::ExcelScan; use self::generate_series::GenerateSeries; @@ -82,6 +84,7 @@ impl BuiltinTableFuncs { Arc::new(ReadMongoDb), Arc::new(ReadMysql), Arc::new(ReadSnowflake), + Arc::new(ReadClickhouse), Arc::new(ReadSqlServer), // Object store Arc::new(PARQUET_SCAN), diff --git a/crates/sqlbuiltins/src/functions/table/postgres.rs b/crates/sqlbuiltins/src/functions/table/postgres.rs index d9e7b236a..34f676c8d 100644 --- a/crates/sqlbuiltins/src/functions/table/postgres.rs +++ b/crates/sqlbuiltins/src/functions/table/postgres.rs @@ -18,7 +18,7 @@ pub struct ReadPostgres; impl ConstBuiltinFunction for ReadPostgres { const NAME: &'static str = "read_postgres"; - const DESCRIPTION: &'static str = "Reads a Postgres table"; + const DESCRIPTION: &'static str = "Read a Postgres table"; const EXAMPLE: &'static str = "SELECT * FROM read_postgres('postgres://localhost:5432', 'database', 'table')"; const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; diff --git a/crates/sqlbuiltins/src/functions/table/virtual_listing.rs b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs index f3a158962..891e067dc 100644 --- a/crates/sqlbuiltins/src/functions/table/virtual_listing.rs +++ b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs @@ -341,6 +341,11 @@ pub(crate) async fn get_virtual_lister_for_external_db( let state = access.connect().await.map_err(ExtensionError::access)?; Box::new(state) } + DatabaseOptions::Clickhouse(_) => { + return Err(ExtensionError::Unimplemented( + "Clickhouse information listing", + )) + } DatabaseOptions::Delta(_) => { return Err(ExtensionError::Unimplemented( "deltalake information listing", diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index 616e25981..8ac3d15ae 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -13,6 +13,7 @@ use datafusion::prelude::SessionContext; use datafusion_ext::functions::{DefaultTableContextProvider, FuncParamValue}; use datasources::bigquery::{BigQueryAccessor, BigQueryTableAccess}; use datasources::bson::table::bson_streaming_table; +use datasources::clickhouse::{ClickhouseAccess, ClickhouseTableProvider}; use datasources::common::url::DatasourceUrl; use datasources::debug::DebugTableType; use datasources::lake::delta::access::{load_table_direct, DeltaLakeAccessor}; @@ -32,12 +33,13 @@ use datasources::sqlserver::{ }; use protogen::metastore::types::catalog::{CatalogEntry, DatabaseEntry, FunctionEntry, TableEntry}; use protogen::metastore::types::options::{ - DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsDebug, DatabaseOptionsDeltaLake, - DatabaseOptionsMongo, DatabaseOptionsMysql, DatabaseOptionsPostgres, DatabaseOptionsSnowflake, - DatabaseOptionsSqlServer, TableOptions, TableOptionsBigQuery, TableOptionsDebug, - TableOptionsGcs, TableOptionsInternal, TableOptionsLocal, TableOptionsMongo, TableOptionsMysql, - TableOptionsObjectStore, TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, - TableOptionsSqlServer, TunnelOptions, + DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, DatabaseOptionsDebug, + DatabaseOptionsDeltaLake, DatabaseOptionsMongo, DatabaseOptionsMysql, DatabaseOptionsPostgres, + DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, TableOptions, TableOptionsBigQuery, + TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsInternal, + TableOptionsLocal, TableOptionsMongo, TableOptionsMysql, TableOptionsObjectStore, + TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, + TunnelOptions, }; use sqlbuiltins::builtins::DEFAULT_CATALOG; use sqlbuiltins::functions::FUNCTION_REGISTRY; @@ -218,6 +220,12 @@ impl<'a> ExternalDispatcher<'a> { .await?; Ok(Arc::new(table)) } + DatabaseOptions::Clickhouse(DatabaseOptionsClickhouse { connection_string }) => { + let access = + ClickhouseAccess::new_from_connection_string(connection_string.clone()); + let table = ClickhouseTableProvider::try_new(access, name).await?; + Ok(Arc::new(table)) + } } } @@ -456,6 +464,15 @@ impl<'a> ExternalDispatcher<'a> { .await?; Ok(Arc::new(table)) } + TableOptions::Clickhouse(TableOptionsClickhouse { + connection_string, + table, + }) => { + let access = + ClickhouseAccess::new_from_connection_string(connection_string.clone()); + let table = ClickhouseTableProvider::try_new(access, table).await?; + Ok(Arc::new(table)) + } TableOptions::Lance(TableOptionsObjectStore { location, storage_options, diff --git a/crates/sqlexec/src/dispatch/mod.rs b/crates/sqlexec/src/dispatch/mod.rs index 03e88c0fd..e8527134c 100644 --- a/crates/sqlexec/src/dispatch/mod.rs +++ b/crates/sqlexec/src/dispatch/mod.rs @@ -88,6 +88,8 @@ pub enum DispatchError { #[error(transparent)] BsonDatasource(#[from] datasources::bson::errors::BsonError), #[error(transparent)] + ClickhouseDatasource(#[from] datasources::clickhouse::errors::ClickhouseError), + #[error(transparent)] NativeDatasource(#[from] datasources::native::errors::NativeError), #[error(transparent)] CommonDatasource(#[from] datasources::common::errors::DatasourceCommonError), diff --git a/crates/sqlexec/src/planner/errors.rs b/crates/sqlexec/src/planner/errors.rs index e8053995c..04f9eeb7b 100644 --- a/crates/sqlexec/src/planner/errors.rs +++ b/crates/sqlexec/src/planner/errors.rs @@ -128,6 +128,7 @@ impl_from_dispatch_variant!(datasources::lake::delta::errors::DeltaError); impl_from_dispatch_variant!(datasources::lake::iceberg::errors::IcebergError); impl_from_dispatch_variant!(datasources::object_store::errors::ObjectStoreSourceError); impl_from_dispatch_variant!(datasources::sqlserver::errors::SqlServerError); +impl_from_dispatch_variant!(datasources::clickhouse::errors::ClickhouseError); #[allow(unused_macros)] macro_rules! internal { diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index 800c7d0bd..e0c13f16a 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -14,6 +14,7 @@ use datafusion::sql::TableReference; use datafusion_ext::planner::SqlQueryPlanner; use datafusion_ext::AsyncContextProvider; use datasources::bigquery::{BigQueryAccessor, BigQueryTableAccess}; +use datasources::clickhouse::ClickhouseAccess; use datasources::common::ssh::{key::SshKey, SshConnection, SshConnectionParameters}; use datasources::common::url::{DatasourceUrl, DatasourceUrlType}; use datasources::debug::DebugTableType; @@ -41,13 +42,14 @@ use protogen::metastore::types::options::{ CopyToDestinationOptionsLocal, CopyToDestinationOptionsS3, CopyToFormatOptions, CopyToFormatOptionsCsv, CopyToFormatOptionsJson, CopyToFormatOptionsParquet, CredentialsOptions, CredentialsOptionsAws, CredentialsOptionsAzure, CredentialsOptionsDebug, - CredentialsOptionsGcp, DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsDebug, - DatabaseOptionsDeltaLake, DatabaseOptionsMongo, DatabaseOptionsMysql, DatabaseOptionsPostgres, - DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog, DeltaLakeUnityCatalog, - StorageOptions, TableOptions, TableOptionsBigQuery, TableOptionsDebug, TableOptionsGcs, - TableOptionsLocal, TableOptionsMongo, TableOptionsMysql, TableOptionsObjectStore, - TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, - TunnelOptions, TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh, + CredentialsOptionsGcp, DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, + DatabaseOptionsDebug, DatabaseOptionsDeltaLake, DatabaseOptionsMongo, DatabaseOptionsMysql, + DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog, + DeltaLakeUnityCatalog, StorageOptions, TableOptions, TableOptionsBigQuery, + TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsLocal, + TableOptionsMongo, TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres, + TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions, + TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh, }; use protogen::metastore::types::service::{AlterDatabaseOperation, AlterTableOperation}; use sqlbuiltins::builtins::{CURRENT_SESSION_SCHEMA, DEFAULT_CATALOG}; @@ -305,6 +307,16 @@ impl<'a> SessionPlanner<'a> { DatabaseOptions::SqlServer(DatabaseOptionsSqlServer { connection_string }) } + DatabaseOptions::CLICKHOUSE => { + let connection_string: String = m.remove_required("connection_string")?; + + // Validate + let access = + ClickhouseAccess::new_from_connection_string(connection_string.clone()); + access.validate_access().await?; + + DatabaseOptions::Clickhouse(DatabaseOptionsClickhouse { connection_string }) + } DatabaseOptions::DEBUG => { datasources::debug::validate_tunnel_connections(tunnel_options.as_ref())?; DatabaseOptions::Debug(DatabaseOptionsDebug {}) @@ -498,7 +510,20 @@ impl<'a> SessionPlanner<'a> { table: table_name, }) } + TableOptions::CLICKHOUSE => { + let connection_string: String = m.remove_required("connection_string")?; + let table_name: String = m.remove_required("table")?; + + // Validate + let access = + ClickhouseAccess::new_from_connection_string(connection_string.clone()); + access.validate_table_access(&table_name).await?; + TableOptions::Clickhouse(TableOptionsClickhouse { + connection_string, + table: table_name, + }) + } TableOptions::LOCAL => { let location: String = m.remove_required("location")?; diff --git a/scripts/ci-install-clickhouse.sh b/scripts/ci-install-clickhouse.sh new file mode 100755 index 000000000..577d1f07b --- /dev/null +++ b/scripts/ci-install-clickhouse.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +# Script for installing clickhouse in CI. This script is meant to be `source`d. +# +# Usage: source ./script/ci-install-clickhouse.sh + +curl https://clickhouse.com/ | sh +mkdir -p ${HOME}/bin +cp ./clickhouse ${HOME}/bin/. + +export PATH=$HOME/bin:$PATH + diff --git a/scripts/create-test-clickhouse-db.sh b/scripts/create-test-clickhouse-db.sh new file mode 100755 index 000000000..1677f6acd --- /dev/null +++ b/scripts/create-test-clickhouse-db.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash + +# Spins up a test clickhouse docker container and loads it with data. +# +# By default, the container will start up a 'default' database an not require a +# username or password. +# +# Requires `clickhouse`: + +set -e + +CONTAINER_NAME="glaredb_clickhouse_test" + +# Remove container if it exists +if [[ -n "$(docker ps -a -q -f name=$CONTAINER_NAME)" ]]; then + docker rm -f $CONTAINER_NAME > /dev/null +fi + +# Start container. +docker run \ + --name $CONTAINER_NAME \ + -p 9000:9000 \ + -d \ + --rm \ + clickhouse/clickhouse-server:23 &> /dev/null + +# Wait until clickhouse is ready. +INIT_TIME=$(date +%s) +EXIT_CODE=1 +while [[ $EXIT_CODE -ne 0 ]]; do + set +e + clickhouse client --query "select 1" &> /dev/null + EXIT_CODE=$? + set -e + + CURRENT_TIME=$(date +%s) + CURRENT_TIME=$((CURRENT_TIME - 60)) + if [[ "$CURRENT_TIME" -gt "$INIT_TIME" ]]; then + echo "Timed out waiting for Clickhouse to start!" + exit 1 + fi +done + +# Create tables. +clickhouse client --multiquery < ./testdata/sqllogictests_clickhouse/data/setup-clickhouse.sql + +# Load data into tables. +clickhouse client \ + --query="INSERT INTO bikeshare_stations FORMAT CSVWithNames" < ./testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv +clickhouse client \ + --query="INSERT INTO bikeshare_trips FORMAT CSVWithNames" < ./testdata/sqllogictests_datasources_common/data/gcs-artifacts/bikeshare_trips.csv + +echo "clickhouse://localhost:9000/default" diff --git a/scripts/create-test-minio-store.sh b/scripts/create-test-minio-store.sh index 3a0755b65..e0cd8d7be 100755 --- a/scripts/create-test-minio-store.sh +++ b/scripts/create-test-minio-store.sh @@ -7,7 +7,7 @@ set -e MINIO_IMAGE="minio/minio:latest" CONTAINER_NAME="glaredb_minio_test" -MINIO_CONSOLE_ADDRESS=:9001 +MINIO_CONSOLE_ADDRESS=:9101 # Remove container if it exists if [[ -n "$(docker ps -a -q -f name=$CONTAINER_NAME)" ]]; then @@ -16,8 +16,8 @@ fi # Start minio. CONTAINER_ID="$(docker run \ - -p 9000:9000 \ - -p 9001:9001 \ + -p 9100:9000 \ + -p 9101:9101 \ -e MINIO_ACCESS_KEY="${MINIO_ACCESS_KEY}" \ -e MINIO_SECRET_KEY="${MINIO_SECRET_KEY}" \ -e MINIO_CONSOLE_ADDRESS="${MINIO_CONSOLE_ADDRESS}" \ @@ -29,10 +29,10 @@ CONTAINER_ID="$(docker run \ # Create the test container using the minio client docker run --rm --net=host --entrypoint=/bin/sh -i minio/mc:latest <