Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support offset frame type #7514

Merged
merged 29 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
06fe3d6
start
xzhangxian1008 May 6, 2023
5fafd74
format
xzhangxian1008 May 6, 2023
315eaa7
address comments
xzhangxian1008 May 6, 2023
4d24724
tweaking
xzhangxian1008 May 6, 2023
5c3d0b0
add ft
xzhangxian1008 May 8, 2023
b8b4338
fix ft
xzhangxian1008 May 11, 2023
6b3b730
Merge branch 'master' into first_val
ti-chi-bot[bot] May 15, 2023
9a5d54a
fix ft
xzhangxian1008 May 15, 2023
96b5e75
Merge branch 'first_val' of github.com:xzhangxian1008/tiflash into fi…
xzhangxian1008 May 15, 2023
caf46f1
init
xzhangxian1008 May 16, 2023
8cc51f6
merge first_val
xzhangxian1008 May 16, 2023
2252564
merge master
xzhangxian1008 May 16, 2023
b34c4f2
add frame type test for first_value
xzhangxian1008 May 16, 2023
283ad3f
Merge branch 'master' of https://github.com/pingcap/tiflash into bd_t…
xzhangxian1008 May 17, 2023
0696d20
Merge branch 'master' of https://github.com/pingcap/tiflash into bd_t…
xzhangxian1008 May 18, 2023
b9fca2f
need fix
xzhangxian1008 May 18, 2023
9ec608b
fix
xzhangxian1008 May 19, 2023
aeb33d9
remove useless code
xzhangxian1008 May 19, 2023
c6672f1
Merge branch 'master' of github.com:pingcap/tiflash into bd_tp_offset
xzhangxian1008 May 25, 2023
8d42fd3
init
xzhangxian1008 May 25, 2023
bb521cd
Update dbms/src/DataStreams/WindowBlockInputStream.cpp
xzhangxian1008 May 25, 2023
017ff6c
resolve comments
xzhangxian1008 May 25, 2023
a4c1d4f
address comments
xzhangxian1008 May 26, 2023
66f4210
1
xzhangxian1008 May 29, 2023
b19a22a
1
xzhangxian1008 May 30, 2023
6183d9f
Merge branch 'master' of github.com:pingcap/tiflash into bd_tp_offset
xzhangxian1008 Jun 2, 2023
02a08d4
1
xzhangxian1008 Jun 2, 2023
c4dbd55
rc
xzhangxian1008 Jun 2, 2023
08088cd
Merge branch 'master' into bd_tp_offset
ti-chi-bot[bot] Jun 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 123 additions & 4 deletions dbms/src/DataStreams/WindowBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <Interpreters/WindowDescription.h>

#include <magic_enum.hpp>
#include <tuple>
#include <type_traits>

namespace DB
{
Expand Down Expand Up @@ -272,12 +274,123 @@ Int64 WindowTransformAction::getPartitionEndRow(size_t block_rows)
return left;
}

RowNumber WindowTransformAction::stepForward(const RowNumber & current_row, UInt64 n)
{
auto dist = distance(current_row, partition_start);
assert(dist >= 0);
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
if (dist <= static_cast<Int64>(n))
return partition_start;

RowNumber result_row = current_row;

// The step happens only in a block
if (result_row.row >= n)
{
result_row.row -= n;
return result_row;
}

// The step happens between blocks
n -= (result_row.row + 1);
--result_row.block;
result_row.row = blockAt(result_row).rows - 1;
while (n > 0)
{
auto & block = blockAt(result_row);
if (block.rows > n)
{
result_row.row = block.rows - n - 1; // index, so we need to -1
break;
}
n -= block.rows;
--result_row.block;
result_row.row = blockAt(result_row).rows - 1;
}
return result_row;
}

std::tuple<RowNumber, bool> WindowTransformAction::stepBackward(const RowNumber & current_row, UInt64 n)
{
if (!partition_ended)
return std::make_tuple(RowNumber(), false);

// Range of rows is [frame_start, frame_end),
// and frame_end position is behind the position of the last frame row.
// So we need to ++n.
++n;
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved

auto dist = distance(partition_end, current_row);
assert(dist >= 1);
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved

// Offset is too large and the partition_end is the longest position we can reach
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
if (dist <= static_cast<Int64>(n))
return std::make_tuple(partition_end, partition_ended);
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved

// Now, frame_end is impossible to reach to partition_end.
RowNumber frame_end_row = current_row;
auto & block = blockAt(frame_end_row);

// The step happens only in a block
if ((block.rows - frame_end_row.row - 1) >= n)
{
frame_end_row.row += n;
return std::make_tuple(frame_end_row, partition_ended);
}

// The step happens between blocks
n -= block.rows - frame_end_row.row;
++frame_end_row.block;
frame_end_row.row = 0;
while (n > 0)
{
auto block_rows = blockAt(frame_end_row).rows;
if (n >= block_rows)
{
frame_end_row.row = 0;
++frame_end_row.block;
n -= block_rows;
continue;
}

frame_end_row.row += n;
n = 0;
}

return std::make_tuple(frame_end_row, partition_ended);
}

Int64 WindowTransformAction::distance(RowNumber left, RowNumber right)
{
if (left.block == right.block)
return left.row - right.row;
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved

Int64 negative_sign = 1;

// Ensure that left is larger than right
if (left.block < right.block)
{
negative_sign = -1;
std::swap(left, right);
}
SeaRise marked this conversation as resolved.
Show resolved Hide resolved

Int64 dist = left.row;
RowNumber tmp = left;
--tmp.block;
while (tmp.block > right.block)
{
dist += blockAt(tmp).rows;
--tmp.block;
}

dist += blockAt(right).rows - right.row;

return dist * negative_sign;
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
}

void WindowTransformAction::advanceFrameStart()
{
if (frame_started)
{
return;
}

switch (window_description.frame.begin_type)
{
Expand All @@ -296,6 +409,9 @@ void WindowTransformAction::advanceFrameStart()
break;
}
case WindowFrame::BoundaryType::Offset:
frame_start = stepForward(current_row, window_description.frame.begin_offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stepForward only works for rows frame type, need explicitly throw error if the frame type is range or groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stepForward only works for rows frame type, need explicitly throw error if the frame type is range or groups.

Both rows and range will reach to here, so I think we should use the correct function according to frame type, instead of throwing exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this means you will support range type in this pr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this means you will support range type in this pr?

No, but I will throw exception for range type here.

frame_started = true;
break;
default:
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
Expand Down Expand Up @@ -396,6 +512,10 @@ void WindowTransformAction::advanceFrameEnd()
break;
}
case WindowFrame::BoundaryType::Offset:
{
std::tie(frame_end, frame_ended) = stepBackward(current_row, window_description.frame.end_offset);
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

break;
}
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"The frame end type '{}' is not implemented",
Expand Down Expand Up @@ -612,7 +732,6 @@ void WindowTransformAction::tryCalculate()
partition_start = partition_end;
advanceRowNumber(partition_end);
partition_ended = false;

// We have to reset the frame and other pointers when the new partition starts.
frame_start = partition_start;
frame_end = partition_start;
Expand Down Expand Up @@ -678,7 +797,7 @@ RowNumber WindowTransformAction::getPreviousRowNumber(const RowNumber & row_num)
}

--prev_row_num.block;
assert(prev_row_num.block - first_block_number < window_blocks.size());
assert(prev_row_num.block < window_blocks.size() + first_block_number);
const auto new_block_rows = blockAt(prev_row_num).rows;
prev_row_num.row = new_block_rows - 1;
return prev_row_num;
Expand Down
10 changes: 10 additions & 0 deletions dbms/src/DataStreams/WindowBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ struct RowNumber
/* Implementation details.*/
struct WindowTransformAction
{
private:
// Used for calculating the frame start
RowNumber stepForward(const RowNumber & current_row, UInt64 n);
// Used for calculating the frame end
std::tuple<RowNumber, bool> stepBackward(const RowNumber & current_row, UInt64 n);

// distance is left - right.
Int64 distance(RowNumber left, RowNumber right);

public:
WindowTransformAction(const Block & input_header, const WindowDescription & window_description_, const String & req_id);

void cleanUp();
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Debug/MockExecutor/WindowBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

namespace DB::mock
{
// true: unbounded, false: not unbounded
using MockWindowFrameBound = std::tuple<tipb::WindowBoundType, bool, UInt64>;
struct MockWindowFrame
{
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/tests/gtest_window_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ try

/*
select count(1) from (
SELECT
SELECT
ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`),
ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order` DESC)
FROM `test_db`.`test_table`
Expand All @@ -347,7 +347,7 @@ try

/*
select count(1) from (
SELECT
SELECT
ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`),
ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`)
FROM `test_db`.`test_table`
Expand All @@ -370,7 +370,7 @@ try

/*
select count(1) from (
SELECT
SELECT
Rank() OVER (PARTITION BY `partition` ORDER BY `order`),
DenseRank() OVER (PARTITION BY `partition` ORDER BY `order`)
FROM `test_db`.`test_table`
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/WindowDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ struct WindowFrame
FrameType type = FrameType::Ranges;

BoundaryType begin_type = BoundaryType::Unbounded;
Field begin_offset = Field(UInt64(0));
UInt64 begin_offset = 0;
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
bool begin_preceding = true;

BoundaryType end_type = BoundaryType::Unbounded;
Field end_offset = Field(UInt64(0));
UInt64 end_offset = 0;
bool end_preceding = false;

bool operator==(const WindowFrame & other) const
Expand Down
81 changes: 63 additions & 18 deletions dbms/src/WindowFunctions/tests/gtest_first_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

#include <Interpreters/Context.h>
#include <TestUtils/ExecutorTestUtils.h>
#include <TestUtils/mockExecutor.h>
#include <tipb/executor.pb.h>

#include <optional>

namespace DB::tests
{
// TODO Tests with frame should be added
class FirstValue : public DB::tests::ExecutorTest
{
static const size_t max_concurrency_level = 10;
Expand Down Expand Up @@ -46,7 +49,8 @@ class FirstValue : public DB::tests::ExecutorTest
void executeFunctionAndAssert(
const ColumnWithTypeAndName & result,
const ASTPtr & function,
const ColumnsWithTypeAndName & input)
const ColumnsWithTypeAndName & input,
MockWindowFrame mock_frame = MockWindowFrame())
{
ColumnsWithTypeAndName actual_input = input;
assert(actual_input.size() == 3);
Expand All @@ -65,7 +69,7 @@ class FirstValue : public DB::tests::ExecutorTest
auto request = context
.scan("test_db", "test_table_for_first_value")
.sort({{"partition", false}, {"order", false}}, true)
.window(function, {"order", false}, {"partition", false}, MockWindowFrame{})
.window(function, {"order", false}, {"partition", false}, mock_frame)
.build(context);

ColumnsWithTypeAndName expect = input;
Expand Down Expand Up @@ -113,24 +117,65 @@ class FirstValue : public DB::tests::ExecutorTest
TEST_F(FirstValue, firstValue)
try
{
executeFunctionAndAssert(
toVec<String>({"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}),
FirstValue(value_col),
{toVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toVec<String>(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})});

executeFunctionAndAssert(
toNullableVec<String>({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}),
FirstValue(value_col),
{toNullableVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toNullableVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toNullableVec<String>(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})});
{
// frame type: unbounded
executeFunctionAndAssert(
toVec<String>({"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}),
FirstValue(value_col),
{toVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toVec<String>(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})});

executeFunctionAndAssert(
toNullableVec<String>({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}),
FirstValue(value_col),
{toNullableVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toNullableVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toNullableVec<String>(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})});
}

{
// frame type: offset
MockWindowFrame frame;
frame.type = tipb::WindowFrameType::Rows;
frame.start = std::make_tuple(tipb::WindowBoundType::Following, false, 0);

std::vector<Int64> frame_start_offset{0, 1, 3, 10};
std::vector<std::vector<String>> res_not_null{
{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"},
{"1", "2", "2", "3", "4", "6", "6", "7", "8", "9", "11", "11", "12"},
{"1", "2", "2", "2", "2", "6", "6", "6", "6", "7", "11", "11", "11"},
{"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}};
std::vector<std::vector<std::optional<String>>> res_null{
{{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"},
{{}, {}, {}, "3", "4", {}, {}, "7", "8", "9", {}, {}, "12"},
{{}, {}, {}, {}, {}, {}, {}, {}, {}, "7", {}, {}, {}},
{{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}};

for (size_t i = 0; i < frame_start_offset.size(); ++i)
{
frame.start = std::make_tuple(tipb::WindowBoundType::Preceding, false, frame_start_offset[i]);

executeFunctionAndAssert(
toVec<String>(res_not_null[i]),
FirstValue(value_col),
{toVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toVec<String>(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})},
frame);

executeFunctionAndAssert(
toNullableVec<String>(res_null[i]),
FirstValue(value_col),
{toNullableVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toNullableVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toNullableVec<String>(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})},
frame);
}
}

// TODO support unsigned int.
testInt<Int8>();
testInt<Int16>();
testInt<Int32>();
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
testInt<Int64>();

testFloat<Float32>();
Expand Down
Loading