From 43dbd9b4ccafeba46906a62bf43cc4d2777cb17a Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Wed, 13 Dec 2023 16:18:33 +0530 Subject: [PATCH] fix: migration updated to support default as well --- migration-0.35/main.go | 82 ++++++++++++++++-------------------------- 1 file changed, 31 insertions(+), 51 deletions(-) diff --git a/migration-0.35/main.go b/migration-0.35/main.go index a09db4c..3ba16d4 100644 --- a/migration-0.35/main.go +++ b/migration-0.35/main.go @@ -142,17 +142,22 @@ func TruncateTagAttributes(conn clickhouse.Conn) error { } func hasMaterializedColumn(tableStatement, field, dataType string) bool { - // check the type as well - // fmt.Println(tableStatement, field, dataType) - if field == "telemetry_sdk_language" { - - } regex := fmt.Sprintf("`%s` (?i)(%s) MATERIALIZED", field, dataType) res, err := regexp.MatchString(regex, tableStatement) if err != nil { zap.S().Error(fmt.Errorf("error while matching regex. Err=%v", err)) return false } + + if !res { + // try checking for default as well + regex := fmt.Sprintf("`%s` (?i)(%s) DEFAULT", field, dataType) + res, err = regexp.MatchString(regex, tableStatement) + if err != nil { + zap.S().Error(fmt.Errorf("error while matching regex. Err=%v", err)) + return false + } + } return res } @@ -165,55 +170,31 @@ func addMaterializedColumnsAndAddIndex(conn clickhouse.Conn, fields []LogField) valueColName := fmt.Sprintf("%s_%s_value", field.Type+"s", strings.ToLower(field.DataType)) // create column in logs table - zap.S().Info(fmt.Sprintf("creating materialized for: %s i.e %s", field.Name, colname)) - query := fmt.Sprintf("ALTER TABLE signoz_logs.logs on cluster cluster ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s[indexOf(%s, '%s')] CODEC(ZSTD(1))", - colname, field.DataType, valueColName, keyColName, field.Name) - err := conn.Exec(context.Background(), query) - if err != nil { - zap.S().Error(fmt.Errorf("error while creating materialized column on logs table. Err=%v", err)) - } - - // create column in distributed logs table - defaultValueDistributed := "-1" - if strings.ToLower(field.DataType) == "bool" { - defaultValueDistributed = "false" - field.IndexType = "set(2)" - } - query = fmt.Sprintf("ALTER TABLE signoz_logs.distributed_logs ON CLUSTER cluster ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s", - colname, field.DataType, - defaultValueDistributed, - ) - err = conn.Exec(context.Background(), query) - if err != nil { - zap.S().Error(fmt.Errorf("error while renaming materialized column on distributed logs table. Err=%v", err)) - return err - } - - // create exists column in logs table - query = fmt.Sprintf("ALTER TABLE signoz_logs.logs ON CLUSTER cluster ADD COLUMN IF NOT EXISTS %s_exists bool MATERIALIZED if(indexOf(%s, '%s') != 0, true, false) CODEC(ZSTD(1))", - colname, - keyColName, - field.Name, - ) - err = conn.Exec(ctx, query) - if err != nil { - zap.S().Error(fmt.Errorf("error while creating exists column on logs table. Err=%v", err)) - return err - } + for _, table := range []string{"logs", "distributed_logs"} { + zap.S().Info(fmt.Sprintf("creating materialized for: %s i.e %s", field.Name, colname)) + query := fmt.Sprintf("ALTER TABLE signoz_logs.%s on cluster cluster ADD COLUMN IF NOT EXISTS %s %s DEFAULT %s[indexOf(%s, '%s')] CODEC(ZSTD(1))", + table, colname, field.DataType, valueColName, keyColName, field.Name) + err := conn.Exec(context.Background(), query) + if err != nil { + zap.S().Error(fmt.Errorf("error while creating materialized column on logs table. Err=%v", err)) + } - // create exists column in distributed logs table - query = fmt.Sprintf("ALTER TABLE signoz_logs.distributed_logs ON CLUSTER cluster ADD COLUMN IF NOT EXISTS %s_exists bool MATERIALIZED false", - colname, - ) - err = conn.Exec(ctx, query) - if err != nil { - zap.S().Error(fmt.Errorf("error while creating exists column on distributed logs table. Err=%v", err)) - return err + query = fmt.Sprintf("ALTER TABLE signoz_logs.%s ON CLUSTER cluster ADD COLUMN IF NOT EXISTS %s_exists bool DEFAULT if(indexOf(%s, '%s') != 0, true, false) CODEC(ZSTD(1))", + table, + colname, + keyColName, + field.Name, + ) + err = conn.Exec(ctx, query) + if err != nil { + zap.S().Error(fmt.Errorf("error while creating exists column on logs table. Err=%v", err)) + return err + } } zap.S().Info(fmt.Sprintf("Create index: %s_idx", colname)) - query = fmt.Sprintf("ALTER TABLE signoz_logs.logs on cluster cluster ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE bloom_filter(0.01) GRANULARITY 64", colname, colname) - err = conn.Exec(context.Background(), query) + query := fmt.Sprintf("ALTER TABLE signoz_logs.logs on cluster cluster ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE bloom_filter(0.01) GRANULARITY 64", colname, colname) + err := conn.Exec(context.Background(), query) if err != nil { zap.S().Error(fmt.Errorf("error while renaming index. Err=%v", err)) return err @@ -242,7 +223,6 @@ func main() { zap.S().Fatal("Error while connecting to clickhouse", zap.Error(err)) } - // Add default indexes for the data in otel collector fields, err := GetFields(conn) if err != nil { zap.S().Fatal("Error while getting fields", zap.Error(err))