diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index d69efa181e886f..45a56b8e565c1a 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -204,7 +204,8 @@ Status BlockReader::init(const ReaderParams& read_params) { auto cid = read_params.origin_return_columns->at(i); for (int j = 0; j < read_params.return_columns.size(); ++j) { if (read_params.return_columns[j] == cid) { - if (j < _tablet->num_key_columns() || _tablet->keys_type() != AGG_KEYS) { + if (j < _tablet_schema->num_key_columns() || + _tablet_schema->keys_type() != AGG_KEYS) { _normal_columns_idx.emplace_back(j); } else { _agg_columns_idx.emplace_back(j); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index f60352f6e6bcf7..1e7b863c027479 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -183,8 +183,11 @@ private boolean processAddColumn(AddColumnClause alterClause, OlapTable olapTabl Set newColNameSet = Sets.newHashSet(column.getName()); - return addColumnInternal(olapTable, column, columnPos, targetIndexId, baseIndexId, indexSchemaMap, + boolean lightSchemaChange = true; + addColumnInternal(olapTable, column, columnPos, targetIndexId, baseIndexId, indexSchemaMap, newColNameSet, false, colUniqueIdSupplierMap); + lightSchemaChange = checkLightSchemaChange(olapTable, column, indexSchemaMap); + return lightSchemaChange; } private void processAddColumn(AddColumnClause alterClause, Table externalTable, List newSchema) @@ -241,8 +244,9 @@ public boolean processAddColumns(AddColumnsClause alterClause, OlapTable olapTab boolean lightSchemaChange = true; for (Column column : columns) { - boolean result = addColumnInternal(olapTable, column, null, targetIndexId, baseIndexId, indexSchemaMap, + addColumnInternal(olapTable, column, null, targetIndexId, baseIndexId, indexSchemaMap, newColNameSet, ignoreSameColumn, colUniqueIdSupplierMap); + boolean result = checkLightSchemaChange(olapTable, column, indexSchemaMap); if (!result) { lightSchemaChange = false; } @@ -887,18 +891,16 @@ private void addColumnInternal(Column newColumn, ColumnPosition columnPos, List< * @param newColNameSet * @param ignoreSameColumn * @param colUniqueIdSupplierMap - * @return true: can light schema change, false: cannot + * @return void * @throws DdlException */ - private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnPosition columnPos, + private void addColumnInternal(OlapTable olapTable, Column newColumn, ColumnPosition columnPos, long targetIndexId, long baseIndexId, Map> indexSchemaMap, Set newColNameSet, boolean ignoreSameColumn, Map colUniqueIdSupplierMap) throws DdlException { - //only new table generate ColUniqueId, exist table do not. - boolean lightSchemaChange = olapTable.getEnableLightSchemaChange(); String newColName = newColumn.getName(); // check the validation of aggregation method on column. @@ -963,12 +965,6 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP throw new DdlException("BITMAP_UNION must be used in AGG_KEYS"); } - //type key column do not allow light schema change. - if (newColumn.isKey()) { - LOG.debug("newColumn: {}, isKey()==true", newColumn); - lightSchemaChange = false; - } - // check if the new column already exist in base schema. // do not support adding new column which already exist in base schema. List baseSchema = olapTable.getBaseSchema(true); @@ -1033,7 +1029,7 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, true, baseIndexNewColumnUniqueId); if (targetIndexId == -1L) { - return lightSchemaChange; + return; } // 2. add to rollup modIndexSchema = indexSchemaMap.get(targetIndexId); @@ -1054,7 +1050,7 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, true, baseIndexNewColumnUniqueId); // no specified target index. return - return lightSchemaChange; + return; } else { // add to rollup index List modIndexSchema = indexSchemaMap.get(targetIndexId); @@ -1088,7 +1084,7 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP if (targetIndexId == -1L) { // no specified target index. return - return lightSchemaChange; + return; } // 2. add to rollup index @@ -1098,6 +1094,64 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP modIndexSchema = indexSchemaMap.get(targetIndexId); checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, false, rollUpNewColumnUniqueId); } + return; + } + + /** + * @param olapTable + * @param newColumn + * @param indexSchemaMap + * @return true: can light schema change, false: cannot + * @throws DdlException + */ + private boolean checkLightSchemaChange(OlapTable olapTable, Column newColumn, + Map> indexSchemaMap) { + // only new table generate ColUniqueId, exist table do not. + boolean lightSchemaChange = olapTable.getEnableLightSchemaChange(); + if (!lightSchemaChange || !newColumn.isKey()) { + return lightSchemaChange; + } + + long baseIndexId = olapTable.getBaseIndexId(); + Set baseColumnNames = Sets.newHashSet(); + + // check light schema change with add key column + for (Long alterIndexId : indexSchemaMap.keySet()) { + List alterSchema = indexSchemaMap.get(alterIndexId); + int newColumnPos = -1; + for (int i = 0; i < alterSchema.size(); ++i) { + if (alterSchema.get(i).getName() == newColumn.getName()) { + newColumnPos = i; + } + } + + if (newColumnPos >= 0) { + // add key column in short key columns + MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(alterIndexId); + if (newColumnPos < currentIndexMeta.getShortKeyColumnCount()) { + return false; + } + + // not support add key column for mv index + if (alterIndexId != baseIndexId) { + if (baseColumnNames.isEmpty()) { + for (Column col : olapTable.getBaseSchemaKeyColumns()) { + baseColumnNames.add(col.getName()); + } + } + for (Column col : olapTable.getKeyColumnsByIndexId(alterIndexId)) { + if (null != col.getDefineExpr() || !baseColumnNames.contains(col.getName())) { + return false; + } + } + } + + // unique key merge on write + if (olapTable.getEnableUniqueKeyMergeOnWrite()) { + return false; + } + } + } return lightSchemaChange; } diff --git a/regression-test/data/schema_change_p0/test_add_keys_light_schema_change.out b/regression-test/data/schema_change_p0/test_add_keys_light_schema_change.out new file mode 100644 index 00000000000000..78e8f051a29bda --- /dev/null +++ b/regression-test/data/schema_change_p0/test_add_keys_light_schema_change.out @@ -0,0 +1,119 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sc -- +1 2 2017-10-01 Beijing 10 1 1 20 \N \N + +-- !sc -- +1 2017-10-01 Beijing 10 1 20 + +-- !21_agg_multi_rowset -- +1 3 40 3 3 + +-- !21_agg_multi_rowset -- +1 2 30 2 2 + +-- !21_agg_multi_rowset -- +1 1 40 1 1 + +-- !21_agg_compaction -- +1 3 40 3 3 + +-- !21_agg_compaction -- +1 2 30 2 2 + +-- !21_agg_compaction -- +1 1 40 1 1 + +-- !22_agg_drop_multi_rowset -- +1 3 40 3 3 + +-- !22_agg_drop_multi_rowset -- +1 2 30 2 2 + +-- !22_agg_drop_multi_rowset -- +1 1 40 1 1 + +-- !22_agg_drop_compaction -- +1 3 40 3 3 + +-- !22_agg_drop_compaction -- +1 2 30 2 2 + +-- !22_agg_drop_compaction -- +1 1 40 1 1 + +-- !23_base_table_multi_rowset -- +1 3 40 3 3 + +-- !23_base_table_multi_rowset -- +1 2 30 2 2 + +-- !23_base_table_multi_rowset -- +1 1 40 1 1 + +-- !23_base_table_compaction -- +1 3 40 3 3 + +-- !23_base_table_compaction -- +1 2 30 2 2 + +-- !23_base_table_compaction -- +1 1 40 1 1 + +-- !23_rollup_multi_rowset -- +1 3 + +-- !23_rollup_multi_rowset -- +1 2 + +-- !23_rollup_multi_rowset -- +1 1 + +-- !23_agg_rollup_compaction -- +1 3 + +-- !23_agg_rollup_compaction -- +1 2 + +-- !23_agg_rollup_compaction -- +1 1 + +-- !31_duplicate_multi_rowset -- +2017-10-01T10:00 1 0 2 10 2017-10-01T12:00 +2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00 +2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00 + +-- !31_duplicate_multi_rowset -- +2017-10-01T10:00 1 0 2 10 2017-10-01T12:00 +2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00 + +-- !31_duplicate_multi_rowset -- +2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00 + +-- !31_duplicate_compaction -- +2017-10-01T10:00 1 0 2 10 2017-10-01T12:00 +2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00 +2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00 + +-- !31_duplicate_compaction -- +2017-10-01T10:00 1 0 2 10 2017-10-01T12:00 +2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00 + +-- !31_duplicate_compaction -- +2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00 + +-- !41_unique_multi_rowset -- +1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00 + +-- !41_unique_multi_rowset -- + +-- !41_unique_multi_rowset -- +1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00 + +-- !41_unique_compaction -- +1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00 + +-- !41_unique_compaction -- + +-- !41_unique_compaction -- +1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00 + diff --git a/regression-test/suites/schema_change_p0/test_add_keys_light_schema_change.groovy b/regression-test/suites/schema_change_p0/test_add_keys_light_schema_change.groovy new file mode 100644 index 00000000000000..2dcdefc85ea220 --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_add_keys_light_schema_change.groovy @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.json.JsonOutput +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite ("test_add_keys_light_schema_change") { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def getJobTxnId = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][8] + } + + def getCreateViewState = { tableName -> + def createViewStateResult = sql """ SHOW ALTER TABLE MATERIALIZED VIEW WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return createViewStateResult[0][8] + } + + def waitJobFinish = { strSql, statePos -> + Object jobStateResult = null + int max_try_time = 2000 + while (max_try_time--){ + jobStateResult = sql """${strSql}""" + def jsonRes = JsonOutput.toJson(jobStateResult) + String result = jobStateResult[0][statePos] + if (result == "FINISHED") { + log.info(jsonRes) + sleep(500) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + return jobStateResult + } + + def tableName = "add_keys_light_schema_change" + try { + + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + boolean disableAutoCompaction = true + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List) ele)[2]) + } + } + int max_try_time = 3000 + List> table_tablets = null; + List> rowset_cnt = null; + + // case 1.1: light schema change type check: add key column in short key column num + sql """ DROP TABLE IF EXISTS add_keys_light_schema_change """ + sql """ + CREATE TABLE IF NOT EXISTS add_keys_light_schema_change ( + `user_id` LARGEINT NOT NULL COMMENT "用户id", + `date` DATEV2 NOT NULL COMMENT "数据灌入日期时间", + `city` VARCHAR(20) COMMENT "用户所在城市", + `age` SMALLINT COMMENT "用户年龄", + `sex` TINYINT COMMENT "用户性别", + + `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", + `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", + `hll_col` HLL HLL_UNION NOT NULL COMMENT "HLL列", + `bitmap_col` Bitmap BITMAP_UNION NOT NULL COMMENT "bitmap列") + AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`) + BUCKETS 4 + PROPERTIES ( "replication_num" = "1", "light_schema_change" = "true"); + """ + sql """ + INSERT INTO add_keys_light_schema_change VALUES + (1, '2017-10-01', 'Beijing', 10, 1, 1, 20, hll_hash(1), to_bitmap(1)) + """ + sql """ + ALTER TABLE add_keys_light_schema_change ADD COLUMN new_key_column INT default "2" AFTER user_id PROPERTIES ("timeout"="604800") + """ + + def jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='add_keys_light_schema_change' ORDER BY createtime DESC LIMIT 1 """, 9) + assertNotEquals(jobStateResult[0][8], "-1") + + qt_sc """ select * from add_keys_light_schema_change order by user_id """ + + // case 1.2: light schema change type check: add key column in mv index + sql """ + CREATE MATERIALIZED VIEW first_view AS + SELECT user_id, date, age, sum(cost) + FROM add_keys_light_schema_change + GROUP BY user_id, date, age + """ + waitJobFinish(""" SHOW ALTER TABLE MATERIALIZED VIEW WHERE IndexName = "add_keys_light_schema_change" ORDER BY CreateTime DESC LIMIT 1 """, 8) + + sql """ + ALTER TABLE add_keys_light_schema_change ADD COLUMN new_mv_key1 INT default "2" TO first_view + """ + + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='add_keys_light_schema_change' ORDER BY createtime DESC LIMIT 1 """, 9) + assertNotEquals(jobStateResult[0][8], "-1") + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='first_view' ORDER BY createtime DESC LIMIT 1 """, 9) + assertNotEquals(jobStateResult[0][8], "-1") + + sql """ + ALTER TABLE add_keys_light_schema_change ADD COLUMN new_mv_key2 INT default "2" + """ + + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='add_keys_light_schema_change' ORDER BY createtime DESC LIMIT 1 """, 9) + assertEquals(jobStateResult[0][8], "-1") + + // case 1.3: light schema change type check: add key column with enable_unique_key_merge_on_write + sql """ DROP TABLE IF EXISTS add_keys_light_schema_change """ + sql """ + CREATE TABLE IF NOT EXISTS add_keys_light_schema_change ( + `user_id` LARGEINT NOT NULL COMMENT "用户id", + `date` DATEV2 NOT NULL COMMENT "数据灌入日期时间", + `city` VARCHAR(20) COMMENT "用户所在城市", + `age` SMALLINT COMMENT "用户年龄", + `sex` TINYINT COMMENT "用户性别", + + `cost` BIGINT DEFAULT "0" COMMENT "用户总消费") + UNIQUE KEY(`user_id`, `date`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`) + BUCKETS 4 + PROPERTIES ( + "replication_num" = "1", + "light_schema_change" = "true", + "enable_unique_key_merge_on_write" = "true"); + """ + sql """ + INSERT INTO add_keys_light_schema_change VALUES + (1, '2017-10-01', 'Beijing', 10, 1, 20) + """ + qt_sc """ select * from add_keys_light_schema_change order by user_id """ + + sql """ + ALTER TABLE add_keys_light_schema_change ADD COLUMN new_mv_key1 INT KEY default "2" + """ + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='add_keys_light_schema_change' ORDER BY createtime DESC LIMIT 1 """, 9) + assertNotEquals(jobStateResult[0][8], "-1") + + // case 2.1: light schema change aggreage : with multi version rowset and compaction + sql """ DROP TABLE IF EXISTS add_keys_light_schema_change """ + sql """ + CREATE TABLE IF NOT EXISTS add_keys_light_schema_change ( + `user_id` LARGEINT NOT NULL COMMENT "用户id", + `date` DATEV2 NOT NULL COMMENT "数据灌入日期时间", + `city` VARCHAR(20) COMMENT "用户所在城市", + `age` SMALLINT COMMENT "用户年龄", + `sex` TINYINT COMMENT "用户性别", + + `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", + `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", + `hll_col` HLL HLL_UNION NOT NULL COMMENT "HLL列", + `bitmap_col` Bitmap BITMAP_UNION NOT NULL COMMENT "bitmap列") + AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`) + BUCKETS 4 + PROPERTIES ( "replication_num" = "1", "light_schema_change" = "true"); + """ + sql """ + INSERT INTO add_keys_light_schema_change VALUES + (1, '2017-10-01', 'Beijing', 10, 1, 1, 20, hll_hash(1), to_bitmap(1)) + """ + sql """ + ALTER TABLE add_keys_light_schema_change ADD COLUMN new_key_column INT default "2" AFTER sex PROPERTIES ("timeout"="604800"); + """ + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='add_keys_light_schema_change' ORDER BY createtime DESC LIMIT 1 """, 9) + assertEquals(jobStateResult[0][8], "-1") + sql """ + INSERT INTO add_keys_light_schema_change (user_id,date,city,age,sex,cost,max_dwell_time,hll_col,bitmap_col) VALUES + (1, '2017-10-01', 'Beijing', 10, 1, 1, 30, hll_hash(2), to_bitmap(2)) + """ + sql """ + INSERT INTO add_keys_light_schema_change (user_id,date,city,age,sex,new_key_column,cost,max_dwell_time,hll_col,bitmap_col) VALUES + (1, '2017-10-01', 'Beijing', 10, 1, 1, 1, 40, hll_hash(3), to_bitmap(3)) + """ + table_tablets = sql """ SHOW TABLETS FROM add_keys_light_schema_change ORDER BY RowCount DESC LIMIT 1 """ + rowset_cnt = sql """ SELECT count(*) FROM information_schema.rowsets WHERE TABLET_ID = ${table_tablets[0][0]} """ + log.info(JsonOutput.toJson(rowset_cnt)) + assertEquals(rowset_cnt[0][0], 4) + + qt_21_agg_multi_rowset """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change GROUP BY user_id; """ + qt_21_agg_multi_rowset """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 2 GROUP BY user_id; """ + qt_21_agg_multi_rowset """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 1 GROUP BY user_id; """ + + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), table_tablets[0][0]) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + + qt_21_agg_compaction """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change GROUP BY user_id; """ + qt_21_agg_compaction """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 2 GROUP BY user_id; """ + qt_21_agg_compaction """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 1 GROUP BY user_id; """ + + // case 2.2: light schema change aggreage : with drop + sql """ DROP TABLE IF EXISTS add_keys_light_schema_change """ + sql """ + CREATE TABLE IF NOT EXISTS add_keys_light_schema_change ( + `user_id` LARGEINT NOT NULL COMMENT "用户id", + `date` DATEV2 NOT NULL COMMENT "数据灌入日期时间", + `city` VARCHAR(20) COMMENT "用户所在城市", + `age` SMALLINT COMMENT "用户年龄", + `sex` TINYINT COMMENT "用户性别", + + `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", + `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", + `hll_col` HLL HLL_UNION NOT NULL COMMENT "HLL列", + `bitmap_col` Bitmap BITMAP_UNION NOT NULL COMMENT "bitmap列") + AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`) + BUCKETS 4 + PROPERTIES ( "replication_num" = "1", "light_schema_change" = "true"); + """ + sql """ + INSERT INTO add_keys_light_schema_change VALUES + (1, '2017-10-01', 'Beijing', 10, 1, 1, 20, hll_hash(1), to_bitmap(1)) + """ + sql """ + ALTER TABLE add_keys_light_schema_change ADD COLUMN new_key_column INT default "2" AFTER sex PROPERTIES ("timeout"="604800"); + """ + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='add_keys_light_schema_change' ORDER BY createtime DESC LIMIT 1 """, 9) + assertEquals(jobStateResult[0][8], "-1") + sql """ + INSERT INTO add_keys_light_schema_change (user_id,date,city,age,sex,cost,max_dwell_time,hll_col,bitmap_col) VALUES + (1, '2017-10-01', 'Beijing', 10, 1, 1, 30, hll_hash(2), to_bitmap(2)) + """ + sql """ + INSERT INTO add_keys_light_schema_change (user_id,date,city,age,sex,new_key_column,cost,max_dwell_time,hll_col,bitmap_col) VALUES + (1, '2017-10-01', 'Beijing', 10, 1, 1, 1, 40, hll_hash(3), to_bitmap(3)) + """ + sql """ + ALTER TABLE add_keys_light_schema_change DROP COLUMN sex; + """ + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='add_keys_light_schema_change' ORDER BY createtime DESC LIMIT 1 """, 9) + assertNotEquals(jobStateResult[0][8], "-1") + + qt_22_agg_drop_multi_rowset """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change GROUP BY user_id; """ + qt_22_agg_drop_multi_rowset """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 2 GROUP BY user_id; """ + qt_22_agg_drop_multi_rowset """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 1 GROUP BY user_id; """ + + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), table_tablets[0][0]) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + + qt_22_agg_drop_compaction """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change GROUP BY user_id; """ + qt_22_agg_drop_compaction """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 2 GROUP BY user_id; """ + qt_22_agg_drop_compaction """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 1 GROUP BY user_id; """ + + // case 2.3: light schema change aggreage : with rollup and multi version rowset and compaction + sql """ DROP TABLE IF EXISTS add_keys_light_schema_change """ + sql """ + CREATE TABLE IF NOT EXISTS add_keys_light_schema_change ( + `user_id` LARGEINT NOT NULL COMMENT "用户id", + `date` DATEV2 NOT NULL COMMENT "数据灌入日期时间", + `city` VARCHAR(20) COMMENT "用户所在城市", + `age` SMALLINT COMMENT "用户年龄", + `sex` TINYINT COMMENT "用户性别", + + `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", + `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", + `hll_col` HLL HLL_UNION NOT NULL COMMENT "HLL列", + `bitmap_col` Bitmap BITMAP_UNION NOT NULL COMMENT "bitmap列") + AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`) + BUCKETS 4 + PROPERTIES ( "replication_num" = "1", "light_schema_change" = "true"); + """ + sql """ + INSERT INTO add_keys_light_schema_change VALUES + (1, '2017-10-01', 'Beijing', 10, 1, 1, 20, hll_hash(1), to_bitmap(1)) + """ + sql """ + ALTER TABLE add_keys_light_schema_change ADD ROLLUP first_idx (user_id, date, city, age, cost); + """ + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE ROLLUP WHERE IndexName='add_keys_light_schema_change' ORDER BY createtime DESC LIMIT 1 """, 8) + sql """ + ALTER TABLE add_keys_light_schema_change ADD COLUMN new_key_column INT default "2" AFTER age TO first_idx PROPERTIES ("timeout"="604800"); + """ + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='add_keys_light_schema_change' ORDER BY createtime DESC LIMIT 1 """, 9) + assertEquals(jobStateResult[0][8], "-1") + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='first_idx' ORDER BY createtime DESC LIMIT 1 """, 9) + assertEquals(jobStateResult[0][8], "-1") + sql """ + INSERT INTO add_keys_light_schema_change (user_id,date,city,age,sex,cost,max_dwell_time,hll_col,bitmap_col) VALUES + (1, '2017-10-01', 'Beijing', 10, 1, 1, 30, hll_hash(2), to_bitmap(2)) + """ + sql """ + INSERT INTO add_keys_light_schema_change (user_id,date,city,age,sex,new_key_column,cost,max_dwell_time,hll_col,bitmap_col) VALUES + (1, '2017-10-01', 'Beijing', 10, 1, 1, 1, 40, hll_hash(3), to_bitmap(3)) + """ + table_tablets = sql """ SHOW TABLETS FROM add_keys_light_schema_change WHERE IndexName = "add_keys_light_schema_change" ORDER BY RowCount DESC LIMIT 1 """ + rowset_cnt = sql """ SELECT count(*) FROM information_schema.rowsets WHERE TABLET_ID = ${table_tablets[0][0]} """ + log.info(JsonOutput.toJson(rowset_cnt)) + assertEquals(rowset_cnt[0][0], 4) + + qt_23_base_table_multi_rowset """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change GROUP BY user_id; """ + qt_23_base_table_multi_rowset """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 2 GROUP BY user_id; """ + qt_23_base_table_multi_rowset """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 1 GROUP BY user_id; """ + + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), table_tablets[0][0]) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + + qt_23_base_table_compaction """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change GROUP BY user_id; """ + qt_23_base_table_compaction """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 2 GROUP BY user_id; """ + qt_23_base_table_compaction """ SELECT user_id,sum(cost),max(max_dwell_time),hll_union_agg(hll_col),bitmap_union_count(bitmap_col) FROM add_keys_light_schema_change WHERE new_key_column = 1 GROUP BY user_id; """ + + + table_tablets = sql """ SHOW TABLETS FROM add_keys_light_schema_change WHERE IndexName = "first_idx" ORDER BY RowCount DESC LIMIT 1 """ + rowset_cnt = sql """ SELECT count(*) FROM information_schema.rowsets WHERE TABLET_ID = ${table_tablets[0][0]} """ + log.info(JsonOutput.toJson(rowset_cnt)) + + assertEquals(rowset_cnt[0][0], 4) + qt_23_rollup_multi_rowset """ SELECT user_id,sum(cost) FROM add_keys_light_schema_change GROUP BY user_id; """ + qt_23_rollup_multi_rowset """ SELECT user_id,sum(cost) FROM add_keys_light_schema_change WHERE new_key_column = 2 GROUP BY user_id; """ + qt_23_rollup_multi_rowset """ SELECT user_id,sum(cost) FROM add_keys_light_schema_change WHERE new_key_column = 1 GROUP BY user_id; """ + + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), table_tablets[0][0]) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + + qt_23_agg_rollup_compaction """ SELECT user_id,sum(cost) FROM add_keys_light_schema_change GROUP BY user_id; """ + qt_23_agg_rollup_compaction """ SELECT user_id,sum(cost) FROM add_keys_light_schema_change WHERE new_key_column = 2 GROUP BY user_id; """ + qt_23_agg_rollup_compaction """ SELECT user_id,sum(cost) FROM add_keys_light_schema_change WHERE new_key_column = 1 GROUP BY user_id; """ + + // case 3.1: light schema change duplicate : with multi version rowset and compaction + sql """ DROP TABLE IF EXISTS add_keys_light_schema_change """ + sql """ + CREATE TABLE IF NOT EXISTS add_keys_light_schema_change ( + `timestamp` DATETIME NOT NULL COMMENT "日志时间", + `type` INT NOT NULL COMMENT "日志类型", + `error_code` INT COMMENT "错误码", + `error_msg` VARCHAR(1024) COMMENT "错误详细信息", + `op_id` BIGINT COMMENT "负责人id", + `op_time` DATETIME COMMENT "处理时间") + DUPLICATE KEY(`timestamp`, `type`, `error_code`) DISTRIBUTED BY HASH(`type`) + BUCKETS 4 + PROPERTIES ( "replication_num" = "1", "light_schema_change" = "true"); + """ + sql """ + INSERT INTO add_keys_light_schema_change VALUES + ('2017-10-01 10:00:00', '1', 0, '', 10, '2017-10-01 12:00:00') + """ + sql """ + ALTER TABLE add_keys_light_schema_change ADD COLUMN new_key_column INT default "2" AFTER error_code PROPERTIES ("timeout"="604800"); + """ + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='add_keys_light_schema_change' ORDER BY createtime DESC LIMIT 1 """, 9) + assertEquals(jobStateResult[0][8], "-1") + sql """ + INSERT INTO add_keys_light_schema_change (timestamp,type,error_code,error_msg,op_id,op_time) VALUES + ('2017-10-01 10:00:00', '1', 0, 'none', 10, '2017-10-01 12:00:00') + """ + sql """ + INSERT INTO add_keys_light_schema_change (timestamp,type,error_code,new_key_column,error_msg,op_id,op_time) VALUES + ('2017-10-01 10:00:00', '1', 0, 1, 'none', 10, '2017-10-01 12:00:00') + """ + table_tablets = sql """ SHOW TABLETS FROM add_keys_light_schema_change ORDER BY RowCount DESC LIMIT 1 """ + rowset_cnt = sql """ SELECT count(*) FROM information_schema.rowsets WHERE TABLET_ID = ${table_tablets[0][0]} """ + log.info(JsonOutput.toJson(rowset_cnt)) + assertEquals(rowset_cnt[0][0], 4) + + qt_31_duplicate_multi_rowset """ SELECT * FROM add_keys_light_schema_change; """ + qt_31_duplicate_multi_rowset """ SELECT * FROM add_keys_light_schema_change WHERE new_key_column = 2; """ + qt_31_duplicate_multi_rowset """ SELECT * FROM add_keys_light_schema_change WHERE new_key_column = 1; """ + + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), table_tablets[0][0]) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + + qt_31_duplicate_compaction """ SELECT * FROM add_keys_light_schema_change; """ + qt_31_duplicate_compaction """ SELECT * FROM add_keys_light_schema_change WHERE new_key_column = 2; """ + qt_31_duplicate_compaction """ SELECT * FROM add_keys_light_schema_change WHERE new_key_column = 1; """ + + // case 4.1: light schema change unique : with multi version rowset and compaction + sql """ DROP TABLE IF EXISTS add_keys_light_schema_change """ + sql """ + CREATE TABLE IF NOT EXISTS add_keys_light_schema_change ( + `user_id` LARGEINT NOT NULL COMMENT "用户id", + `username` VARCHAR(50) NOT NULL COMMENT "用户昵称", + `city` VARCHAR(20) COMMENT "用户所在城市", + `age` SMALLINT COMMENT "用户年龄", + `sex` TINYINT COMMENT "用户性别", + `phone` LARGEINT COMMENT "用户电话", + `address` VARCHAR(500) COMMENT "用户地址", + `register_time` DATETIME COMMENT "用户注册时间") + UNIQUE KEY(`user_id`, `username`, `city`) + DISTRIBUTED BY HASH(`user_id`) + BUCKETS 4 + PROPERTIES ( "replication_num" = "1", "light_schema_change" = "true"); + """ + sql """ + INSERT INTO add_keys_light_schema_change VALUES + (1, 'Jone', 'Beijing', 10, 1, 10010, 'Haidian', '2017-10-01 12:00:00') + """ + sql """ + ALTER TABLE add_keys_light_schema_change ADD COLUMN new_key_column INT default "2" AFTER city PROPERTIES ("timeout"="604800"); + """ + jobStateResult = waitJobFinish(""" SHOW ALTER TABLE COLUMN WHERE IndexName='add_keys_light_schema_change' ORDER BY createtime DESC LIMIT 1 """, 9) + assertEquals(jobStateResult[0][8], "-1") + sql """ + INSERT INTO add_keys_light_schema_change (user_id,username,city,age,sex,phone,address,register_time) VALUES + (1, 'Jone', 'Beijing', 10, 1, 10010, 'Haidian', '2017-10-01 13:00:00') + """ + sql """ + INSERT INTO add_keys_light_schema_change (user_id,username,city,new_key_column,age,sex,phone,address,register_time) VALUES + (1, 'Jone', 'Beijing', 1, 10, 1, 10010, 'Haidian', '2017-10-01 14:00:00') + """ + table_tablets = sql """ SHOW TABLETS FROM add_keys_light_schema_change ORDER BY RowCount DESC LIMIT 1 """ + rowset_cnt = sql """ SELECT count(*) FROM information_schema.rowsets WHERE TABLET_ID = ${table_tablets[0][0]} """ + log.info(JsonOutput.toJson(rowset_cnt)) + assertEquals(rowset_cnt[0][0], 4) + + qt_41_unique_multi_rowset """ SELECT * FROM add_keys_light_schema_change; """ + qt_41_unique_multi_rowset """ SELECT * FROM add_keys_light_schema_change WHERE new_key_column = 2; """ + qt_41_unique_multi_rowset """ SELECT * FROM add_keys_light_schema_change WHERE new_key_column = 1; """ + + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), table_tablets[0][0]) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + + qt_41_unique_compaction """ SELECT * FROM add_keys_light_schema_change; """ + qt_41_unique_compaction """ SELECT * FROM add_keys_light_schema_change WHERE new_key_column = 2; """ + qt_41_unique_compaction """ SELECT * FROM add_keys_light_schema_change WHERE new_key_column = 1; """ + + } finally { + //try_sql("DROP TABLE IF EXISTS ${tableName}") + } +}