Skip to content

Commit

Permalink
fix: migration updated to support default as well
Browse files Browse the repository at this point in the history
  • Loading branch information
nityanandagohain committed Dec 13, 2023
1 parent cbda8b3 commit 43dbd9b
Showing 1 changed file with 31 additions and 51 deletions.
82 changes: 31 additions & 51 deletions migration-0.35/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,22 @@ func TruncateTagAttributes(conn clickhouse.Conn) error {
}

func hasMaterializedColumn(tableStatement, field, dataType string) bool {
// check the type as well
// fmt.Println(tableStatement, field, dataType)
if field == "telemetry_sdk_language" {

}
regex := fmt.Sprintf("`%s` (?i)(%s) MATERIALIZED", field, dataType)
res, err := regexp.MatchString(regex, tableStatement)
if err != nil {
zap.S().Error(fmt.Errorf("error while matching regex. Err=%v", err))
return false
}

if !res {
// try checking for default as well
regex := fmt.Sprintf("`%s` (?i)(%s) DEFAULT", field, dataType)
res, err = regexp.MatchString(regex, tableStatement)
if err != nil {
zap.S().Error(fmt.Errorf("error while matching regex. Err=%v", err))
return false
}
}
return res
}

Expand All @@ -165,55 +170,31 @@ func addMaterializedColumnsAndAddIndex(conn clickhouse.Conn, fields []LogField)
valueColName := fmt.Sprintf("%s_%s_value", field.Type+"s", strings.ToLower(field.DataType))

// create column in logs table
zap.S().Info(fmt.Sprintf("creating materialized for: %s i.e %s", field.Name, colname))
query := fmt.Sprintf("ALTER TABLE signoz_logs.logs on cluster cluster ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s[indexOf(%s, '%s')] CODEC(ZSTD(1))",
colname, field.DataType, valueColName, keyColName, field.Name)
err := conn.Exec(context.Background(), query)
if err != nil {
zap.S().Error(fmt.Errorf("error while creating materialized column on logs table. Err=%v", err))
}

// create column in distributed logs table
defaultValueDistributed := "-1"
if strings.ToLower(field.DataType) == "bool" {
defaultValueDistributed = "false"
field.IndexType = "set(2)"
}
query = fmt.Sprintf("ALTER TABLE signoz_logs.distributed_logs ON CLUSTER cluster ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s",
colname, field.DataType,
defaultValueDistributed,
)
err = conn.Exec(context.Background(), query)
if err != nil {
zap.S().Error(fmt.Errorf("error while renaming materialized column on distributed logs table. Err=%v", err))
return err
}

// create exists column in logs table
query = fmt.Sprintf("ALTER TABLE signoz_logs.logs ON CLUSTER cluster ADD COLUMN IF NOT EXISTS %s_exists bool MATERIALIZED if(indexOf(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
colname,
keyColName,
field.Name,
)
err = conn.Exec(ctx, query)
if err != nil {
zap.S().Error(fmt.Errorf("error while creating exists column on logs table. Err=%v", err))
return err
}
for _, table := range []string{"logs", "distributed_logs"} {
zap.S().Info(fmt.Sprintf("creating materialized for: %s i.e %s", field.Name, colname))
query := fmt.Sprintf("ALTER TABLE signoz_logs.%s on cluster cluster ADD COLUMN IF NOT EXISTS %s %s DEFAULT %s[indexOf(%s, '%s')] CODEC(ZSTD(1))",
table, colname, field.DataType, valueColName, keyColName, field.Name)
err := conn.Exec(context.Background(), query)
if err != nil {
zap.S().Error(fmt.Errorf("error while creating materialized column on logs table. Err=%v", err))
}

// create exists column in distributed logs table
query = fmt.Sprintf("ALTER TABLE signoz_logs.distributed_logs ON CLUSTER cluster ADD COLUMN IF NOT EXISTS %s_exists bool MATERIALIZED false",
colname,
)
err = conn.Exec(ctx, query)
if err != nil {
zap.S().Error(fmt.Errorf("error while creating exists column on distributed logs table. Err=%v", err))
return err
query = fmt.Sprintf("ALTER TABLE signoz_logs.%s ON CLUSTER cluster ADD COLUMN IF NOT EXISTS %s_exists bool DEFAULT if(indexOf(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
table,
colname,
keyColName,
field.Name,
)
err = conn.Exec(ctx, query)
if err != nil {
zap.S().Error(fmt.Errorf("error while creating exists column on logs table. Err=%v", err))
return err
}
}

zap.S().Info(fmt.Sprintf("Create index: %s_idx", colname))
query = fmt.Sprintf("ALTER TABLE signoz_logs.logs on cluster cluster ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE bloom_filter(0.01) GRANULARITY 64", colname, colname)
err = conn.Exec(context.Background(), query)
query := fmt.Sprintf("ALTER TABLE signoz_logs.logs on cluster cluster ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE bloom_filter(0.01) GRANULARITY 64", colname, colname)
err := conn.Exec(context.Background(), query)
if err != nil {
zap.S().Error(fmt.Errorf("error while renaming index. Err=%v", err))
return err
Expand Down Expand Up @@ -242,7 +223,6 @@ func main() {
zap.S().Fatal("Error while connecting to clickhouse", zap.Error(err))
}

// Add default indexes for the data in otel collector
fields, err := GetFields(conn)
if err != nil {
zap.S().Fatal("Error while getting fields", zap.Error(err))
Expand Down

0 comments on commit 43dbd9b

Please sign in to comment.