Skip to content

Commit

Permalink
sync: support for table name prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Sep 5, 2023
1 parent 4a107e4 commit 10f966a
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 19 deletions.
2 changes: 1 addition & 1 deletion bulkerlib/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type StreamOption func(*StreamOptions)
var optionsRegistry = make(map[string]ParseableOption)

// Not used by bulker. Just added here to be treated as known options and don't print errors
var ignoredOptions = []string{"functions", "streams", "dataLayout", "events", "hosts", "schedule", "timezone", "storageKey", "multithreading"}
var ignoredOptions = []string{"functions", "streams", "dataLayout", "events", "hosts", "schedule", "timezone", "storageKey", "tableNamePrefix", "multithreading"}

var (
BatchSizeOption = ImplementationOption[int]{
Expand Down
4 changes: 3 additions & 1 deletion sync-controller/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,10 @@ func (j *JobRunner) createPod(podName string, task TaskDescriptor, configuration
"PACKAGE": task.Package,
"PACKAGE_VERSION": task.PackageVersion,
"COMMAND": task.TaskType,
"STARTED_AT": task.StartedAt,
"TABLE_NAME_PREFIX": task.TableNamePrefix,
"DATABASE_URL": databaseURL,
"STARTED_BY": task.StartedBy,
"STARTED_AT": task.StartedAt,
}
if task.SyncID != "" {
sideCarEnv["SYNC_ID"] = task.SyncID
Expand Down
20 changes: 11 additions & 9 deletions sync-controller/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import (
)

type TaskDescriptor struct {
TaskID string `json:"taskId"`
TaskType string `json:"taskType"` //spec, discover, read, check
SyncID string `json:"syncId"`
SourceType string `json:"sourceType"`
StorageKey string `json:"storageKey"`
Protocol string `json:"protocol"`
Package string `json:"package"`
PackageVersion string `json:"packageVersion"`
StartedAt string `json:"startedAt"`
TaskID string `json:"taskId"`
TaskType string `json:"taskType"` //spec, discover, read, check
SyncID string `json:"syncId"`
SourceType string `json:"sourceType"`
StorageKey string `json:"storageKey"`
Protocol string `json:"protocol"`
Package string `json:"package"`
PackageVersion string `json:"packageVersion"`
TableNamePrefix string `json:"tableNamePrefix"`
StartedBy string `json:"startedBy"`
StartedAt string `json:"startedAt"`
}

func (t *TaskDescriptor) StartedAtTime() time.Time {
Expand Down
14 changes: 8 additions & 6 deletions sync-controller/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,14 @@ func (t *TaskManager) ReadHandler(c *gin.Context) {
taskConfig.State = map[string]any{}
}
taskDescriptor := TaskDescriptor{
TaskType: "read",
Package: c.Query("package"),
PackageVersion: c.Query("version"),
SyncID: c.Query("syncId"),
TaskID: c.Query("taskId"),
StartedAt: time.Now().Format(time.RFC3339),
TaskType: "read",
Package: c.Query("package"),
PackageVersion: c.Query("version"),
SyncID: c.Query("syncId"),
TaskID: c.Query("taskId"),
TableNamePrefix: c.Query("tableNamePrefix"),
StartedBy: c.Query("startedBy"),
StartedAt: time.Now().Format(time.RFC3339),
}

taskStatus := t.jobRunner.CreatePod(taskDescriptor, &taskConfig)
Expand Down
3 changes: 2 additions & 1 deletion sync-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type AbstractSideCar struct {
databaseURL string
dbpool *pgxpool.Pool

startedBy string
startedAt time.Time

//first error occurred during command
Expand Down Expand Up @@ -63,7 +64,7 @@ func main() {
startedAt: startedAt,
}
if command == "read" {
sidecar := &ReadSideCar{AbstractSideCar: abstract}
sidecar := &ReadSideCar{AbstractSideCar: abstract, tableNamePrefix: os.Getenv("TABLE_NAME_PREFIX")}
sidecar.Run()
} else {
sidecar := SpecCatalogSideCar{AbstractSideCar: abstract}
Expand Down
3 changes: 2 additions & 1 deletion sync-sidecar/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (s *ActiveStream) RegisterError(err error) {

type ReadSideCar struct {
*AbstractSideCar
tableNamePrefix string

currentStream *ActiveStream
processedStreams map[string]*StreamStat
Expand Down Expand Up @@ -299,7 +300,7 @@ func (s *ReadSideCar) openStream(streamName string) *ActiveStream {

bulkerConnFunc := func(streamReader *io.PipeReader) {
s.log("Creating bulker stream: %s mode: %s primary keys: %s", streamName, mode, str.GetPrimaryKeys())
bulkerUrl := fmt.Sprintf("%s/bulk/%s?tableName=%s&mode=%s&taskId=%s", s.bulkerURL, s.syncId, url.QueryEscape(streamName), mode, s.taskId)
bulkerUrl := fmt.Sprintf("%s/bulk/%s?tableName=%s&mode=%s&taskId=%s", s.bulkerURL, s.syncId, url.QueryEscape(s.tableNamePrefix+streamName), mode, s.taskId)
for _, v := range str.GetPrimaryKeys() {
bulkerUrl += fmt.Sprintf("&pk=%s", url.QueryEscape(v))
}
Expand Down

0 comments on commit 10f966a

Please sign in to comment.