From 06fe3d664e81712837c412c1f1318177dcbd25c4 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Sat, 6 May 2023 10:18:14 +0800 Subject: [PATCH 01/20] start --- .../DataStreams/WindowBlockInputStream.cpp | 21 ++- dbms/src/DataStreams/WindowBlockInputStream.h | 3 +- dbms/src/Debug/MockExecutor/FuncSigMap.cpp | 1 + dbms/src/Debug/MockExecutor/WindowBinder.cpp | 18 +++ dbms/src/Flash/Coprocessor/DAGUtils.cpp | 3 +- dbms/src/TestUtils/mockExecutor.h | 1 + dbms/src/WindowFunctions/IWindowFunction.cpp | 39 +++++ .../tests/gtest_first_value.cpp | 140 ++++++++++++++++++ .../WindowFunctions/tests/gtest_lead_lag.cpp | 7 +- 9 files changed, 220 insertions(+), 13 deletions(-) create mode 100644 dbms/src/WindowFunctions/tests/gtest_first_value.cpp diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index ce2acfd9163..95a2955a507 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -122,9 +122,14 @@ Block WindowBlockInputStream::readImpl() } // Judge whether current_partition_row is end row of partition in current block +// How to judge? +// Compare data in previous partition with the new scanned data. bool WindowTransformAction::isDifferentFromPrevPartition(UInt64 current_partition_row) { + // prev_frame_start refers to the data in previous partition const Columns & reference_columns = inputAt(prev_frame_start); + + // partition_end refers to the new scanned data const Columns & compared_columns = inputAt(partition_end); for (size_t i = 0; i < partition_column_indices.size(); ++i) @@ -299,9 +304,9 @@ void WindowTransformAction::advanceFrameStart() } } -bool WindowTransformAction::arePeers(const RowNumber & x, const RowNumber & y) const +bool WindowTransformAction::arePeers(const RowNumber & peer_group_last_row, const RowNumber & current_row) const { - if (x == y) + if (peer_group_last_row == current_row) { // For convenience, a row is always its own peer. return true; @@ -324,18 +329,18 @@ bool WindowTransformAction::arePeers(const RowNumber & x, const RowNumber & y) c for (size_t i = 0; i < n; ++i) { - const auto * column_x = inputAt(x)[order_column_indices[i]].get(); - const auto * column_y = inputAt(y)[order_column_indices[i]].get(); + const auto * column_peer_last = inputAt(peer_group_last_row)[order_column_indices[i]].get(); + const auto * column_current = inputAt(current_row)[order_column_indices[i]].get(); if (window_description.order_by[i].collator) { - if (column_x->compareAt(x.row, y.row, *column_y, 1 /* nan_direction_hint */, *window_description.order_by[i].collator) != 0) + if (column_peer_last->compareAt(peer_group_last_row.row, current_row.row, *column_current, 1 /* nan_direction_hint */, *window_description.order_by[i].collator) != 0) { return false; } } else { - if (column_x->compareAt(x.row, y.row, *column_y, 1 /* nan_direction_hint */) != 0) + if (column_peer_last->compareAt(peer_group_last_row.row, current_row.row, *column_current, 1 /* nan_direction_hint */) != 0) { return false; } @@ -607,8 +612,8 @@ void WindowTransformAction::tryCalculate() partition_start = partition_end; advanceRowNumber(partition_end); partition_ended = false; - // We have to reset the frame and other pointers when the new partition - // starts. + + // We have to reset the frame and other pointers when the new partition starts. frame_start = partition_start; frame_end = partition_start; prev_frame_start = partition_start; diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index 36ffccbe57e..abfd2a1fdcc 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -78,7 +78,7 @@ struct WindowTransformAction void advancePartitionEnd(); bool isDifferentFromPrevPartition(UInt64 current_partition_row); - bool arePeers(const RowNumber & x, const RowNumber & y) const; + bool arePeers(const RowNumber & peer_group_last_row, const RowNumber & current_row) const; void advanceFrameStart(); void advanceFrameEndCurrentRow(); @@ -202,6 +202,7 @@ struct WindowTransformAction // The row for which we are now computing the window functions. RowNumber current_row; + // The start of current peer group, needed for CURRENT ROW frame start. // For ROWS frame, always equal to the current row, and for RANGE and GROUP // frames may be earlier. diff --git a/dbms/src/Debug/MockExecutor/FuncSigMap.cpp b/dbms/src/Debug/MockExecutor/FuncSigMap.cpp index 72ded892845..1bfb62e483e 100644 --- a/dbms/src/Debug/MockExecutor/FuncSigMap.cpp +++ b/dbms/src/Debug/MockExecutor/FuncSigMap.cpp @@ -96,5 +96,6 @@ std::unordered_map window_func_name_to_sig({ {"DenseRank", tipb::ExprType::DenseRank}, {"Lead", tipb::ExprType::Lead}, {"Lag", tipb::ExprType::Lag}, + {"FirstValue", tipb::ExprType::FirstValue}, }); } // namespace DB::tests diff --git a/dbms/src/Debug/MockExecutor/WindowBinder.cpp b/dbms/src/Debug/MockExecutor/WindowBinder.cpp index 0642300cecb..42f4e94f6bd 100644 --- a/dbms/src/Debug/MockExecutor/WindowBinder.cpp +++ b/dbms/src/Debug/MockExecutor/WindowBinder.cpp @@ -17,6 +17,8 @@ #include #include #include +#include + namespace DB::mock { @@ -73,6 +75,17 @@ bool WindowBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collat ft->set_decimal(first_arg_type.decimal()); break; } + case tipb::ExprType::FirstValue: + { + assert(window_expr->children_size() == 1); + const auto arg_type = window_expr->children(0).field_type(); + ft->set_tp(arg_type.tp()); + ft->set_flag(arg_type.flag()); + ft->set_collate(arg_type.collate()); + ft->set_flen(arg_type.flen()); + ft->set_decimal(arg_type.decimal()); + break; + } default: ft->set_tp(TiDB::TypeLongLong); ft->set_flag(TiDB::ColumnFlagBinary); @@ -202,6 +215,11 @@ ExecutorBinderPtr compileWindow(ExecutorBinderPtr input, size_t & executor_index } break; } + case tipb::ExprType::FirstValue: + { + ci = children_ci[0]; + break; + } default: throw Exception(fmt::format("Unsupported window function {}", func->name), ErrorCodes::LOGICAL_ERROR); } diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index 8c0930c8ac1..1512f31846e 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -39,6 +39,7 @@ const std::unordered_map window_func_map({ {tipb::ExprType::RowNumber, "row_number"}, {tipb::ExprType::Lead, "lead"}, {tipb::ExprType::Lag, "lag"}, + {tipb::ExprType::FirstValue, "first_value"}, }); const std::unordered_map agg_func_map({ @@ -1030,10 +1031,10 @@ bool isWindowFunctionExpr(const tipb::Expr & expr) case tipb::ExprType::DenseRank: case tipb::ExprType::Lead: case tipb::ExprType::Lag: + case tipb::ExprType::FirstValue: // case tipb::ExprType::CumeDist: // case tipb::ExprType::PercentRank: // case tipb::ExprType::Ntile: - // case tipb::ExprType::FirstValue: // case tipb::ExprType::LastValue: // case tipb::ExprType::NthValue: return true; diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 9d4c8afbda1..db8dc3aac45 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -295,5 +295,6 @@ MockWindowFrame buildDefaultRowsFrame(); #define Lag1(expr) makeASTFunction("Lag", (expr)) #define Lag2(expr1, expr2) makeASTFunction("Lag", (expr1), (expr2)) #define Lag3(expr1, expr2, expr3) makeASTFunction("Lag", (expr1), (expr2), (expr3)) +#define FirstValue(expr) makeASTFunction("FirstValue", (expr)) } // namespace tests } // namespace DB diff --git a/dbms/src/WindowFunctions/IWindowFunction.cpp b/dbms/src/WindowFunctions/IWindowFunction.cpp index 0a0e48b30a6..97d6f536b5b 100644 --- a/dbms/src/WindowFunctions/IWindowFunction.cpp +++ b/dbms/src/WindowFunctions/IWindowFunction.cpp @@ -133,6 +133,44 @@ struct WindowFunctionRowNumber final : public IWindowFunction } }; +struct WindowFunctionFirstValue final : public IWindowFunction +{ +public: + static constexpr auto name = "first_value"; + + explicit WindowFunctionFirstValue(const DataTypes & argument_types_) + : IWindowFunction(argument_types_) + { + assert(argument_types_.size() == 1); + return_type = argument_types_[0]; + } + + String getName() const override + { + return name; + } + + DataTypePtr getReturnType() const override + { + return return_type; + } + + void windowInsertResultInto( + WindowTransformAction & action, + size_t function_index, + const ColumnNumbers & arguments) override + { + assert(action.frame_started); + IColumn & to = *action.blockAt(action.current_row).output_columns[function_index]; + const auto & value_column = *action.inputAt(action.frame_start)[arguments[0]]; + const auto & value_field = value_column[action.frame_start.row]; + to.insert(value_field); + } + +private: + DataTypePtr return_type; +}; + /** LEAD/LAG([,offset[, default_value]]) OVER ( PARTITION BY (expr) @@ -319,5 +357,6 @@ void registerWindowFunctions(WindowFunctionFactory & factory) factory.registerFunction(); factory.registerFunction>(); factory.registerFunction>(); + factory.registerFunction(); } } // namespace DB diff --git a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp new file mode 100644 index 00000000000..637194f5e23 --- /dev/null +++ b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp @@ -0,0 +1,140 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB::tests +{ +// TODO Tests with frame should be added +class FirstValue : public DB::tests::ExecutorTest +{ + static const size_t max_concurrency_level = 10; +public: + static constexpr auto value_col_name = "first_value"; + const ASTPtr value_col = col(value_col_name); + + void initializeContext() override + { + ExecutorTest::initializeContext(); + } + + void executeWithConcurrencyAndBlockSize(const std::shared_ptr & request, const ColumnsWithTypeAndName & expect_columns) + { + std::vector block_sizes{1, 2, 3, 4, DEFAULT_BLOCK_SIZE}; + for (auto block_size : block_sizes) + { + context.context->setSetting("max_block_size", Field(static_cast(block_size))); + ASSERT_COLUMNS_EQ_R(expect_columns, executeStreams(request)); + ASSERT_COLUMNS_EQ_UR(expect_columns, executeStreams(request, 2)); + ASSERT_COLUMNS_EQ_UR(expect_columns, executeStreams(request, max_concurrency_level)); + } + } + + void executeFunctionAndAssert( + const ColumnWithTypeAndName & result, + const ASTPtr & function, + const ColumnsWithTypeAndName & input) + { + ColumnsWithTypeAndName actual_input = input; + assert(actual_input.size() == 3); + TiDB::TP value_tp = dataTypeToTP(actual_input[2].type); + + actual_input[0].name = "partition"; + actual_input[1].name = "order"; + actual_input[2].name = value_col_name; + context.addMockTable( + {"test_db", "test_table_for_lead_lag"}, + {{"partition", TiDB::TP::TypeLongLong, actual_input[0].type->isNullable()}, + {"order", TiDB::TP::TypeLongLong, actual_input[1].type->isNullable()}, + {value_col_name, value_tp, actual_input[2].type->isNullable()}}, + actual_input); + + auto request = context + .scan("test_db", "test_table_for_lead_lag") + .sort({{"partition", false}, {"order", false}}, true) + .window(function, {"order", false}, {"partition", false}, MockWindowFrame{}) + .build(context); + + ColumnsWithTypeAndName expect = input; + expect.push_back(result); + executeWithConcurrencyAndBlockSize(request, expect); + } + + template + void testInt() + { + executeFunctionAndAssert( + toVec({1, 2, 2, 2, 2, 6, 6, 6, 6, 6, 11, 11, 11}), + FirstValue(value_col), + {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})}); + + executeFunctionAndAssert( + toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), + FirstValue(value_col), + {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, {}, 3, 4, 5, {}, 7, 8, 9, 10, {}, 12, 13})}); + } + + template + void testFloat() + { + executeFunctionAndAssert( + toVec({1, 2, 2, 2, 2, 6, 6, 6, 6, 6, 11, 11, 11}), + FirstValue(value_col), + {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})}); + + executeFunctionAndAssert( + toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), + FirstValue(value_col), + {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, {}, 3, 4, 5, {}, 7, 8, 9, 10, {}, 12, 13})}); + } +}; + +TEST_F(FirstValue, firstValue) +try +{ + executeFunctionAndAssert( + toVec({"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}), + FirstValue(value_col), + {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}); + + executeFunctionAndAssert( + toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), + FirstValue(value_col), + {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}); + + // TODO support unsigned int. + testInt(); + testInt(); + testInt(); + testInt(); + + testFloat(); + testFloat(); +} +CATCH + +} // namespace DB::tests diff --git a/dbms/src/WindowFunctions/tests/gtest_lead_lag.cpp b/dbms/src/WindowFunctions/tests/gtest_lead_lag.cpp index 43ac9ac9d48..4951e78e32d 100644 --- a/dbms/src/WindowFunctions/tests/gtest_lead_lag.cpp +++ b/dbms/src/WindowFunctions/tests/gtest_lead_lag.cpp @@ -21,6 +21,7 @@ template using Limits = std::numeric_limits; // TODO Support more convenient testing framework for Window Function. +// TODO Tests with frame should be added class LeadLag : public DB::tests::ExecutorTest { static const size_t max_concurrency_level = 10; @@ -60,9 +61,9 @@ class LeadLag : public DB::tests::ExecutorTest actual_input[2].name = value_col_name; context.addMockTable( {"test_db", "test_table_for_lead_lag"}, - {{"partition", TiDB::TP::TypeLongLong}, - {"order", TiDB::TP::TypeLongLong}, - {value_col_name, value_tp}}, + {{"partition", TiDB::TP::TypeLongLong, actual_input[0].type->isNullable()}, + {"order", TiDB::TP::TypeLongLong, actual_input[1].type->isNullable()}, + {value_col_name, value_tp, actual_input[2].type->isNullable()}}, actual_input); auto request = context From 5fafd74f87b7643d9947c99079acd11332e882fe Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Sat, 6 May 2023 10:50:29 +0800 Subject: [PATCH 02/20] format --- dbms/src/WindowFunctions/tests/gtest_first_value.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp index 637194f5e23..ca3afe950f1 100644 --- a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp +++ b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp @@ -21,6 +21,7 @@ namespace DB::tests class FirstValue : public DB::tests::ExecutorTest { static const size_t max_concurrency_level = 10; + public: static constexpr auto value_col_name = "first_value"; const ASTPtr value_col = col(value_col_name); @@ -125,7 +126,7 @@ try {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}); - + // TODO support unsigned int. testInt(); testInt(); From 315eaa704af50e9ac88a9992317d729a58741cb8 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Sat, 6 May 2023 12:06:30 +0800 Subject: [PATCH 03/20] address comments --- dbms/src/Debug/MockExecutor/WindowBinder.cpp | 6 +----- dbms/src/WindowFunctions/IWindowFunction.cpp | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/dbms/src/Debug/MockExecutor/WindowBinder.cpp b/dbms/src/Debug/MockExecutor/WindowBinder.cpp index 42f4e94f6bd..b8a027d8112 100644 --- a/dbms/src/Debug/MockExecutor/WindowBinder.cpp +++ b/dbms/src/Debug/MockExecutor/WindowBinder.cpp @@ -79,11 +79,7 @@ bool WindowBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collat { assert(window_expr->children_size() == 1); const auto arg_type = window_expr->children(0).field_type(); - ft->set_tp(arg_type.tp()); - ft->set_flag(arg_type.flag()); - ft->set_collate(arg_type.collate()); - ft->set_flen(arg_type.flen()); - ft->set_decimal(arg_type.decimal()); + (*ft) = arg_type; break; } default: diff --git a/dbms/src/WindowFunctions/IWindowFunction.cpp b/dbms/src/WindowFunctions/IWindowFunction.cpp index 97d6f536b5b..2c0c1885825 100644 --- a/dbms/src/WindowFunctions/IWindowFunction.cpp +++ b/dbms/src/WindowFunctions/IWindowFunction.cpp @@ -141,7 +141,7 @@ struct WindowFunctionFirstValue final : public IWindowFunction explicit WindowFunctionFirstValue(const DataTypes & argument_types_) : IWindowFunction(argument_types_) { - assert(argument_types_.size() == 1); + RUNTIME_CHECK(argument_types_.size() == 1); return_type = argument_types_[0]; } From 4d247248dee4c6680de2a4a670c44dac3ef17b4d Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Sat, 6 May 2023 13:42:12 +0800 Subject: [PATCH 04/20] tweaking --- dbms/src/WindowFunctions/tests/gtest_first_value.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp index ca3afe950f1..2c33a4a6830 100644 --- a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp +++ b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp @@ -56,14 +56,14 @@ class FirstValue : public DB::tests::ExecutorTest actual_input[1].name = "order"; actual_input[2].name = value_col_name; context.addMockTable( - {"test_db", "test_table_for_lead_lag"}, + {"test_db", "test_table_for_first_value"}, {{"partition", TiDB::TP::TypeLongLong, actual_input[0].type->isNullable()}, {"order", TiDB::TP::TypeLongLong, actual_input[1].type->isNullable()}, {value_col_name, value_tp, actual_input[2].type->isNullable()}}, actual_input); auto request = context - .scan("test_db", "test_table_for_lead_lag") + .scan("test_db", "test_table_for_first_value") .sort({{"partition", false}, {"order", false}}, true) .window(function, {"order", false}, {"partition", false}, MockWindowFrame{}) .build(context); From 5c3d0b01cc6c034952b5ce807a1fec3561ec50e5 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 8 May 2023 11:46:36 +0800 Subject: [PATCH 05/20] add ft --- tests/fullstack-test/mpp/window.test | 54 +++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/tests/fullstack-test/mpp/window.test b/tests/fullstack-test/mpp/window.test index f3e04a15c36..e83cb271a21 100644 --- a/tests/fullstack-test/mpp/window.test +++ b/tests/fullstack-test/mpp/window.test @@ -1,4 +1,4 @@ -# Copyright 2022 PingCAP, Ltd. +# Copyright 2023 PingCAP, Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -55,3 +55,55 @@ mysql> use test; set @@tidb_isolation_read_engines='tiflash'; select count(*) fr +----------+ mysql> drop table if exists test.t1; mysql> drop table if exists test.t2; + +# Test first_value +mysql> drop table if exists test.first; +mysql> create table test.first(p int not null, o int not null, v varchar(30) null); +mysql> insert into first values (0, 0, "1"), (1, 1, "2"), (1, 2, "3"), (1, 3, "4"), (1, 4, "5"), (2, 5, "6"), (2, 6, "7"), (2, 7, "8"), (2, 8, "9"), (2, 9, "10"), (3, 10, "11"), (3, 11, "12"), (3, 12, "13"); +mysql> alter table test.first set tiflash replica 1; + +mysql> drop table if exists test.first1; +mysql> create table test.first1(p int not null, o int not null, v varchar(30) null); +mysql> insert into first1 values (0, 0, null), (1, 1, null), (1, 2, "3"), (1, 3, "4"), (1, 4, "5"), (2, 5, null), (2, 6, "7"), (2, 7, "8"), (2, 8, "9"), (2, 9, "10"), (3, 10, null), (3, 11, "12"), (3, 12, "13"); +mysql> alter table test.first1 set tiflash replica 1; + +func> wait_table test first +func> wait_table test first1 + +mysql> use test; set enforce_tidb_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from first; ++---+----+------+------+ +| p | o | v | a | ++---+----+------+------+ +| 0 | 0 | 1 | 1 | +| 1 | 1 | 2 | 2 | +| 1 | 2 | 3 | 2 | +| 1 | 3 | 4 | 2 | +| 1 | 4 | 5 | 2 | +| 3 | 10 | 11 | 11 | +| 3 | 11 | 12 | 11 | +| 3 | 12 | 13 | 11 | +| 2 | 5 | 6 | 6 | +| 2 | 6 | 7 | 6 | +| 2 | 7 | 8 | 6 | +| 2 | 8 | 9 | 6 | +| 2 | 9 | 10 | 6 | ++---+----+------+------+ + +mysql> use test; set enforce_tidb_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from first; ++---+----+------+------+ +| p | o | v | a | ++---+----+------+------+ +| 0 | 0 | NULL | NULL | +| 1 | 1 | NULL | NULL | +| 1 | 2 | 3 | NULL | +| 1 | 3 | 4 | NULL | +| 1 | 4 | 5 | NULL | +| 3 | 10 | NULL | NULL | +| 3 | 11 | 12 | NULL | +| 3 | 12 | 13 | NULL | +| 2 | 5 | NULL | NULL | +| 2 | 6 | 7 | NULL | +| 2 | 7 | 8 | NULL | +| 2 | 8 | 9 | NULL | +| 2 | 9 | 10 | NULL | ++---+----+------+------+ From b8b433856ec3914d4fa04f44ec5d49184d34ddb2 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 11 May 2023 13:21:36 +0800 Subject: [PATCH 06/20] fix ft --- tests/fullstack-test/mpp/window.test | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/fullstack-test/mpp/window.test b/tests/fullstack-test/mpp/window.test index e83cb271a21..c65ade11b39 100644 --- a/tests/fullstack-test/mpp/window.test +++ b/tests/fullstack-test/mpp/window.test @@ -59,18 +59,18 @@ mysql> drop table if exists test.t2; # Test first_value mysql> drop table if exists test.first; mysql> create table test.first(p int not null, o int not null, v varchar(30) null); -mysql> insert into first values (0, 0, "1"), (1, 1, "2"), (1, 2, "3"), (1, 3, "4"), (1, 4, "5"), (2, 5, "6"), (2, 6, "7"), (2, 7, "8"), (2, 8, "9"), (2, 9, "10"), (3, 10, "11"), (3, 11, "12"), (3, 12, "13"); +mysql> insert into test.first values (0, 0, "1"), (1, 1, "2"), (1, 2, "3"), (1, 3, "4"), (1, 4, "5"), (2, 5, "6"), (2, 6, "7"), (2, 7, "8"), (2, 8, "9"), (2, 9, "10"), (3, 10, "11"), (3, 11, "12"), (3, 12, "13"); mysql> alter table test.first set tiflash replica 1; mysql> drop table if exists test.first1; mysql> create table test.first1(p int not null, o int not null, v varchar(30) null); -mysql> insert into first1 values (0, 0, null), (1, 1, null), (1, 2, "3"), (1, 3, "4"), (1, 4, "5"), (2, 5, null), (2, 6, "7"), (2, 7, "8"), (2, 8, "9"), (2, 9, "10"), (3, 10, null), (3, 11, "12"), (3, 12, "13"); +mysql> insert into test.first1 values (0, 0, null), (1, 1, null), (1, 2, "3"), (1, 3, "4"), (1, 4, "5"), (2, 5, null), (2, 6, "7"), (2, 7, "8"), (2, 8, "9"), (2, 9, "10"), (3, 10, null), (3, 11, "12"), (3, 12, "13"); mysql> alter table test.first1 set tiflash replica 1; func> wait_table test first func> wait_table test first1 -mysql> use test; set enforce_tidb_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from first; +mysql> use test; set enforce_tidb_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from test.first; +---+----+------+------+ | p | o | v | a | +---+----+------+------+ @@ -89,7 +89,7 @@ mysql> use test; set enforce_tidb_mpp=1; select *, first_value(v) over (partitio | 2 | 9 | 10 | 6 | +---+----+------+------+ -mysql> use test; set enforce_tidb_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from first; +mysql> use test; set enforce_tidb_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from test.first1; +---+----+------+------+ | p | o | v | a | +---+----+------+------+ From 9a5d54ad8391cd03815ebaf24a3a15f2b80fa5e5 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 15 May 2023 17:07:53 +0800 Subject: [PATCH 07/20] fix ft --- tests/fullstack-test/mpp/window.test | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/fullstack-test/mpp/window.test b/tests/fullstack-test/mpp/window.test index c65ade11b39..aa5bbe24e18 100644 --- a/tests/fullstack-test/mpp/window.test +++ b/tests/fullstack-test/mpp/window.test @@ -70,7 +70,7 @@ mysql> alter table test.first1 set tiflash replica 1; func> wait_table test first func> wait_table test first1 -mysql> use test; set enforce_tidb_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from test.first; +mysql> use test; set tidb_enforce_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from test.first; +---+----+------+------+ | p | o | v | a | +---+----+------+------+ @@ -89,7 +89,7 @@ mysql> use test; set enforce_tidb_mpp=1; select *, first_value(v) over (partitio | 2 | 9 | 10 | 6 | +---+----+------+------+ -mysql> use test; set enforce_tidb_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from test.first1; +mysql> use test; set tidb_enforce_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from test.first1; +---+----+------+------+ | p | o | v | a | +---+----+------+------+ From caf46f12bb284245b1f258b5fe3547ea3086cd16 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 16 May 2023 11:01:26 +0800 Subject: [PATCH 08/20] init --- .../DataStreams/WindowBlockInputStream.cpp | 119 +++++++++++++++++- dbms/src/DataStreams/WindowBlockInputStream.h | 9 ++ dbms/src/Interpreters/WindowDescription.h | 4 +- 3 files changed, 128 insertions(+), 4 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index ce2acfd9163..22f121de58b 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -16,6 +16,8 @@ #include #include +#include +#include namespace DB { @@ -267,6 +269,108 @@ Int64 WindowTransformAction::getPartitionEndRow(size_t block_rows) return left; } +RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, Int64 n) +{ + auto dist = distance(current_row, partition_start); + assert(dist >= 0); + if (dist <= n) + return partition_start; + + RowNumber result_row = current_row; + + // The step happens only in a block + if (static_cast(result_row.row) >= n) + { + result_row.row -= n; + return result_row; + } + + // The step happens between blocks + n -= result_row.row + 1; + --result_row.block; + while (n > 0) + { + auto block = blockAt(result_row); + if (static_cast(block.rows) > n) + { + result_row.row = block.rows - n - 1; // index, so we need to -1 + break; + } + n -= block.rows; + --result_row.block; + } + return result_row; +} + +std::tuple WindowTransformAction::stepBackward(const RowNumber & current_row, Int64 n) +{ + // Distance is too long and partition_end is the longest distance. + auto dist = distance(partition_end, current_row); + assert(dist - 1 >= 0); + if (dist - 1 <= n) + return std::make_tuple(partition_end, partition_ended); + + // Now, frame_end is impossible to reach to partition_end. + RowNumber result_row = current_row; + auto block = blockAt(result_row); + + // The step happens only in a block + if (static_cast(block.rows - result_row.row - 1) >= n) + { + result_row.row += n; + return std::make_tuple(result_row, partition_ended); + } + + // The step happens between blocks + ++result_row.block; + result_row.row = 0; + n -= block.rows - result_row.row; + while (n > 0) + { + auto block_rows = static_cast(blockAt(result_row).rows); + if (n >= block_rows) + { + result_row.row = 0; + ++result_row.block; + n -= block_rows; + continue; + } + + result_row.row += n; + n = 0; + } + + return std::make_tuple(result_row, partition_ended); +} + +Int64 WindowTransformAction::distance(RowNumber left, RowNumber right) +{ + if (left.block == right.block) + return left.row - right.row; + + Int64 negative_sign = 1; + + // Ensure that left is larger than right + if (left.block < right.block) + { + negative_sign = -1; + std::swap(left, right); + } + + Int64 dist = left.row; + RowNumber tmp = left; + --tmp.block; + while (tmp.block > right.row) + { + dist += blockAt(tmp).rows; + --tmp.block; + } + + dist += blockAt(right).rows - right.row; + + return dist * negative_sign; +} + void WindowTransformAction::advanceFrameStart() { if (frame_started) @@ -291,6 +395,9 @@ void WindowTransformAction::advanceFrameStart() break; } case WindowFrame::BoundaryType::Offset: + frame_start = stepForward(current_row, window_description.frame.begin_offset); + frame_started = true; + break; default: throw Exception( ErrorCodes::NOT_IMPLEMENTED, @@ -391,6 +498,15 @@ void WindowTransformAction::advanceFrameEnd() break; } case WindowFrame::BoundaryType::Offset: + { + constexpr size_t frame_end_pos = 0; + constexpr size_t is_frame_end_pos = 1; + auto res = stepBackward(current_row, window_description.frame.end_offset); + frame_ended = std::get(res); + if (frame_ended) + frame_end = std::get(res); + break; + } default: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "The frame end type '{}' is not implemented", @@ -607,8 +723,7 @@ void WindowTransformAction::tryCalculate() partition_start = partition_end; advanceRowNumber(partition_end); partition_ended = false; - // We have to reset the frame and other pointers when the new partition - // starts. + // We have to reset the frame and other pointers when the new partition starts. frame_start = partition_start; frame_end = partition_start; prev_frame_start = partition_start; diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index 36ffccbe57e..b6ec5babd8e 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -71,6 +71,15 @@ struct RowNumber /* Implementation details.*/ struct WindowTransformAction { +private: + // Used for calculating the frame start + RowNumber stepForward(const RowNumber & current_row, Int64 n); + // Used for calculating the frame end + std::tuple stepBackward(const RowNumber & current_row, Int64 n); + + // distance is left - right. + Int64 distance(RowNumber left, RowNumber right); +public: WindowTransformAction(const Block & input_header, const WindowDescription & window_description_, const String & req_id); void cleanUp(); diff --git a/dbms/src/Interpreters/WindowDescription.h b/dbms/src/Interpreters/WindowDescription.h index c51be3cd224..e986f920f65 100644 --- a/dbms/src/Interpreters/WindowDescription.h +++ b/dbms/src/Interpreters/WindowDescription.h @@ -61,11 +61,11 @@ struct WindowFrame FrameType type = FrameType::Ranges; BoundaryType begin_type = BoundaryType::Unbounded; - Field begin_offset = Field(UInt64(0)); + UInt64 begin_offset = 0; bool begin_preceding = true; BoundaryType end_type = BoundaryType::Unbounded; - Field end_offset = Field(UInt64(0)); + UInt64 end_offset = 0; bool end_preceding = false; bool operator==(const WindowFrame & other) const From b34c4f299fdc5f1b093704be8f4d300003f6ed1e Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 16 May 2023 18:22:23 +0800 Subject: [PATCH 09/20] add frame type test for first_value --- .../DataStreams/WindowBlockInputStream.cpp | 16 ++-- dbms/src/Debug/MockExecutor/WindowBinder.h | 1 + .../src/Flash/tests/gtest_window_executor.cpp | 6 +- .../tests/gtest_first_value.cpp | 82 +++++++++++++++---- 4 files changed, 76 insertions(+), 29 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index 796afe4e23a..88ddca1fd61 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -293,9 +293,10 @@ RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, Int6 // The step happens between blocks n -= result_row.row + 1; --result_row.block; + result_row.row = blockAt(result_row).rows - 1; while (n > 0) { - auto block = blockAt(result_row); + auto & block = blockAt(result_row); if (static_cast(block.rows) > n) { result_row.row = block.rows - n - 1; // index, so we need to -1 @@ -303,6 +304,7 @@ RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, Int6 } n -= block.rows; --result_row.block; + result_row.row = blockAt(result_row).rows - 1; } return result_row; } @@ -317,7 +319,7 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber // Now, frame_end is impossible to reach to partition_end. RowNumber result_row = current_row; - auto block = blockAt(result_row); + auto & block = blockAt(result_row); // The step happens only in a block if (static_cast(block.rows - result_row.row - 1) >= n) @@ -365,7 +367,7 @@ Int64 WindowTransformAction::distance(RowNumber left, RowNumber right) Int64 dist = left.row; RowNumber tmp = left; --tmp.block; - while (tmp.block > right.row) + while (tmp.block > right.block) { dist += blockAt(tmp).rows; --tmp.block; @@ -378,11 +380,6 @@ Int64 WindowTransformAction::distance(RowNumber left, RowNumber right) void WindowTransformAction::advanceFrameStart() { - if (frame_started) - { - return; - } - switch (window_description.frame.begin_type) { case WindowFrame::BoundaryType::Unbounded: @@ -392,6 +389,9 @@ void WindowTransformAction::advanceFrameStart() break; case WindowFrame::BoundaryType::Current: { + if (frame_started) + return; + RUNTIME_CHECK_MSG( only_have_pure_window, "window function only support pure window function in WindowFrame::BoundaryType::Current now."); diff --git a/dbms/src/Debug/MockExecutor/WindowBinder.h b/dbms/src/Debug/MockExecutor/WindowBinder.h index b9745d3358b..97e272c0ebe 100644 --- a/dbms/src/Debug/MockExecutor/WindowBinder.h +++ b/dbms/src/Debug/MockExecutor/WindowBinder.h @@ -19,6 +19,7 @@ namespace DB::mock { +// true: unbounded, false: not unbounded using MockWindowFrameBound = std::tuple; struct MockWindowFrame { diff --git a/dbms/src/Flash/tests/gtest_window_executor.cpp b/dbms/src/Flash/tests/gtest_window_executor.cpp index 5fdcc802cff..3976ee98270 100644 --- a/dbms/src/Flash/tests/gtest_window_executor.cpp +++ b/dbms/src/Flash/tests/gtest_window_executor.cpp @@ -330,7 +330,7 @@ try /* select count(1) from ( - SELECT + SELECT ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`), ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order` DESC) FROM `test_db`.`test_table` @@ -347,7 +347,7 @@ try /* select count(1) from ( - SELECT + SELECT ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`), ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`) FROM `test_db`.`test_table` @@ -370,7 +370,7 @@ try /* select count(1) from ( - SELECT + SELECT Rank() OVER (PARTITION BY `partition` ORDER BY `order`), DenseRank() OVER (PARTITION BY `partition` ORDER BY `order`) FROM `test_db`.`test_table` diff --git a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp index 2c33a4a6830..e43be15988b 100644 --- a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp +++ b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp @@ -14,10 +14,12 @@ #include #include +#include +#include +#include namespace DB::tests { -// TODO Tests with frame should be added class FirstValue : public DB::tests::ExecutorTest { static const size_t max_concurrency_level = 10; @@ -46,7 +48,8 @@ class FirstValue : public DB::tests::ExecutorTest void executeFunctionAndAssert( const ColumnWithTypeAndName & result, const ASTPtr & function, - const ColumnsWithTypeAndName & input) + const ColumnsWithTypeAndName & input, + MockWindowFrame mock_frame = MockWindowFrame()) { ColumnsWithTypeAndName actual_input = input; assert(actual_input.size() == 3); @@ -65,7 +68,7 @@ class FirstValue : public DB::tests::ExecutorTest auto request = context .scan("test_db", "test_table_for_first_value") .sort({{"partition", false}, {"order", false}}, true) - .window(function, {"order", false}, {"partition", false}, MockWindowFrame{}) + .window(function, {"order", false}, {"partition", false}, mock_frame) .build(context); ColumnsWithTypeAndName expect = input; @@ -113,24 +116,67 @@ class FirstValue : public DB::tests::ExecutorTest TEST_F(FirstValue, firstValue) try { - executeFunctionAndAssert( - toVec({"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}), - FirstValue(value_col), - {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}); - - executeFunctionAndAssert( - toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), - FirstValue(value_col), - {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}); + { + // frame type: unbounded + executeFunctionAndAssert( + toVec({"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}), + FirstValue(value_col), + {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}); + + executeFunctionAndAssert( + toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), + FirstValue(value_col), + {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}); + } + + { + // frame type: offset + MockWindowFrame frame; + frame.type = tipb::WindowFrameType::Rows; + frame.start = std::make_tuple(tipb::WindowBoundType::Following, false, 0); + + std::vector frame_start_offset{0, 1, 3, 10}; + std::vector> res_not_null{ + {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"}, + {"1", "2", "2", "3", "4", "6", "6", "7", "8", "9", "11", "11", "12"}, + {"1", "2", "2", "2", "2", "6", "6", "6", "6", "7", "11", "11", "11"}, + {"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"} + }; + std::vector>> res_null{ + {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"}, + {{}, {}, {}, "3", "4", {}, {}, "7", "8", "9", {}, {}, "12"}, + {{}, {}, {}, {}, {}, {}, {}, {}, {}, "7", {}, {}, {}}, + {{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}} + }; + + for (size_t i = 0; i < frame_start_offset.size(); ++i) + { + frame.start = std::make_tuple(tipb::WindowBoundType::Preceding, false, frame_start_offset[i]); + + executeFunctionAndAssert( + toVec(res_not_null[i]), + FirstValue(value_col), + {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}, + frame); + + executeFunctionAndAssert( + toNullableVec(res_null[i]), + FirstValue(value_col), + {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}, + frame); + } + } // TODO support unsigned int. testInt(); - testInt(); - testInt(); testInt(); testFloat(); From b9fca2f64a2286ecd81ce5a02454f7b06b9d7508 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 18 May 2023 18:24:33 +0800 Subject: [PATCH 10/20] need fix --- .../DataStreams/WindowBlockInputStream.cpp | 7 +- dbms/src/DataStreams/WindowBlockInputStream.h | 1 + .../tests/gtest_first_value.cpp | 25 +++--- .../tests/gtest_last_value.cpp | 85 +++++++++++++------ 4 files changed, 78 insertions(+), 40 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index 1c088342eec..8b6d6ff9c8d 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -311,6 +311,11 @@ RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, Int6 std::tuple WindowTransformAction::stepBackward(const RowNumber & current_row, Int64 n) { + // Range of rows is [frame_start, frame_end), + // and frame_end position is behind the position of the last frame row. + // So we need the ++n. + ++n; + // Distance is too long and partition_end is the longest distance. auto dist = distance(partition_end, current_row); assert(dist - 1 >= 0); @@ -793,7 +798,7 @@ RowNumber WindowTransformAction::getPreviousRowNumber(const RowNumber & row_num) } --prev_row_num.block; - assert(prev_row_num.block - first_block_number < window_blocks.size()); + assert(static_cast(prev_row_num.block - first_block_number) < static_cast(window_blocks.size())); const auto new_block_rows = blockAt(prev_row_num).rows; prev_row_num.row = new_block_rows - 1; return prev_row_num; diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index 7e01b720ac2..954126fa0f8 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -79,6 +79,7 @@ struct WindowTransformAction // distance is left - right. Int64 distance(RowNumber left, RowNumber right); + public: WindowTransformAction(const Block & input_header, const WindowDescription & window_description_, const String & req_id); diff --git a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp index e43be15988b..f0aa236e44c 100644 --- a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp +++ b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp @@ -14,10 +14,11 @@ #include #include -#include #include #include +#include + namespace DB::tests { class FirstValue : public DB::tests::ExecutorTest @@ -122,15 +123,15 @@ try toVec({"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}), FirstValue(value_col), {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}); + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}); executeFunctionAndAssert( toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), FirstValue(value_col), {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}); + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}); } { @@ -144,14 +145,12 @@ try {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"}, {"1", "2", "2", "3", "4", "6", "6", "7", "8", "9", "11", "11", "12"}, {"1", "2", "2", "2", "2", "6", "6", "6", "6", "7", "11", "11", "11"}, - {"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"} - }; + {"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}}; std::vector>> res_null{ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"}, {{}, {}, {}, "3", "4", {}, {}, "7", "8", "9", {}, {}, "12"}, {{}, {}, {}, {}, {}, {}, {}, {}, {}, "7", {}, {}, {}}, - {{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}} - }; + {{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}}; for (size_t i = 0; i < frame_start_offset.size(); ++i) { @@ -161,16 +160,16 @@ try toVec(res_not_null[i]), FirstValue(value_col), {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}, + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}, frame); executeFunctionAndAssert( toNullableVec(res_null[i]), FirstValue(value_col), {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}, + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}, frame); } } diff --git a/dbms/src/WindowFunctions/tests/gtest_last_value.cpp b/dbms/src/WindowFunctions/tests/gtest_last_value.cpp index 58d25def848..ae3a462ebc9 100644 --- a/dbms/src/WindowFunctions/tests/gtest_last_value.cpp +++ b/dbms/src/WindowFunctions/tests/gtest_last_value.cpp @@ -51,7 +51,7 @@ class LastValue : public DB::tests::ExecutorTest const ColumnWithTypeAndName & result, const ASTPtr & function, const ColumnsWithTypeAndName & input, - const MockWindowFrame & frame) + MockWindowFrame frame = MockWindowFrame()) { ColumnsWithTypeAndName actual_input = input; assert(actual_input.size() == 3); @@ -81,7 +81,6 @@ class LastValue : public DB::tests::ExecutorTest template void testInt() { - // TODO test with bounded_type_frame MockWindowFrame unbounded_type_frame{ tipb::WindowFrameType::Rows, std::make_tuple(tipb::WindowBoundType::Preceding, true, 0), @@ -107,7 +106,6 @@ class LastValue : public DB::tests::ExecutorTest template void testFloat() { - // TODO test with bounded_type_frame MockWindowFrame unbounded_type_frame{ tipb::WindowFrameType::Rows, std::make_tuple(tipb::WindowBoundType::Preceding, true, 0), @@ -134,32 +132,67 @@ class LastValue : public DB::tests::ExecutorTest TEST_F(LastValue, lastValue) try { - // TODO test with bounded_type_frame - MockWindowFrame unbounded_type_frame{ - tipb::WindowFrameType::Rows, - std::make_tuple(tipb::WindowBoundType::Preceding, true, 0), - std::make_tuple(tipb::WindowBoundType::Following, true, 0)}; - - executeFunctionAndAssert( - toVec({"1", "5", "5", "5", "5", "10", "10", "10", "10", "10", "13", "13", "13"}), - LastValue(value_col), - {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}, - unbounded_type_frame); - - executeFunctionAndAssert( - toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), - LastValue(value_col), - {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toNullableVec(/*value*/ {{}, "2", "3", "4", {}, "6", "7", "8", "9", {}, "11", "12", {}})}, - unbounded_type_frame); + { + // frame type: unbounded + executeFunctionAndAssert( + toVec({"1", "5", "5", "5", "5", "10", "10", "10", "10", "10", "13", "13", "13"}), + LastValue(value_col), + {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}); + + executeFunctionAndAssert( + toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), + LastValue(value_col), + {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, "2", "3", "4", {}, "6", "7", "8", "9", {}, "11", "12", {}})}); + } + + { + // frame type: offset + MockWindowFrame frame; + frame.type = tipb::WindowFrameType::Rows; + frame.end = std::make_tuple(tipb::WindowBoundType::Following, false, 0); + + std::vector frame_start_offset{0, 1, 3, 10}; + std::vector> res_not_null{ + {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"}, + {"1", "3", "4", "5", "5", "7", "8", "9", "10", "10", "12", "13", "13"}, + {"1", "5", "5", "5", "5", "9", "10", "10", "10", "10", "13", "13", "13"}, + {"1", "5", "5", "5", "5", "10", "10", "10", "10", "10", "13", "13", "13"}, + }; + std::vector>> res_null{ + {{}, "2", "3", "4", {}, "6", "7", "8", "9", {}, "11", "12", {}}, + {{}, "3", "4", {}, {}, "7", "8", "9", {}, {}, "12", {}, {}}, + {{}, {}, {}, {}, {}, "9", {}, {}, {}, {}, {}, {}, {}}, + {{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}, + }; + + for (size_t i = 0; i < frame_start_offset.size(); ++i) + { + std::cout << "iiiiiii: " << i << std::endl; + frame.end = std::make_tuple(tipb::WindowBoundType::Following, false, frame_start_offset[i]); + executeFunctionAndAssert( + toVec(res_not_null[i]), + LastValue(value_col), + {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}, + frame); + + executeFunctionAndAssert( + toNullableVec(res_null[i]), + LastValue(value_col), + {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, "2", "3", "4", {}, "6", "7", "8", "9", {}, "11", "12", {}})}, + frame); + } + } // TODO support unsigned int. testInt(); - testInt(); - testInt(); testInt(); testFloat(); From 9ec608bb526afc5a851041c0adb5897e069800c9 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 19 May 2023 11:33:57 +0800 Subject: [PATCH 11/20] fix --- .../DataStreams/WindowBlockInputStream.cpp | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index 8b6d6ff9c8d..eb6e83e1e17 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -313,46 +313,47 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber { // Range of rows is [frame_start, frame_end), // and frame_end position is behind the position of the last frame row. - // So we need the ++n. + // So we need to ++n. ++n; - // Distance is too long and partition_end is the longest distance. auto dist = distance(partition_end, current_row); - assert(dist - 1 >= 0); - if (dist - 1 <= n) + assert(dist >= 1); + + // Distance is too long and partition_end is the longest distance. + if (dist <= n) return std::make_tuple(partition_end, partition_ended); // Now, frame_end is impossible to reach to partition_end. - RowNumber result_row = current_row; - auto & block = blockAt(result_row); + RowNumber frame_end_row = current_row; + auto & block = blockAt(frame_end_row); // The step happens only in a block - if (static_cast(block.rows - result_row.row - 1) >= n) + if (static_cast(block.rows - frame_end_row.row - 1) >= n) { - result_row.row += n; - return std::make_tuple(result_row, partition_ended); + frame_end_row.row += n; + return std::make_tuple(frame_end_row, partition_ended); } // The step happens between blocks - ++result_row.block; - result_row.row = 0; - n -= block.rows - result_row.row; + n -= block.rows - frame_end_row.row; + ++frame_end_row.block; + frame_end_row.row = 0; while (n > 0) { - auto block_rows = static_cast(blockAt(result_row).rows); + auto block_rows = static_cast(blockAt(frame_end_row).rows); if (n >= block_rows) { - result_row.row = 0; - ++result_row.block; + frame_end_row.row = 0; + ++frame_end_row.block; n -= block_rows; continue; } - result_row.row += n; + frame_end_row.row += n; n = 0; } - return std::make_tuple(result_row, partition_ended); + return std::make_tuple(frame_end_row, partition_ended); } Int64 WindowTransformAction::distance(RowNumber left, RowNumber right) From aeb33d9bdf33e55209454672a3e32f195bcba639 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 19 May 2023 15:34:21 +0800 Subject: [PATCH 12/20] remove useless code --- dbms/src/WindowFunctions/tests/gtest_last_value.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/WindowFunctions/tests/gtest_last_value.cpp b/dbms/src/WindowFunctions/tests/gtest_last_value.cpp index ae3a462ebc9..a41147f01be 100644 --- a/dbms/src/WindowFunctions/tests/gtest_last_value.cpp +++ b/dbms/src/WindowFunctions/tests/gtest_last_value.cpp @@ -171,7 +171,6 @@ try for (size_t i = 0; i < frame_start_offset.size(); ++i) { - std::cout << "iiiiiii: " << i << std::endl; frame.end = std::make_tuple(tipb::WindowBoundType::Following, false, frame_start_offset[i]); executeFunctionAndAssert( toVec(res_not_null[i]), From 8d42fd3eb65cf5213878083f8c2bec7935e24e9f Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 25 May 2023 11:19:15 +0800 Subject: [PATCH 13/20] init --- .../DataStreams/WindowBlockInputStream.cpp | 17 ++--- tests/fullstack-test/mpp/window.test | 76 +++++++++++++++++++ 2 files changed, 82 insertions(+), 11 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index eb6e83e1e17..835432cd5f7 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -319,7 +319,7 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber auto dist = distance(partition_end, current_row); assert(dist >= 1); - // Distance is too long and partition_end is the longest distance. + // Offset is too large and the partition_end is the longest position we can reach if (dist <= n) return std::make_tuple(partition_end, partition_ended); @@ -386,6 +386,9 @@ Int64 WindowTransformAction::distance(RowNumber left, RowNumber right) void WindowTransformAction::advanceFrameStart() { + if (frame_started) + return; + switch (window_description.frame.begin_type) { case WindowFrame::BoundaryType::Unbounded: @@ -395,9 +398,6 @@ void WindowTransformAction::advanceFrameStart() break; case WindowFrame::BoundaryType::Current: { - if (frame_started) - return; - RUNTIME_CHECK_MSG( only_have_pure_window, "window function only support pure window function in WindowFrame::BoundaryType::Current now."); @@ -510,12 +510,7 @@ void WindowTransformAction::advanceFrameEnd() } case WindowFrame::BoundaryType::Offset: { - constexpr size_t frame_end_pos = 0; - constexpr size_t is_frame_end_pos = 1; - auto res = stepBackward(current_row, window_description.frame.end_offset); - frame_ended = std::get(res); - if (frame_ended) - frame_end = std::get(res); + std::tie(frame_end, frame_ended) = stepBackward(current_row, window_description.frame.end_offset); break; } default: @@ -799,7 +794,7 @@ RowNumber WindowTransformAction::getPreviousRowNumber(const RowNumber & row_num) } --prev_row_num.block; - assert(static_cast(prev_row_num.block - first_block_number) < static_cast(window_blocks.size())); + assert(static_cast(prev_row_num.block) - static_cast(first_block_number) < static_cast(window_blocks.size())); const auto new_block_rows = blockAt(prev_row_num).rows; prev_row_num.row = new_block_rows - 1; return prev_row_num; diff --git a/tests/fullstack-test/mpp/window.test b/tests/fullstack-test/mpp/window.test index ed636a54c06..f417d50a69a 100644 --- a/tests/fullstack-test/mpp/window.test +++ b/tests/fullstack-test/mpp/window.test @@ -89,6 +89,44 @@ mysql> use test; set tidb_enforce_mpp=1; select *, first_value(v) over (partitio | 2 | 9 | 10 | 6 | +---+----+------+------+ +mysql> use test; set tidb_enforce_mpp=1; select *, first_value(v) over (partition by p order by o asc rows between 2 preceding and 2 following) as a from test.first; ++---+----+------+------+ +| p | o | v | a | ++---+----+------+------+ +| 0 | 0 | 1 | 1 | +| 1 | 1 | 2 | 2 | +| 1 | 2 | 3 | 2 | +| 1 | 3 | 4 | 2 | +| 1 | 4 | 5 | 3 | +| 2 | 5 | 6 | 6 | +| 2 | 6 | 7 | 6 | +| 2 | 7 | 8 | 6 | +| 2 | 8 | 9 | 7 | +| 2 | 9 | 10 | 8 | +| 3 | 10 | 11 | 11 | +| 3 | 11 | 12 | 11 | +| 3 | 12 | 13 | 11 | ++---+----+------+------+ + +mysql> use test; set tidb_enforce_mpp=1; select *, first_value(v) over (partition by p order by o asc rows between 0 preceding and 0 following) as a from test.first; ++---+----+------+------+ +| p | o | v | a | ++---+----+------+------+ +| 0 | 0 | 1 | 1 | +| 1 | 1 | 2 | 2 | +| 1 | 2 | 3 | 3 | +| 1 | 3 | 4 | 4 | +| 1 | 4 | 5 | 5 | +| 2 | 5 | 6 | 6 | +| 2 | 6 | 7 | 7 | +| 2 | 7 | 8 | 8 | +| 2 | 8 | 9 | 9 | +| 2 | 9 | 10 | 10 | +| 3 | 10 | 11 | 11 | +| 3 | 11 | 12 | 12 | +| 3 | 12 | 13 | 13 | ++---+----+------+------+ + mysql> use test; set tidb_enforce_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from test.first1; +---+----+------+------+ | p | o | v | a | @@ -179,6 +217,44 @@ mysql> use test; set tidb_enforce_mpp=1; select *, last_value(v) over (partition | 3 | 12 | 13 | 13 | +---+----+------+------+ +mysql> use test; set tidb_enforce_mpp=1; select *, last_value(v) over (partition by p order by o asc rows between 2 preceding and 2 following) as a from test.last; ++---+----+------+------+ +| p | o | v | a | ++---+----+------+------+ +| 0 | 0 | 1 | 1 | +| 1 | 1 | 2 | 4 | +| 1 | 2 | 3 | 5 | +| 1 | 3 | 4 | 5 | +| 1 | 4 | 5 | 5 | +| 2 | 5 | 6 | 8 | +| 2 | 6 | 7 | 9 | +| 2 | 7 | 8 | 10 | +| 2 | 8 | 9 | 10 | +| 2 | 9 | 10 | 10 | +| 3 | 10 | 11 | 13 | +| 3 | 11 | 12 | 13 | +| 3 | 12 | 13 | 13 | ++---+----+------+------+ + +mysql> use test; set tidb_enforce_mpp=1; select *, last_value(v) over (partition by p order by o asc rows between 0 preceding and 0 following) as a from test.last; ++---+----+------+------+ +| p | o | v | a | ++---+----+------+------+ +| 0 | 0 | 1 | 1 | +| 1 | 1 | 2 | 2 | +| 1 | 2 | 3 | 3 | +| 1 | 3 | 4 | 4 | +| 1 | 4 | 5 | 5 | +| 2 | 5 | 6 | 6 | +| 2 | 6 | 7 | 7 | +| 2 | 7 | 8 | 8 | +| 2 | 8 | 9 | 9 | +| 2 | 9 | 10 | 10 | +| 3 | 10 | 11 | 11 | +| 3 | 11 | 12 | 12 | +| 3 | 12 | 13 | 13 | ++---+----+------+------+ + mysql> use test; set tidb_enforce_mpp=1; select *, last_value(v) over (partition by p order by o asc) as a from test.last1; +---+----+------+------+ | p | o | v | a | From bb521cd4cba8cd50bfe077297ef497fe1ae4a790 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 25 May 2023 13:56:14 +0800 Subject: [PATCH 14/20] Update dbms/src/DataStreams/WindowBlockInputStream.cpp Co-authored-by: SeaRise --- dbms/src/DataStreams/WindowBlockInputStream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index 835432cd5f7..eb40f50f0d7 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -291,7 +291,7 @@ RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, Int6 } // The step happens between blocks - n -= result_row.row + 1; + n -= (result_row.row + 1); --result_row.block; result_row.row = blockAt(result_row).rows - 1; while (n > 0) From 017ff6c81e464a077ebb738cfb6f8fc98b4d5f2f Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 25 May 2023 14:09:24 +0800 Subject: [PATCH 15/20] resolve comments --- dbms/src/DataStreams/WindowBlockInputStream.cpp | 16 ++++++++-------- dbms/src/DataStreams/WindowBlockInputStream.h | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index eb40f50f0d7..861f98fdf9c 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -274,17 +274,17 @@ Int64 WindowTransformAction::getPartitionEndRow(size_t block_rows) return left; } -RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, Int64 n) +RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, UInt64 n) { auto dist = distance(current_row, partition_start); assert(dist >= 0); - if (dist <= n) + if (dist <= static_cast(n)) return partition_start; RowNumber result_row = current_row; // The step happens only in a block - if (static_cast(result_row.row) >= n) + if (result_row.row >= n) { result_row.row -= n; return result_row; @@ -297,7 +297,7 @@ RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, Int6 while (n > 0) { auto & block = blockAt(result_row); - if (static_cast(block.rows) > n) + if (block.rows > n) { result_row.row = block.rows - n - 1; // index, so we need to -1 break; @@ -309,7 +309,7 @@ RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, Int6 return result_row; } -std::tuple WindowTransformAction::stepBackward(const RowNumber & current_row, Int64 n) +std::tuple WindowTransformAction::stepBackward(const RowNumber & current_row, UInt64 n) { // Range of rows is [frame_start, frame_end), // and frame_end position is behind the position of the last frame row. @@ -320,7 +320,7 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber assert(dist >= 1); // Offset is too large and the partition_end is the longest position we can reach - if (dist <= n) + if (dist <= static_cast(n)) return std::make_tuple(partition_end, partition_ended); // Now, frame_end is impossible to reach to partition_end. @@ -328,7 +328,7 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber auto & block = blockAt(frame_end_row); // The step happens only in a block - if (static_cast(block.rows - frame_end_row.row - 1) >= n) + if ((block.rows - frame_end_row.row - 1) >= n) { frame_end_row.row += n; return std::make_tuple(frame_end_row, partition_ended); @@ -341,7 +341,7 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber while (n > 0) { auto block_rows = static_cast(blockAt(frame_end_row).rows); - if (n >= block_rows) + if (static_cast(n) >= block_rows) { frame_end_row.row = 0; ++frame_end_row.block; diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index 954126fa0f8..711c9ace09b 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -73,9 +73,9 @@ struct WindowTransformAction { private: // Used for calculating the frame start - RowNumber stepForward(const RowNumber & current_row, Int64 n); + RowNumber stepForward(const RowNumber & current_row, UInt64 n); // Used for calculating the frame end - std::tuple stepBackward(const RowNumber & current_row, Int64 n); + std::tuple stepBackward(const RowNumber & current_row, UInt64 n); // distance is left - right. Int64 distance(RowNumber left, RowNumber right); From a4c1d4f35f9c495aad89ced619985240cedc64e3 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 26 May 2023 17:38:31 +0800 Subject: [PATCH 16/20] address comments --- dbms/src/DataStreams/WindowBlockInputStream.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index 861f98fdf9c..83f3b6c46e4 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -311,6 +311,9 @@ RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, UInt std::tuple WindowTransformAction::stepBackward(const RowNumber & current_row, UInt64 n) { + if (!partition_ended) + return std::make_tuple(RowNumber(), false); + // Range of rows is [frame_start, frame_end), // and frame_end position is behind the position of the last frame row. // So we need to ++n. @@ -340,8 +343,8 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber frame_end_row.row = 0; while (n > 0) { - auto block_rows = static_cast(blockAt(frame_end_row).rows); - if (static_cast(n) >= block_rows) + auto block_rows = blockAt(frame_end_row).rows; + if (n >= block_rows) { frame_end_row.row = 0; ++frame_end_row.block; @@ -794,7 +797,7 @@ RowNumber WindowTransformAction::getPreviousRowNumber(const RowNumber & row_num) } --prev_row_num.block; - assert(static_cast(prev_row_num.block) - static_cast(first_block_number) < static_cast(window_blocks.size())); + assert(prev_row_num.block < window_blocks.size() + first_block_number); const auto new_block_rows = blockAt(prev_row_num).rows; prev_row_num.row = new_block_rows - 1; return prev_row_num; From 66f4210f3b9ae552310866b313adb97ffcfa6655 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 29 May 2023 16:27:02 +0800 Subject: [PATCH 17/20] 1 --- dbms/src/DataStreams/WindowBlockInputStream.cpp | 10 +++++----- dbms/src/DataStreams/WindowBlockInputStream.h | 2 +- dbms/src/WindowFunctions/tests/gtest_first_value.cpp | 2 ++ dbms/src/WindowFunctions/tests/gtest_last_value.cpp | 2 ++ 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index 83f3b6c46e4..a0a4aff233c 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -277,8 +277,8 @@ Int64 WindowTransformAction::getPartitionEndRow(size_t block_rows) RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, UInt64 n) { auto dist = distance(current_row, partition_start); - assert(dist >= 0); - if (dist <= static_cast(n)) + + if (dist <= n) return partition_start; RowNumber result_row = current_row; @@ -320,10 +320,10 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber ++n; auto dist = distance(partition_end, current_row); - assert(dist >= 1); + RUNTIME_CHECK(dist >= 1); // Offset is too large and the partition_end is the longest position we can reach - if (dist <= static_cast(n)) + if (dist <= n) return std::make_tuple(partition_end, partition_ended); // Now, frame_end is impossible to reach to partition_end. @@ -359,7 +359,7 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber return std::make_tuple(frame_end_row, partition_ended); } -Int64 WindowTransformAction::distance(RowNumber left, RowNumber right) +UInt64 WindowTransformAction::distance(RowNumber left, RowNumber right) { if (left.block == right.block) return left.row - right.row; diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index 711c9ace09b..19e7c9e50b1 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -78,7 +78,7 @@ struct WindowTransformAction std::tuple stepBackward(const RowNumber & current_row, UInt64 n); // distance is left - right. - Int64 distance(RowNumber left, RowNumber right); + UInt64 distance(RowNumber left, RowNumber right); public: WindowTransformAction(const Block & input_header, const WindowDescription & window_description_, const String & req_id); diff --git a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp index f0aa236e44c..402af1c8e0c 100644 --- a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp +++ b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp @@ -176,6 +176,8 @@ try // TODO support unsigned int. testInt(); + testInt(); + testInt(); testInt(); testFloat(); diff --git a/dbms/src/WindowFunctions/tests/gtest_last_value.cpp b/dbms/src/WindowFunctions/tests/gtest_last_value.cpp index a41147f01be..585efa70324 100644 --- a/dbms/src/WindowFunctions/tests/gtest_last_value.cpp +++ b/dbms/src/WindowFunctions/tests/gtest_last_value.cpp @@ -192,6 +192,8 @@ try // TODO support unsigned int. testInt(); + testInt(); + testInt(); testInt(); testFloat(); From b19a22a38db8da45b17478cf15424b9a966f39d7 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 30 May 2023 14:29:47 +0800 Subject: [PATCH 18/20] 1 --- .../DataStreams/WindowBlockInputStream.cpp | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index a0a4aff233c..b7b2831fbe6 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -324,7 +324,7 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber // Offset is too large and the partition_end is the longest position we can reach if (dist <= n) - return std::make_tuple(partition_end, partition_ended); + return std::make_tuple(partition_end, true); // Now, frame_end is impossible to reach to partition_end. RowNumber frame_end_row = current_row; @@ -334,7 +334,7 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber if ((block.rows - frame_end_row.row - 1) >= n) { frame_end_row.row += n; - return std::make_tuple(frame_end_row, partition_ended); + return std::make_tuple(frame_end_row, true); } // The step happens between blocks @@ -356,7 +356,7 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber n = 0; } - return std::make_tuple(frame_end_row, partition_ended); + return std::make_tuple(frame_end_row, true); } UInt64 WindowTransformAction::distance(RowNumber left, RowNumber right) @@ -364,14 +364,7 @@ UInt64 WindowTransformAction::distance(RowNumber left, RowNumber right) if (left.block == right.block) return left.row - right.row; - Int64 negative_sign = 1; - - // Ensure that left is larger than right - if (left.block < right.block) - { - negative_sign = -1; - std::swap(left, right); - } + RUNTIME_CHECK_MSG(left.block > right.block, "left should always be bigger than right"); Int64 dist = left.row; RowNumber tmp = left; @@ -384,7 +377,7 @@ UInt64 WindowTransformAction::distance(RowNumber left, RowNumber right) dist += blockAt(right).rows - right.row; - return dist * negative_sign; + return dist; } void WindowTransformAction::advanceFrameStart() @@ -409,7 +402,13 @@ void WindowTransformAction::advanceFrameStart() break; } case WindowFrame::BoundaryType::Offset: - frame_start = stepForward(current_row, window_description.frame.begin_offset); + if (window_description.frame.type == WindowFrame::FrameType::Rows) + frame_start = stepForward(current_row, window_description.frame.begin_offset); + else + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + fmt::format("Frame type {}'s Offset BoundaryType is not implemented", + magic_enum::enum_name(window_description.frame.type))); frame_started = true; break; default: @@ -513,7 +512,13 @@ void WindowTransformAction::advanceFrameEnd() } case WindowFrame::BoundaryType::Offset: { - std::tie(frame_end, frame_ended) = stepBackward(current_row, window_description.frame.end_offset); + if (window_description.frame.type == WindowFrame::FrameType::Rows) + std::tie(frame_end, frame_ended) = stepBackward(current_row, window_description.frame.end_offset); + else + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + fmt::format("Frame type {}'s Offset BoundaryType is not implemented", + magic_enum::enum_name(window_description.frame.type))); break; } default: From 02a08d441c462ab8bf5468d81bc56a033583aa60 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 2 Jun 2023 09:37:49 +0800 Subject: [PATCH 19/20] 1 --- dbms/src/DataStreams/WindowBlockInputStream.cpp | 11 +++++++---- dbms/src/DataStreams/WindowBlockInputStream.h | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index 2acf5a93d30..f10a1f871de 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -273,7 +273,7 @@ Int64 WindowTransformAction::getPartitionEndRow(size_t block_rows) return left; } -RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, UInt64 n) +RowNumber WindowTransformAction::stepToFrameStart(const RowNumber & current_row, UInt64 n) { auto dist = distance(current_row, partition_start); @@ -308,7 +308,7 @@ RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, UInt return result_row; } -std::tuple WindowTransformAction::stepBackward(const RowNumber & current_row, UInt64 n) +std::tuple WindowTransformAction::stepToFrameEnd(const RowNumber & current_row, UInt64 n) { if (!partition_ended) return std::make_tuple(RowNumber(), false); @@ -361,7 +361,10 @@ std::tuple WindowTransformAction::stepBackward(const RowNumber UInt64 WindowTransformAction::distance(RowNumber left, RowNumber right) { if (left.block == right.block) + { + RUNTIME_CHECK_MSG(left.row >= right.row, "left should always be bigger than right"); return left.row - right.row; + } RUNTIME_CHECK_MSG(left.block > right.block, "left should always be bigger than right"); @@ -399,7 +402,7 @@ void WindowTransformAction::advanceFrameStart() } case WindowFrame::BoundaryType::Offset: if (window_description.frame.type == WindowFrame::FrameType::Rows) - frame_start = stepForward(current_row, window_description.frame.begin_offset); + frame_start = stepToFrameStart(current_row, window_description.frame.begin_offset); else throw Exception( ErrorCodes::NOT_IMPLEMENTED, @@ -505,7 +508,7 @@ void WindowTransformAction::advanceFrameEnd() case WindowFrame::BoundaryType::Offset: { if (window_description.frame.type == WindowFrame::FrameType::Rows) - std::tie(frame_end, frame_ended) = stepBackward(current_row, window_description.frame.end_offset); + std::tie(frame_end, frame_ended) = stepToFrameEnd(current_row, window_description.frame.end_offset); else throw Exception( ErrorCodes::NOT_IMPLEMENTED, diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index 586dc84b6d2..53804efa965 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -73,9 +73,9 @@ struct WindowTransformAction { private: // Used for calculating the frame start - RowNumber stepForward(const RowNumber & current_row, UInt64 n); + RowNumber stepToFrameStart(const RowNumber & current_row, UInt64 n); // Used for calculating the frame end - std::tuple stepBackward(const RowNumber & current_row, UInt64 n); + std::tuple stepToFrameEnd(const RowNumber & current_row, UInt64 n); // distance is left - right. UInt64 distance(RowNumber left, RowNumber right); From c4dbd556fa35e95a0b91cb32afe9c3c3c2bc2ab3 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 2 Jun 2023 12:02:36 +0800 Subject: [PATCH 20/20] rc --- .../DataStreams/WindowBlockInputStream.cpp | 46 ++++++++++--------- dbms/src/DataStreams/WindowBlockInputStream.h | 4 +- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index f10a1f871de..a3d773d3cc4 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -273,56 +273,58 @@ Int64 WindowTransformAction::getPartitionEndRow(size_t block_rows) return left; } -RowNumber WindowTransformAction::stepToFrameStart(const RowNumber & current_row, UInt64 n) +RowNumber WindowTransformAction::stepToFrameStart(const RowNumber & current_row, const WindowFrame & frame) { + auto step_num = frame.begin_offset; auto dist = distance(current_row, partition_start); - if (dist <= n) + if (dist <= step_num) return partition_start; RowNumber result_row = current_row; // The step happens only in a block - if (result_row.row >= n) + if (result_row.row >= step_num) { - result_row.row -= n; + result_row.row -= step_num; return result_row; } // The step happens between blocks - n -= (result_row.row + 1); + step_num -= (result_row.row + 1); --result_row.block; result_row.row = blockAt(result_row).rows - 1; - while (n > 0) + while (step_num > 0) { auto & block = blockAt(result_row); - if (block.rows > n) + if (block.rows > step_num) { - result_row.row = block.rows - n - 1; // index, so we need to -1 + result_row.row = block.rows - step_num - 1; // index, so we need to -1 break; } - n -= block.rows; + step_num -= block.rows; --result_row.block; result_row.row = blockAt(result_row).rows - 1; } return result_row; } -std::tuple WindowTransformAction::stepToFrameEnd(const RowNumber & current_row, UInt64 n) +std::tuple WindowTransformAction::stepToFrameEnd(const RowNumber & current_row, const WindowFrame & frame) { + auto step_num = frame.end_offset; if (!partition_ended) return std::make_tuple(RowNumber(), false); // Range of rows is [frame_start, frame_end), // and frame_end position is behind the position of the last frame row. // So we need to ++n. - ++n; + ++step_num; auto dist = distance(partition_end, current_row); RUNTIME_CHECK(dist >= 1); // Offset is too large and the partition_end is the longest position we can reach - if (dist <= n) + if (dist <= step_num) return std::make_tuple(partition_end, true); // Now, frame_end is impossible to reach to partition_end. @@ -330,29 +332,29 @@ std::tuple WindowTransformAction::stepToFrameEnd(const RowNumbe auto & block = blockAt(frame_end_row); // The step happens only in a block - if ((block.rows - frame_end_row.row - 1) >= n) + if ((block.rows - frame_end_row.row - 1) >= step_num) { - frame_end_row.row += n; + frame_end_row.row += step_num; return std::make_tuple(frame_end_row, true); } // The step happens between blocks - n -= block.rows - frame_end_row.row; + step_num -= block.rows - frame_end_row.row; ++frame_end_row.block; frame_end_row.row = 0; - while (n > 0) + while (step_num > 0) { auto block_rows = blockAt(frame_end_row).rows; - if (n >= block_rows) + if (step_num >= block_rows) { frame_end_row.row = 0; ++frame_end_row.block; - n -= block_rows; + step_num -= block_rows; continue; } - frame_end_row.row += n; - n = 0; + frame_end_row.row += step_num; + step_num = 0; } return std::make_tuple(frame_end_row, true); @@ -402,7 +404,7 @@ void WindowTransformAction::advanceFrameStart() } case WindowFrame::BoundaryType::Offset: if (window_description.frame.type == WindowFrame::FrameType::Rows) - frame_start = stepToFrameStart(current_row, window_description.frame.begin_offset); + frame_start = stepToFrameStart(current_row, window_description.frame); else throw Exception( ErrorCodes::NOT_IMPLEMENTED, @@ -508,7 +510,7 @@ void WindowTransformAction::advanceFrameEnd() case WindowFrame::BoundaryType::Offset: { if (window_description.frame.type == WindowFrame::FrameType::Rows) - std::tie(frame_end, frame_ended) = stepToFrameEnd(current_row, window_description.frame.end_offset); + std::tie(frame_end, frame_ended) = stepToFrameEnd(current_row, window_description.frame); else throw Exception( ErrorCodes::NOT_IMPLEMENTED, diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index 53804efa965..fca4fa7ea0e 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -73,9 +73,9 @@ struct WindowTransformAction { private: // Used for calculating the frame start - RowNumber stepToFrameStart(const RowNumber & current_row, UInt64 n); + RowNumber stepToFrameStart(const RowNumber & current_row, const WindowFrame & frame); // Used for calculating the frame end - std::tuple stepToFrameEnd(const RowNumber & current_row, UInt64 n); + std::tuple stepToFrameEnd(const RowNumber & current_row, const WindowFrame & frame); // distance is left - right. UInt64 distance(RowNumber left, RowNumber right);