Skip to content

Commit

Permalink
Add clickhouse replicated cluster name setting and integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chapsuk committed Sep 21, 2024
1 parent cbe9e6f commit cf87405
Show file tree
Hide file tree
Showing 17 changed files with 364 additions and 40 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Examples:
GOOSE_DRIVER=mysql GOOSE_DBSTRING="user:password@/dbname" goose status
GOOSE_DRIVER=redshift GOOSE_DBSTRING="postgres://user:password@qwerty.us-east-1.redshift.amazonaws.com:5439/db" goose status
GOOSE_DRIVER=clickhouse GOOSE_DBSTRING="clickhouse://user:password@qwerty.clickhouse.cloud:9440/dbname?secure=true&skip_verify=false" goose status
GOOSE_DRIVER=clickhouse-replicated GOOSE_CLICKHOUSE_CLUSTER_NAME=example GOOSE_DBSTRING="clickhouse://user:password@qwerty.clickhouse.cloud:9440/dbname?secure=true&skip_verify=false" goose status
Options:
Expand Down
22 changes: 12 additions & 10 deletions cmd/goose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,19 @@ Examples:
goose tidb "user:password@/dbname?parseTime=true" status
goose mssql "sqlserver://user:password@dbname:1433?database=master" status
goose clickhouse "tcp://127.0.0.1:9000" status
goose clickhouse-replicated "tcp://127.0.0.1:9000" status
goose vertica "vertica://user:password@localhost:5433/dbname?connection_load_balance=1" status
goose ydb "grpcs://localhost:2135/local?go_query_mode=scripting&go_fake_tx=scripting&go_query_bind=declare,numeric" status
goose turso "libsql://dbname.turso.io?authToken=token" status
goose turso "libsql://dbname.turso.io?authToken=token" status
GOOSE_DRIVER=sqlite3 GOOSE_DBSTRING=./foo.db goose status
GOOSE_DRIVER=sqlite3 GOOSE_DBSTRING=./foo.db goose create init sql
GOOSE_DRIVER=postgres GOOSE_DBSTRING="user=postgres dbname=postgres sslmode=disable" goose status
GOOSE_DRIVER=mysql GOOSE_DBSTRING="user:password@/dbname" goose status
GOOSE_DRIVER=redshift GOOSE_DBSTRING="postgres://user:password@qwerty.us-east-1.redshift.amazonaws.com:5439/db" goose status
GOOSE_DRIVER=turso GOOSE_DBSTRING="libsql://dbname.turso.io?authToken=token" goose status
GOOSE_DRIVER=clickhouse GOOSE_DBSTRING="clickhouse://user:password@qwerty.clickhouse.cloud:9440/dbname?secure=true&skip_verify=false" goose status
GOOSE_DRIVER=clickhouse GOOSE_DBSTRING="clickhouse://user:password@qwerty.clickhouse.cloud:9440/dbname?secure=true&skip_verify=false" goose status
GOOSE_DRIVER=clickhouse-replicated GOOSE_CLICKHOUSE_CLUSTER_NAME=example GOOSE_DBSTRING="clickhouse://user:password@qwerty.clickhouse.cloud:9440/dbname?secure=true&skip_verify=false" goose status
Options:
`
Expand All @@ -302,23 +304,23 @@ Commands:
)

var sqlMigrationTemplate = template.Must(template.New("goose.sql-migration").Parse(`-- Thank you for giving goose a try!
--
--
-- This file was automatically created running goose init. If you're familiar with goose
-- feel free to remove/rename this file, write some SQL and goose up. Briefly,
--
--
-- Documentation can be found here: https://pressly.github.io/goose
--
-- A single goose .sql file holds both Up and Down migrations.
--
--
-- All goose .sql files are expected to have a -- +goose Up annotation.
-- The -- +goose Down annotation is optional, but recommended, and must come after the Up annotation.
--
-- The -- +goose NO TRANSACTION annotation may be added to the top of the file to run statements
--
-- The -- +goose NO TRANSACTION annotation may be added to the top of the file to run statements
-- outside a transaction. Both Up and Down migrations within this file will be run without a transaction.
--
-- More complex statements that have semicolons within them must be annotated with
--
-- More complex statements that have semicolons within them must be annotated with
-- the -- +goose StatementBegin and -- +goose StatementEnd annotations to be properly recognized.
--
--
-- Use GitHub issues for reporting bugs and requesting features, enjoy!
-- +goose Up
Expand Down
47 changes: 26 additions & 21 deletions database/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,26 @@ import (
"errors"
"fmt"

"github.com/pressly/goose/v3/internal/cfg"
"github.com/pressly/goose/v3/internal/dialect/dialectquery"
)

// Dialect is the type of database dialect.
type Dialect string

const (
DialectClickHouse Dialect = "clickhouse"
DialectMSSQL Dialect = "mssql"
DialectMySQL Dialect = "mysql"
DialectPostgres Dialect = "postgres"
DialectRedshift Dialect = "redshift"
DialectSQLite3 Dialect = "sqlite3"
DialectTiDB Dialect = "tidb"
DialectTurso Dialect = "turso"
DialectVertica Dialect = "vertica"
DialectYdB Dialect = "ydb"
DialectStarrocks Dialect = "starrocks"
DialectClickHouse Dialect = "clickhouse"
DialectClickHouseReplicated Dialect = "clickhouse-replicated"
DialectMSSQL Dialect = "mssql"
DialectMySQL Dialect = "mysql"
DialectPostgres Dialect = "postgres"
DialectRedshift Dialect = "redshift"
DialectSQLite3 Dialect = "sqlite3"
DialectTiDB Dialect = "tidb"
DialectTurso Dialect = "turso"
DialectVertica Dialect = "vertica"
DialectYdB Dialect = "ydb"
DialectStarrocks Dialect = "starrocks"
)

// NewStore returns a new [Store] implementation for the given dialect.
Expand All @@ -36,16 +38,19 @@ func NewStore(dialect Dialect, tablename string) (Store, error) {
}
lookup := map[Dialect]dialectquery.Querier{
DialectClickHouse: &dialectquery.Clickhouse{},
DialectMSSQL: &dialectquery.Sqlserver{},
DialectMySQL: &dialectquery.Mysql{},
DialectPostgres: &dialectquery.Postgres{},
DialectRedshift: &dialectquery.Redshift{},
DialectSQLite3: &dialectquery.Sqlite3{},
DialectTiDB: &dialectquery.Tidb{},
DialectVertica: &dialectquery.Vertica{},
DialectYdB: &dialectquery.Ydb{},
DialectTurso: &dialectquery.Turso{},
DialectStarrocks: &dialectquery.Starrocks{},
DialectClickHouseReplicated: &dialectquery.ClickhouseReplicated{
ClusterName: cfg.GOOSECLICKHOUSECLUSTERNAME,
},
DialectMSSQL: &dialectquery.Sqlserver{},
DialectMySQL: &dialectquery.Mysql{},
DialectPostgres: &dialectquery.Postgres{},
DialectRedshift: &dialectquery.Redshift{},
DialectSQLite3: &dialectquery.Sqlite3{},
DialectTiDB: &dialectquery.Tidb{},
DialectVertica: &dialectquery.Vertica{},
DialectYdB: &dialectquery.Ydb{},
DialectTurso: &dialectquery.Turso{},
DialectStarrocks: &dialectquery.Starrocks{},
}
querier, ok := lookup[dialect]
if !ok {
Expand Down
8 changes: 5 additions & 3 deletions internal/cfg/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package cfg
import "os"

var (
GOOSEDRIVER = envOr("GOOSE_DRIVER", "")
GOOSEDBSTRING = envOr("GOOSE_DBSTRING", "")
GOOSEMIGRATIONDIR = envOr("GOOSE_MIGRATION_DIR", DefaultMigrationDir)
GOOSECLICKHOUSECLUSTERNAME = envOr("GOOSE_CLICKHOUSE_CLUSTER_NAME", "{cluster}")
GOOSEDRIVER = envOr("GOOSE_DRIVER", "")
GOOSEDBSTRING = envOr("GOOSE_DBSTRING", "")
GOOSEMIGRATIONDIR = envOr("GOOSE_MIGRATION_DIR", DefaultMigrationDir)
// https://no-color.org/
GOOSENOCOLOR = envOr("NO_COLOR", "false")
)
Expand All @@ -22,6 +23,7 @@ type EnvVar struct {

func List() []EnvVar {
return []EnvVar{
{Name: "GOOSE_CLICKHOUSER_CLUSTER_NAME", Value: GOOSECLICKHOUSECLUSTERNAME},
{Name: "GOOSE_DRIVER", Value: GOOSEDRIVER},
{Name: "GOOSE_DBSTRING", Value: GOOSEDBSTRING},
{Name: "GOOSE_MIGRATION_DIR", Value: GOOSEMIGRATIONDIR},
Expand Down
8 changes: 5 additions & 3 deletions internal/dialect/dialectquery/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,22 @@ func (c *Clickhouse) GetLatestVersion(tableName string) string {
return fmt.Sprintf(q, tableName)
}

type ClickhouseReplicated struct{}
type ClickhouseReplicated struct {
ClusterName string
}

var _ Querier = (*ClickhouseReplicated)(nil)

func (c *ClickhouseReplicated) CreateTable(tableName string) string {
q := `CREATE TABLE IF NOT EXISTS %s ON CLUSTER '{cluster}' (
q := `CREATE TABLE IF NOT EXISTS %s ON CLUSTER '%s' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime default now()
)
ENGINE = ReplicatedMergeTree()
ORDER BY (date)`
return fmt.Sprintf(q, tableName)
return fmt.Sprintf(q, tableName, c.ClusterName)
}

func (c *ClickhouseReplicated) InsertVersion(tableName string) string {
Expand Down
5 changes: 4 additions & 1 deletion internal/dialect/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/pressly/goose/v3/internal/cfg"
"github.com/pressly/goose/v3/internal/dialect/dialectquery"
)

Expand Down Expand Up @@ -64,7 +65,9 @@ func NewStore(d Dialect) (Store, error) {
case Clickhouse:
querier = &dialectquery.Clickhouse{}
case ClickhouseReplicated:
querier = &dialectquery.ClickhouseReplicated{}
querier = &dialectquery.ClickhouseReplicated{
ClusterName: cfg.GOOSECLICKHOUSECLUSTERNAME,
}
case Vertica:
querier = &dialectquery.Vertica{}
case Ydb:
Expand Down
21 changes: 21 additions & 0 deletions internal/testing/integration/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,27 @@ func TestClickhouseRemote(t *testing.T) {
require.Equal(t, 265, count)
}

func TestClickhouseReplicated(t *testing.T) {
t.Parallel()

db0, db1, cleanup, err := testdb.NewClickHouseReplicated(testdb.WithDebug(false))
require.NoError(t, err)
t.Cleanup(cleanup)

testDatabase(t, database.DialectClickHouseReplicated, db0, "testdata/migrations/clickhouse-replicated")

rows, err := db1.Query(`SELECT count(*) FROM clickstream`)
require.NoError(t, err)
var result int
for rows.Next() {
err = rows.Scan(&result)
require.NoError(t, err)
}
require.Equal(t, result, 3)
require.NoError(t, rows.Close())
require.NoError(t, rows.Err())
}

func TestMySQL(t *testing.T) {
t.Parallel()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-- +goose Up
CREATE TABLE IF NOT EXISTS trips ON CLUSTER '{cluster}'
(
`trip_id` UInt32,
`vendor_id` Enum8('1' = 1, '2' = 2, '3' = 3, '4' = 4, 'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14, '' = 15),
`pickup_date` Date,
`pickup_datetime` DateTime,
`dropoff_date` Date,
`dropoff_datetime` DateTime,
`store_and_fwd_flag` UInt8,
`rate_code_id` UInt8,
`pickup_longitude` Float64,
`pickup_latitude` Float64,
`dropoff_longitude` Float64,
`dropoff_latitude` Float64,
`passenger_count` UInt8,
`trip_distance` Float64,
`fare_amount` Float32,
`extra` Float32,
`mta_tax` Float32,
`tip_amount` Float32,
`tolls_amount` Float32,
`ehail_fee` Float32,
`improvement_surcharge` Float32,
`total_amount` Float32,
`payment_type` Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
`trip_type` UInt8,
`pickup` FixedString(25),
`dropoff` FixedString(25),
`cab_type` Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
`pickup_nyct2010_gid` Int8,
`pickup_ctlabel` Float32,
`pickup_borocode` Int8,
`pickup_ct2010` String,
`pickup_boroct2010` FixedString(7),
`pickup_cdeligibil` String,
`pickup_ntacode` FixedString(4),
`pickup_ntaname` String,
`pickup_puma` UInt16,
`dropoff_nyct2010_gid` UInt8,
`dropoff_ctlabel` Float32,
`dropoff_borocode` UInt8,
`dropoff_ct2010` String,
`dropoff_boroct2010` FixedString(7),
`dropoff_cdeligibil` String,
`dropoff_ntacode` FixedString(4),
`dropoff_ntaname` String,
`dropoff_puma` UInt16
)
ENGINE = ReplicatedMergeTree()
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime;

-- +goose Down
DROP TABLE IF EXISTS trips ON CLUSTER '{cluster}' SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +goose Up
CREATE TABLE IF NOT EXISTS clickstream ON CLUSTER '{cluster}' (
customer_id String,
time_stamp Date,
click_event_type String,
country_code FixedString(2),
source_id UInt64
)
ENGINE = ReplicatedMergeTree()
ORDER BY (time_stamp);

-- +goose Down
DROP TABLE IF EXISTS clickstream ON CLUSTER '{cluster}' SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- +goose Up
INSERT INTO clickstream VALUES ('customer1', '2021-10-02', 'add_to_cart', 'US', 568239 );

INSERT INTO clickstream (customer_id, time_stamp, click_event_type) VALUES ('customer2', '2021-10-30', 'remove_from_cart' );

INSERT INTO clickstream (* EXCEPT(country_code)) VALUES ('customer3', '2021-11-07', 'checkout', 307493 );

-- +goose Down
Loading

0 comments on commit cf87405

Please sign in to comment.