Skip to content

Commit

Permalink
bulkerlib: support for namespaces
Browse files Browse the repository at this point in the history
sync-sidecar: support for namespaces
  • Loading branch information
absorbb committed Aug 12, 2024
1 parent 0c7ac64 commit b54ceac
Show file tree
Hide file tree
Showing 23 changed files with 904 additions and 348 deletions.
7 changes: 6 additions & 1 deletion bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type AbstractSQLStream struct {
mode bulker.BulkMode
options bulker.StreamOptions
tableName string
namespace string
merge bool
mergeWindow int
omitNils bool
Expand Down Expand Up @@ -63,6 +64,10 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu
if ps.timestampColumn != "" {
ps.timestampColumn = p.ColumnName(ps.timestampColumn)
}
ps.namespace = bulker.NamespaceOption.Get(&ps.options)
if ps.namespace != "" {
ps.namespace = p.TableName(ps.namespace)
}
ps.omitNils = OmitNilsOption.Get(&ps.options)
ps.schemaFreeze = SchemaFreezeOption.Get(&ps.options)

Expand Down Expand Up @@ -98,7 +103,7 @@ func (ps *AbstractSQLStream) preprocess(object types.Object) (*Table, types.Obje
if err != nil {
return nil, nil, err
}
table, processedObject := ps.sqlAdapter.TableHelper().MapTableSchema(ps.sqlAdapter, batchHeader, processedObject, ps.pkColumns, ps.timestampColumn)
table, processedObject := ps.sqlAdapter.TableHelper().MapTableSchema(ps.sqlAdapter, batchHeader, processedObject, ps.pkColumns, ps.timestampColumn, ps.namespace)
ps.state.ProcessedRows++
return table, processedObject, nil
}
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/autocommit_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (s
if err != nil {
return
}
existingTable, err := ps.sqlAdapter.TableHelper().Get(ctx, ps.sqlAdapter, table.Name, true)
existingTable, err := ps.sqlAdapter.TableHelper().Get(ctx, ps.sqlAdapter, table.Namespace, table.Name, true)
if err != nil {
err = errorj.Decorate(err, "failed to get current table table")
return
Expand Down
8 changes: 4 additions & 4 deletions bulkerlib/implementations/sql/bigdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func testLotOfEvents(t *testing.T, testConfig bulkerTestConfig, mode bulker.Bulk
PostStep("init_database", testConfig, mode, reqr, err)
//clean up in case of previous test failure
if !testConfig.leaveResultingTable && !forceLeaveResultingTables {
err = sqlAdapter.DropTable(ctx, tableName, true)
err = sqlAdapter.DropTable(ctx, "", tableName, true)
PostStep("pre_cleanup", testConfig, mode, reqr, err)
}
//clean up after test run
if !testConfig.leaveResultingTable && !forceLeaveResultingTables {
defer func() {
_ = sqlAdapter.DropTable(ctx, tableName, true)
_ = sqlAdapter.DropTable(ctx, "", tableName, true)
}()
}
stream, err := blk.CreateStream(t.Name(), tableName, mode, testConfig.streamOptions...)
Expand Down Expand Up @@ -161,14 +161,14 @@ func testLotOfEvents(t *testing.T, testConfig bulkerTestConfig, mode bulker.Bulk

if testConfig.expectedTable.Columns != nil && testConfig.expectedTable.Columns.Len() > 0 {
//Check table schema
table, err := sqlAdapter.GetTableSchema(ctx, tableName)
table, err := sqlAdapter.GetTableSchema(ctx, "", tableName)
PostStep("get_table", testConfig, mode, reqr, err)
reqr.Equal(testConfig.expectedTable, table)
}
if testConfig.expectedRowsCount != nil {
time.Sleep(1 * time.Second)
//Check rows count and rows data when provided
count, err := sqlAdapter.Count(ctx, tableName, nil)
count, err := sqlAdapter.Count(ctx, "", tableName, nil)
PostStep("select_count", testConfig, mode, reqr, err)
reqr.Equal(testConfig.expectedRowsCount, count)
}
Expand Down
141 changes: 89 additions & 52 deletions bulkerlib/implementations/sql/bigquery.go

Large diffs are not rendered by default.

42 changes: 34 additions & 8 deletions bulkerlib/implementations/sql/bulker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ type StepFunction func(testConfig bulkerTestConfig, mode bulker.BulkMode) error
var configRegistry = map[string]any{}

type ExpectedTable struct {
Name string
PKFields []string
Columns Columns
Name string
Namespace string
PKFields []string
Columns Columns
}

var postgresContainer *testcontainers2.PostgresContainer
Expand Down Expand Up @@ -212,6 +213,7 @@ type bulkerTestConfig struct {
name string
//tableName name of the destination table. Leave empty generate automatically
tableName string
namespace string
//bulker config
config *bulker.Config
//for which bulker predefined configurations to run test
Expand Down Expand Up @@ -412,6 +414,28 @@ func TestBasics(t *testing.T) {
},
})},
},
{
name: "namespace",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream, bulker.ReplaceTable, bulker.ReplacePartition},
expectPartitionId: true,
namespace: "bnsp",
dataFile: "test_data/columns_added.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "id", "name", "column1", "column2", "column3"),
},
expectedRowsCount: 6,
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "test", "column1": nil, "column2": nil, "column3": nil},
{"_timestamp": constantTime, "id": 2, "name": "test2", "column1": "data", "column2": nil, "column3": nil},
{"_timestamp": constantTime, "id": 3, "name": "test3", "column1": "data", "column2": "data", "column3": nil},
{"_timestamp": constantTime, "id": 4, "name": "test2", "column1": "data", "column2": nil, "column3": nil},
{"_timestamp": constantTime, "id": 5, "name": "test", "column1": nil, "column2": nil, "column3": nil},
{"_timestamp": constantTime, "id": 6, "name": "test4", "column1": "data", "column2": "data", "column3": "data"},
},
streamOptions: []bulker.StreamOption{bulker.WithNamespace("bnsp")},
expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported},
configIds: allBulkerConfigs,
},
}
for _, tt := range tests {
tt := tt
Expand Down Expand Up @@ -471,17 +495,18 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode)
reqr.True(ok)
ctx := context.Background()
id, tableName := testConfig.getIdAndTableName(mode)
namespace := utils.DefaultString(testConfig.namespace, sqlAdapter.DefaultNamespace())
err = sqlAdapter.InitDatabase(ctx)
PostStep("init_database", testConfig, mode, reqr, err)
//clean up in case of previous test failure
if !testConfig.leaveResultingTable && !forceLeaveResultingTables {
err = sqlAdapter.DropTable(ctx, tableName, true)
PostStep("pre_cleanup", testConfig, mode, reqr, err)
_ = sqlAdapter.DropTable(ctx, namespace, tableName, true)
//PostStep("pre_cleanup", testConfig, mode, reqr, err)
}
//clean up after test run
if !testConfig.leaveResultingTable && !forceLeaveResultingTables {
defer func() {
_ = sqlAdapter.DropTable(ctx, tableName, true)
_ = sqlAdapter.DropTable(ctx, namespace, tableName, true)
}()
}
stream, err := blk.CreateStream(id, tableName, mode, testConfig.streamOptions...)
Expand Down Expand Up @@ -540,7 +565,7 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode)
//PostStep("state_lasterror", testConfig, mode, reqr, state.LastError)
if testConfig.expectedTable.Columns.Len() > 0 {
//Check table schema
table, err := sqlAdapter.GetTableSchema(ctx, tableName)
table, err := sqlAdapter.GetTableSchema(ctx, namespace, tableName)
PostStep("get_table", testConfig, mode, reqr, err)
switch testConfig.expectedTableTypeChecking {
case TypeCheckingDisabled:
Expand Down Expand Up @@ -604,6 +629,7 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode)
table.PrimaryKeyName = ""
expectedTable := &Table{
Name: testConfig.expectedTable.Name,
Namespace: utils.Nvl(testConfig.expectedTable.Namespace, namespace),
PrimaryKeyName: "",
PKFields: expectedPKFields,
Columns: testConfig.expectedTable.Columns,
Expand All @@ -623,7 +649,7 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode)
if testConfig.expectedRowsCount != nil || testConfig.expectedRows != nil {
time.Sleep(1 * time.Second)
//Check rows count and rows data when provided
rows, err := sqlAdapter.Select(ctx, tableName, nil, testConfig.orderBy)
rows, err := sqlAdapter.Select(ctx, namespace, tableName, nil, testConfig.orderBy)
PostStep("select_result", testConfig, mode, reqr, err)
if testConfig.expectedRows == nil {
reqr.Equal(testConfig.expectedRowsCount, len(rows))
Expand Down
Loading

0 comments on commit b54ceac

Please sign in to comment.