diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index e62302228a3..5e177ea3f9e 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -45,9 +45,6 @@ extern const int LOGICAL_ERROR; extern const int NO_SUCH_COLUMN_IN_TABLE; } // namespace ErrorCodes -using TiDB::DatumFlat; -using TiDB::TableInfo; - using DAGColumnInfo = std::pair; using DAGSchema = std::vector; static const String ENCODE_TYPE_NAME = "encode_type"; @@ -129,6 +126,16 @@ std::unordered_map func_name_to_sig({ }); +std::unordered_map agg_func_name_to_sig({ + {"min", tipb::ExprType::Min}, + {"max", tipb::ExprType::Max}, + {"count", tipb::ExprType::Count}, + {"sum", tipb::ExprType::Sum}, + {"first_row", tipb::ExprType::First}, + {"uniqRawRes", tipb::ExprType::ApproxCountDistinct}, + {"group_concat", tipb::ExprType::GroupConcat}, +}); + std::pair splitQualifiedName(String s) { std::pair ret; @@ -277,11 +284,11 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA { /// contains a table scan auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(table_id); - if (regions.size() < (size_t)properties.mpp_partition_num) + if (regions.size() < static_cast(properties.mpp_partition_num)) throw Exception("Not supported: table region num less than mpp partition num"); for (size_t i = 0; i < regions.size(); i++) { - if (i % properties.mpp_partition_num != (size_t)task.partition_id) + if (i % properties.mpp_partition_num != static_cast(task.partition_id)) continue; auto * region = req->add_regions(); region->set_region_id(regions[i].first); @@ -300,12 +307,12 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA throw Exception("Meet error while dispatch mpp task: " + call.getResp()->error().msg()); } tipb::ExchangeReceiver tipb_exchange_receiver; - for (size_t i = 0; i < root_task_ids.size(); i++) + for (const auto root_task_id : root_task_ids) { mpp::TaskMeta tm; tm.set_start_ts(properties.start_ts); tm.set_address(LOCAL_HOST); - tm.set_task_id(root_task_ids[i]); + tm.set_task_id(root_task_id); tm.set_partition_id(-1); auto * tm_string = tipb_exchange_receiver.add_encoded_task_meta(); tm.AppendToString(tm_string); @@ -373,7 +380,7 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA BlockInputStreamPtr dbgFuncTiDBQuery(Context & context, const ASTs & args) { - if (args.size() < 1 || args.size() > 3) + if (args.empty() || args.size() > 3) throw Exception("Args not matched, should be: query[, region-id, dag_prop_string]", ErrorCodes::BAD_ARGUMENTS); String query = safeGet(typeid_cast(*args[0]).value); @@ -381,7 +388,7 @@ BlockInputStreamPtr dbgFuncTiDBQuery(Context & context, const ASTs & args) if (args.size() >= 2) region_id = safeGet(typeid_cast(*args[1]).value); - String prop_string = ""; + String prop_string; if (args.size() == 3) prop_string = safeGet(typeid_cast(*args[2]).value); DAGProperties properties = getDAGProperties(prop_string); @@ -417,7 +424,7 @@ BlockInputStreamPtr dbgFuncMockTiDBQuery(Context & context, const ASTs & args) if (start_ts == 0) start_ts = context.getTMTContext().getPDClient()->getTS(); - String prop_string = ""; + String prop_string; if (args.size() == 4) prop_string = safeGet(typeid_cast(*args[3]).value); DAGProperties properties = getDAGProperties(prop_string); @@ -525,7 +532,7 @@ void foldConstant(tipb::Expr * expr, uint32_t collator_id, const Context & conte if (expr->tp() == tipb::ScalarFunc) { bool all_const = true; - for (auto c : expr->children()) + for (const auto & c : expr->children()) { if (!isLiteralExpr(c)) { @@ -537,7 +544,7 @@ void foldConstant(tipb::Expr * expr, uint32_t collator_id, const Context & conte return; DataTypes arguments_types; ColumnsWithTypeAndName argument_columns; - for (auto & c : expr->children()) + for (const auto & c : expr->children()) { Field value = decodeLiteral(c); DataTypePtr flash_type = applyVisitor(FieldToDataType(), value); @@ -821,7 +828,7 @@ void collectUsedColumnsFromExpr(const DAGSchema & input, ASTPtr ast, std::unorde else { bool found = false; - for (auto & field : input) + for (const auto & field : input) { auto field_name = splitQualifiedName(field.first); if (field_name.second == column_name.second) @@ -867,14 +874,10 @@ void collectUsedColumnsFromExpr(const DAGSchema & input, ASTPtr ast, std::unorde struct MPPCtx { Timestamp start_ts; - Int64 partition_num; Int64 next_task_id; std::vector sender_target_task_ids; - std::vector current_task_ids; - std::vector partition_keys; - MPPCtx(Timestamp start_ts_, size_t partition_num_) + explicit MPPCtx(Timestamp start_ts_) : start_ts(start_ts_) - , partition_num(partition_num_) , next_task_id(1) {} }; @@ -930,7 +933,7 @@ struct Executor { children[0]->toMPPSubPlan(executor_index, properties, exchange_map); } - virtual ~Executor() {} + virtual ~Executor() = default; }; struct ExchangeSender : Executor @@ -1047,7 +1050,7 @@ struct TableScan : public Executor ci->set_decimal(info.second.decimal); if (!info.second.elems.empty()) { - for (auto & pair : info.second.elems) + for (const auto & pair : info.second.elems) { ci->add_elems(pair.first); } @@ -1186,61 +1189,43 @@ struct Aggregation : public Executor tipb::Expr * arg_expr = agg_func->add_children(); astToPB(input_schema, arg, arg_expr, collator_id, context); } + auto agg_sig_it = agg_func_name_to_sig.find(func->name); + if (agg_sig_it == agg_func_name_to_sig.end()) + throw Exception("Unsupported agg function " + func->name, ErrorCodes::LOGICAL_ERROR); + auto agg_sig = agg_sig_it->second; + agg_func->set_tp(agg_sig); - if (func->name == "count") - { - agg_func->set_tp(tipb::Count); - auto ft = agg_func->mutable_field_type(); - ft->set_tp(TiDB::TypeLongLong); - ft->set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull); - } - else if (func->name == "sum") + if (agg_sig == tipb::ExprType::Count || agg_sig == tipb::ExprType::Sum) { - agg_func->set_tp(tipb::Sum); - auto ft = agg_func->mutable_field_type(); + auto * ft = agg_func->mutable_field_type(); ft->set_tp(TiDB::TypeLongLong); ft->set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull); } - else if (func->name == "max") - { - agg_func->set_tp(tipb::Max); - if (agg_func->children_size() != 1) - throw Exception("udaf max only accept 1 argument"); - auto ft = agg_func->mutable_field_type(); - ft->set_tp(agg_func->children(0).field_type().tp()); - ft->set_decimal(agg_func->children(0).field_type().decimal()); - ft->set_flag(agg_func->children(0).field_type().flag()); - ft->set_collate(collator_id); - } - else if (func->name == "min") + else if (agg_sig == tipb::ExprType::Min || agg_sig == tipb::ExprType::Max || agg_sig == tipb::ExprType::First) { - agg_func->set_tp(tipb::Min); if (agg_func->children_size() != 1) - throw Exception("udaf min only accept 1 argument"); - auto ft = agg_func->mutable_field_type(); + throw Exception("udaf " + func->name + " only accept 1 argument"); + auto * ft = agg_func->mutable_field_type(); ft->set_tp(agg_func->children(0).field_type().tp()); ft->set_decimal(agg_func->children(0).field_type().decimal()); - ft->set_flag(agg_func->children(0).field_type().flag()); + ft->set_flag(agg_func->children(0).field_type().flag() & (~TiDB::ColumnFlagNotNull)); ft->set_collate(collator_id); } - else if (func->name == uniq_raw_res_name) + else if (agg_sig == tipb::ExprType::ApproxCountDistinct) { - agg_func->set_tp(tipb::ApproxCountDistinct); - auto ft = agg_func->mutable_field_type(); + auto * ft = agg_func->mutable_field_type(); ft->set_tp(TiDB::TypeString); ft->set_flag(1); } - else if (func->name == "group_concat") + else if (agg_sig == tipb::ExprType::GroupConcat) { - agg_func->set_tp(tipb::GroupConcat); - auto ft = agg_func->mutable_field_type(); + auto * ft = agg_func->mutable_field_type(); ft->set_tp(TiDB::TypeString); } - // TODO: Other agg func. + if (is_final_mode) + agg_func->set_aggfuncmode(tipb::AggFunctionMode::FinalMode); else - { - throw Exception("Unsupported agg function " + func->name, ErrorCodes::LOGICAL_ERROR); - } + agg_func->set_aggfuncmode(tipb::AggFunctionMode::Partial1Mode); } for (const auto & child : gby_exprs) @@ -1289,8 +1274,6 @@ struct Aggregation : public Executor // todo support avg if (has_uniq_raw_res) throw Exception("uniq raw res not supported in mpp query"); - if (gby_exprs.size() == 0) - throw Exception("agg without group by columns not supported in mpp query"); std::shared_ptr partial_agg = std::make_shared( executor_index, output_schema_for_partial_agg, @@ -1307,7 +1290,7 @@ struct Aggregation : public Executor partition_keys.push_back(i + agg_func_num); } std::shared_ptr exchange_sender - = std::make_shared(executor_index, output_schema_for_partial_agg, tipb::Hash, partition_keys); + = std::make_shared(executor_index, output_schema_for_partial_agg, partition_keys.empty() ? tipb::PassThrough : tipb::Hash, partition_keys); exchange_sender->children.push_back(partial_agg); std::shared_ptr exchange_receiver @@ -1421,14 +1404,14 @@ struct Join : Executor std::unordered_set left_used_columns; std::unordered_set right_used_columns; - for (auto & s : used_columns) + for (const auto & s : used_columns) { if (left_columns.find(s) != left_columns.end()) left_used_columns.emplace(s); else right_used_columns.emplace(s); } - for (auto child : join_params.using_expression_list->children) + for (const auto & child : join_params.using_expression_list->children) { if (auto * identifier = typeid_cast(child.get())) { @@ -1475,7 +1458,7 @@ struct Join : Executor } } - void fillJoinKeyAndFieldType( + static void fillJoinKeyAndFieldType( ASTPtr key, const DAGSchema & schema, tipb::Expr * tipb_key, @@ -1485,7 +1468,7 @@ struct Join : Executor auto * identifier = typeid_cast(key.get()); for (size_t index = 0; index < schema.size(); index++) { - auto & field = schema[index]; + const auto & field = schema[index]; if (splitQualifiedName(field.first).second == identifier->getColumnName()) { auto tipb_type = TiDB::columnInfoToFieldType(field.second); @@ -1834,9 +1817,9 @@ ExecutorPtr compileTopN(ExecutorPtr input, size_t & executor_index, ASTPtr order compileExpr(input->output_schema, elem->children[0]); } auto limit = safeGet(typeid_cast(*limit_expr).value); - auto topN = std::make_shared(executor_index, input->output_schema, std::move(order_columns), limit); - topN->children.push_back(input); - return topN; + auto top_n = std::make_shared(executor_index, input->output_schema, std::move(order_columns), limit); + top_n->children.push_back(input); + return top_n; } ExecutorPtr compileLimit(ExecutorPtr input, size_t & executor_index, ASTPtr limit_expr) @@ -1879,9 +1862,10 @@ ExecutorPtr compileAggregation(ExecutorPtr input, size_t & executor_index, ASTPt ci.tp = TiDB::TypeLongLong; ci.flag = TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull; } - else if (func->name == "max" || func->name == "min") + else if (func->name == "max" || func->name == "min" || func->name == "first_row") { ci = children_ci[0]; + ci.flag &= ~TiDB::ColumnFlagNotNull; } else if (func->name == uniq_raw_res_name) { @@ -1971,7 +1955,7 @@ ExecutorPtr compileProject(ExecutorPtr input, size_t & executor_index, ASTPtr se ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, ASTPtr params) { DAGSchema output_schema; - auto & join_params = (static_cast(*params)); + const auto & join_params = (static_cast(*params)); for (auto & field : left->output_schema) { if (join_params.kind == ASTTableJoin::Kind::Right && field.second.hasNotNullFlag()) @@ -2035,7 +2019,7 @@ struct QueryFragment dag_request.add_output_offsets(i); auto * root_tipb_executor = dag_request.mutable_root_executor(); root_executor->toTiPBExecutor(root_tipb_executor, properties.collator, mpp_info, context); - return QueryTask(dag_request_ptr, table_id, root_executor->output_schema, mpp_info.sender_target_task_ids.size() == 0 ? DAG : MPP_DISPATCH, mpp_info.task_id, mpp_info.partition_id, is_top_fragment); + return QueryTask(dag_request_ptr, table_id, root_executor->output_schema, mpp_info.sender_target_task_ids.empty() ? DAG : MPP_DISPATCH, mpp_info.task_id, mpp_info.partition_id, is_top_fragment); } QueryTasks toQueryTasks(const DAGProperties & properties, const Context & context) @@ -2071,7 +2055,7 @@ TableID findTableIdForQueryFragment(ExecutorPtr root_executor, bool must_have_ta while (!current_executor->children.empty()) { ExecutorPtr non_exchange_child; - for (auto c : current_executor->children) + for (const auto & c : current_executor->children) { if (dynamic_cast(c.get())) continue; @@ -2109,17 +2093,24 @@ QueryFragments mppQueryToQueryFragments( root_executor->toMPPSubPlan(executor_index, properties, exchange_map); TableID table_id = findTableIdForQueryFragment(root_executor, exchange_map.empty()); std::vector sender_target_task_ids = mpp_ctx->sender_target_task_ids; - std::vector current_task_ids = mpp_ctx->current_task_ids; std::unordered_map> receiver_source_task_ids_map; + size_t current_task_num = properties.mpp_partition_num; + for (auto & exchange : exchange_map) + { + if (exchange.second.second->type == tipb::ExchangeType::PassThrough) + { + current_task_num = 1; + break; + } + } + std::vector current_task_ids; + for (size_t i = 0; i < current_task_num; i++) + current_task_ids.push_back(mpp_ctx->next_task_id++); for (auto & exchange : exchange_map) { - std::vector task_ids; - for (size_t i = 0; i < (size_t)mpp_ctx->partition_num; i++) - task_ids.push_back(mpp_ctx->next_task_id++); mpp_ctx->sender_target_task_ids = current_task_ids; - mpp_ctx->current_task_ids = task_ids; - receiver_source_task_ids_map[exchange.first] = task_ids; auto sub_fragments = mppQueryToQueryFragments(exchange.second.second, executor_index, properties, false, mpp_ctx); + receiver_source_task_ids_map[exchange.first] = sub_fragments.cbegin()->task_ids; fragments.insert(fragments.end(), sub_fragments.begin(), sub_fragments.end()); } fragments.emplace_back(root_executor, table_id, for_root_fragment, std::move(sender_target_task_ids), std::move(receiver_source_task_ids_map), std::move(current_task_ids)); @@ -2134,10 +2125,8 @@ QueryFragments queryPlanToQueryFragments(const DAGProperties & properties, Execu = std::make_shared(executor_index, root_executor->output_schema, tipb::PassThrough); root_exchange_sender->children.push_back(root_executor); root_executor = root_exchange_sender; - MPPCtxPtr mpp_ctx = std::make_shared(properties.start_ts, properties.mpp_partition_num); + MPPCtxPtr mpp_ctx = std::make_shared(properties.start_ts); mpp_ctx->sender_target_task_ids.emplace_back(-1); - for (size_t i = 0; i < (size_t)properties.mpp_partition_num; i++) - mpp_ctx->current_task_ids.push_back(mpp_ctx->next_task_id++); return mppQueryToQueryFragments(root_executor, executor_index, properties, true, mpp_ctx); } else @@ -2195,7 +2184,7 @@ std::pair compileQueryBlock( const DAGProperties & properties, ASTSelectQuery & ast_query) { - auto joined_table = getJoin(ast_query); + const auto * joined_table = getJoin(ast_query); /// uniq_raw is used to test `ApproxCountDistinct`, when testing `ApproxCountDistinct` in mock coprocessor /// the return value of `ApproxCountDistinct` is just the raw result, we need to convert it to a readable /// value when decoding the result(using `UniqRawResReformatBlockOutputStream`) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 2395bc50fef..5f6da4cd435 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -84,6 +84,7 @@ struct AnalysisResult Names aggregation_keys; TiDB::TiDBCollators aggregation_collators; AggregateDescriptions aggregate_descriptions; + bool is_final_agg; }; // add timezone cast for timestamp type, this is used to support session level timezone @@ -103,6 +104,15 @@ bool addExtraCastsAfterTs( return analyzer.appendExtraCastsAfterTS(chain, need_cast_column, query_block); } +bool isFinalAgg(const tipb::Expr & expr) +{ + if (!expr.has_aggfuncmode()) + /// set default value to true to make it compatible with old version of TiDB since before this + /// change, all the aggregation in TiFlash is treated as final aggregation + return true; + return expr.aggfuncmode() == tipb::AggFunctionMode::FinalMode || expr.aggfuncmode() == tipb::AggFunctionMode::CompleteMode; +} + AnalysisResult analyzeExpressions( Context & context, DAGExpressionAnalyzer & analyzer, @@ -149,12 +159,22 @@ AnalysisResult analyzeExpressions( // There will be either Agg... if (query_block.aggregation) { + /// set default value to true to make it compatible with old version of TiDB since before this + /// change, all the aggregation in TiFlash is treated as final aggregation + res.is_final_agg = true; + const auto & aggregation = query_block.aggregation->aggregation(); + if (aggregation.agg_func_size() > 0 && !isFinalAgg(aggregation.agg_func(0))) + res.is_final_agg = false; + for (int i = 1; i < aggregation.agg_func_size(); i++) + { + if (res.is_final_agg != isFinalAgg(aggregation.agg_func(i))) + throw TiFlashException("Different aggregation mode detected", Errors::Coprocessor::BadRequest); + } + // todo now we can tell if the aggregation is final stage or partial stage, maybe we can do collation insensitive + // aggregation if the stage is partial bool group_by_collation_sensitive = - /// collation sensitive group by is slower then normal group by, use normal group by by default - context.getSettingsRef().group_by_collation_sensitive || - /// in mpp task, here is no way to tell whether this aggregation is first stage aggregation or - /// final stage aggregation, to make sure the result is right, always do collation sensitive aggregation - context.getDAGContext()->isMPPTask(); + /// collation sensitive group by is slower than normal group by, use normal group by by default + context.getSettingsRef().group_by_collation_sensitive || context.getDAGContext()->isMPPTask(); std::tie(res.aggregation_keys, res.aggregation_collators, res.aggregate_descriptions) = analyzer.appendAggregation( chain, @@ -642,7 +662,8 @@ void DAGQueryBlockInterpreter::executeAggregation( const ExpressionActionsPtr & expression_actions_ptr, Names & key_names, TiDB::TiDBCollators & collators, - AggregateDescriptions & aggregate_descriptions) + AggregateDescriptions & aggregate_descriptions, + bool is_final_agg) { pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expression_actions_ptr, taskLogger()); }); @@ -682,7 +703,7 @@ void DAGQueryBlockInterpreter::executeAggregation( allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, - settings.empty_result_for_aggregation_by_empty_set, + !is_final_agg, context.getTemporaryPath(), has_collator ? collators : TiDB::dummy_collators); @@ -1113,7 +1134,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) if (res.before_aggregation) { // execute aggregation - executeAggregation(pipeline, res.before_aggregation, res.aggregation_keys, res.aggregation_collators, res.aggregate_descriptions); + executeAggregation(pipeline, res.before_aggregation, res.aggregation_keys, res.aggregation_collators, res.aggregate_descriptions, res.is_final_agg); } if (res.before_having) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index 569cec69834..dceadb9c30a 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -79,7 +79,8 @@ class DAGQueryBlockInterpreter const ExpressionActionsPtr & expression_actions_ptr, Names & key_names, TiDB::TiDBCollators & collators, - AggregateDescriptions & aggregate_descriptions); + AggregateDescriptions & aggregate_descriptions, + bool is_final_agg); void executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols); void executeExchangeSender(DAGPipeline & pipeline); diff --git a/tests/delta-merge-test/query/mpp/aggregation_empty_input.test b/tests/delta-merge-test/query/mpp/aggregation_empty_input.test new file mode 100644 index 00000000000..b91027b29c4 --- /dev/null +++ b/tests/delta-merge-test/query/mpp/aggregation_empty_input.test @@ -0,0 +1,43 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __put_region(5, 100, 200, default, test) +=> DBGInvoke __put_region(6, 200, 300, default, test) + +# shuffle agg with empty table +=> DBGInvoke tidb_query('select count(col_1) from default.test', 4,'mpp_query:true,mpp_partition_num:3') +┌─exchange_receiver_0─┐ +│ 0 │ +└─────────────────────┘ + +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 52, 'test3', 777) +=> DBGInvoke __raft_insert_row(default, test, 4, 53, 'test4', 888) +=> DBGInvoke __raft_insert_row(default, test, 5, 150, 'test1', 666) +=> DBGInvoke __raft_insert_row(default, test, 5, 151, 'test2', 666) +=> DBGInvoke __raft_insert_row(default, test, 5, 152, 'test3', 777) +=> DBGInvoke __raft_insert_row(default, test, 5, 153, 'test4', 888) +=> DBGInvoke __raft_insert_row(default, test, 6, 250, 'test1', 666) +=> DBGInvoke __raft_insert_row(default, test, 6, 251, 'test2', 666) +=> DBGInvoke __raft_insert_row(default, test, 6, 252, 'test3', 777) +=> DBGInvoke __raft_insert_row(default, test, 6, 253, 'test4', 999) + +# shuffle agg +=> DBGInvoke tidb_query('select count(col_1), first_row(col_2) from default.test where col_2 = 999', 4,'mpp_query:true,mpp_partition_num:3') +┌─exchange_receiver_0─┬─exchange_receiver_1─┐ +│ 1 │ 999 │ +└─────────────────────┴─────────────────────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/fullstack-test/expr/empty_input_for_udaf.test b/tests/fullstack-test/expr/empty_input_for_udaf.test index 7327e398f2d..36d60b07944 100644 --- a/tests/fullstack-test/expr/empty_input_for_udaf.test +++ b/tests/fullstack-test/expr/empty_input_for_udaf.test @@ -61,3 +61,79 @@ mysql> select /*+ read_from_storage(tiflash[t]) */ max(1),max(a),max(b),max(d),m +--------+--------+--------+--------+-----------+ | 1 | 1 | 2 | 1 | NULL | +--------+--------+--------+--------+-----------+ + +mysql> drop table if exists test.t +mysql> create table test.t(a int, b int) +mysql> insert into test.t values(1,1) +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> split table test.t between (0) and (30000) regions 20; ++--------------------+----------------------+ +| TOTAL_SPLIT_REGION | SCATTER_FINISH_RATIO | ++--------------------+----------------------+ +| 19 | 1 | ++--------------------+----------------------+ +mysql> alter table test.t set tiflash replica 1 location labels 'rack', 'host', 'abc' +func> wait_table test t +mysql> set tidb_allow_mpp=0; set tidb_allow_batch_cop=2; select any_value(a), sum(b), count(*) from test.t where a = 2; ++--------------+--------+----------+ +| any_value(a) | sum(b) | count(*) | ++--------------+--------+----------+ +| NULL | NULL | 0 | ++--------------+--------+----------+ +mysql> set tidb_allow_mpp=0; set tidb_allow_batch_cop=1; select any_value(a), sum(b), count(*) from test.t where a = 2; ++--------------+--------+----------+ +| any_value(a) | sum(b) | count(*) | ++--------------+--------+----------+ +| NULL | NULL | 0 | ++--------------+--------+----------+ +mysql> set tidb_allow_mpp=0; set tidb_allow_batch_cop=0; select any_value(a), sum(b), count(*) from test.t where a = 2; ++--------------+--------+----------+ +| any_value(a) | sum(b) | count(*) | ++--------------+--------+----------+ +| NULL | NULL | 0 | ++--------------+--------+----------+ +mysql> set tidb_enforce_mpp=1; select any_value(a), sum(b), count(*) from test.t where a = 2; ++--------------+--------+----------+ +| any_value(a) | sum(b) | count(*) | ++--------------+--------+----------+ +| NULL | NULL | 0 | ++--------------+--------+----------+ +mysql> insert into test.t values(2,1) +mysql> set tidb_allow_mpp=0; set tidb_allow_batch_cop=2; select any_value(a), sum(b), count(*) from test.t where a = 2; ++--------------+--------+----------+ +| any_value(a) | sum(b) | count(*) | ++--------------+--------+----------+ +| 2 | 1 | 1 | ++--------------+--------+----------+ +mysql> set tidb_allow_mpp=0; set tidb_allow_batch_cop=1; select any_value(a), sum(b), count(*) from test.t where a = 2; ++--------------+--------+----------+ +| any_value(a) | sum(b) | count(*) | ++--------------+--------+----------+ +| 2 | 1 | 1 | ++--------------+--------+----------+ +mysql> set tidb_allow_mpp=0; set tidb_allow_batch_cop=0; select any_value(a), sum(b), count(*) from test.t where a = 2; ++--------------+--------+----------+ +| any_value(a) | sum(b) | count(*) | ++--------------+--------+----------+ +| 2 | 1 | 1 | ++--------------+--------+----------+ +mysql> set tidb_enforce_mpp=1; select any_value(a), sum(b), count(*) from test.t where a = 2; ++--------------+--------+----------+ +| any_value(a) | sum(b) | count(*) | ++--------------+--------+----------+ +| 2 | 1 | 1 | ++--------------+--------+----------+