diff --git a/bulkerlib/implementations/sql/abstract.go b/bulkerlib/implementations/sql/abstract.go index 5125d1e..8cd63c1 100644 --- a/bulkerlib/implementations/sql/abstract.go +++ b/bulkerlib/implementations/sql/abstract.go @@ -23,6 +23,7 @@ type AbstractSQLStream struct { mode bulker.BulkMode options bulker.StreamOptions tableName string + namespace string merge bool mergeWindow int omitNils bool @@ -63,6 +64,10 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu if ps.timestampColumn != "" { ps.timestampColumn = p.ColumnName(ps.timestampColumn) } + ps.namespace = bulker.NamespaceOption.Get(&ps.options) + if ps.namespace != "" { + ps.namespace = p.TableName(ps.namespace) + } ps.omitNils = OmitNilsOption.Get(&ps.options) ps.schemaFreeze = SchemaFreezeOption.Get(&ps.options) @@ -98,7 +103,7 @@ func (ps *AbstractSQLStream) preprocess(object types.Object) (*Table, types.Obje if err != nil { return nil, nil, err } - table, processedObject := ps.sqlAdapter.TableHelper().MapTableSchema(ps.sqlAdapter, batchHeader, processedObject, ps.pkColumns, ps.timestampColumn) + table, processedObject := ps.sqlAdapter.TableHelper().MapTableSchema(ps.sqlAdapter, batchHeader, processedObject, ps.pkColumns, ps.timestampColumn, ps.namespace) ps.state.ProcessedRows++ return table, processedObject, nil } diff --git a/bulkerlib/implementations/sql/autocommit_stream.go b/bulkerlib/implementations/sql/autocommit_stream.go index d6b213f..641c1f3 100644 --- a/bulkerlib/implementations/sql/autocommit_stream.go +++ b/bulkerlib/implementations/sql/autocommit_stream.go @@ -53,7 +53,7 @@ func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (s if err != nil { return } - existingTable, err := ps.sqlAdapter.TableHelper().Get(ctx, ps.sqlAdapter, table.Name, true) + existingTable, err := ps.sqlAdapter.TableHelper().Get(ctx, ps.sqlAdapter, table.Namespace, table.Name, true) if err != nil { err = errorj.Decorate(err, "failed to get current table table") return diff --git a/bulkerlib/implementations/sql/bigdata_test.go b/bulkerlib/implementations/sql/bigdata_test.go index 19a464a..e43c97a 100644 --- a/bulkerlib/implementations/sql/bigdata_test.go +++ b/bulkerlib/implementations/sql/bigdata_test.go @@ -99,13 +99,13 @@ func testLotOfEvents(t *testing.T, testConfig bulkerTestConfig, mode bulker.Bulk PostStep("init_database", testConfig, mode, reqr, err) //clean up in case of previous test failure if !testConfig.leaveResultingTable && !forceLeaveResultingTables { - err = sqlAdapter.DropTable(ctx, tableName, true) + err = sqlAdapter.DropTable(ctx, "", tableName, true) PostStep("pre_cleanup", testConfig, mode, reqr, err) } //clean up after test run if !testConfig.leaveResultingTable && !forceLeaveResultingTables { defer func() { - _ = sqlAdapter.DropTable(ctx, tableName, true) + _ = sqlAdapter.DropTable(ctx, "", tableName, true) }() } stream, err := blk.CreateStream(t.Name(), tableName, mode, testConfig.streamOptions...) @@ -161,14 +161,14 @@ func testLotOfEvents(t *testing.T, testConfig bulkerTestConfig, mode bulker.Bulk if testConfig.expectedTable.Columns != nil && testConfig.expectedTable.Columns.Len() > 0 { //Check table schema - table, err := sqlAdapter.GetTableSchema(ctx, tableName) + table, err := sqlAdapter.GetTableSchema(ctx, "", tableName) PostStep("get_table", testConfig, mode, reqr, err) reqr.Equal(testConfig.expectedTable, table) } if testConfig.expectedRowsCount != nil { time.Sleep(1 * time.Second) //Check rows count and rows data when provided - count, err := sqlAdapter.Count(ctx, tableName, nil) + count, err := sqlAdapter.Count(ctx, "", tableName, nil) PostStep("select_count", testConfig, mode, reqr, err) reqr.Equal(testConfig.expectedRowsCount, count) } diff --git a/bulkerlib/implementations/sql/bigquery.go b/bulkerlib/implementations/sql/bigquery.go index 8c3407e..c8937e3 100644 --- a/bulkerlib/implementations/sql/bigquery.go +++ b/bulkerlib/implementations/sql/bigquery.go @@ -194,6 +194,7 @@ func (bq *BigQuery) validateOptions(streamOptions []bulker.StreamOption) error { } func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error) { + namespace := bq.namespaceName(targetTable.Namespace) if mergeWindow <= 0 { defer func() { if err != nil { @@ -206,19 +207,19 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa }) } }() - dataset := bq.client.Dataset(bq.config.Dataset) + dataset := bq.client.Dataset(namespace) copier := dataset.Table(targetTable.Name).CopierFrom(dataset.Table(sourceTable.Name)) copier.WriteDisposition = bigquery.WriteAppend copier.CreateDisposition = bigquery.CreateIfNeeded - _, state, err = bq.RunJob(ctx, copier, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name)) + _, state, err = bq.RunJob(ctx, copier, fmt.Sprintf("copy data from %s to %s", bq.fullTableName(sourceTable.Namespace, sourceTable.Name), bq.fullTableName(targetTable.Namespace, targetTable.Name))) state.Name = "copy" if err != nil { // try to insert from select as a fallback quotedColumns := sourceTable.MappedColumnNames(bq.quotedColumnName) columnsString := strings.Join(quotedColumns, ",") - insertFromSelectStatement := fmt.Sprintf(bigqueryInsertFromSelectTemplate, bq.fullTableName(targetTable.Name), columnsString, columnsString, bq.fullTableName(sourceTable.Name)) + insertFromSelectStatement := fmt.Sprintf(bigqueryInsertFromSelectTemplate, bq.fullTableName(targetTable.Namespace, targetTable.Name), columnsString, columnsString, bq.fullTableName(sourceTable.Namespace, sourceTable.Name)) query := bq.client.Query(insertFromSelectStatement) - _, state2, err := bq.RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name)) + _, state2, err := bq.RunJob(ctx, query, fmt.Sprintf("copy data from %s to %s", bq.fullTableName(sourceTable.Namespace, sourceTable.Name), bq.fullTableName(targetTable.Namespace, targetTable.Name))) state2.Name = "insert_from_select" state.Merge(state2) return state, err @@ -230,7 +231,7 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa if err != nil { err = errorj.BulkMergeError.Wrap(err, "failed to run merge"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Dataset: bq.config.Dataset, + Dataset: namespace, Bucket: bq.config.Bucket, Project: bq.config.Project, Table: targetTable.Name, @@ -252,7 +253,7 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa monthBefore := timestamp.Now().Add(time.Duration(mergeWindow) * -24 * time.Hour).Format("2006-01-02") joinConditions = append(joinConditions, fmt.Sprintf("T.%s >= '%s'", bq.quotedColumnName(targetTable.TimestampColumn), monthBefore)) } - insertFromSelectStatement := fmt.Sprintf(bigqueryMergeTemplate, bq.fullTableName(targetTable.Name), bq.fullTableName(sourceTable.Name), + insertFromSelectStatement := fmt.Sprintf(bigqueryMergeTemplate, bq.fullTableName(targetTable.Namespace, targetTable.Name), bq.fullTableName(sourceTable.Namespace, sourceTable.Name), strings.Join(joinConditions, " AND "), strings.Join(updateSet, ", "), columnsString, columnsString) query := bq.client.Query(insertFromSelectStatement) @@ -285,11 +286,11 @@ func (bq *BigQuery) Ping(ctx context.Context) error { } // GetTableSchema return google BigQuery table (name,columns) representation wrapped in Table struct -func (bq *BigQuery) GetTableSchema(ctx context.Context, tableName string) (*Table, error) { +func (bq *BigQuery) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error) { tableName = bq.TableName(tableName) - table := &Table{Name: tableName, Columns: NewColumns(), PKFields: types.NewOrderedSet[string]()} - - bqTable := bq.client.Dataset(bq.config.Dataset).Table(tableName) + namespace = bq.namespaceName(namespace) + table := &Table{Name: tableName, Namespace: namespace, Columns: NewColumns(), PKFields: types.NewOrderedSet[string]()} + bqTable := bq.client.Dataset(namespace).Table(tableName) meta, err := bqTable.Metadata(ctx) if err != nil { @@ -299,7 +300,7 @@ func (bq *BigQuery) GetTableSchema(ctx context.Context, tableName string) (*Tabl return nil, errorj.GetTableError.Wrap(err, "failed to get table"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Dataset: bq.config.Dataset, + Dataset: namespace, Bucket: bq.config.Bucket, Project: bq.config.Project, Table: tableName, @@ -340,8 +341,13 @@ func (bq *BigQuery) GetTableSchema(ctx context.Context, tableName string) (*Tabl // CreateTable creates google BigQuery table from Table func (bq *BigQuery) CreateTable(ctx context.Context, table *Table) (err error) { + err = bq.createDatasetIfNotExists(ctx, table.Namespace) + if err != nil { + return err + } tableName := bq.TableName(table.Name) - bqTable := bq.client.Dataset(bq.config.Dataset).Table(tableName) + namespace := bq.namespaceName(table.Namespace) + bqTable := bq.client.Dataset(namespace).Table(tableName) _, err = bqTable.Metadata(ctx) if err == nil { @@ -352,7 +358,7 @@ func (bq *BigQuery) CreateTable(ctx context.Context, table *Table) (err error) { if !isNotFoundErr(err) { return errorj.GetTableError.Wrap(err, "failed to get table metadata"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Dataset: bq.config.Dataset, + Dataset: namespace, Bucket: bq.config.Bucket, Project: bq.config.Project, Table: tableName, @@ -401,12 +407,12 @@ func (bq *BigQuery) CreateTable(ctx context.Context, table *Table) (err error) { if table.Temporary { tableMetaData.ExpirationTime = time.Now().Add(time.Hour) } - bq.logQuery("CREATE table for schema: ", tableMetaData, nil) + bq.logQuery(fmt.Sprintf("CREATE table %s with schema: ", bq.fullTableName(table.Namespace, table.Name)), tableMetaData, nil) if err := bqTable.Create(ctx, &tableMetaData); err != nil { schemaJson, _ := bqSchema.ToJSONFields() return errorj.GetTableError.Wrap(err, "failed to create table"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Dataset: bq.config.Dataset, + Dataset: namespace, Bucket: bq.config.Bucket, Project: bq.config.Project, Table: tableName, @@ -417,9 +423,14 @@ func (bq *BigQuery) CreateTable(ctx context.Context, table *Table) (err error) { return nil } -// InitDatabase creates google BigQuery Dataset if doesn't exist -func (bq *BigQuery) InitDatabase(ctx context.Context) error { - dataset := bq.config.Dataset +func (bq *BigQuery) createDatasetIfNotExists(ctx context.Context, dataset string) error { + if dataset == "" { + return nil + } + dataset = bq.namespaceName(dataset) + if dataset == "" { + return nil + } bqDataset := bq.client.Dataset(dataset) if _, err := bqDataset.Metadata(ctx); err != nil { if isNotFoundErr(err) { @@ -442,14 +453,20 @@ func (bq *BigQuery) InitDatabase(ctx context.Context) error { return nil } +// InitDatabase creates google BigQuery Dataset if doesn't exist +func (bq *BigQuery) InitDatabase(ctx context.Context) error { + return bq.createDatasetIfNotExists(ctx, bq.config.Dataset) +} + // PatchTableSchema adds Table columns to google BigQuery table func (bq *BigQuery) PatchTableSchema(ctx context.Context, patchSchema *Table) error { - bqTable := bq.client.Dataset(bq.config.Dataset).Table(patchSchema.Name) + namespace := bq.namespaceName(patchSchema.Namespace) + bqTable := bq.client.Dataset(namespace).Table(patchSchema.Name) metadata, err := bqTable.Metadata(ctx) if err != nil { return errorj.PatchTableError.Wrap(err, "failed to get table metadata"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Dataset: bq.config.Dataset, + Dataset: namespace, Bucket: bq.config.Bucket, Project: bq.config.Project, Table: patchSchema.Name, @@ -490,7 +507,7 @@ func (bq *BigQuery) PatchTableSchema(ctx context.Context, patchSchema *Table) er schemaJson, _ := metadata.Schema.ToJSONFields() return errorj.PatchTableError.Wrap(err, "failed to patch table"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Dataset: bq.config.Dataset, + Dataset: namespace, Bucket: bq.config.Bucket, Project: bq.config.Project, Table: patchSchema.Name, @@ -501,13 +518,14 @@ func (bq *BigQuery) PatchTableSchema(ctx context.Context, patchSchema *Table) er return nil } -func (bq *BigQuery) DeletePartition(ctx context.Context, tableName string, datePartiton *DatePartition) error { +func (bq *BigQuery) DeletePartition(ctx context.Context, namespace, tableName string, datePartiton *DatePartition) error { tableName = bq.TableName(tableName) + namespace = bq.namespaceName(namespace) partitions := GranularityToPartitionIds(datePartiton.Granularity, datePartiton.Value) for _, partition := range partitions { bq.logQuery("DELETE partition "+partition+" in table"+tableName, "", nil) bq.Infof("Deletion partition %s in table %s", partition, tableName) - if err := bq.client.Dataset(bq.config.Dataset).Table(tableName + "$" + partition).Delete(ctx); err != nil { + if err := bq.client.Dataset(namespace).Table(tableName + "$" + partition).Delete(ctx); err != nil { gerr, ok := err.(*googleapi.Error) if ok && gerr.Code == 404 { bq.Infof("Partition %s$%s was not found", tableName, partition) @@ -515,7 +533,7 @@ func (bq *BigQuery) DeletePartition(ctx context.Context, tableName string, dateP } return errorj.DeleteFromTableError.Wrap(err, "failed to delete partition"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Dataset: bq.config.Dataset, + Dataset: namespace, Bucket: bq.config.Bucket, Project: bq.config.Project, Table: tableName, @@ -557,7 +575,8 @@ func GranularityToPartitionIds(g Granularity, t time.Time) []string { } func (bq *BigQuery) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) (err error) { - inserter := bq.client.Dataset(bq.config.Dataset).Table(table.Name).Inserter() + namespace := bq.namespaceName(table.Namespace) + inserter := bq.client.Dataset(namespace).Table(table.Name).Inserter() bq.logQuery(fmt.Sprintf("Inserting [%d] values to table %s using BigQuery Streaming API with chunks [%d]: ", len(objects), table.Name, bigqueryRowsLimitPerInsertOperation), objects, nil) items := make([]*BQItem, 0, bigqueryRowsLimitPerInsertOperation) @@ -569,7 +588,7 @@ func (bq *BigQuery) Insert(ctx context.Context, table *Table, merge bool, object if err := bq.insertItems(ctx, inserter, items); err != nil { return errorj.ExecuteInsertInBatchError.Wrap(err, "failed to execute middle insert %d of %d in batch", operation, operations). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Dataset: bq.config.Dataset, + Dataset: namespace, Bucket: bq.config.Bucket, Project: bq.config.Project, Table: table.Name, @@ -600,11 +619,12 @@ func (bq *BigQuery) Insert(ctx context.Context, table *Table, merge bool, object func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) { tableName := bq.TableName(targetTable.Name) + dataset := bq.namespaceName(targetTable.Namespace) defer func() { if err != nil { err = errorj.ExecuteInsertInBatchError.Wrap(err, "failed to execute middle insert batch"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Dataset: bq.config.Dataset, + Dataset: dataset, Bucket: bq.config.Bucket, Project: bq.config.Project, Table: tableName, @@ -621,8 +641,11 @@ func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSourc defer func() { _ = file.Close() }() - bqTable := bq.client.Dataset(bq.config.Dataset).Table(tableName) + bqTable := bq.client.Dataset(dataset).Table(tableName) meta, err := bqTable.Metadata(ctx) + if err != nil { + return state, err + } if loadSource.Format == types2.FileFormatCSV { //sort meta schema field order to match csv file column order @@ -660,20 +683,21 @@ func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSourc UseAvroLogicalTypes: true, } } - loader := bq.client.Dataset(bq.config.Dataset).Table(tableName).LoaderFrom(source) + loader := bq.client.Dataset(dataset).Table(tableName).LoaderFrom(source) loader.CreateDisposition = bigquery.CreateIfNeeded loader.WriteDisposition = bigquery.WriteAppend - _, state, err = bq.RunJob(ctx, loader, fmt.Sprintf("load into table '%s'", tableName)) + _, state, err = bq.RunJob(ctx, loader, fmt.Sprintf("load into table %s", bq.fullTableName(dataset, tableName))) state.Name = "load" return state, err } // DropTable drops table from BigQuery -func (bq *BigQuery) DropTable(ctx context.Context, tableName string, ifExists bool) error { +func (bq *BigQuery) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error { tableName = bq.TableName(tableName) - bq.logQuery(fmt.Sprintf("DROP table '%s' if exists: %t", tableName, ifExists), nil, nil) + dataset := bq.namespaceName(namespace) + bq.logQuery(fmt.Sprintf("DROP table %s if exists: %t", bq.fullTableName(dataset, tableName), ifExists), nil, nil) - bqTable := bq.client.Dataset(bq.config.Dataset).Table(tableName) + bqTable := bq.client.Dataset(dataset).Table(tableName) _, err := bqTable.Metadata(ctx) gerr, ok := err.(*googleapi.Error) if ok && gerr.Code == 404 && ifExists { @@ -682,7 +706,7 @@ func (bq *BigQuery) DropTable(ctx context.Context, tableName string, ifExists bo if err := bqTable.Delete(ctx); err != nil { return errorj.DropError.Wrap(err, "failed to drop table"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Dataset: bq.config.Dataset, + Dataset: dataset, Bucket: bq.config.Bucket, Project: bq.config.Project, Table: tableName, @@ -693,11 +717,12 @@ func (bq *BigQuery) DropTable(ctx context.Context, tableName string, ifExists bo } func (bq *BigQuery) Drop(ctx context.Context, table *Table, ifExists bool) error { - return bq.DropTable(ctx, table.Name, ifExists) + return bq.DropTable(ctx, table.Namespace, table.Name, ifExists) } func (bq *BigQuery) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error) { targetTableName = bq.TableName(targetTableName) + namespace := bq.namespaceName(replacementTable.Namespace) replacementTableName := bq.TableName(replacementTable.Name) defer func() { if err != nil { @@ -710,7 +735,7 @@ func (bq *BigQuery) ReplaceTable(ctx context.Context, targetTableName string, re }) } }() - dataset := bq.client.Dataset(bq.config.Dataset) + dataset := bq.client.Dataset(namespace) copier := dataset.Table(targetTableName).CopierFrom(dataset.Table(replacementTableName)) copier.WriteDisposition = bigquery.WriteTruncate copier.CreateDisposition = bigquery.CreateIfNeeded @@ -719,16 +744,16 @@ func (bq *BigQuery) ReplaceTable(ctx context.Context, targetTableName string, re return err } if dropOldTable { - return bq.DropTable(ctx, replacementTableName, false) + return bq.DropTable(ctx, replacementTable.Namespace, replacementTableName, false) } else { return nil } } // TruncateTable deletes all records in tableName table -func (bq *BigQuery) TruncateTable(ctx context.Context, tableName string) error { +func (bq *BigQuery) TruncateTable(ctx context.Context, namespace string, tableName string) error { tableName = bq.TableName(tableName) - query := fmt.Sprintf(bigqueryTruncateTemplate, bq.fullTableName(tableName)) + query := fmt.Sprintf(bigqueryTruncateTemplate, bq.fullTableName(namespace, tableName)) bq.logQuery(query, nil, nil) if _, err := bq.client.Query(query).Read(ctx); err != nil { extraText := "" @@ -820,7 +845,7 @@ func (bqi *BQItem) Save() (row map[string]bigquery.Value, insertID string, err e return } -func (bq *BigQuery) Update(ctx context.Context, tableName string, object types2.Object, whenConditions *WhenConditions) (err error) { +func (bq *BigQuery) Update(ctx context.Context, namespace, tableName string, object types2.Object, whenConditions *WhenConditions) (err error) { tableName = bq.TableName(tableName) updateCondition, updateValues := bq.toWhenConditions(whenConditions) @@ -835,7 +860,7 @@ func (bq *BigQuery) Update(ctx context.Context, tableName string, object types2. for a := 0; a < len(updateValues); a++ { values[i+a] = updateValues[a] } - updateQuery := fmt.Sprintf(bigqueryUpdateTemplate, bq.fullTableName(tableName), strings.Join(columns, ", "), updateCondition) + updateQuery := fmt.Sprintf(bigqueryUpdateTemplate, bq.fullTableName(namespace, tableName), strings.Join(columns, ", "), updateCondition) defer func() { v := make([]any, len(values)) for i, value := range values { @@ -858,10 +883,10 @@ func (bq *BigQuery) Update(ctx context.Context, tableName string, object types2. return err } -func (bq *BigQuery) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { - return bq.selectFrom(ctx, tableName, "*", whenConditions, orderBy) +func (bq *BigQuery) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { + return bq.selectFrom(ctx, namespace, tableName, "*", whenConditions, orderBy) } -func (bq *BigQuery) selectFrom(ctx context.Context, tableName string, selectExpression string, whenConditions *WhenConditions, orderBy []string) (res []map[string]any, err error) { +func (bq *BigQuery) selectFrom(ctx context.Context, namespace, tableName string, selectExpression string, whenConditions *WhenConditions, orderBy []string) (res []map[string]any, err error) { tableName = bq.TableName(tableName) whenCondition, values := bq.toWhenConditions(whenConditions) if whenCondition != "" { @@ -875,7 +900,7 @@ func (bq *BigQuery) selectFrom(ctx context.Context, tableName string, selectExpr } orderByClause = " ORDER BY " + strings.Join(quotedOrderByColumns, ", ") } - selectQuery := fmt.Sprintf(bigquerySelectTemplate, selectExpression, bq.fullTableName(tableName), whenCondition, orderByClause) + selectQuery := fmt.Sprintf(bigquerySelectTemplate, selectExpression, bq.fullTableName(namespace, tableName), whenCondition, orderByClause) defer func() { v := make([]any, len(values)) @@ -895,7 +920,7 @@ func (bq *BigQuery) selectFrom(ctx context.Context, tableName string, selectExpr query := bq.client.Query(selectQuery) query.Parameters = values - job, _, err := bq.RunJob(ctx, query, fmt.Sprintf("select from table '%s'", tableName)) + job, _, err := bq.RunJob(ctx, query, fmt.Sprintf("select from table %s", bq.fullTableName(namespace, tableName))) if err != nil { return nil, err } @@ -928,8 +953,8 @@ func (bq *BigQuery) selectFrom(ctx context.Context, tableName string, selectExpr return result, nil } -func (bq *BigQuery) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error) { - res, err := bq.selectFrom(ctx, tableName, "count(*) as jitsu_count", whenConditions, nil) +func (bq *BigQuery) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error) { + res, err := bq.selectFrom(ctx, namespace, tableName, "count(*) as jitsu_count", whenConditions, nil) if err != nil { return -1, err } @@ -959,13 +984,13 @@ func (bq *BigQuery) toWhenConditions(conditions *WhenConditions) (string, []bigq return strings.Join(queryConditions, " "+conditions.JoinCondition+" "), values } -func (bq *BigQuery) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) (err error) { +func (bq *BigQuery) Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) (err error) { tableName = bq.TableName(tableName) whenCondition, values := bq.toWhenConditions(deleteConditions) if len(whenCondition) == 0 { return errors.New("delete conditions are empty") } - deleteQuery := fmt.Sprintf(bigqueryDeleteTemplate, bq.fullTableName(tableName), whenCondition) + deleteQuery := fmt.Sprintf(bigqueryDeleteTemplate, bq.fullTableName(namespace, tableName), whenCondition) defer func() { v := make([]any, len(values)) for i, value := range values { @@ -998,8 +1023,8 @@ func (bq *BigQuery) OpenTx(ctx context.Context) (*TxSQLAdapter, error) { return &TxSQLAdapter{sqlAdapter: bq, tx: NewDummyTxWrapper(bq.Type())}, nil } -func (bq *BigQuery) fullTableName(tableName string) string { - return fmt.Sprintf("`%s`.`%s`", bq.config.Project, bq.config.Dataset) + "." + bq.tableHelper.quotedTableName(tableName) +func (bq *BigQuery) fullTableName(namespace, tableName string) string { + return fmt.Sprintf("`%s`.`%s`", bq.config.Project, bq.namespaceName(namespace)) + "." + bq.tableHelper.quotedTableName(tableName) } func tableNameFunc(identifier string, alphanumeric bool) (adapted string, needQuote bool) { @@ -1089,6 +1114,18 @@ func (bq *BigQuery) TableHelper() *TableHelper { return &bq.tableHelper } +func (bq *BigQuery) DefaultNamespace() string { + return bq.config.Dataset +} + +func (bq *BigQuery) TmpNamespace(namespace string) string { + return namespace +} + +func (bq *BigQuery) namespaceName(namespace string) string { + return bq.tableHelper.TableName(utils.DefaultString(namespace, bq.config.Dataset)) +} + type JobRunner interface { Run(ctx context.Context) (*bigquery.Job, error) } diff --git a/bulkerlib/implementations/sql/bulker_test.go b/bulkerlib/implementations/sql/bulker_test.go index 99bc421..6138e91 100644 --- a/bulkerlib/implementations/sql/bulker_test.go +++ b/bulkerlib/implementations/sql/bulker_test.go @@ -44,9 +44,10 @@ type StepFunction func(testConfig bulkerTestConfig, mode bulker.BulkMode) error var configRegistry = map[string]any{} type ExpectedTable struct { - Name string - PKFields []string - Columns Columns + Name string + Namespace string + PKFields []string + Columns Columns } var postgresContainer *testcontainers2.PostgresContainer @@ -212,6 +213,7 @@ type bulkerTestConfig struct { name string //tableName name of the destination table. Leave empty generate automatically tableName string + namespace string //bulker config config *bulker.Config //for which bulker predefined configurations to run test @@ -412,6 +414,28 @@ func TestBasics(t *testing.T) { }, })}, }, + { + name: "namespace", + modes: []bulker.BulkMode{bulker.Batch, bulker.Stream, bulker.ReplaceTable, bulker.ReplacePartition}, + expectPartitionId: true, + namespace: "bnsp", + dataFile: "test_data/columns_added.ndjson", + expectedTable: ExpectedTable{ + Columns: justColumns("_timestamp", "id", "name", "column1", "column2", "column3"), + }, + expectedRowsCount: 6, + expectedRows: []map[string]any{ + {"_timestamp": constantTime, "id": 1, "name": "test", "column1": nil, "column2": nil, "column3": nil}, + {"_timestamp": constantTime, "id": 2, "name": "test2", "column1": "data", "column2": nil, "column3": nil}, + {"_timestamp": constantTime, "id": 3, "name": "test3", "column1": "data", "column2": "data", "column3": nil}, + {"_timestamp": constantTime, "id": 4, "name": "test2", "column1": "data", "column2": nil, "column3": nil}, + {"_timestamp": constantTime, "id": 5, "name": "test", "column1": nil, "column2": nil, "column3": nil}, + {"_timestamp": constantTime, "id": 6, "name": "test4", "column1": "data", "column2": "data", "column3": "data"}, + }, + streamOptions: []bulker.StreamOption{bulker.WithNamespace("bnsp")}, + expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, + configIds: allBulkerConfigs, + }, } for _, tt := range tests { tt := tt @@ -471,17 +495,18 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode) reqr.True(ok) ctx := context.Background() id, tableName := testConfig.getIdAndTableName(mode) + namespace := utils.DefaultString(testConfig.namespace, sqlAdapter.DefaultNamespace()) err = sqlAdapter.InitDatabase(ctx) PostStep("init_database", testConfig, mode, reqr, err) //clean up in case of previous test failure if !testConfig.leaveResultingTable && !forceLeaveResultingTables { - err = sqlAdapter.DropTable(ctx, tableName, true) - PostStep("pre_cleanup", testConfig, mode, reqr, err) + _ = sqlAdapter.DropTable(ctx, namespace, tableName, true) + //PostStep("pre_cleanup", testConfig, mode, reqr, err) } //clean up after test run if !testConfig.leaveResultingTable && !forceLeaveResultingTables { defer func() { - _ = sqlAdapter.DropTable(ctx, tableName, true) + _ = sqlAdapter.DropTable(ctx, namespace, tableName, true) }() } stream, err := blk.CreateStream(id, tableName, mode, testConfig.streamOptions...) @@ -540,7 +565,7 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode) //PostStep("state_lasterror", testConfig, mode, reqr, state.LastError) if testConfig.expectedTable.Columns.Len() > 0 { //Check table schema - table, err := sqlAdapter.GetTableSchema(ctx, tableName) + table, err := sqlAdapter.GetTableSchema(ctx, namespace, tableName) PostStep("get_table", testConfig, mode, reqr, err) switch testConfig.expectedTableTypeChecking { case TypeCheckingDisabled: @@ -604,6 +629,7 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode) table.PrimaryKeyName = "" expectedTable := &Table{ Name: testConfig.expectedTable.Name, + Namespace: utils.Nvl(testConfig.expectedTable.Namespace, namespace), PrimaryKeyName: "", PKFields: expectedPKFields, Columns: testConfig.expectedTable.Columns, @@ -623,7 +649,7 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode) if testConfig.expectedRowsCount != nil || testConfig.expectedRows != nil { time.Sleep(1 * time.Second) //Check rows count and rows data when provided - rows, err := sqlAdapter.Select(ctx, tableName, nil, testConfig.orderBy) + rows, err := sqlAdapter.Select(ctx, namespace, tableName, nil, testConfig.orderBy) PostStep("select_result", testConfig, mode, reqr, err) if testConfig.expectedRows == nil { reqr.Equal(testConfig.expectedRowsCount, len(rows)) diff --git a/bulkerlib/implementations/sql/clickhouse.go b/bulkerlib/implementations/sql/clickhouse.go index 5875e79..ff4fad6 100644 --- a/bulkerlib/implementations/sql/clickhouse.go +++ b/bulkerlib/implementations/sql/clickhouse.go @@ -36,29 +36,29 @@ const ( chDatabaseQuery = "SELECT name FROM system.databases where name = ?" chClusterQuery = "SELECT max(shard_num) > 1 FROM system.clusters where cluster = ?" - chCreateDatabaseTemplate = `CREATE DATABASE IF NOT EXISTS ? %s` + chCreateDatabaseTemplate = `CREATE DATABASE IF NOT EXISTS %s %s` chTableSchemaQuery = `SELECT name, type FROM system.columns WHERE database = ? and table = ? and default_kind not in ('MATERIALIZED', 'ALIAS', 'EPHEMERAL') order by position` chPrimaryKeyFieldsQuery = `SELECT primary_key FROM system.tables WHERE database = ? and table = ?` chOnClusterClauseTemplate = " ON CLUSTER `%s` " chNullableColumnTemplate = ` Nullable(%s) ` - chCreateDistributedTableTemplate = `CREATE TABLE %s %s AS %s ENGINE = Distributed(%s,%s,%s,%s)` - chAlterTableTemplate = `ALTER TABLE %s %s %s` - chDeleteBeforeBulkMergeUsing = `ALTER TABLE %s %s DELETE WHERE %s in (select %s from %s)` + chCreateDistributedTableTemplate = `CREATE TABLE %s%s %s AS %s%s ENGINE = Distributed(%s,%s,%s,%s)` + chAlterTableTemplate = `ALTER TABLE %s%s %s %s` + chDeleteBeforeBulkMergeUsing = `ALTER TABLE %s%s %s DELETE WHERE %s in (select %s from %s)` //chDeleteBeforeBulkMergeUsing = `DELETE FROM %s %s WHERE %s in (select %s from %s)` - chDeleteQueryTemplate = `ALTER TABLE %s %s DELETE WHERE %s` + chDeleteQueryTemplate = `ALTER TABLE %s%s %s DELETE WHERE %s` - chCreateTableTemplate = `CREATE TABLE %s %s (%s) %s %s %s %s` - chDropTableTemplate = `DROP TABLE %s%s %s` - chTruncateTableTemplate = `TRUNCATE TABLE IF EXISTS %s %s` - chExchangeTableTemplate = `EXCHANGE TABLES %s AND %s %s` - chRenameTableTemplate = `RENAME TABLE %s TO %s %s` + chCreateTableTemplate = `CREATE TABLE %s%s %s (%s) %s %s %s %s` + chDropTableTemplate = `DROP TABLE %s %s%s %s` + chTruncateTableTemplate = `TRUNCATE TABLE IF EXISTS %s%s %s` + chExchangeTableTemplate = `EXCHANGE TABLES %s%s AND %s%s %s` + chRenameTableTemplate = `RENAME TABLE %s%s TO %s%s %s` - chSelectFinalStatement = `SELECT %s FROM %s FINAL %s%s` - chLoadStatement = `INSERT INTO %s (%s) VALUES %s` - chLoadJSONStatement = `INSERT INTO %s format JSONEachRow` + chSelectFinalStatement = `SELECT %s FROM %s%s FINAL %s%s` + chLoadStatement = `INSERT INTO %s%s (%s) VALUES %s` + chLoadJSONStatement = `INSERT INTO %s%s format JSONEachRow` chDateFormat = `2006-01-02 15:04:05.000000` ) @@ -222,7 +222,7 @@ func NewClickHouse(bulkerConfig bulkerlib.Config) (bulkerlib.Bulker, error) { if bulkerConfig.LogLevel == bulkerlib.Verbose { queryLogger = logging.NewQueryLogger(bulkerConfig.Id, os.Stderr, os.Stderr) } - sqlAdapterBase, err := newSQLAdapterBase(bulkerConfig.Id, ClickHouseBulkerTypeId, config, dbConnectFunction, clickhouseTypes, queryLogger, chTypecastFunc, QuestionMarkParameterPlaceholder, columnDDlFunc, chReformatValue, checkErr, false) + sqlAdapterBase, err := newSQLAdapterBase(bulkerConfig.Id, ClickHouseBulkerTypeId, config, config.Database, dbConnectFunction, clickhouseTypes, queryLogger, chTypecastFunc, QuestionMarkParameterPlaceholder, columnDDlFunc, chReformatValue, checkErr, false) sqlAdapterBase.batchFileFormat = types.FileFormatNDJSON c := &ClickHouse{ @@ -330,25 +330,40 @@ func (ch *ClickHouse) OpenTx(ctx context.Context) (*TxSQLAdapter, error) { return &TxSQLAdapter{sqlAdapter: ch, tx: NewDbWrapper(ch.Type(), db, ch.queryLogger, ch.checkErrFunc, true)}, nil } -// InitDatabase create database instance if doesn't exist -func (ch *ClickHouse) InitDatabase(ctx context.Context) error { +func (ch *ClickHouse) createDatabaseIfNotExists(ctx context.Context, db string) error { + if db == "" { + return nil + } var dbname string - row := ch.txOrDb(ctx).QueryRowContext(ctx, chDatabaseQuery, ch.config.Database) + db = ch.namespaceName(db) + if db == "" { + return nil + } + row := ch.txOrDb(ctx).QueryRowContext(ctx, chDatabaseQuery, db) if row != nil { _ = row.Scan(&dbname) } if dbname == "" { - query := fmt.Sprintf(chCreateDatabaseTemplate, ch.getOnClusterClause()) + query := fmt.Sprintf(chCreateDatabaseTemplate, db, ch.getOnClusterClause()) - if _, err := ch.txOrDb(ctx).ExecContext(ctx, query, ch.config.Database); err != nil { + if _, err := ch.txOrDb(ctx).ExecContext(ctx, query); err != nil { return errorj.CreateSchemaError.Wrap(err, "failed to create db schema"). WithProperty(errorj.DBInfo, &types.ErrorPayload{ - Database: ch.config.Database, + Database: db, Cluster: ch.config.Cluster, Statement: query, }) } } + return nil +} + +// InitDatabase create database instance if doesn't exist +func (ch *ClickHouse) InitDatabase(ctx context.Context) error { + err := ch.createDatabaseIfNotExists(ctx, ch.config.Database) + if err != nil { + return err + } if ch.config.Cluster != "" { var distributed bool err := ch.txOrDb(ctx).QueryRowContext(ctx, chClusterQuery, ch.config.Cluster).Scan(&distributed) @@ -371,6 +386,12 @@ func (ch *ClickHouse) InitDatabase(ctx context.Context) error { // CreateTable create database table with name,columns provided in Table representation // New tables will have MergeTree() or ReplicatedMergeTree() engine depends on config.cluster empty or not func (ch *ClickHouse) CreateTable(ctx context.Context, table *Table) error { + err := ch.createDatabaseIfNotExists(ctx, table.Namespace) + if err != nil { + return err + } + quotedSchema := ch.namespacePrefix(table.Namespace) + if table.Temporary { table := table.Clone() table.PKFields = types2.NewOrderedSet[string]() @@ -378,7 +399,7 @@ func (ch *ClickHouse) CreateTable(ctx context.Context, table *Table) error { return ch.columnDDL(name, table, column) }) - query := fmt.Sprintf(createTableTemplate, "TEMPORARY", ch.quotedTableName(table.Name), strings.Join(columnsDDL, ", ")) + query := fmt.Sprintf(createTableTemplate, "TEMPORARY", quotedSchema, ch.quotedTableName(table.Name), strings.Join(columnsDDL, ", ")) if _, err := ch.txOrDb(ctx).ExecContext(ctx, query); err != nil { return errorj.CreateTableError.Wrap(err, "failed to create table"). @@ -393,7 +414,7 @@ func (ch *ClickHouse) CreateTable(ctx context.Context, table *Table) error { return ch.columnDDL(name, table, column) }) - statementStr := ch.tableStatementFactory.CreateTableStatement(ch.quotedLocalTableName(table.Name), ch.TableName(table.Name), strings.Join(columnsDDL, ","), table) + statementStr := ch.tableStatementFactory.CreateTableStatement(quotedSchema, ch.quotedLocalTableName(table.Name), ch.TableName(table.Name), strings.Join(columnsDDL, ","), table) if _, err := ch.txOrDb(ctx).ExecContext(ctx, statementStr); err != nil { return errorj.CreateTableError.Wrap(err, "failed to create table"). @@ -415,16 +436,17 @@ func (ch *ClickHouse) CreateTable(ctx context.Context, table *Table) error { } // GetTableSchema return table (name,columns with name and types) representation wrapped in Table struct -func (ch *ClickHouse) GetTableSchema(ctx context.Context, tableName string) (*Table, error) { +func (ch *ClickHouse) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error) { //local table name since schema of distributed table lacks primary keys queryTableName := ch.TableName(ch.localTableName(tableName)) + namespace = ch.namespaceName(namespace) tableName = ch.TableName(tableName) - table := &Table{Name: tableName, Columns: NewColumns(), PKFields: types2.NewOrderedSet[string]()} - rows, err := ch.txOrDb(ctx).QueryContext(ctx, chTableSchemaQuery, ch.config.Database, queryTableName) + table := &Table{Name: tableName, Namespace: namespace, Columns: NewColumns(), PKFields: types2.NewOrderedSet[string]()} + rows, err := ch.txOrDb(ctx).QueryContext(ctx, chTableSchemaQuery, namespace, queryTableName) if err != nil { return nil, errorj.GetTableError.Wrap(err, "failed to get table columns"). WithProperty(errorj.DBInfo, &types.ErrorPayload{ - Database: ch.config.Database, + Database: namespace, Cluster: ch.config.Cluster, Table: tableName, PrimaryKeys: table.GetPKFields(), @@ -439,7 +461,7 @@ func (ch *ClickHouse) GetTableSchema(ctx context.Context, tableName string) (*Ta if err := rows.Scan(&columnName, &columnClickhouseType); err != nil { return nil, errorj.GetTableError.Wrap(err, "failed to scan result"). WithProperty(errorj.DBInfo, &types.ErrorPayload{ - Database: ch.config.Database, + Database: namespace, Cluster: ch.config.Cluster, Table: tableName, PrimaryKeys: table.GetPKFields(), @@ -453,7 +475,7 @@ func (ch *ClickHouse) GetTableSchema(ctx context.Context, tableName string) (*Ta if err := rows.Err(); err != nil { return nil, errorj.GetTableError.Wrap(err, "failed read last row"). WithProperty(errorj.DBInfo, &types.ErrorPayload{ - Database: ch.config.Database, + Database: namespace, Cluster: ch.config.Cluster, Table: tableName, PrimaryKeys: table.GetPKFields(), @@ -461,7 +483,7 @@ func (ch *ClickHouse) GetTableSchema(ctx context.Context, tableName string) (*Ta Values: []interface{}{ch.config.Database, queryTableName}, }) } - primaryKeyName, pkFields, err := ch.getPrimaryKey(ctx, tableName) + primaryKeyName, pkFields, err := ch.getPrimaryKey(ctx, namespace, tableName) if err != nil { return nil, err } @@ -474,14 +496,15 @@ func (ch *ClickHouse) GetTableSchema(ctx context.Context, tableName string) (*Ta } // getPrimaryKey returns primary key name and fields -func (ch *ClickHouse) getPrimaryKey(ctx context.Context, tableName string) (string, types2.OrderedSet[string], error) { +func (ch *ClickHouse) getPrimaryKey(ctx context.Context, namespace, tableName string) (string, types2.OrderedSet[string], error) { tableName = ch.TableName(tableName) + namespace = ch.namespaceName(namespace) queryTableName := ch.TableName(ch.localTableName(tableName)) - pkFieldsRows, err := ch.txOrDb(ctx).QueryContext(ctx, chPrimaryKeyFieldsQuery, ch.config.Database, queryTableName) + pkFieldsRows, err := ch.txOrDb(ctx).QueryContext(ctx, chPrimaryKeyFieldsQuery, namespace, queryTableName) if err != nil { return "", types2.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed to get primary key"). WithProperty(errorj.DBInfo, &types.ErrorPayload{ - Database: ch.config.Database, + Database: namespace, Cluster: ch.config.Cluster, Table: tableName, Statement: chPrimaryKeyFieldsQuery, @@ -494,7 +517,7 @@ func (ch *ClickHouse) getPrimaryKey(ctx context.Context, tableName string) (stri if err = pkFieldsRows.Scan(&pkString); err != nil { return "", types2.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed to scan result"). WithProperty(errorj.DBInfo, &types.ErrorPayload{ - Database: ch.config.Database, + Database: namespace, Cluster: ch.config.Cluster, Table: tableName, Statement: chPrimaryKeyFieldsQuery, @@ -520,8 +543,9 @@ func (ch *ClickHouse) PatchTableSchema(ctx context.Context, patchSchema *Table) addedColumnsDDL := patchSchema.MappedColumns(func(columnName string, column types.SQLColumn) string { return "ADD COLUMN " + ch.columnDDL(columnName, patchSchema, column) }) + namespace := ch.namespacePrefix(patchSchema.Namespace) - query := fmt.Sprintf(chAlterTableTemplate, ch.quotedLocalTableName(patchSchema.Name), ch.getOnClusterClause(), strings.Join(addedColumnsDDL, ", ")) + query := fmt.Sprintf(chAlterTableTemplate, namespace, ch.quotedLocalTableName(patchSchema.Name), ch.getOnClusterClause(), strings.Join(addedColumnsDDL, ", ")) if _, err := ch.txOrDb(ctx).ExecContext(ctx, query); err != nil { return errorj.PatchTableError.Wrap(err, "failed to patch table"). @@ -535,13 +559,13 @@ func (ch *ClickHouse) PatchTableSchema(ctx context.Context, patchSchema *Table) } if ch.distributed.Load() { - query := fmt.Sprintf(chAlterTableTemplate, ch.quotedTableName(patchSchema.Name), ch.getOnClusterClause(), strings.Join(addedColumnsDDL, ", ")) + query := fmt.Sprintf(chAlterTableTemplate, namespace, ch.quotedTableName(patchSchema.Name), ch.getOnClusterClause(), strings.Join(addedColumnsDDL, ", ")) _, err := ch.txOrDb(ctx).ExecContext(ctx, query) if err != nil { ch.Errorf("Error altering distributed table for [%s] with statement [%s]: %v", patchSchema.Name, query, err) // fallback for older clickhouse versions: drop and create distributed table if ReplicatedMergeTree engine - ch.dropTable(ctx, ch.quotedTableName(patchSchema.Name), ch.getOnClusterClause(), true) + ch.dropTable(ctx, namespace, ch.quotedTableName(patchSchema.Name), ch.getOnClusterClause(), true) return ch.createDistributedTableInTransaction(ctx, patchSchema) } @@ -550,32 +574,32 @@ func (ch *ClickHouse) PatchTableSchema(ctx context.Context, patchSchema *Table) return nil } -func (ch *ClickHouse) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { +func (ch *ClickHouse) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { tableName = ch.TableName(tableName) - table, err := ch.GetTableSchema(ctx, tableName) + table, err := ch.GetTableSchema(ctx, namespace, tableName) if err != nil { return nil, err } if table.PKFields.Size() > 0 { - return ch.selectFrom(ctx, chSelectFinalStatement, tableName, "*", whenConditions, orderBy) + return ch.selectFrom(ctx, chSelectFinalStatement, namespace, tableName, "*", whenConditions, orderBy) } else { - return ch.selectFrom(ctx, selectQueryTemplate, tableName, "*", whenConditions, orderBy) + return ch.selectFrom(ctx, selectQueryTemplate, namespace, tableName, "*", whenConditions, orderBy) } } -func (ch *ClickHouse) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error) { +func (ch *ClickHouse) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error) { tableName = ch.TableName(tableName) - table, err := ch.GetTableSchema(ctx, tableName) + table, err := ch.GetTableSchema(ctx, namespace, tableName) if err != nil { return -1, err } var res []map[string]any if table.PKFields.Size() > 0 { - res, err = ch.selectFrom(ctx, chSelectFinalStatement, tableName, "count(*) as jitsu_count", whenConditions, nil) + res, err = ch.selectFrom(ctx, chSelectFinalStatement, namespace, tableName, "count(*) as jitsu_count", whenConditions, nil) } else { - res, err = ch.selectFrom(ctx, selectQueryTemplate, tableName, "count(*) as jitsu_count", whenConditions, nil) + res, err = ch.selectFrom(ctx, selectQueryTemplate, namespace, tableName, "count(*) as jitsu_count", whenConditions, nil) } if err != nil { return -1, err @@ -601,6 +625,7 @@ func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSou } startTime := time.Now() tableName := ch.quotedTableName(targetTable.Name) + namespace := ch.namespacePrefix(targetTable.Namespace) var copyStatement string defer func() { @@ -628,7 +653,7 @@ func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSou if err != nil { return state, err } - copyStatement = fmt.Sprintf(chLoadJSONStatement+"\n", tableName) + copyStatement = fmt.Sprintf(chLoadJSONStatement+"\n", namespace, tableName) builder := strings.Builder{} builder.Grow(int(stat.Size()) + len(copyStatement)) builder.WriteString(copyStatement) @@ -689,7 +714,7 @@ func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSou Name: "clickhouse_prepare_data", TimeProcessedMs: loadTime.Sub(startTime).Milliseconds(), } - copyStatement = fmt.Sprintf(chLoadStatement, tableName, strings.Join(columnNames, ", "), placeholdersBuilder.String()[1:]) + copyStatement = fmt.Sprintf(chLoadStatement, namespace, tableName, strings.Join(columnNames, ", "), placeholdersBuilder.String()[1:]) if _, err := ch.txOrDb(ctx).ExecContext(ctx, copyStatement, args...); err != nil { return state, checkErr(err) } @@ -706,14 +731,15 @@ func (ch *ClickHouse) CopyTables(ctx context.Context, targetTable *Table, source return ch.copy(ctx, targetTable, sourceTable) } -func (ch *ClickHouse) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error { +func (ch *ClickHouse) Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error { deleteCondition, values := ch.ToWhenConditions(deleteConditions, ch.parameterPlaceholder, 0) - deleteQuery := fmt.Sprintf(chDeleteQueryTemplate, ch.quotedLocalTableName(tableName), ch.getOnClusterClause(), deleteCondition) + quotedSchema := ch.namespacePrefix(namespace) + deleteQuery := fmt.Sprintf(chDeleteQueryTemplate, quotedSchema, ch.quotedLocalTableName(tableName), ch.getOnClusterClause(), deleteCondition) if _, err := ch.txOrDb(ctx).ExecContext(ctx, deleteQuery, values...); err != nil { return errorj.DeleteFromTableError.Wrap(err, "failed to delete data"). WithProperty(errorj.DBInfo, &types.ErrorPayload{ - Database: ch.config.Database, + Database: quotedSchema, Cluster: ch.config.Cluster, Table: tableName, Statement: deleteQuery, @@ -724,9 +750,9 @@ func (ch *ClickHouse) Delete(ctx context.Context, tableName string, deleteCondit } // TruncateTable deletes all records in tableName table -func (ch *ClickHouse) TruncateTable(ctx context.Context, tableName string) error { +func (ch *ClickHouse) TruncateTable(ctx context.Context, namespace string, tableName string) error { tableName = ch.TableName(tableName) - statement := fmt.Sprintf(chTruncateTableTemplate, ch.quotedLocalTableName(tableName), ch.getOnClusterClause()) + statement := fmt.Sprintf(chTruncateTableTemplate, ch.namespacePrefix(namespace), ch.quotedLocalTableName(tableName), ch.getOnClusterClause()) if _, err := ch.txOrDb(ctx).ExecContext(ctx, statement); err != nil { return errorj.TruncateError.Wrap(err, "failed to truncate table"). WithProperty(errorj.DBInfo, &types.ErrorPayload{ @@ -738,31 +764,31 @@ func (ch *ClickHouse) TruncateTable(ctx context.Context, tableName string) error return nil } -func (ch *ClickHouse) DropTable(ctx context.Context, tableName string, ifExists bool) error { - err := ch.dropTable(ctx, ch.quotedTableName(tableName), ch.getOnClusterClause(), ifExists) +func (ch *ClickHouse) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error { + err := ch.dropTable(ctx, ch.namespacePrefix(namespace), ch.quotedTableName(tableName), ch.getOnClusterClause(), ifExists) if err != nil { return err } if ch.distributed.Load() { - return ch.dropTable(ctx, ch.quotedLocalTableName(tableName), ch.getOnClusterClause(), true) + return ch.dropTable(ctx, ch.namespacePrefix(namespace), ch.quotedLocalTableName(tableName), ch.getOnClusterClause(), true) } return nil } func (ch *ClickHouse) Drop(ctx context.Context, table *Table, ifExists bool) error { if table.Temporary { - return ch.dropTable(ctx, ch.quotedTableName(table.Name), "", ifExists) + return ch.dropTable(ctx, ch.namespacePrefix(table.Namespace), ch.quotedTableName(table.Name), "", ifExists) } else { - return ch.DropTable(ctx, table.Name, ifExists) + return ch.DropTable(ctx, table.Namespace, table.Name, ifExists) } } -func (ch *ClickHouse) dropTable(ctx context.Context, fullTableName string, onClusterClause string, ifExists bool) error { +func (ch *ClickHouse) dropTable(ctx context.Context, namespacePrefix, fullTableName string, onClusterClause string, ifExists bool) error { ifExs := "" if ifExists { ifExs = "IF EXISTS " } - query := fmt.Sprintf(chDropTableTemplate, ifExs, fullTableName, onClusterClause) + query := fmt.Sprintf(chDropTableTemplate, ifExs, namespacePrefix, fullTableName, onClusterClause) if _, err := ch.txOrDb(ctx).ExecContext(ctx, query); err != nil { @@ -779,7 +805,8 @@ func (ch *ClickHouse) dropTable(ctx context.Context, fullTableName string, onClu } func (ch *ClickHouse) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error) { - targetTable, err := ch.GetTableSchema(ctx, targetTableName) + targetTable, err := ch.GetTableSchema(ctx, replacementTable.Namespace, targetTableName) + namespace := ch.namespacePrefix(replacementTable.Namespace) if err != nil { return fmt.Errorf("failed to check existence of target table: %s : %v", targetTableName, err) } @@ -795,13 +822,13 @@ func (ch *ClickHouse) ReplaceTable(ctx context.Context, targetTableName string, } //exchange local tables only. //For cluster no need to exchange distribute tables. they are linked by name and will represent new data - query := fmt.Sprintf(chExchangeTableTemplate, ch.quotedLocalTableName(targetTableName), ch.quotedLocalTableName(replacementTable.Name), ch.getOnClusterClause()) + query := fmt.Sprintf(chExchangeTableTemplate, namespace, ch.quotedLocalTableName(targetTableName), namespace, ch.quotedLocalTableName(replacementTable.Name), ch.getOnClusterClause()) if _, err := ch.txOrDb(ctx).ExecContext(ctx, query); err != nil { return fmt.Errorf("error replacing [%s] table: %v", targetTableName, err) } } else { //if target table does not exist yet, just rename replacement table to target one - query := fmt.Sprintf(chRenameTableTemplate, ch.quotedLocalTableName(replacementTable.Name), ch.quotedLocalTableName(targetTableName), ch.getOnClusterClause()) + query := fmt.Sprintf(chRenameTableTemplate, namespace, ch.quotedLocalTableName(replacementTable.Name), namespace, ch.quotedLocalTableName(targetTableName), ch.getOnClusterClause()) if _, err := ch.txOrDb(ctx).ExecContext(ctx, query); err != nil { return fmt.Errorf("error renaming [%s] table: %v", replacementTable.Name, err) } @@ -815,7 +842,7 @@ func (ch *ClickHouse) ReplaceTable(ctx context.Context, targetTableName string, } if targetTable.Exists() && dropOldTable { - return ch.DropTable(ctx, replacementTable.Name, true) + return ch.DropTable(ctx, replacementTable.Namespace, replacementTable.Name, true) } else { return nil } @@ -834,12 +861,13 @@ func (ch *ClickHouse) getOnClusterClause() string { // create distributed table, ignore errors func (ch *ClickHouse) createDistributedTableInTransaction(ctx context.Context, originTable *Table) error { originTableName := originTable.Name + namespace := ch.namespacePrefix(originTable.Namespace) shardingKey := "rand()" if originTable.PKFields.Size() > 0 { shardingKey = "halfMD5(" + strings.Join(originTable.GetPKFields(), ",") + ")" } statement := fmt.Sprintf(chCreateDistributedTableTemplate, - ch.quotedTableName(originTable.Name), ch.getOnClusterClause(), ch.quotedLocalTableName(originTableName), ch.config.Cluster, ch.config.Database, ch.quotedLocalTableName(originTableName), shardingKey) + namespace, ch.quotedTableName(originTable.Name), ch.getOnClusterClause(), namespace, ch.quotedLocalTableName(originTableName), ch.config.Cluster, ch.namespaceName(originTable.Namespace), ch.quotedLocalTableName(originTableName), shardingKey) if _, err := ch.txOrDb(ctx).ExecContext(ctx, statement); err != nil { return fmt.Errorf("error creating distributed table statement with statement [%s] for [%s] : %v", statement, ch.quotedTableName(originTableName), err) @@ -1067,10 +1095,10 @@ func NewTableStatementFactory(ch ClickHouseCluster) *TableStatementFactory { } // CreateTableStatement return clickhouse DDL for creating table statement -func (tsf TableStatementFactory) CreateTableStatement(quotedTableName, tableName, columnsClause string, table *Table) string { +func (tsf TableStatementFactory) CreateTableStatement(namespacePrefix, quotedTableName, tableName, columnsClause string, table *Table) string { config := tsf.ch.Config() if config.Engine != nil && len(config.Engine.RawStatement) > 0 { - return fmt.Sprintf(chCreateTableTemplate, quotedTableName, tsf.onClusterClause, columnsClause, config.Engine.RawStatement, + return fmt.Sprintf(chCreateTableTemplate, namespacePrefix, quotedTableName, tsf.onClusterClause, columnsClause, config.Engine.RawStatement, "", "", "") } var engineStatement string @@ -1119,10 +1147,14 @@ func (tsf TableStatementFactory) CreateTableStatement(quotedTableName, tableName keeperPath = fmt.Sprintf("%s_%x", keeperPath, utils.HashString(tableName)) engineStatement = fmt.Sprintf(engineStatement, keeperPath) } - return fmt.Sprintf(chCreateTableTemplate, quotedTableName, tsf.onClusterClause, columnsClause, engineStatement, + return fmt.Sprintf(chCreateTableTemplate, namespacePrefix, quotedTableName, tsf.onClusterClause, columnsClause, engineStatement, partitionClause, orderByClause, primaryKeyClause) } +func (ch *ClickHouse) TmpNamespace(string) string { + return NoNamespaceValue +} + func (ch *ClickHouse) Ping(_ context.Context) error { if ch.httpMode { //not sure Ping is necessary in httpMode diff --git a/bulkerlib/implementations/sql/mysql.go b/bulkerlib/implementations/sql/mysql.go index f72d6e2..066d9cc 100644 --- a/bulkerlib/implementations/sql/mysql.go +++ b/bulkerlib/implementations/sql/mysql.go @@ -38,10 +38,10 @@ const ( WHERE table_schema = ? AND table_name = ? AND CONSTRAINT_NAME = 'PRIMARY' order by ORDINAL_POSITION` mySQLCreateDBIfNotExistsTemplate = "CREATE DATABASE IF NOT EXISTS %s" mySQLAllowLocalFile = "SET GLOBAL local_infile = 1" - mySQLIndexTemplate = `CREATE INDEX %s ON %s (%s);` - mySQLLoadTemplate = `LOAD DATA LOCAL INFILE '%s' INTO TABLE %s FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 LINES (%s)` - mySQLMergeQuery = `INSERT INTO {{.TableName}}({{.Columns}}) VALUES ({{.Placeholders}}) ON DUPLICATE KEY UPDATE {{.UpdateSet}}` - mySQLBulkMergeQuery = "INSERT INTO {{.TableTo}}({{.Columns}}) SELECT * FROM (SELECT {{.Columns}} FROM {{.TableFrom}}) AS S ON DUPLICATE KEY UPDATE {{.UpdateSet}}" + mySQLIndexTemplate = `CREATE INDEX %s ON %s%s (%s);` + mySQLLoadTemplate = `LOAD DATA LOCAL INFILE '%s' INTO TABLE %s%s FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 LINES (%s)` + mySQLMergeQuery = `INSERT INTO {{.Namespace}}{{.TableName}}({{.Columns}}) VALUES ({{.Placeholders}}) ON DUPLICATE KEY UPDATE {{.UpdateSet}}` + mySQLBulkMergeQuery = "INSERT INTO {{.Namespace}}{{.TableTo}}({{.Columns}}) SELECT * FROM (SELECT {{.Columns}} FROM {{.NamespaceFrom}}{{.TableFrom}}) AS S ON DUPLICATE KEY UPDATE {{.UpdateSet}}" ) var ( @@ -126,7 +126,7 @@ func NewMySQL(bulkerConfig bulker.Config) (bulker.Bulker, error) { } // disable infile support for convenience infileEnabled := false - sqlAdapterBase, err := newSQLAdapterBase(bulkerConfig.Id, MySQLBulkerTypeId, config, dbConnectFunction, mysqlTypes, queryLogger, typecastFunc, QuestionMarkParameterPlaceholder, mySQLColumnDDL, mySQLMapColumnValue, checkErr, true) + sqlAdapterBase, err := newSQLAdapterBase(bulkerConfig.Id, MySQLBulkerTypeId, config, config.Db, dbConnectFunction, mysqlTypes, queryLogger, typecastFunc, QuestionMarkParameterPlaceholder, mySQLColumnDDL, mySQLMapColumnValue, checkErr, true) m := &MySQL{ SQLAdapterBase: sqlAdapterBase, infileEnabled: infileEnabled, @@ -166,17 +166,28 @@ func (m *MySQL) validateOptions(streamOptions []bulker.StreamOption) error { return nil } +func (m *MySQL) createDatabaseIfNotExists(ctx context.Context, database string) error { + if database == "" { + return nil + } + n := m.namespaceName(database) + if n == "" { + return nil + } + query := fmt.Sprintf(mySQLCreateDBIfNotExistsTemplate, n) + if _, err := m.txOrDb(ctx).ExecContext(ctx, query); err != nil { + return errorj.CreateSchemaError.Wrap(err, "failed to create database"). + WithProperty(errorj.DBInfo, &types2.ErrorPayload{ + Database: m.config.Db, + Statement: query, + }) + } + return nil +} + // InitDatabase creates database instance if doesn't exist func (m *MySQL) InitDatabase(ctx context.Context) error { - //query := fmt.Sprintf(mySQLCreateDBIfNotExistsTemplate, m.config.Db) - //if _, err := m.txOrDb(ctx).ExecContext(ctx, query); err != nil { - // return errorj.CreateSchemaError.Wrap(err, "failed to create db schema"). - // WithProperty(errorj.DBInfo, &types.ErrorPayload{ - // Database: m.config.Db, - // Statement: query, - // }) - //} - + _ = m.createDatabaseIfNotExists(ctx, m.config.Db) return nil } @@ -204,6 +215,7 @@ func (m *MySQL) CopyTables(ctx context.Context, targetTable *Table, sourceTable func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) { quotedTableName := m.quotedTableName(targetTable.Name) + quotedNamespace := m.namespacePrefix(targetTable.Namespace) if loadSource.Type != LocalFile { return state, fmt.Errorf("LoadTable: only local file is supported") @@ -221,7 +233,7 @@ func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *L defer mysql.DeregisterLocalFile(loadSource.Path) header := targetTable.MappedColumnNames(m.quotedColumnName) - loadStatement := fmt.Sprintf(mySQLLoadTemplate, loadSource.Path, quotedTableName, strings.Join(header, ", ")) + loadStatement := fmt.Sprintf(mySQLLoadTemplate, loadSource.Path, quotedNamespace, quotedTableName, strings.Join(header, ", ")) if _, err := m.txOrDb(ctx).ExecContext(ctx, loadStatement); err != nil { return state, errorj.LoadError.Wrap(err, "failed to load data from local file system"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ @@ -239,6 +251,7 @@ func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *L placeholders[i] = m.typecastFunc(m.parameterPlaceholder(i+1, name), col) }) insertPayload := QueryPayload{ + Namespace: quotedNamespace, TableName: quotedTableName, Columns: strings.Join(columnNames, ", "), Placeholders: strings.Join(placeholders, ", "), @@ -306,8 +319,8 @@ func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *L } // GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct -func (m *MySQL) GetTableSchema(ctx context.Context, tableName string) (*Table, error) { - table, err := m.getTable(ctx, tableName) +func (m *MySQL) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error) { + table, err := m.getTable(ctx, namespace, tableName) if err != nil { return nil, err } @@ -317,7 +330,7 @@ func (m *MySQL) GetTableSchema(ctx context.Context, tableName string) (*Table, e return table, nil } - pkFields, err := m.getPrimaryKeys(ctx, tableName) + pkFields, err := m.getPrimaryKeys(ctx, namespace, tableName) if err != nil { return nil, err } @@ -334,16 +347,17 @@ func (m *MySQL) BuildConstraintName(tableName string) string { return "PRIMARY" } -func (m *MySQL) getTable(ctx context.Context, tableName string) (*Table, error) { +func (m *MySQL) getTable(ctx context.Context, namespace, tableName string) (*Table, error) { tableName = m.TableName(tableName) - table := &Table{Name: tableName, Columns: NewColumns(), PKFields: types.NewOrderedSet[string]()} + namespace = m.namespaceName(namespace) + table := &Table{Name: tableName, Namespace: namespace, Columns: NewColumns(), PKFields: types.NewOrderedSet[string]()} ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() - rows, err := m.dataSource.QueryContext(ctx, mySQLTableSchemaQuery, m.config.Db, tableName) + rows, err := m.dataSource.QueryContext(ctx, mySQLTableSchemaQuery, namespace, tableName) if err != nil { return nil, errorj.GetTableError.Wrap(err, "failed to get table columns"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Database: m.config.Db, + Database: namespace, Table: tableName, PrimaryKeys: table.GetPKFields(), Statement: mySQLTableSchemaQuery, @@ -357,7 +371,7 @@ func (m *MySQL) getTable(ctx context.Context, tableName string) (*Table, error) if err := rows.Scan(&columnName, &columnType); err != nil { return nil, errorj.GetTableError.Wrap(err, "failed to scan result"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Database: m.config.Db, + Database: namespace, Table: tableName, PrimaryKeys: table.GetPKFields(), Statement: mySQLTableSchemaQuery, @@ -375,7 +389,7 @@ func (m *MySQL) getTable(ctx context.Context, tableName string) (*Table, error) if err := rows.Err(); err != nil { return nil, errorj.GetTableError.Wrap(err, "failed read last row"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Database: m.config.Db, + Database: namespace, Table: tableName, PrimaryKeys: table.GetPKFields(), Statement: mySQLTableSchemaQuery, @@ -386,13 +400,14 @@ func (m *MySQL) getTable(ctx context.Context, tableName string) (*Table, error) return table, nil } -func (m *MySQL) getPrimaryKeys(ctx context.Context, tableName string) (types.OrderedSet[string], error) { +func (m *MySQL) getPrimaryKeys(ctx context.Context, namespace, tableName string) (types.OrderedSet[string], error) { tableName = m.TableName(tableName) - pkFieldsRows, err := m.dataSource.QueryContext(ctx, mySQLPrimaryKeyFieldsQuery, m.config.Db, tableName) + namespace = m.namespaceName(namespace) + pkFieldsRows, err := m.dataSource.QueryContext(ctx, mySQLPrimaryKeyFieldsQuery, namespace, tableName) if err != nil { return types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed to get primary key"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Database: m.config.Db, + Database: namespace, Table: tableName, Statement: mySQLPrimaryKeyFieldsQuery, Values: []any{m.config.Db, tableName}, @@ -407,7 +422,7 @@ func (m *MySQL) getPrimaryKeys(ctx context.Context, tableName string) (types.Ord if err := pkFieldsRows.Scan(&fieldName); err != nil { return types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed to scan result"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Database: m.config.Db, + Database: namespace, Table: tableName, Statement: mySQLPrimaryKeyFieldsQuery, Values: []any{m.config.Db, tableName}, @@ -418,7 +433,7 @@ func (m *MySQL) getPrimaryKeys(ctx context.Context, tableName string) (types.Ord if err := pkFieldsRows.Err(); err != nil { return types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed read last row"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Database: m.config.Db, + Database: namespace, Table: tableName, Statement: mySQLPrimaryKeyFieldsQuery, Values: []any{m.config.Db, tableName}, @@ -430,18 +445,19 @@ func (m *MySQL) getPrimaryKeys(ctx context.Context, tableName string) (types.Ord func (m *MySQL) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error) { tmpTable := "deprecated_" + targetTableName + time.Now().Format("_20060102_150405") - err1 := m.renameTable(ctx, true, targetTableName, tmpTable) - err = m.renameTable(ctx, false, replacementTable.Name, targetTableName) + err1 := m.renameTable(ctx, true, replacementTable.Namespace, targetTableName, tmpTable) + err = m.renameTable(ctx, false, replacementTable.Namespace, replacementTable.Name, targetTableName) if dropOldTable && err1 == nil && err == nil { - return m.DropTable(ctx, tmpTable, true) + return m.DropTable(ctx, replacementTable.Namespace, tmpTable, true) } return } -func (m *MySQL) renameTable(ctx context.Context, ifExists bool, tableName, newTableName string) error { +func (m *MySQL) renameTable(ctx context.Context, ifExists bool, namespace, tableName, newTableName string) error { if ifExists { + db := m.TableName(utils.DefaultString(namespace, m.namespace)) tableName = m.TableName(tableName) - row := m.txOrDb(ctx).QueryRowContext(ctx, fmt.Sprintf(`SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s')`, m.config.Db, tableName)) + row := m.txOrDb(ctx).QueryRowContext(ctx, fmt.Sprintf(`SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s')`, db, tableName)) exists := false err := row.Scan(&exists) if err != nil { @@ -451,7 +467,7 @@ func (m *MySQL) renameTable(ctx context.Context, ifExists bool, tableName, newTa return nil } } - return m.SQLAdapterBase.renameTable(ctx, false, tableName, newTableName) + return m.SQLAdapterBase.renameTable(ctx, false, namespace, tableName, newTableName) } func mySQLDriverConnectionString(config *DataSourceConfig) string { @@ -499,14 +515,18 @@ func mySQLMapColumnValue(value any, valuePresent bool, column types2.SQLColumn) } func (m *MySQL) CreateTable(ctx context.Context, schemaToCreate *Table) error { - err := m.SQLAdapterBase.CreateTable(ctx, schemaToCreate) + err := m.createDatabaseIfNotExists(ctx, schemaToCreate.Namespace) + if err != nil { + return err + } + err = m.SQLAdapterBase.CreateTable(ctx, schemaToCreate) if err != nil { return err } if !schemaToCreate.Temporary && schemaToCreate.TimestampColumn != "" { err = m.createIndex(ctx, schemaToCreate) if err != nil { - m.DropTable(ctx, schemaToCreate.Name, true) + m.DropTable(ctx, schemaToCreate.Namespace, schemaToCreate.Name, true) return fmt.Errorf("failed to create sort key: %v", err) } } @@ -518,9 +538,10 @@ func (m *MySQL) createIndex(ctx context.Context, table *Table) error { return nil } quotedTableName := m.quotedTableName(table.Name) + quotedNamespace := m.namespacePrefix(table.Namespace) statement := fmt.Sprintf(mySQLIndexTemplate, "bulker_timestamp_index", - quotedTableName, m.quotedColumnName(table.TimestampColumn)) + quotedNamespace, quotedTableName, m.quotedColumnName(table.TimestampColumn)) if _, err := m.txOrDb(ctx).ExecContext(ctx, statement); err != nil { return errorj.AlterTableError.Wrap(err, "failed to set sort key"). diff --git a/bulkerlib/implementations/sql/namespace_test.go b/bulkerlib/implementations/sql/namespace_test.go new file mode 100644 index 0000000..873f950 --- /dev/null +++ b/bulkerlib/implementations/sql/namespace_test.go @@ -0,0 +1,282 @@ +package sql + +import ( + bulker "github.com/jitsucom/bulker/bulkerlib" + "sync" + "testing" +) + +func TestTransactionalSequentialRepeatPKWithNamespace(t *testing.T) { + t.Parallel() + tests := []bulkerTestConfig{ + { + //deletes any table leftovers from previous tests + name: "dummy_test_table_cleanup", + namespace: "mynamespace2", + tableName: "transactional_test_pk_namespace", + modes: []bulker.BulkMode{bulker.Batch}, + dataFile: "test_data/empty.ndjson", + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate(), bulker.WithNamespace("mynamespace2")}, + expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, + configIds: allBulkerConfigs, + }, + { + name: "first_run_batch", + namespace: "mynamespace2", + tableName: "transactional_test_pk_namespace", + modes: []bulker.BulkMode{bulker.Batch}, + leaveResultingTable: true, + dataFile: "test_data/repeated_ids.ndjson", + expectedRows: []map[string]any{ + {"_timestamp": constantTime, "id": 1, "name": "test7"}, + {"_timestamp": constantTime, "id": 2, "name": "test1"}, + {"_timestamp": constantTime, "id": 3, "name": "test6"}, + {"_timestamp": constantTime, "id": 4, "name": "test5"}, + }, + configIds: allBulkerConfigs, + expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate(), bulker.WithNamespace("mynamespace2")}, + }, + { + name: "second_run_batch", + namespace: "mynamespace2", + tableName: "transactional_test_pk_namespace", + modes: []bulker.BulkMode{bulker.Batch}, + leaveResultingTable: true, + dataFile: "test_data/repeated_ids2.ndjson", + expectedRows: []map[string]any{ + {"_timestamp": constantTime, "id": 1, "name": "test7"}, + {"_timestamp": constantTime, "id": 2, "name": "test1"}, + {"_timestamp": constantTime, "id": 3, "name": "test13"}, + {"_timestamp": constantTime, "id": 4, "name": "test14"}, + {"_timestamp": constantTime, "id": 5, "name": "test15"}, + }, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate(), bulker.WithNamespace("mynamespace2")}, + expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, + configIds: allBulkerConfigs, + }, + { + name: "dummy_test_table_cleanup", + namespace: "mynamespace2", + tableName: "transactional_test_pk_namespace", + modes: []bulker.BulkMode{bulker.Batch}, + dataFile: "test_data/empty.ndjson", + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate(), bulker.WithNamespace("mynamespace2")}, + expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, + configIds: allBulkerConfigs, + }, + } + sequentialGroup := sync.WaitGroup{} + sequentialGroup.Add(1) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + runTestConfig(t, tt, testStream) + sequentialGroup.Done() + }) + sequentialGroup.Wait() + sequentialGroup.Add(1) + } +} + +// TestReplaceTableStream sequentially runs 3 replace table streams without dropping table in between +func TestReplaceTableStreamWithNamespace(t *testing.T) { + t.Parallel() + tests := []bulkerTestConfig{ + { + //delete any table leftovers from previous tests + name: "dummy_test_table_cleanup", + namespace: "mynamespace", + tableName: "replace_table_test_nsp", + modes: []bulker.BulkMode{bulker.ReplaceTable}, + dataFile: "test_data/empty.ndjson", + configIds: allBulkerConfigs, + streamOptions: []bulker.StreamOption{bulker.WithNamespace("mynamespace")}, + }, + { + name: "first_run", + namespace: "mynamespace", + tableName: "replace_table_test_nsp", + modes: []bulker.BulkMode{bulker.ReplaceTable}, + leaveResultingTable: true, + dataFile: "test_data/partition1.ndjson", + expectedRowsCount: 5, + configIds: allBulkerConfigs, + streamOptions: []bulker.StreamOption{bulker.WithNamespace("mynamespace")}, + }, + { + name: "second_run", + namespace: "mynamespace", + tableName: "replace_table_test_nsp", + modes: []bulker.BulkMode{bulker.ReplaceTable}, + leaveResultingTable: true, + dataFile: "test_data/replace_table.ndjson", + expectedRows: []map[string]any{ + {"_timestamp": constantTime, "id": 11, "name": "test11", "name2": "a"}, + {"_timestamp": constantTime, "id": 12, "name": "test12", "name2": "a"}, + {"_timestamp": constantTime, "id": 13, "name": "test13", "name2": "a"}, + {"_timestamp": constantTime, "id": 14, "name": "test14", "name2": "a"}, + {"_timestamp": constantTime, "id": 15, "name": "test15", "name2": "a"}, + {"_timestamp": constantTime, "id": 16, "name": "test16", "name2": "a"}, + {"_timestamp": constantTime, "id": 17, "name": "test17", "name2": "a"}, + {"_timestamp": constantTime, "id": 18, "name": "test18", "name2": "a"}, + {"_timestamp": constantTime, "id": 19, "name": "test19", "name2": "a"}, + {"_timestamp": constantTime, "id": 20, "name": "test20", "name2": "a"}, + }, + configIds: allBulkerConfigs, + streamOptions: []bulker.StreamOption{bulker.WithNamespace("mynamespace")}, + }, + { + name: "empty_run", + namespace: "mynamespace", + tableName: "replace_table_test_nsp", + leaveResultingTable: true, + modes: []bulker.BulkMode{bulker.ReplaceTable}, + dataFile: "test_data/empty.ndjson", + expectedRowsCount: 0, + configIds: allBulkerConfigs, + streamOptions: []bulker.StreamOption{bulker.WithNamespace("mynamespace")}, + }, + { + name: "dummy_test_table_cleanup", + namespace: "mynamespace", + tableName: "replace_table_test_nsp", + modes: []bulker.BulkMode{bulker.ReplaceTable}, + dataFile: "test_data/empty.ndjson", + configIds: allBulkerConfigs, + streamOptions: []bulker.StreamOption{bulker.WithNamespace("mynamespace")}, + }, + } + sequentialGroup := sync.WaitGroup{} + sequentialGroup.Add(1) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + runTestConfig(t, tt, testStream) + sequentialGroup.Done() + }) + sequentialGroup.Wait() + sequentialGroup.Add(1) + } +} + +func TestReplacePartitionStreamWithNamespace(t *testing.T) { + t.Parallel() + tests := []bulkerTestConfig{ + { + //delete any table leftovers from previous tests + name: "dummy_test_table_cleanup", + namespace: "mynsp3", + tableName: "replace_partition_test_namespace", + modes: []bulker.BulkMode{bulker.ReplacePartition}, + dataFile: "test_data/empty.ndjson", + streamOptions: []bulker.StreamOption{bulker.WithPartition("1"), bulker.WithNamespace("mynsp3")}, + configIds: allBulkerConfigs, + }, + { + name: "first_partition", + namespace: "mynsp3", + tableName: "replace_partition_test_namespace", + modes: []bulker.BulkMode{bulker.ReplacePartition}, + leaveResultingTable: true, + dataFile: "test_data/partition1.ndjson", + expectedRows: []map[string]any{ + {"_timestamp": constantTime, "id": 1, "name": "test", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 2, "name": "test2", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 3, "name": "test3", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 4, "name": "test4", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 5, "name": "test5", "__partition_id": "1"}, + }, + streamOptions: []bulker.StreamOption{bulker.WithPartition("1"), bulker.WithNamespace("mynsp3")}, + configIds: allBulkerConfigs, + }, + { + name: "second_partition", + namespace: "mynsp3", + tableName: "replace_partition_test_namespace", + modes: []bulker.BulkMode{bulker.ReplacePartition}, + leaveResultingTable: true, + dataFile: "test_data/partition2.ndjson", + expectedRows: []map[string]any{ + {"_timestamp": constantTime, "id": 1, "name": "test", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 2, "name": "test2", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 3, "name": "test3", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 4, "name": "test4", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 5, "name": "test5", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 11, "name": "test11", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 12, "name": "test12", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 13, "name": "test13", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 14, "name": "test14", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 15, "name": "test15", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 16, "name": "test16", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 17, "name": "test17", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 18, "name": "test18", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 19, "name": "test19", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 20, "name": "test20", "__partition_id": "2"}, + }, + streamOptions: []bulker.StreamOption{bulker.WithPartition("2"), bulker.WithNamespace("mynsp3")}, + configIds: allBulkerConfigs, + }, + { + name: "first_partition_reprocess", + namespace: "mynsp3", + tableName: "replace_partition_test_namespace", + modes: []bulker.BulkMode{bulker.ReplacePartition}, + leaveResultingTable: true, + dataFile: "test_data/partition1_1.ndjson", + expectedRows: []map[string]any{ + {"_timestamp": constantTime, "id": 6, "name": "test6", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 7, "name": "test7", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 8, "name": "test8", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 9, "name": "test9", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 10, "name": "test10", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 11, "name": "test11", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 12, "name": "test12", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 13, "name": "test13", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 14, "name": "test14", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 15, "name": "test15", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 16, "name": "test16", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 17, "name": "test17", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 18, "name": "test18", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 19, "name": "test19", "__partition_id": "2"}, + {"_timestamp": constantTime, "id": 20, "name": "test20", "__partition_id": "2"}, + }, + streamOptions: []bulker.StreamOption{bulker.WithPartition("1"), bulker.WithNamespace("mynsp3")}, + configIds: allBulkerConfigs, + }, + { + name: "second_partition_empty", + namespace: "mynsp3", + tableName: "replace_partition_test_namespace", + leaveResultingTable: true, + modes: []bulker.BulkMode{bulker.ReplacePartition}, + dataFile: "test_data/empty.ndjson", + expectedRows: []map[string]any{ + {"_timestamp": constantTime, "id": 6, "name": "test6", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 7, "name": "test7", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 8, "name": "test8", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 9, "name": "test9", "__partition_id": "1"}, + {"_timestamp": constantTime, "id": 10, "name": "test10", "__partition_id": "1"}, + }, + streamOptions: []bulker.StreamOption{bulker.WithPartition("2"), bulker.WithNamespace("mynsp3")}, + configIds: allBulkerConfigs, + }, + { + name: "dummy_test_table_cleanup", + namespace: "mynsp3", + tableName: "replace_partition_test_namespace", + modes: []bulker.BulkMode{bulker.ReplacePartition}, + dataFile: "test_data/empty.ndjson", + streamOptions: []bulker.StreamOption{bulker.WithPartition("1"), bulker.WithNamespace("mynsp3")}, + configIds: allBulkerConfigs, + }, + } + sequentialGroup := sync.WaitGroup{} + sequentialGroup.Add(1) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + runTestConfig(t, tt, testStream) + sequentialGroup.Done() + }) + sequentialGroup.Wait() + sequentialGroup.Add(1) + } +} diff --git a/bulkerlib/implementations/sql/postgres.go b/bulkerlib/implementations/sql/postgres.go index a3f5506..e53e8be 100644 --- a/bulkerlib/implementations/sql/postgres.go +++ b/bulkerlib/implementations/sql/postgres.go @@ -50,15 +50,14 @@ FROM information_schema.table_constraints tco WHERE tco.constraint_type = 'PRIMARY KEY' AND kcu.table_schema ilike $1 AND kcu.table_name = $2 order by kcu.ordinal_position` - pgSetSearchPath = `SET search_path TO "%s";` - pgCreateDbSchemaIfNotExistsTemplate = `CREATE SCHEMA IF NOT EXISTS "%s"; SET search_path TO "%s";` - pgCreateIndexTemplate = `CREATE INDEX ON %s (%s);` + pgCreateDbSchemaIfNotExistsTemplate = `CREATE SCHEMA IF NOT EXISTS "%s"` + pgCreateIndexTemplate = `CREATE INDEX ON %s%s (%s);` - pgMergeQuery = `INSERT INTO {{.TableName}}({{.Columns}}) VALUES ({{.Placeholders}}) ON CONFLICT ON CONSTRAINT {{.PrimaryKeyName}} DO UPDATE set {{.UpdateSet}}` + pgMergeQuery = `INSERT INTO {{.Namespace}}{{.TableName}}({{.Columns}}) VALUES ({{.Placeholders}}) ON CONFLICT ON CONSTRAINT {{.PrimaryKeyName}} DO UPDATE set {{.UpdateSet}}` - pgCopyTemplate = `COPY %s(%s) FROM STDIN` + pgCopyTemplate = `COPY %s%s(%s) FROM STDIN` - pgBulkMergeQuery = `INSERT INTO {{.TableTo}}({{.Columns}}) SELECT {{.Columns}} FROM {{.TableFrom}} ON CONFLICT ON CONSTRAINT {{.PrimaryKeyName}} DO UPDATE SET {{.UpdateSet}}` + pgBulkMergeQuery = `INSERT INTO {{.Namespace}}{{.TableTo}}({{.Columns}}) SELECT {{.Columns}} FROM {{.NamespaceFrom}}{{.TableFrom}} ON CONFLICT ON CONSTRAINT {{.PrimaryKeyName}} DO UPDATE SET {{.UpdateSet}}` pgBulkMergeSourceAlias = `excluded` ) @@ -159,8 +158,9 @@ func NewPostgres(bulkerConfig bulker.Config) (bulker.Bulker, error) { dataSource.SetMaxIdleConns(10) return dataSource, nil } - sqlAdapterBase, err := newSQLAdapterBase(bulkerConfig.Id, PostgresBulkerTypeId, config, dbConnectFunction, postgresDataTypes, queryLogger, typecastFunc, IndexParameterPlaceholder, pgColumnDDL, valueMappingFunc, checkErr, true) + sqlAdapterBase, err := newSQLAdapterBase(bulkerConfig.Id, PostgresBulkerTypeId, config, config.Schema, dbConnectFunction, postgresDataTypes, queryLogger, typecastFunc, IndexParameterPlaceholder, pgColumnDDL, valueMappingFunc, checkErr, true) p := &Postgres{sqlAdapterBase, tmpDir} + // some clients have no permission to create tmp tables p.temporaryTables = false p.tableHelper = NewTableHelper(63, '"') return p, err @@ -198,24 +198,36 @@ func (p *Postgres) OpenTx(ctx context.Context) (*TxSQLAdapter, error) { return p.openTx(ctx, p) } -// InitDatabase creates database schema instance if doesn't exist -func (p *Postgres) InitDatabase(ctx context.Context) error { - query := fmt.Sprintf(pgCreateDbSchemaIfNotExistsTemplate, p.config.Schema, p.config.Schema) +func (p *Postgres) createSchemaIfNotExists(ctx context.Context, schema string) error { + if schema == "" { + return nil + } + n := p.namespaceName(schema) + if n == "" { + return nil + } + query := fmt.Sprintf(pgCreateDbSchemaIfNotExistsTemplate, n) if _, err := p.txOrDb(ctx).ExecContext(ctx, query); err != nil { - //return errorj.CreateSchemaError.Wrap(err, "failed to create db schema"). - // WithProperty(errorj.DBInfo, &types.ErrorPayload{ - // Schema: p.config.Schema, - // Statement: query, - // }) + return errorj.CreateSchemaError.Wrap(err, "failed to create db schema"). + WithProperty(errorj.DBInfo, &types2.ErrorPayload{ + Schema: n, + Statement: query, + }) } + return nil +} + +// InitDatabase creates database schema instance if doesn't exist +func (p *Postgres) InitDatabase(ctx context.Context) error { + _ = p.createSchemaIfNotExists(ctx, p.config.Schema) return nil } // GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct -func (p *Postgres) GetTableSchema(ctx context.Context, tableName string) (*Table, error) { - table, err := p.getTable(ctx, tableName) +func (p *Postgres) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error) { + table, err := p.getTable(ctx, namespace, tableName) if err != nil { return nil, err } @@ -225,7 +237,7 @@ func (p *Postgres) GetTableSchema(ctx context.Context, tableName string) (*Table return table, nil } - primaryKeyName, pkFields, err := p.getPrimaryKey(ctx, tableName) + primaryKeyName, pkFields, err := p.getPrimaryKey(ctx, namespace, tableName) if err != nil { return nil, err } @@ -239,14 +251,15 @@ func (p *Postgres) GetTableSchema(ctx context.Context, tableName string) (*Table return table, nil } -func (p *Postgres) getTable(ctx context.Context, tableName string) (*Table, error) { +func (p *Postgres) getTable(ctx context.Context, namespace string, tableName string) (*Table, error) { tableName = p.TableName(tableName) - table := &Table{Name: tableName, Columns: NewColumns(), PKFields: types.NewOrderedSet[string]()} - rows, err := p.txOrDb(ctx).QueryContext(ctx, pgTableSchemaQuery, p.config.Schema, tableName) + namespace = p.namespaceName(namespace) + table := &Table{Name: tableName, Namespace: namespace, Columns: NewColumns(), PKFields: types.NewOrderedSet[string]()} + rows, err := p.txOrDb(ctx).QueryContext(ctx, pgTableSchemaQuery, namespace, tableName) if err != nil { return nil, errorj.GetTableError.Wrap(err, "failed to get table columns"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: p.config.Schema, + Schema: namespace, Table: tableName, PrimaryKeys: table.GetPKFields(), Statement: pgTableSchemaQuery, @@ -260,7 +273,7 @@ func (p *Postgres) getTable(ctx context.Context, tableName string) (*Table, erro if err := rows.Scan(&columnName, &columnPostgresType); err != nil { return nil, errorj.GetTableError.Wrap(err, "failed to scan result"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: p.config.Schema, + Schema: namespace, Table: tableName, PrimaryKeys: table.GetPKFields(), Statement: pgTableSchemaQuery, @@ -278,7 +291,7 @@ func (p *Postgres) getTable(ctx context.Context, tableName string) (*Table, erro if err := rows.Err(); err != nil { return nil, errorj.GetTableError.Wrap(err, "failed read last row"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: p.config.Schema, + Schema: namespace, Table: tableName, PrimaryKeys: table.GetPKFields(), Statement: pgTableSchemaQuery, @@ -307,6 +320,7 @@ func (p *Postgres) CopyTables(ctx context.Context, targetTable *Table, sourceTab func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) { quotedTableName := p.quotedTableName(targetTable.Name) + qoutedNamespace := p.namespacePrefix(targetTable.Namespace) if loadSource.Type != LocalFile { return state, fmt.Errorf("LoadTable: only local file is supported") } @@ -314,12 +328,12 @@ func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource return state, fmt.Errorf("LoadTable: only %s format is supported", p.batchFileFormat) } columnNames := targetTable.MappedColumnNames(p.quotedColumnName) - copyStatement := fmt.Sprintf(pgCopyTemplate, quotedTableName, strings.Join(columnNames, ", ")) + copyStatement := fmt.Sprintf(pgCopyTemplate, qoutedNamespace, quotedTableName, strings.Join(columnNames, ", ")) defer func() { if err != nil { err = errorj.LoadError.Wrap(err, "failed to load table"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: p.config.Schema, + Schema: qoutedNamespace, Table: quotedTableName, PrimaryKeys: targetTable.GetPKFields(), Statement: copyStatement, @@ -405,14 +419,15 @@ func getDefaultValueStatement(sqlType string) string { } // getPrimaryKey returns primary key name and fields -func (p *Postgres) getPrimaryKey(ctx context.Context, tableName string) (string, types.OrderedSet[string], error) { +func (p *Postgres) getPrimaryKey(ctx context.Context, namespace string, tableName string) (string, types.OrderedSet[string], error) { tableName = p.TableName(tableName) + namespace = p.namespaceName(namespace) primaryKeys := types.NewOrderedSet[string]() - pkFieldsRows, err := p.txOrDb(ctx).QueryContext(ctx, pgPrimaryKeyFieldsQuery, p.config.Schema, tableName) + pkFieldsRows, err := p.txOrDb(ctx).QueryContext(ctx, pgPrimaryKeyFieldsQuery, namespace, tableName) if err != nil { return "", types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed to get primary key"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: p.config.Schema, + Schema: namespace, Table: tableName, Statement: pgPrimaryKeyFieldsQuery, Values: []any{p.config.Schema, tableName}, @@ -427,7 +442,7 @@ func (p *Postgres) getPrimaryKey(ctx context.Context, tableName string) (string, if err := pkFieldsRows.Scan(&constraintName, &keyColumn); err != nil { return "", types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed to scan result"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: p.config.Schema, + Schema: namespace, Table: tableName, Statement: pgPrimaryKeyFieldsQuery, Values: []any{p.config.Schema, tableName}, @@ -443,7 +458,7 @@ func (p *Postgres) getPrimaryKey(ctx context.Context, tableName string) (string, if err := pkFieldsRows.Err(); err != nil { return "", types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed read last row"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: p.config.Schema, + Schema: namespace, Table: tableName, Statement: pgPrimaryKeyFieldsQuery, Values: []any{p.config.Schema, tableName}, @@ -456,14 +471,18 @@ func (p *Postgres) getPrimaryKey(ctx context.Context, tableName string) (string, } func (p *Postgres) CreateTable(ctx context.Context, schemaToCreate *Table) error { - err := p.SQLAdapterBase.CreateTable(ctx, schemaToCreate) + err := p.createSchemaIfNotExists(ctx, schemaToCreate.Namespace) + if err != nil { + return err + } + err = p.SQLAdapterBase.CreateTable(ctx, schemaToCreate) if err != nil { return err } if !schemaToCreate.Temporary && schemaToCreate.TimestampColumn != "" { err = p.createIndex(ctx, schemaToCreate) if err != nil { - p.DropTable(ctx, schemaToCreate.Name, true) + p.DropTable(ctx, schemaToCreate.Namespace, schemaToCreate.Name, true) return fmt.Errorf("failed to create sort key: %v", err) } } @@ -480,7 +499,7 @@ func (p *Postgres) ReplaceTable(ctx context.Context, targetTableName string, rep if err != nil { return err } - err = p.TruncateTable(ctx, targetTableName) + err = p.TruncateTable(ctx, replacementTable.Namespace, targetTableName) if err != nil { return err } @@ -489,7 +508,7 @@ func (p *Postgres) ReplaceTable(ctx context.Context, targetTableName string, rep return err } if dropOldTable { - err = p.DropTable(ctx, replacementTable.Name, true) + err = p.DropTable(ctx, replacementTable.Namespace, replacementTable.Name, true) if err != nil { return err } @@ -502,14 +521,16 @@ func (p *Postgres) createIndex(ctx context.Context, table *Table) error { return nil } quotedTableName := p.quotedTableName(table.Name) + quotedSchema := p.namespacePrefix(table.Namespace) - statement := fmt.Sprintf(pgCreateIndexTemplate, + statement := fmt.Sprintf(pgCreateIndexTemplate, quotedSchema, quotedTableName, p.quotedColumnName(table.TimestampColumn)) if _, err := p.txOrDb(ctx).ExecContext(ctx, statement); err != nil { return errorj.AlterTableError.Wrap(err, "failed to set sort key"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ Table: quotedTableName, + Schema: quotedSchema, PrimaryKeys: table.GetPKFields(), Statement: statement, }) @@ -522,12 +543,13 @@ func (p *Postgres) Ping(ctx context.Context) error { if err != nil { return err } - if _, err = p.txOrDb(ctx).ExecContext(ctx, fmt.Sprintf(pgSetSearchPath, p.config.Schema)); err != nil { - return err - } return nil } +//func (p *Postgres) TmpNamespace(namespace string) string { +// return NoNamespaceValue +//} + // Close underlying sql.DB func (p *Postgres) Close() error { if p.tmpDir != "" { diff --git a/bulkerlib/implementations/sql/reconnect_test.go b/bulkerlib/implementations/sql/reconnect_test.go index 751ef12..8b0f868 100644 --- a/bulkerlib/implementations/sql/reconnect_test.go +++ b/bulkerlib/implementations/sql/reconnect_test.go @@ -132,7 +132,7 @@ func _TestReconnect(t *testing.T) { "create_bulker": "connection refused", "consume_object_0": "connection refused", "consume_object_1": "connection refused", - "pre_cleanup": "database connection is not initialized", + "create_stream": "database connection is not initialized", "init_database_clickhouse": "database connection is not initialized", }, postStepFunctions: map[string]StepFunction{ diff --git a/bulkerlib/implementations/sql/redshift.go b/bulkerlib/implementations/sql/redshift.go index 258e1c2..8d9cc2f 100644 --- a/bulkerlib/implementations/sql/redshift.go +++ b/bulkerlib/implementations/sql/redshift.go @@ -20,7 +20,7 @@ func init() { const ( RedshiftBulkerTypeId = "redshift" - redshiftCopyTemplate = `copy %s (%s) + redshiftCopyTemplate = `copy %s%s (%s) from 's3://%s/%s' ACCESS_KEY_ID '%s' SECRET_ACCESS_KEY '%s' @@ -31,8 +31,8 @@ const ( dateformat 'auto' timeformat 'auto'` - redshiftAlterSortKeyTemplate = `ALTER TABLE %s ALTER SORTKEY (%s)` - redshiftDeleteBeforeBulkMergeUsing = `DELETE FROM %s using %s where %s` + redshiftAlterSortKeyTemplate = `ALTER TABLE %s%s ALTER SORTKEY (%s)` + redshiftDeleteBeforeBulkMergeUsing = `DELETE FROM %s%s using %s where %s` redshiftPrimaryKeyFieldsQuery = `select tco.constraint_name as constraint_name, kcu.column_name as key_column from information_schema.table_constraints tco @@ -92,6 +92,7 @@ func NewRedshift(bulkerConfig bulker.Config) (bulker.Bulker, error) { r.typesMapping, r.reverseTypesMapping = InitTypes(redshiftTypes, false) r.tableHelper = NewTableHelper(127, '"') r.temporaryTables = true + r.renameToSchemaless = true //// Redshift is case insensitive by default //r._columnNameFunc = strings.ToLower //r._tableNameFunc = func(config *DataSourceConfig, tableName string) string { return tableName } @@ -151,7 +152,7 @@ func (p *Redshift) Insert(ctx context.Context, table *Table, merge bool, objects pkMatchConditions = pkMatchConditions.Add(pkColumn, "=", value) } } - res, err := p.Select(ctx, table.Name, pkMatchConditions, nil) + res, err := p.Select(ctx, table.Namespace, table.Name, pkMatchConditions, nil) if err != nil { return errorj.ExecuteInsertError.Wrap(err, "failed check primary key collision"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ @@ -172,6 +173,7 @@ func (p *Redshift) Insert(ctx context.Context, table *Table, merge bool, objects // LoadTable copy transfer data from s3 to redshift by passing COPY request to redshift func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) { quotedTableName := p.quotedTableName(targetTable.Name) + namespace := p.namespacePrefix(targetTable.Namespace) if loadSource.Type != AmazonS3 { return state, fmt.Errorf("LoadTable: only Amazon S3 file is supported") } @@ -185,13 +187,13 @@ func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource if s3Config.Folder != "" { fileKey = s3Config.Folder + "/" + fileKey } - statement := fmt.Sprintf(redshiftCopyTemplate, quotedTableName, strings.Join(columnNames, ","), s3Config.Bucket, fileKey, s3Config.AccessKeyID, s3Config.SecretKey, s3Config.Region) + statement := fmt.Sprintf(redshiftCopyTemplate, namespace, quotedTableName, strings.Join(columnNames, ","), s3Config.Bucket, fileKey, s3Config.AccessKeyID, s3Config.SecretKey, s3Config.Region) if _, err := p.txOrDb(ctx).ExecContext(ctx, statement); err != nil { return state, errorj.CopyError.Wrap(err, "failed to copy data from s3"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ Schema: p.config.Schema, Table: quotedTableName, - Statement: fmt.Sprintf(redshiftCopyTemplate, quotedTableName, strings.Join(columnNames, ","), s3Config.Bucket, fileKey, credentialsMask, credentialsMask, s3Config.Region), + Statement: fmt.Sprintf(redshiftCopyTemplate, namespace, quotedTableName, strings.Join(columnNames, ","), s3Config.Bucket, fileKey, credentialsMask, credentialsMask, s3Config.Region), }) } @@ -200,6 +202,7 @@ func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource func (p *Redshift) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error) { quotedTargetTableName := p.quotedTableName(targetTable.Name) + namespace := p.namespacePrefix(targetTable.Namespace) quotedSourceTableName := p.quotedTableName(sourceTable.Name) if mergeWindow > 0 && targetTable.PKFields.Size() > 0 { @@ -211,7 +214,7 @@ func (p *Redshift) CopyTables(ctx context.Context, targetTable *Table, sourceTab } pkMatchConditions += fmt.Sprintf(`%s.%s = %s.%s`, quotedTargetTableName, pkColumn, quotedSourceTableName, pkColumn) } - deleteStatement := fmt.Sprintf(redshiftDeleteBeforeBulkMergeUsing, quotedTargetTableName, quotedSourceTableName, pkMatchConditions) + deleteStatement := fmt.Sprintf(redshiftDeleteBeforeBulkMergeUsing, namespace, quotedTargetTableName, quotedSourceTableName, pkMatchConditions) if _, err = p.txOrDb(ctx).ExecContext(ctx, deleteStatement); err != nil { @@ -229,17 +232,18 @@ func (p *Redshift) CopyTables(ctx context.Context, targetTable *Table, sourceTab func (p *Redshift) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error) { tmpTable := "deprecated_" + targetTableName + time.Now().Format("_20060102_150405") - err1 := p.renameTable(ctx, true, targetTableName, tmpTable) - err = p.renameTable(ctx, false, replacementTable.Name, targetTableName) + err1 := p.renameTable(ctx, true, replacementTable.Namespace, targetTableName, tmpTable) + err = p.renameTable(ctx, false, replacementTable.Namespace, replacementTable.Name, targetTableName) if dropOldTable && err1 == nil && err == nil { - return p.DropTable(ctx, tmpTable, true) + return p.DropTable(ctx, replacementTable.Namespace, tmpTable, true) } return } -func (p *Redshift) renameTable(ctx context.Context, ifExists bool, tableName, newTableName string) error { +func (p *Redshift) renameTable(ctx context.Context, ifExists bool, namespace, tableName, newTableName string) error { if ifExists { - row := p.txOrDb(ctx).QueryRowContext(ctx, fmt.Sprintf(`SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_schema ilike '%s' AND table_name = '%s')`, p.config.Schema, tableName)) + schema := p.TableName(utils.DefaultString(namespace, p.namespace)) + row := p.txOrDb(ctx).QueryRowContext(ctx, fmt.Sprintf(`SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_schema ilike '%s' AND table_name = '%s')`, schema, tableName)) exists := false err := row.Scan(&exists) if err != nil { @@ -249,12 +253,12 @@ func (p *Redshift) renameTable(ctx context.Context, ifExists bool, tableName, ne return nil } } - return p.SQLAdapterBase.renameTable(ctx, false, tableName, newTableName) + return p.SQLAdapterBase.renameTable(ctx, false, namespace, tableName, newTableName) } // GetTableSchema return table (name,columns, primary key) representation wrapped in Table struct -func (p *Redshift) GetTableSchema(ctx context.Context, tableName string) (*Table, error) { - table, err := p.getTable(ctx, strings.ToLower(tableName)) +func (p *Redshift) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error) { + table, err := p.getTable(ctx, namespace, strings.ToLower(tableName)) if err != nil { return nil, err } @@ -264,7 +268,7 @@ func (p *Redshift) GetTableSchema(ctx context.Context, tableName string) (*Table return table, nil } - primaryKeyName, pkFields, err := p.getPrimaryKeys(ctx, table.Name) + primaryKeyName, pkFields, err := p.getPrimaryKeys(ctx, table.Namespace, table.Name) if err != nil { return nil, err } @@ -278,14 +282,15 @@ func (p *Redshift) GetTableSchema(ctx context.Context, tableName string) (*Table return table, nil } -func (p *Redshift) getPrimaryKeys(ctx context.Context, tableName string) (string, types.OrderedSet[string], error) { +func (p *Redshift) getPrimaryKeys(ctx context.Context, namespace, tableName string) (string, types.OrderedSet[string], error) { tableName = p.TableName(tableName) + namespace = p.namespaceName(namespace) primaryKeys := types.NewOrderedSet[string]() - pkFieldsRows, err := p.txOrDb(ctx).QueryContext(ctx, redshiftPrimaryKeyFieldsQuery, p.config.Schema, tableName) + pkFieldsRows, err := p.txOrDb(ctx).QueryContext(ctx, redshiftPrimaryKeyFieldsQuery, namespace, tableName) if err != nil { return "", types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed to get primary key"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: p.config.Schema, + Schema: namespace, Table: tableName, Statement: redshiftPrimaryKeyFieldsQuery, Values: []any{p.config.Schema, tableName}, @@ -300,7 +305,7 @@ func (p *Redshift) getPrimaryKeys(ctx context.Context, tableName string) (string if err := pkFieldsRows.Scan(&constraintName, &fieldName); err != nil { return "", types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed to scan result"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: p.config.Schema, + Schema: namespace, Table: tableName, Statement: redshiftPrimaryKeyFieldsQuery, Values: []any{p.config.Schema, tableName}, @@ -314,7 +319,7 @@ func (p *Redshift) getPrimaryKeys(ctx context.Context, tableName string) (string if err := pkFieldsRows.Err(); err != nil { return "", types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed read last row"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: p.config.Schema, + Schema: namespace, Table: tableName, Statement: redshiftPrimaryKeyFieldsQuery, Values: []any{p.config.Schema, tableName}, @@ -326,16 +331,23 @@ func (p *Redshift) getPrimaryKeys(ctx context.Context, tableName string) (string return primaryKeyName, primaryKeys, nil } +func (p *Redshift) TmpNamespace(string) string { + return NoNamespaceValue +} func (p *Redshift) CreateTable(ctx context.Context, schemaToCreate *Table) error { - err := p.SQLAdapterBase.CreateTable(ctx, schemaToCreate) + err := p.createSchemaIfNotExists(ctx, schemaToCreate.Namespace) + if err != nil { + return err + } + err = p.SQLAdapterBase.CreateTable(ctx, schemaToCreate) if err != nil { return err } if !schemaToCreate.Temporary && schemaToCreate.TimestampColumn != "" { err = p.createSortKey(ctx, schemaToCreate) if err != nil { - p.DropTable(ctx, schemaToCreate.Name, true) + p.DropTable(ctx, schemaToCreate.Namespace, schemaToCreate.Name, true) return fmt.Errorf("failed to create sort key: %v", err) } } @@ -347,9 +359,10 @@ func (p *Redshift) createSortKey(ctx context.Context, table *Table) error { return nil } quotedTableName := p.quotedTableName(table.Name) + namespace := p.namespacePrefix(table.Namespace) statement := fmt.Sprintf(redshiftAlterSortKeyTemplate, - quotedTableName, p.quotedColumnName(table.TimestampColumn)) + namespace, quotedTableName, p.quotedColumnName(table.TimestampColumn)) if _, err := p.txOrDb(ctx).ExecContext(ctx, statement); err != nil { return errorj.AlterTableError.Wrap(err, "failed to set sort key"). diff --git a/bulkerlib/implementations/sql/replacepartition_stream.go b/bulkerlib/implementations/sql/replacepartition_stream.go index 4710175..ea2f7f2 100644 --- a/bulkerlib/implementations/sql/replacepartition_stream.go +++ b/bulkerlib/implementations/sql/replacepartition_stream.go @@ -32,7 +32,7 @@ func newReplacePartitionStream(id string, p SQLAdapter, tableName string, stream return nil, err } ps.partitionId = partitionId - ps.existingTable, _ = ps.sqlAdapter.GetTableSchema(context.Background(), ps.tableName) + ps.existingTable, _ = ps.sqlAdapter.GetTableSchema(context.Background(), ps.namespace, ps.tableName) ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) { dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable, tableForObject).Clone() ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object) @@ -41,6 +41,7 @@ func newReplacePartitionStream(id string, p SQLAdapter, tableName string, stream } tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405")) return &Table{ + Namespace: p.TmpNamespace(ps.namespace), Name: tmpTableName, Columns: dstTable.Columns, Temporary: true, @@ -118,7 +119,7 @@ func (ps *ReplacePartitionStream) Complete(ctx context.Context) (state bulker.St func (ps *ReplacePartitionStream) clearPartition(ctx context.Context, tx *TxSQLAdapter) error { //check if destination table already exists - table, err := tx.GetTableSchema(ctx, ps.tableName) + table, err := tx.GetTableSchema(ctx, ps.namespace, ps.tableName) if err != nil { return fmt.Errorf("couldn't start ReplacePartitionStream: failed to check existence of table: %s error: %s", ps.tableName, err) } @@ -130,7 +131,7 @@ func (ps *ReplacePartitionStream) clearPartition(ctx context.Context, tx *TxSQLA return fmt.Errorf("couldn't start ReplacePartitionStream: destination table [%s] exist but it is not managed by ReplacePartitionStream: %s column is missing", ps.tableName, tx.ColumnName(PartitonIdKeyword)) } //delete previous data by provided partition id - err = tx.Delete(ctx, ps.tableName, ByPartitionId(ps.partitionId)) + err = tx.Delete(ctx, ps.namespace, ps.tableName, ByPartitionId(ps.partitionId)) if err != nil { return fmt.Errorf("couldn't start ReplacePartitionStream: failed to delete data for partitionId: %s error: %s", ps.partitionId, err) } diff --git a/bulkerlib/implementations/sql/replacetable_stream.go b/bulkerlib/implementations/sql/replacetable_stream.go index 9ded219..b611ce7 100644 --- a/bulkerlib/implementations/sql/replacetable_stream.go +++ b/bulkerlib/implementations/sql/replacetable_stream.go @@ -28,6 +28,7 @@ func newReplaceTableStream(id string, p SQLAdapter, tableName string, streamOpti } ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) { tmpTable := &Table{ + Namespace: ps.namespace, Name: fmt.Sprintf("%s_tmp%s", utils.ShortenString(ps.tableName, 47), time.Now().Format("060102150405")), PrimaryKeyName: tableForObject.PrimaryKeyName, //PrimaryKeyName: fmt.Sprintf("%s_%s", tableForObject.PrimaryKeyName, time.Now().Format("060102_150405")), @@ -89,17 +90,17 @@ func (ps *ReplaceTableStream) Complete(ctx context.Context) (state bulker.State, err = ps.init(ctx) if err == nil { var table *Table - table, err = ps.tx.GetTableSchema(ctx, ps.tableName) + table, err = ps.tx.GetTableSchema(ctx, ps.namespace, ps.tableName) if table.Exists() { - err = ps.tx.TruncateTable(ctx, ps.tableName) + err = ps.tx.TruncateTable(ctx, ps.namespace, ps.tableName) } } } else { //no transaction was opened yet and not needed that is why we use ps.sqlAdapter instead of tx var table *Table - table, err = ps.sqlAdapter.GetTableSchema(ctx, ps.tableName) + table, err = ps.sqlAdapter.GetTableSchema(ctx, ps.namespace, ps.tableName) if table.Exists() { - err = ps.sqlAdapter.TruncateTable(ctx, ps.tableName) + err = ps.sqlAdapter.TruncateTable(ctx, ps.namespace, ps.tableName) } } } diff --git a/bulkerlib/implementations/sql/snowflake.go b/bulkerlib/implementations/sql/snowflake.go index 93d2849..07135df 100644 --- a/bulkerlib/implementations/sql/snowflake.go +++ b/bulkerlib/implementations/sql/snowflake.go @@ -32,16 +32,16 @@ const ( SnowflakeBulkerTypeId = "snowflake" sfTableExistenceQuery = `SELECT count(*) from INFORMATION_SCHEMA.COLUMNS where TABLE_SCHEMA = ? and TABLE_NAME = ?` - sfDescTableQuery = `desc table %s` - sfAlterClusteringKeyTemplate = `ALTER TABLE %s CLUSTER BY (DATE_TRUNC('MONTH', %s))` + sfDescTableQuery = `desc table %s%s` + sfAlterClusteringKeyTemplate = `ALTER TABLE %s%s CLUSTER BY (DATE_TRUNC('MONTH', %s))` - sfCopyStatement = `COPY INTO %s (%s) from @~/%s FILE_FORMAT=(TYPE= 'CSV', FIELD_OPTIONALLY_ENCLOSED_BY = '"' ESCAPE_UNENCLOSED_FIELD = NONE SKIP_HEADER = 1) ` + sfCopyStatement = `COPY INTO %s%s (%s) from @~/%s FILE_FORMAT=(TYPE= 'CSV', FIELD_OPTIONALLY_ENCLOSED_BY = '"' ESCAPE_UNENCLOSED_FIELD = NONE SKIP_HEADER = 1) ` - sfMergeStatement = `MERGE INTO {{.TableTo}} T USING (SELECT {{.Columns}} FROM {{.TableFrom}} ) S ON {{.JoinConditions}} WHEN MATCHED THEN UPDATE SET {{.UpdateSet}} WHEN NOT MATCHED THEN INSERT ({{.Columns}}) VALUES ({{.SourceColumns}})` + sfMergeStatement = `MERGE INTO {{.Namespace}}{{.TableTo}} T USING (SELECT {{.Columns}} FROM {{.NamespaceFrom}}{{.TableFrom}} ) S ON {{.JoinConditions}} WHEN MATCHED THEN UPDATE SET {{.UpdateSet}} WHEN NOT MATCHED THEN INSERT ({{.Columns}}) VALUES ({{.SourceColumns}})` sfCreateSchemaIfNotExistsTemplate = `CREATE SCHEMA IF NOT EXISTS %s` - sfPrimaryKeyFieldsQuery = `show primary keys in %s` + sfPrimaryKeyFieldsQuery = `show primary keys in %s%s` ) var ( @@ -165,7 +165,7 @@ func NewSnowflake(bulkerConfig bulker.Config) (bulker.Bulker, error) { if bulkerConfig.LogLevel == bulker.Verbose { queryLogger = logging.NewQueryLogger(bulkerConfig.Id, os.Stderr, os.Stderr) } - sqlAdapter, err := newSQLAdapterBase(bulkerConfig.Id, SnowflakeBulkerTypeId, config, dbConnectFunction, snowflakeTypes, queryLogger, typecastFunc, QuestionMarkParameterPlaceholder, sfColumnDDL, unmappedValue, checkErr, false) + sqlAdapter, err := newSQLAdapterBase(bulkerConfig.Id, SnowflakeBulkerTypeId, config, config.Schema, dbConnectFunction, snowflakeTypes, queryLogger, typecastFunc, QuestionMarkParameterPlaceholder, sfColumnDDL, unmappedValue, checkErr, false) s := &Snowflake{sqlAdapter} s.batchFileFormat = types2.FileFormatCSV s.batchFileCompression = types2.FileCompressionGZIP @@ -217,29 +217,39 @@ func (s *Snowflake) OpenTx(ctx context.Context) (*TxSQLAdapter, error) { return s.openTx(ctx, s) } -// InitDatabase create database schema instance if doesn't exist -func (s *Snowflake) InitDatabase(ctx context.Context) error { - query := fmt.Sprintf(sfCreateSchemaIfNotExistsTemplate, s.config.Schema) +func (s *Snowflake) createSchemaIfNotExists(ctx context.Context, schema string) error { + if schema == "" { + return nil + } + schema = s.namespaceName(schema) + if schema == "" { + return nil + } + query := fmt.Sprintf(sfCreateSchemaIfNotExistsTemplate, schema) if _, err := s.txOrDb(ctx).ExecContext(ctx, query); err != nil { err = checkErr(err) return errorj.CreateSchemaError.Wrap(err, "failed to create db schema"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: s.config.Schema, + Schema: schema, Statement: query, }) } - return nil } +// InitDatabase create database schema instance if doesn't exist +func (s *Snowflake) InitDatabase(ctx context.Context) error { + return s.createSchemaIfNotExists(ctx, s.config.Schema) +} + // GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct -func (s *Snowflake) GetTableSchema(ctx context.Context, tableName string) (*Table, error) { +func (s *Snowflake) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error) { quotedTableName, tableName := s.tableHelper.adaptTableName(tableName) - table := &Table{Name: tableName, Columns: NewColumns(), PKFields: types.NewOrderedSet[string]()} + table := &Table{Name: tableName, Namespace: namespace, Columns: NewColumns(), PKFields: types.NewOrderedSet[string]()} - query := fmt.Sprintf(sfDescTableQuery, quotedTableName) + query := fmt.Sprintf(sfDescTableQuery, s.namespacePrefix(namespace), quotedTableName) rows, err := s.txOrDb(ctx).QueryContext(ctx, query) if err != nil { if strings.Contains(err.Error(), "does not exist") { @@ -280,7 +290,7 @@ func (s *Snowflake) GetTableSchema(ctx context.Context, tableName string) (*Tabl }) } - primaryKeyName, pkFields, err := s.getPrimaryKey(ctx, tableName) + primaryKeyName, pkFields, err := s.getPrimaryKey(ctx, namespace, tableName) if err != nil { return nil, err } @@ -301,16 +311,16 @@ func (s *Snowflake) BuildConstraintName(tableName string) string { } // getPrimaryKey returns primary key name and fields -func (s *Snowflake) getPrimaryKey(ctx context.Context, tableName string) (string, types.OrderedSet[string], error) { +func (s *Snowflake) getPrimaryKey(ctx context.Context, namespace, tableName string) (string, types.OrderedSet[string], error) { quotedTableName := s.quotedTableName(tableName) primaryKeys := types.NewOrderedSet[string]() - statement := fmt.Sprintf(sfPrimaryKeyFieldsQuery, quotedTableName) + statement := fmt.Sprintf(sfPrimaryKeyFieldsQuery, s.namespacePrefix(namespace), quotedTableName) pkFieldsRows, err := s.txOrDb(ctx).QueryContext(ctx, statement) if err != nil { return "", types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed to get primary key"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: s.config.Schema, + Schema: namespace, Table: quotedTableName, Statement: statement, }) @@ -325,7 +335,7 @@ func (s *Snowflake) getPrimaryKey(ctx context.Context, tableName string) (string if err != nil { return "", types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed to get primary key"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: s.config.Schema, + Schema: namespace, Table: quotedTableName, Statement: statement, }) @@ -351,7 +361,7 @@ func (s *Snowflake) getPrimaryKey(ctx context.Context, tableName string) (string if err := pkFieldsRows.Err(); err != nil { return "", types.OrderedSet[string]{}, errorj.GetPrimaryKeysError.Wrap(err, "failed read last row"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: s.config.Schema, + Schema: namespace, Table: quotedTableName, Statement: statement, }) @@ -368,7 +378,7 @@ func (s *Snowflake) getPrimaryKey(ctx context.Context, tableName string) (string // LoadTable transfer data from local file to Snowflake by passing COPY request to Snowflake func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) { quotedTableName := s.quotedTableName(targetTable.Name) - + namespacePrefix := s.namespacePrefix(targetTable.Namespace) if loadSource.Type != LocalFile { return state, fmt.Errorf("LoadTable: only local file is supported") } @@ -407,12 +417,12 @@ func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSourc }) }() columnNames := targetTable.MappedColumnNames(s.quotedColumnName) - statement := fmt.Sprintf(sfCopyStatement, quotedTableName, strings.Join(columnNames, ","), path.Base(loadSource.Path)) + statement := fmt.Sprintf(sfCopyStatement, namespacePrefix, quotedTableName, strings.Join(columnNames, ","), path.Base(loadSource.Path)) if _, err := s.txOrDb(ctx).ExecContext(ctx, statement); err != nil { return state, errorj.CopyError.Wrap(err, "failed to copy data from stage"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ - Schema: s.config.Schema, + Schema: targetTable.Namespace, Table: quotedTableName, Statement: statement, }) @@ -436,7 +446,7 @@ func (s *Snowflake) Insert(ctx context.Context, table *Table, merge bool, object pkMatchConditions = pkMatchConditions.Add(pkColumn, "=", value) } } - res, err := s.SQLAdapterBase.Select(ctx, table.Name, pkMatchConditions, nil) + res, err := s.SQLAdapterBase.Select(ctx, table.Namespace, table.Name, pkMatchConditions, nil) if err != nil { return errorj.ExecuteInsertError.Wrap(err, "failed check primary key collision"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ @@ -464,10 +474,10 @@ func (s *Snowflake) CopyTables(ctx context.Context, targetTable *Table, sourceTa func (s *Snowflake) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) error { tmpTable := "deprecated_" + targetTableName + time.Now().Format("_20060102_150405") - err1 := s.renameTable(ctx, true, targetTableName, tmpTable) - err := s.renameTable(ctx, false, replacementTable.Name, targetTableName) + err1 := s.renameTable(ctx, true, replacementTable.Namespace, targetTableName, tmpTable) + err := s.renameTable(ctx, false, replacementTable.Namespace, replacementTable.Name, targetTableName) if dropOldTable && err1 == nil && err == nil { - return s.DropTable(ctx, tmpTable, true) + return s.DropTable(ctx, replacementTable.Namespace, tmpTable, true) } return nil } @@ -477,9 +487,9 @@ func sfColumnDDL(quotedName, _ string, _ *Table, column types2.SQLColumn) string return fmt.Sprintf(`%s %s`, quotedName, column.GetDDLType()) } -func (s *Snowflake) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { +func (s *Snowflake) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { ctx = sf.WithHigherPrecision(ctx) - return s.SQLAdapterBase.Select(ctx, tableName, whenConditions, orderBy) + return s.SQLAdapterBase.Select(ctx, namespace, tableName, whenConditions, orderBy) } func sfIdentifierFunction(value string, alphanumeric bool) (adapted string, needQuotes bool) { @@ -494,14 +504,18 @@ func sfIdentifierFunction(value string, alphanumeric bool) (adapted string, need } func (s *Snowflake) CreateTable(ctx context.Context, schemaToCreate *Table) error { - err := s.SQLAdapterBase.CreateTable(ctx, schemaToCreate) + err := s.createSchemaIfNotExists(ctx, schemaToCreate.Namespace) + if err != nil { + return err + } + err = s.SQLAdapterBase.CreateTable(ctx, schemaToCreate) if err != nil { return err } if !schemaToCreate.Temporary && schemaToCreate.TimestampColumn != "" { err = s.createClusteringKey(ctx, schemaToCreate) if err != nil { - s.DropTable(ctx, schemaToCreate.Name, true) + s.DropTable(ctx, schemaToCreate.Namespace, schemaToCreate.Name, true) return fmt.Errorf("failed to create sort key: %v", err) } } @@ -513,9 +527,9 @@ func (s *Snowflake) createClusteringKey(ctx context.Context, table *Table) error return nil } quotedTableName := s.quotedTableName(table.Name) - + namespacePrefix := s.namespacePrefix(table.Namespace) statement := fmt.Sprintf(sfAlterClusteringKeyTemplate, - quotedTableName, s.quotedColumnName(table.TimestampColumn)) + quotedTableName, namespacePrefix, s.quotedColumnName(table.TimestampColumn)) if _, err := s.txOrDb(ctx).ExecContext(ctx, statement); err != nil { return errorj.AlterTableError.Wrap(err, "failed to set clustering key"). diff --git a/bulkerlib/implementations/sql/sql_adapter.go b/bulkerlib/implementations/sql/sql_adapter.go index 2ba5640..dbd2d24 100644 --- a/bulkerlib/implementations/sql/sql_adapter.go +++ b/bulkerlib/implementations/sql/sql_adapter.go @@ -36,27 +36,31 @@ type SQLAdapter interface { // InitDatabase setups required db objects like 'schema' or 'dataset' if they don't exist InitDatabase(ctx context.Context) error TableHelper() *TableHelper - GetTableSchema(ctx context.Context, tableName string) (*Table, error) + GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error) CreateTable(ctx context.Context, schemaToCreate *Table) error CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) PatchTableSchema(ctx context.Context, patchTable *Table) error - TruncateTable(ctx context.Context, tableName string) error + TruncateTable(ctx context.Context, namespace string, tableName string) error //(ctx context.Context, tableName string, object types.Object, whenConditions *WhenConditions) error - Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error - DropTable(ctx context.Context, tableName string, ifExists bool) error + Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error + DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error Drop(ctx context.Context, table *Table, ifExists bool) error ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) error - Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) - Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error) + Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) + Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error) // ColumnName adapts column name to sql identifier rules of database ColumnName(rawColumn string) string // TableName adapts table name to sql identifier rules of database TableName(rawTableName string) string BuildConstraintName(tableName string) string + DefaultNamespace() string + // TmpNamespace returns namespace used by temporary tables, e.g. for warehouses where temporary tables + // must not be specified with schema or db prefix NoNamespaceValue constant must be used + TmpNamespace(targetNamespace string) string } type LoadSourceType string @@ -79,6 +83,14 @@ type TxSQLAdapter struct { tx *TxWrapper } +func (tx *TxSQLAdapter) TmpNamespace(targetNamespace string) string { + return tx.sqlAdapter.TmpNamespace(targetNamespace) +} + +func (tx *TxSQLAdapter) DefaultNamespace() string { + return tx.sqlAdapter.DefaultNamespace() +} + func (tx *TxSQLAdapter) Type() string { return tx.sqlAdapter.Type() } @@ -131,9 +143,9 @@ func (tx *TxSQLAdapter) TableHelper() *TableHelper { return tx.sqlAdapter.TableHelper() } -func (tx *TxSQLAdapter) GetTableSchema(ctx context.Context, tableName string) (*Table, error) { +func (tx *TxSQLAdapter) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error) { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) - return tx.sqlAdapter.GetTableSchema(ctx, tableName) + return tx.sqlAdapter.GetTableSchema(ctx, namespace, tableName) } func (tx *TxSQLAdapter) CreateTable(ctx context.Context, schemaToCreate *Table) error { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) @@ -151,22 +163,22 @@ func (tx *TxSQLAdapter) PatchTableSchema(ctx context.Context, patchTable *Table) ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) return tx.sqlAdapter.PatchTableSchema(ctx, patchTable) } -func (tx *TxSQLAdapter) TruncateTable(ctx context.Context, tableName string) error { +func (tx *TxSQLAdapter) TruncateTable(ctx context.Context, namespace string, tableName string) error { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) - return tx.sqlAdapter.TruncateTable(ctx, tableName) + return tx.sqlAdapter.TruncateTable(ctx, namespace, tableName) } // func (tx *TxSQLAdapter) Update(ctx context.Context, tableName string, object types.Object, whenConditions *WhenConditions) error { // ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) // return tx.sqlAdapter.Update(ctx, tableName, object, whenConditions) // } -func (tx *TxSQLAdapter) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error { +func (tx *TxSQLAdapter) Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) - return tx.sqlAdapter.Delete(ctx, tableName, deleteConditions) + return tx.sqlAdapter.Delete(ctx, namespace, tableName, deleteConditions) } -func (tx *TxSQLAdapter) DropTable(ctx context.Context, tableName string, ifExists bool) error { +func (tx *TxSQLAdapter) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) - return tx.sqlAdapter.DropTable(ctx, tableName, ifExists) + return tx.sqlAdapter.DropTable(ctx, namespace, tableName, ifExists) } func (tx *TxSQLAdapter) Drop(ctx context.Context, table *Table, ifExists bool) error { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) @@ -177,13 +189,13 @@ func (tx *TxSQLAdapter) ReplaceTable(ctx context.Context, targetTableName string return tx.sqlAdapter.ReplaceTable(ctx, targetTableName, replacementTable, dropOldTable) } -func (tx *TxSQLAdapter) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { +func (tx *TxSQLAdapter) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) - return tx.sqlAdapter.Select(ctx, tableName, whenConditions, orderBy) + return tx.sqlAdapter.Select(ctx, namespace, tableName, whenConditions, orderBy) } -func (tx *TxSQLAdapter) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error) { +func (tx *TxSQLAdapter) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error) { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) - return tx.sqlAdapter.Count(ctx, tableName, whenConditions) + return tx.sqlAdapter.Count(ctx, namespace, tableName, whenConditions) } func (tx *TxSQLAdapter) Commit() error { diff --git a/bulkerlib/implementations/sql/sql_adapter_base.go b/bulkerlib/implementations/sql/sql_adapter_base.go index 7873841..4f45ea9 100644 --- a/bulkerlib/implementations/sql/sql_adapter_base.go +++ b/bulkerlib/implementations/sql/sql_adapter_base.go @@ -19,21 +19,25 @@ import ( ) const ( - createTableTemplate = `CREATE %s TABLE %s (%s)` - addColumnTemplate = `ALTER TABLE %s ADD COLUMN %s` - dropPrimaryKeyTemplate = `ALTER TABLE %s DROP CONSTRAINT %s` - alterPrimaryKeyTemplate = `ALTER TABLE %s ADD CONSTRAINT %s PRIMARY KEY (%s)` - - deleteQueryTemplate = `DELETE FROM %s WHERE %s` - selectQueryTemplate = `SELECT %s FROM %s%s%s` - insertQuery = `INSERT INTO {{.TableName}}({{.Columns}}) VALUES ({{.Placeholders}})` - insertFromSelectQuery = `INSERT INTO {{.TableTo}}({{.Columns}}) SELECT {{.Columns}} FROM {{.TableFrom}}` - renameTableTemplate = `ALTER TABLE %s%s RENAME TO %s` - - updateStatementTemplate = `UPDATE %s SET %s WHERE %s` - dropTableTemplate = `DROP TABLE %s%s` - truncateTableTemplate = `TRUNCATE TABLE %s` - deleteAllQueryTemplate = `DELETE FROM %s` + createTableTemplate = `CREATE %s TABLE %s%s (%s)` + addColumnTemplate = `ALTER TABLE %s%s ADD COLUMN %s` + dropPrimaryKeyTemplate = `ALTER TABLE %s%s DROP CONSTRAINT %s` + alterPrimaryKeyTemplate = `ALTER TABLE %s%s ADD CONSTRAINT %s PRIMARY KEY (%s)` + + deleteQueryTemplate = `DELETE FROM %s%s WHERE %s` + selectQueryTemplate = `SELECT %s FROM %s%s%s%s` + insertQuery = `INSERT INTO {{.Namespace}}{{.TableName}}({{.Columns}}) VALUES ({{.Placeholders}})` + insertFromSelectQuery = `INSERT INTO {{.Namespace}}{{.TableTo}}({{.Columns}}) SELECT {{.Columns}} FROM {{.NamespaceFrom}}{{.TableFrom}}` + renameTableTemplate = `ALTER TABLE %s%s%s RENAME TO %s%s` + + updateStatementTemplate = `UPDATE %s%s SET %s WHERE %s` + dropTableTemplate = `DROP TABLE %s %s%s` + truncateTableTemplate = `TRUNCATE TABLE %s%s` + deleteAllQueryTemplate = `DELETE FROM %s%s` + + // that value indicates that table must not use namespace (schema or db) in queries. + // e.g. for Redshift where temporary tables don't belong to any schema + NoNamespaceValue = "__jitsu_no_namespace__" ) var ( @@ -62,13 +66,16 @@ type ErrorAdapter func(error) error type SQLAdapterBase[T any] struct { appbase.Service - typeId string + typeId string + // schema dataset or database + namespace string config *T dataSource *sql.DB queryLogger *logging.QueryLogger batchFileFormat types2.FileFormat batchFileCompression types2.FileCompression temporaryTables bool + renameToSchemaless bool // stringifyObjects objects types like JSON, array will be stringified before sent to warehouse (warehouse will parse them back) stringifyObjects bool @@ -84,11 +91,12 @@ type SQLAdapterBase[T any] struct { checkErrFunc ErrorAdapter } -func newSQLAdapterBase[T any](id string, typeId string, config *T, dbConnectFunction DbConnectFunction[T], dataTypes map[types2.DataType][]string, queryLogger *logging.QueryLogger, typecastFunc TypeCastFunction, parameterPlaceholder ParameterPlaceholder, columnDDLFunc ColumnDDLFunction, valueMappingFunction ValueMappingFunction, checkErrFunc ErrorAdapter, supportsJSON bool) (*SQLAdapterBase[T], error) { +func newSQLAdapterBase[T any](id string, typeId string, config *T, namespace string, dbConnectFunction DbConnectFunction[T], dataTypes map[types2.DataType][]string, queryLogger *logging.QueryLogger, typecastFunc TypeCastFunction, parameterPlaceholder ParameterPlaceholder, columnDDLFunc ColumnDDLFunction, valueMappingFunction ValueMappingFunction, checkErrFunc ErrorAdapter, supportsJSON bool) (*SQLAdapterBase[T], error) { s := SQLAdapterBase[T]{ Service: appbase.NewServiceBase(id), typeId: typeId, config: config, + namespace: namespace, dbConnectFunction: dbConnectFunction, queryLogger: queryLogger, parameterPlaceholder: parameterPlaceholder, @@ -193,11 +201,13 @@ func (b *SQLAdapterBase[T]) txOrDb(ctx context.Context) TxOrDB { return txOrDb } -func (b *SQLAdapterBase[T]) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { - return b.selectFrom(ctx, selectQueryTemplate, tableName, "*", whenConditions, orderBy) +func (b *SQLAdapterBase[T]) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { + return b.selectFrom(ctx, selectQueryTemplate, namespace, tableName, "*", whenConditions, orderBy) } -func (b *SQLAdapterBase[T]) selectFrom(ctx context.Context, statement string, tableName string, selectExpression string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { +func (b *SQLAdapterBase[T]) selectFrom(ctx context.Context, statement string, namespace string, tableName string, selectExpression string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) { quotedTableName := b.tableHelper.quotedTableName(tableName) + quotedSchema := b.namespacePrefix(namespace) + whenCondition, values := b.ToWhenConditions(whenConditions, b.parameterPlaceholder, 0) if whenCondition != "" { whenCondition = " WHERE " + whenCondition @@ -212,7 +222,7 @@ func (b *SQLAdapterBase[T]) selectFrom(ctx context.Context, statement string, ta } var rows *sql.Rows var err error - query := fmt.Sprintf(statement, selectExpression, quotedTableName, whenCondition, orderByClause) + query := fmt.Sprintf(statement, selectExpression, quotedSchema, quotedTableName, whenCondition, orderByClause) if b.typeId == MySQLBulkerTypeId { //For MySQL using Prepared statement switches mySQL to use Binary protocol that preserves types information var stmt *sql.Stmt @@ -261,8 +271,8 @@ func (b *SQLAdapterBase[T]) selectFrom(ctx context.Context, statement string, ta return result, nil } -func (b *SQLAdapterBase[T]) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error) { - res, err := b.selectFrom(ctx, selectQueryTemplate, tableName, "count(*) as jitsu_count", whenConditions, nil) +func (b *SQLAdapterBase[T]) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error) { + res, err := b.selectFrom(ctx, selectQueryTemplate, namespace, tableName, "count(*) as jitsu_count", whenConditions, nil) if err != nil { return -1, err } @@ -273,11 +283,12 @@ func (b *SQLAdapterBase[T]) Count(ctx context.Context, tableName string, whenCon return strconv.Atoi(fmt.Sprint(scnt)) } -func (b *SQLAdapterBase[T]) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error { +func (b *SQLAdapterBase[T]) Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error { quotedTableName := b.quotedTableName(tableName) + quotedSchema := b.namespacePrefix(namespace) deleteCondition, values := b.ToWhenConditions(deleteConditions, b.parameterPlaceholder, 0) - query := fmt.Sprintf(deleteQueryTemplate, quotedTableName, deleteCondition) + query := fmt.Sprintf(deleteQueryTemplate, quotedSchema, quotedTableName, deleteCondition) if _, err := b.txOrDb(ctx).ExecContext(ctx, query, values...); err != nil { @@ -293,6 +304,7 @@ func (b *SQLAdapterBase[T]) Delete(ctx context.Context, tableName string, delete func (b *SQLAdapterBase[T]) Update(ctx context.Context, table *Table, object types2.Object, whenConditions *WhenConditions) error { quotedTableName := b.quotedTableName(table.Name) + quotedSchema := b.namespacePrefix(table.Namespace) count := object.Len() @@ -311,7 +323,7 @@ func (b *SQLAdapterBase[T]) Update(ctx context.Context, table *Table, object typ values[i+a] = updateValues[a] } - statement := fmt.Sprintf(updateStatementTemplate, quotedTableName, strings.Join(columns, ", "), updateCondition) + statement := fmt.Sprintf(updateStatementTemplate, quotedSchema, quotedTableName, strings.Join(columns, ", "), updateCondition) if _, err := b.txOrDb(ctx).ExecContext(ctx, statement, values...); err != nil { @@ -326,14 +338,15 @@ func (b *SQLAdapterBase[T]) Update(ctx context.Context, table *Table, object typ return nil } -func (b *SQLAdapterBase[T]) DropTable(ctx context.Context, tableName string, ifExists bool) error { +func (b *SQLAdapterBase[T]) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error { quotedTableName := b.quotedTableName(tableName) + quotedSchema := b.namespacePrefix(namespace) ifExs := "" if ifExists { ifExs = "IF EXISTS " } - query := fmt.Sprintf(dropTableTemplate, ifExs, quotedTableName) + query := fmt.Sprintf(dropTableTemplate, ifExs, quotedSchema, quotedTableName) if _, err := b.txOrDb(ctx).ExecContext(ctx, query); err != nil { @@ -348,13 +361,14 @@ func (b *SQLAdapterBase[T]) DropTable(ctx context.Context, tableName string, ifE } func (b *SQLAdapterBase[T]) Drop(ctx context.Context, table *Table, ifExists bool) error { - return b.DropTable(ctx, table.Name, ifExists) + return b.DropTable(ctx, table.Namespace, table.Name, ifExists) } -func (b *SQLAdapterBase[T]) TruncateTable(ctx context.Context, tableName string) error { +func (b *SQLAdapterBase[T]) TruncateTable(ctx context.Context, namespace string, tableName string) error { quotedTableName := b.quotedTableName(tableName) + quotedSchema := b.namespacePrefix(namespace) - statement := fmt.Sprintf(truncateTableTemplate, quotedTableName) + statement := fmt.Sprintf(truncateTableTemplate, quotedSchema, quotedTableName) if _, err := b.txOrDb(ctx).ExecContext(ctx, statement); err != nil { return errorj.TruncateError.Wrap(err, "failed to truncate table"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ @@ -367,10 +381,11 @@ func (b *SQLAdapterBase[T]) TruncateTable(ctx context.Context, tableName string) } // DeleteAll deletes all records in tableName table -func (b *SQLAdapterBase[T]) DeleteAll(ctx context.Context, tableName string) error { +func (b *SQLAdapterBase[T]) DeleteAll(ctx context.Context, namespace, tableName string) error { quotedTableName := b.quotedTableName(tableName) + quotedSchema := b.namespacePrefix(namespace) - statement := fmt.Sprintf(deleteAllQueryTemplate, quotedTableName) + statement := fmt.Sprintf(deleteAllQueryTemplate, quotedSchema, quotedTableName) if _, err := b.txOrDb(ctx).ExecContext(ctx, statement); err != nil { return errorj.TruncateError.Wrap(err, "failed to delete all from table"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ @@ -383,6 +398,8 @@ func (b *SQLAdapterBase[T]) DeleteAll(ctx context.Context, tableName string) err } type QueryPayload struct { + Namespace string + NamespaceFrom string TableName string Columns string Placeholders string @@ -402,6 +419,7 @@ func (b *SQLAdapterBase[T]) insert(ctx context.Context, table *Table, objects [] // plainInsert inserts provided object into Snowflake func (b *SQLAdapterBase[T]) insertOrMerge(ctx context.Context, table *Table, objects []types2.Object, mergeQuery *template.Template) error { quotedTableName := b.quotedTableName(table.Name) + quotedSchema := b.namespacePrefix(table.Namespace) count := table.ColumnsCount() columnNames := make([]string, count) placeholders := make([]string, count) @@ -420,6 +438,7 @@ func (b *SQLAdapterBase[T]) insertOrMerge(ctx context.Context, table *Table, obj }) insertPayload := QueryPayload{ + Namespace: quotedSchema, TableName: quotedTableName, Columns: strings.Join(columnNames, ", "), Placeholders: strings.Join(placeholders, ", "), @@ -466,6 +485,8 @@ func (b *SQLAdapterBase[T]) copy(ctx context.Context, targetTable *Table, source func (b *SQLAdapterBase[T]) copyOrMerge(ctx context.Context, targetTable *Table, sourceTable *Table, mergeQuery *template.Template, sourceAlias string) (state bulkerlib.WarehouseState, err error) { startTime := time.Now() + quotedSchema := b.namespacePrefix(targetTable.Namespace) + quotedSchemaFrom := b.namespacePrefix(sourceTable.Namespace) quotedTargetTableName := b.quotedTableName(targetTable.Name) quotedSourceTableName := b.quotedTableName(sourceTable.Name) @@ -489,6 +510,8 @@ func (b *SQLAdapterBase[T]) copyOrMerge(ctx context.Context, targetTable *Table, }) } insertPayload := QueryPayload{ + Namespace: quotedSchema, + NamespaceFrom: quotedSchemaFrom, TableTo: quotedTargetTableName, TableFrom: quotedSourceTableName, Columns: strings.Join(columnNames, ","), @@ -528,6 +551,8 @@ func (b *SQLAdapterBase[T]) copyOrMerge(ctx context.Context, targetTable *Table, // make fields from Table PkFields - 'not null' func (b *SQLAdapterBase[T]) CreateTable(ctx context.Context, schemaToCreate *Table) error { quotedTableName := b.quotedTableName(schemaToCreate.Name) + quotedSchema := b.namespacePrefix(schemaToCreate.Namespace) + columnsDDL := schemaToCreate.MappedColumns(func(columnName string, column types2.SQLColumn) string { return b.columnDDL(columnName, schemaToCreate, column) }) @@ -536,7 +561,7 @@ func (b *SQLAdapterBase[T]) CreateTable(ctx context.Context, schemaToCreate *Tab temporary = "TEMPORARY" } - query := fmt.Sprintf(createTableTemplate, temporary, quotedTableName, strings.Join(columnsDDL, ", ")) + query := fmt.Sprintf(createTableTemplate, temporary, quotedSchema, quotedTableName, strings.Join(columnsDDL, ", ")) if _, err := b.txOrDb(ctx).ExecContext(ctx, query); err != nil { return errorj.CreateTableError.Wrap(err, "failed to create table"). @@ -558,11 +583,12 @@ func (b *SQLAdapterBase[T]) CreateTable(ctx context.Context, schemaToCreate *Tab // recreate primary key (if not empty) or delete primary key if Table.DeletePrimaryKeyNamed is true func (b *SQLAdapterBase[T]) PatchTableSchema(ctx context.Context, patchTable *Table) error { quotedTableName := b.quotedTableName(patchTable.Name) + quotedSchema := b.namespacePrefix(patchTable.Namespace) //patch columns err := patchTable.Columns.ForEachIndexedE(func(_ int, columnName string, column types2.SQLColumn) error { columnDDL := b.columnDDL(columnName, patchTable, column) - query := fmt.Sprintf(addColumnTemplate, quotedTableName, columnDDL) + query := fmt.Sprintf(addColumnTemplate, quotedSchema, quotedTableName, columnDDL) if _, err := b.txOrDb(ctx).ExecContext(ctx, query); err != nil { return errorj.PatchTableError.Wrap(err, "failed to patch table"). @@ -604,13 +630,14 @@ func (b *SQLAdapterBase[T]) createPrimaryKey(ctx context.Context, table *Table) } quotedTableName := b.quotedTableName(table.Name) + quotedSchema := b.namespacePrefix(table.Namespace) columnNames := make([]string, table.PKFields.Size()) for i, column := range table.GetPKFields() { columnNames[i] = b.quotedColumnName(column) } - statement := fmt.Sprintf(alterPrimaryKeyTemplate, + statement := fmt.Sprintf(alterPrimaryKeyTemplate, quotedSchema, quotedTableName, b.quotedTableName(table.PrimaryKeyName), strings.Join(columnNames, ",")) if _, err := b.txOrDb(ctx).ExecContext(ctx, statement); err != nil { @@ -628,8 +655,9 @@ func (b *SQLAdapterBase[T]) createPrimaryKey(ctx context.Context, table *Table) // delete primary key func (b *SQLAdapterBase[T]) deletePrimaryKey(ctx context.Context, table *Table, pkName string) error { quotedTableName := b.quotedTableName(table.Name) + quotedSchema := b.namespacePrefix(table.Namespace) - query := fmt.Sprintf(dropPrimaryKeyTemplate, quotedTableName, b.quotedTableName(pkName)) + query := fmt.Sprintf(dropPrimaryKeyTemplate, quotedSchema, quotedTableName, b.quotedTableName(pkName)) if _, err := b.txOrDb(ctx).ExecContext(ctx, query); err != nil { return errorj.DeletePrimaryKeysError.Wrap(err, "failed to delete primary key"). @@ -643,15 +671,16 @@ func (b *SQLAdapterBase[T]) deletePrimaryKey(ctx context.Context, table *Table, return nil } -func (b *SQLAdapterBase[T]) renameTable(ctx context.Context, ifExists bool, tableName, newTableName string) error { +func (b *SQLAdapterBase[T]) renameTable(ctx context.Context, ifExists bool, namespace, tableName, newTableName string) error { quotedTableName := b.quotedTableName(tableName) quotedNewTableName := b.quotedTableName(newTableName) - + quotedSchema := b.namespacePrefix(namespace) + renameToSchema := utils.Ternary(b.renameToSchemaless, "", quotedSchema) ifExs := "" if ifExists { ifExs = "IF EXISTS " } - query := fmt.Sprintf(renameTableTemplate, ifExs, quotedTableName, quotedNewTableName) + query := fmt.Sprintf(renameTableTemplate, ifExs, quotedSchema, quotedTableName, renameToSchema, quotedNewTableName) if _, err := b.txOrDb(ctx).ExecContext(ctx, query); err != nil { return errorj.RenameError.Wrap(err, "failed to rename table"). @@ -666,10 +695,10 @@ func (b *SQLAdapterBase[T]) renameTable(ctx context.Context, ifExists bool, tabl func (b *SQLAdapterBase[T]) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error) { tmpTable := "deprecated_" + targetTableName + time.Now().Format("_20060102_150405") - err1 := b.renameTable(ctx, true, targetTableName, tmpTable) - err = b.renameTable(ctx, false, replacementTable.Name, targetTableName) + err1 := b.renameTable(ctx, true, replacementTable.Namespace, targetTableName, tmpTable) + err = b.renameTable(ctx, false, replacementTable.Namespace, replacementTable.Name, targetTableName) if dropOldTable && err1 == nil && err == nil { - return b.DropTable(ctx, tmpTable, true) + return b.DropTable(ctx, replacementTable.Namespace, tmpTable, true) } else if err != nil { return multierror.Append(err, err1).ErrorOrNil() } @@ -690,6 +719,20 @@ func (b *SQLAdapterBase[T]) quotedTableName(tableName string) string { return b.tableHelper.quotedTableName(tableName) } +func (b *SQLAdapterBase[T]) namespaceName(namespace string) string { + if namespace == NoNamespaceValue { + return "" + } + return b.tableHelper.TableName(utils.DefaultString(namespace, b.namespace)) +} + +func (b *SQLAdapterBase[T]) namespacePrefix(namespace string) string { + if namespace == NoNamespaceValue { + return "" + } + return b.tableHelper.quotedTableName(utils.DefaultString(namespace, b.namespace)) + "." +} + // quotedColumnName adapts column name to sql identifier rules of database and quotes accordingly (if needed) func (b *SQLAdapterBase[T]) quotedColumnName(columnName string) string { return b.tableHelper.quotedColumnName(columnName) @@ -761,6 +804,14 @@ func (b *SQLAdapterBase[T]) BuildConstraintName(tableName string) string { return fmt.Sprintf("%s%s", BulkerManagedPkConstraintPrefix, uuid.NewLettersNumbers()) } +func (b *SQLAdapterBase[T]) DefaultNamespace() string { + return b.namespace +} + +func (b *SQLAdapterBase[T]) TmpNamespace(targetNamespace string) string { + return targetNamespace +} + func match(target, pattern string) bool { suff := pattern[0] == '%' pref := pattern[len(pattern)-1] == '%' diff --git a/bulkerlib/implementations/sql/table.go b/bulkerlib/implementations/sql/table.go index 5ceae4d..92f292f 100644 --- a/bulkerlib/implementations/sql/table.go +++ b/bulkerlib/implementations/sql/table.go @@ -38,7 +38,9 @@ type TableField struct { // Table is a dto for DWH Table representation type Table struct { - Name string + Name string + // database or schema depending on warehouse + Namespace string Temporary bool Cached bool @@ -105,6 +107,7 @@ func (t *Table) CleanClone() *Table { clonedPkFields := t.PKFields.Clone() return &Table{ + Namespace: t.Namespace, Name: t.Name, Columns: clonedColumns, PKFields: clonedPkFields, @@ -141,6 +144,7 @@ func (t *Table) Clone() *Table { clonedPkFields := t.PKFields.Clone() return &Table{ + Namespace: t.Namespace, Name: t.Name, Columns: clonedColumns, PKFields: clonedPkFields, @@ -168,7 +172,7 @@ func (t *Table) GetPKFieldsSet() types2.OrderedSet[string] { // 2) all fields from another schema exist in current schema // NOTE: Diff method doesn't take types into account func (t *Table) Diff(sqlAdapter SQLAdapter, another *Table) *Table { - diff := &Table{Name: t.Name, Columns: NewColumns(), PKFields: types2.NewOrderedSet[string]()} + diff := &Table{Name: t.Name, Namespace: t.Namespace, Columns: NewColumns(), PKFields: types2.NewOrderedSet[string]()} if !another.Exists() { return diff diff --git a/bulkerlib/implementations/sql/table_helper.go b/bulkerlib/implementations/sql/table_helper.go index 68c319a..75bcf1f 100644 --- a/bulkerlib/implementations/sql/table_helper.go +++ b/bulkerlib/implementations/sql/table_helper.go @@ -60,13 +60,17 @@ func NewTableHelper(maxIdentifierLength int, identifierQuoteChar rune) TableHelp // MapTableSchema maps types.TypesHeader (JSON structure with json data types) into types.Table (structure with SQL types) // applies column types mapping // adjusts object properties names to column names -func (th *TableHelper) MapTableSchema(sqlAdapter SQLAdapter, batchHeader *TypesHeader, object types2.Object, pkColumns []string, timestampColumn string) (*Table, types2.Object) { +func (th *TableHelper) MapTableSchema(sqlAdapter SQLAdapter, batchHeader *TypesHeader, object types2.Object, pkColumns []string, timestampColumn string, namespace string) (*Table, types2.Object) { adaptedPKFields := types.NewOrderedSet[string]() for _, pkField := range pkColumns { adaptedPKFields.Put(pkField) } + if namespace != "" { + namespace = th.TableName(namespace) + } table := &Table{ Name: sqlAdapter.TableName(batchHeader.TableName), + Namespace: namespace, Columns: NewColumns(), Partition: batchHeader.Partition, PKFields: adaptedPKFields, @@ -267,7 +271,7 @@ func (th *TableHelper) getOrCreateWithLock(ctx context.Context, sqlAdapter SQLAd func (th *TableHelper) getOrCreate(ctx context.Context, sqlAdapter SQLAdapter, dataSchema *Table) (*Table, error) { //Get schema - dbTableSchema, err := sqlAdapter.GetTableSchema(ctx, dataSchema.Name) + dbTableSchema, err := sqlAdapter.GetTableSchema(ctx, dataSchema.Namespace, dataSchema.Name) if err != nil { return nil, err } @@ -308,7 +312,7 @@ func (th *TableHelper) getTableIdentifier(destinationID, tableName string) strin return destinationID + "_" + tableName } -func (th *TableHelper) Get(ctx context.Context, sqlAdapter SQLAdapter, tableName string, cacheTable bool) (*Table, error) { +func (th *TableHelper) Get(ctx context.Context, sqlAdapter SQLAdapter, namespace string, tableName string, cacheTable bool) (*Table, error) { var table *Table var ok bool if cacheTable { @@ -317,7 +321,7 @@ func (th *TableHelper) Get(ctx context.Context, sqlAdapter SQLAdapter, tableName return table, nil } } - table, err := sqlAdapter.GetTableSchema(ctx, tableName) + table, err := sqlAdapter.GetTableSchema(ctx, namespace, tableName) if err != nil { return nil, err } diff --git a/bulkerlib/implementations/sql/testcontainers/mysql_container.go b/bulkerlib/implementations/sql/testcontainers/mysql_container.go index f446068..4a3272b 100644 --- a/bulkerlib/implementations/sql/testcontainers/mysql_container.go +++ b/bulkerlib/implementations/sql/testcontainers/mysql_container.go @@ -63,8 +63,8 @@ func NewMySQLContainer(ctx context.Context) (*MySQLContainer, error) { } dbSettings := make(map[string]string, 0) dbSettings["MYSQL_ROOT_PASSWORD"] = mySQLRootPassword - dbSettings["MYSQL_USER"] = mySQLUser - dbSettings["MYSQL_PASSWORD"] = mySQLPassword + //dbSettings["MYSQL_USER"] = mySQLUser + //dbSettings["MYSQL_PASSWORD"] = mySQLPassword dbSettings["MYSQL_DATABASE"] = mySQLDatabase //exposedPort := fmt.Sprintf("%d:%d", utils.GetPort(), 3306) @@ -94,11 +94,11 @@ func NewMySQLContainer(ctx context.Context) (*MySQLContainer, error) { container.Terminate(ctx) return nil, err } - _, _, err = container.Exec(context.Background(), []string{"mysql", "-uroot", "-p" + mySQLRootPassword, "-Bse", "CREATE USER 'root'@'%' IDENTIFIED BY '" + mySQLRootPassword + "';GRANT ALL ON *.* TO 'root'@'%';FLUSH PRIVILEGES;"}) - //logging.Infof("Exec result: %v err: %v", res, err) + _, _, err = container.Exec(context.Background(), []string{"mysql", "-uroot", "-p" + mySQLRootPassword, "-Bse", fmt.Sprintf("CREATE USER '%s'@'%%' IDENTIFIED BY '%s';GRANT ALL PRIVILEGES ON *.* TO '%s'@'%%';FLUSH PRIVILEGES;", mySQLUser, mySQLPassword, mySQLUser)}) + logging.Infof("Exec result err: %v", err) // [user[:password]@][net[(addr)]]/dbname[?param1=value1¶mN=valueN] connectionString := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", - "root", mySQLRootPassword, host, port.Int(), mySQLDatabase) + mySQLUser, mySQLPassword, host, port.Int(), mySQLDatabase) dataSource, err := sql.Open("mysql", connectionString) if err != nil { container.Terminate(ctx) diff --git a/bulkerlib/implementations/sql/transactional_stream.go b/bulkerlib/implementations/sql/transactional_stream.go index a2f9fca..2ef71b5 100644 --- a/bulkerlib/implementations/sql/transactional_stream.go +++ b/bulkerlib/implementations/sql/transactional_stream.go @@ -23,7 +23,7 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt if err != nil { return nil, err } - ps.existingTable, _ = ps.sqlAdapter.GetTableSchema(context.Background(), ps.tableName) + ps.existingTable, _ = ps.sqlAdapter.GetTableSchema(context.Background(), ps.namespace, ps.tableName) ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) { dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable, tableForObject).Clone() ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object) @@ -32,6 +32,7 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt } tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405")) return &Table{ + Namespace: p.TmpNamespace(ps.namespace), Name: tmpTableName, Columns: dstTable.Columns, Temporary: true, diff --git a/bulkerlib/options.go b/bulkerlib/options.go index c312bce..a17b241 100644 --- a/bulkerlib/options.go +++ b/bulkerlib/options.go @@ -88,6 +88,11 @@ var ( ParseFunc: utils.ParseString, } + NamespaceOption = ImplementationOption[string]{ + Key: "namespace", + ParseFunc: utils.ParseString, + } + // TimestampOption - field name that contains timestamp. For creating sorting indexes or partitions by that field in destination tables TimestampOption = ImplementationOption[string]{ Key: "timestampColumn", @@ -145,6 +150,7 @@ func init() { RegisterOption(&PartitionIdOption) RegisterOption(&TimestampOption) RegisterOption(&SchemaOption) + RegisterOption(&NamespaceOption) dummyParse := func(_ any) (any, error) { return nil, nil } for _, ignoredOption := range ignoredOptions { @@ -264,3 +270,7 @@ func WithDiscriminatorField(discriminatorField []string) StreamOption { func WithSchema(schema types.Schema) StreamOption { return WithOption(&SchemaOption, schema) } + +func WithNamespace(namespace string) StreamOption { + return WithOption(&NamespaceOption, namespace) +} diff --git a/sync-sidecar/read.go b/sync-sidecar/read.go index 3ab7b6e..25d7092 100644 --- a/sync-sidecar/read.go +++ b/sync-sidecar/read.go @@ -13,6 +13,7 @@ import ( "github.com/jitsucom/bulker/jitsubase/utils" "github.com/jitsucom/bulker/sync-sidecar/db" "os" + "strings" "sync" "sync/atomic" "time" @@ -398,7 +399,21 @@ func (s *ReadSideCar) openStream(streamName string) (*ActiveStream, error) { s.processedStreams[streamName] = stream } s.lastStream = stream - tableName := utils.NvlString(str.TableName, s.tableNamePrefix+streamName) + var namespace string + tableNamePrefix := strings.ReplaceAll(s.tableNamePrefix, "${SOURCE_NAMESPACE}", str.Namespace) + tableName := utils.NvlString(str.TableName, tableNamePrefix+str.Name) + switch str.NamespaceMode { + case "source": + namespace = str.Namespace + case "custom": + namespace = strings.ReplaceAll(str.CustomNamespace, "${SOURCE_NAMESPACE}", str.Namespace) + case "destination": + namespace = "" + default: + // legacy + namespace = "" + tableName = utils.NvlString(str.TableName, tableNamePrefix+streamName) + } jobId := fmt.Sprintf("%s_%s_%s", s.syncId, s.taskId, tableName) var streamOptions []bulker.StreamOption @@ -416,6 +431,9 @@ func (s *ReadSideCar) openStream(streamName string) (*ActiveStream, error) { } else if len(str.DefaultCursorField) > 0 { streamOptions = append(streamOptions, bulker.WithDiscriminatorField(str.DefaultCursorField)) } + if namespace != "" { + streamOptions = append(streamOptions, bulker.WithNamespace(namespace)) + } bulkerStream, err := s.blk.CreateStream(jobId, tableName, mode, streamOptions...) if err != nil { return stream, fmt.Errorf("error creating bulker stream: %v", err) diff --git a/sync-sidecar/types.go b/sync-sidecar/types.go index b4c0293..6383c72 100644 --- a/sync-sidecar/types.go +++ b/sync-sidecar/types.go @@ -119,6 +119,8 @@ type Catalog struct { type StreamMeta struct { Name string `json:"name"` Namespace string `json:"namespace"` + NamespaceMode string `json:"namespace_mode"` + CustomNamespace string `json:"custom_namespace"` TableName string `json:"table_name,omitempty"` JSONSchema StreamJsonSchema `json:"json_schema"` PrimaryKeys [][]string `json:"source_defined_primary_key"`