Skip to content

Commit

Permalink
sync-ctl & sync-sidecar: toSameCase option
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Aug 15, 2024
1 parent 260d183 commit 073411e
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 6 deletions.
4 changes: 3 additions & 1 deletion bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu
ps.nameTransformer = strings.ToLower
}
ps.tableName = ps.nameTransformer(tableName)
} else {
ps.nameTransformer = func(s string) string { return s }
}
ps.merge = bulker.DeduplicateOption.Get(&ps.options)
pkColumns := bulker.PrimaryKeyOption.Get(&ps.options)
Expand All @@ -84,7 +86,7 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu
schema := bulker.SchemaOption.Get(&ps.options)
if !schema.IsEmpty() {
ps.schemaOptions = schema
ps.schemaFromOptions = ps.sqlAdapter.TableHelper().MapSchema(ps.sqlAdapter, schema)
ps.schemaFromOptions = ps.sqlAdapter.TableHelper().MapSchema(ps.sqlAdapter, schema, ps.nameTransformer)
}

ps.unmappedDataColumn = p.ColumnName(unmappedDataColumn)
Expand Down
6 changes: 3 additions & 3 deletions bulkerlib/implementations/sql/table_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ func (th *TableHelper) MapTableSchema(sqlAdapter SQLAdapter, batchHeader *TypesH
}

// MapSchema maps types.Schema into types.Table (structure with SQL types)
func (th *TableHelper) MapSchema(sqlAdapter SQLAdapter, schema types2.Schema) *Table {
func (th *TableHelper) MapSchema(sqlAdapter SQLAdapter, schema types2.Schema, nameTransformer func(string) string) *Table {
table := &Table{
Name: sqlAdapter.TableName(schema.Name),
Name: th.TableName(nameTransformer(schema.Name)),
Columns: NewColumns(),
}

for _, field := range schema.Fields {
colName := th.ColumnName(field.Name)
colName := th.ColumnName(nameTransformer(field.Name))
//map Jitsu type -> SQL type
sqlType, ok := sqlAdapter.GetSQLType(field.Type)
if ok {
Expand Down
1 change: 1 addition & 0 deletions sync-controller/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ func (j *JobRunner) createPod(podName string, task TaskDescriptor, configuration
"PACKAGE_VERSION": task.PackageVersion,
"COMMAND": task.TaskType,
"NAMESPACE": task.Namespace,
"TO_SAME_CASE": task.ToSameCase,
"TABLE_NAME_PREFIX": task.TableNamePrefix,
"FULL_SYNC": task.FullSync,
"DATABASE_URL": databaseURL,
Expand Down
1 change: 1 addition & 0 deletions sync-controller/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type TaskDescriptor struct {
Package string `json:"package"`
PackageVersion string `json:"packageVersion"`
Namespace string `json:"namespace"`
ToSameCase string `json:"sameCase"`
TableNamePrefix string `json:"tableNamePrefix"`
FullSync string `json:"fullSync"`
StartedBy string `json:"startedBy"`
Expand Down
1 change: 1 addition & 0 deletions sync-controller/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (t *TaskManager) ReadHandler(c *gin.Context) {
TaskID: c.Query("taskId"),
Namespace: c.Query("namespace"),
TableNamePrefix: c.Query("tableNamePrefix"),
ToSameCase: c.Query("toSameCase"),
FullSync: c.Query("fullSync"),
StartedBy: c.Query("startedBy"),
StartedAt: time.Now().Format(time.RFC3339),
Expand Down
6 changes: 5 additions & 1 deletion sync-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func main() {
startedAt: startedAt,
}
if command == "read" {
sidecar = &ReadSideCar{AbstractSideCar: abstract, namespace: os.Getenv("NAMESPACE"), tableNamePrefix: os.Getenv("TABLE_NAME_PREFIX")}
sidecar = &ReadSideCar{AbstractSideCar: abstract,
namespace: os.Getenv("NAMESPACE"),
tableNamePrefix: os.Getenv("TABLE_NAME_PREFIX"),
toSameCase: os.Getenv("TO_SAME_CASE") == "true",
}
sidecar.(*ReadSideCar).eventsLogService = &eventslog.DummyEventsLogService{}
clickhouseHost := os.Getenv("CLICKHOUSE_HOST")
if clickhouseHost != "" {
Expand Down
7 changes: 6 additions & 1 deletion sync-sidecar/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ReadSideCar struct {
*AbstractSideCar
namespace string
tableNamePrefix string
toSameCase bool

eventsLogService eventslog.EventsLogService

Expand Down Expand Up @@ -416,7 +417,6 @@ func (s *ReadSideCar) openStream(streamName string) (*ActiveStream, error) {
if len(str.GetPrimaryKeys()) > 0 {
streamOptions = append(streamOptions, bulker.WithPrimaryKey(str.GetPrimaryKeys()...), bulker.WithDeduplicate())
}
s.log("Stream '%s' created bulker. table: %s mode: %s primary keys: %s", streamName, tableName, mode, str.GetPrimaryKeys())
schema := str.ToSchema()
//s.log("Schema: %+v", schema)
if len(schema.Fields) > 0 {
Expand All @@ -430,10 +430,15 @@ func (s *ReadSideCar) openStream(streamName string) (*ActiveStream, error) {
if namespace != "" {
streamOptions = append(streamOptions, bulker.WithNamespace(namespace))
}
if s.toSameCase {
streamOptions = append(streamOptions, bulker.WithToSameCase())
}
bulkerStream, err := s.blk.CreateStream(jobId, tableName, mode, streamOptions...)
if err != nil {
return stream, fmt.Errorf("error creating bulker stream: %v", err)
}
s.log("Stream '%s' created bulker. table: %s mode: %s primary keys: %s", streamName, tableName, mode, str.GetPrimaryKeys())

err = stream.Begin(bulkerStream)
if err != nil {
return stream, fmt.Errorf("error starting bulker stream: %v", err)
Expand Down

0 comments on commit 073411e

Please sign in to comment.