Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support fine grained shuffle for window function #5048

Merged
merged 52 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
21ea8e1
support fine grained shuffle
guo-shaoge May 24, 2022
1524bef
add unit test for StreamingDAGResponseWriter
guo-shaoge Jun 6, 2022
d860c45
add perf test for window function
guo-shaoge Jun 6, 2022
119b462
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 8, 2022
1e86122
fix conflict
guo-shaoge Jun 8, 2022
6dc4eab
fix contrib change
guo-shaoge Jun 8, 2022
2260514
fix some comment
guo-shaoge Jun 9, 2022
c1585b1
fix
guo-shaoge Jun 10, 2022
8211475
fix executeOrder()
guo-shaoge Jun 10, 2022
bcdf69f
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 15, 2022
162b1e3
add interpreter unittest
guo-shaoge Jun 15, 2022
df8aef7
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 16, 2022
d03f0eb
refine microbenchmark
guo-shaoge Jun 16, 2022
e5a01cf
rm exchange_perftest.cpp
guo-shaoge Jun 16, 2022
278240c
update kvproto dep
guo-shaoge Jun 20, 2022
0d4bf2e
fix fmt
guo-shaoge Jun 20, 2022
a4ee8a8
fix lint
guo-shaoge Jun 20, 2022
73e314d
Merge branch 'master' into fine_grained_shuffle
SeaRise Jun 21, 2022
eb605ce
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jun 23, 2022
e3d3ff3
fix conflict
guo-shaoge Jun 23, 2022
e8b9747
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jun 23, 2022
db50925
enable fine_grained_shuffle in fragment level
guo-shaoge Jun 28, 2022
8624a85
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jun 29, 2022
c20c74c
stream_count from uint32_t to uint64_t
guo-shaoge Jun 30, 2022
b5df200
fix testcase
guo-shaoge Jul 1, 2022
4df7c9a
fix minor type
guo-shaoge Jul 1, 2022
d78a055
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 1, 2022
9210111
fix comment
guo-shaoge Jul 1, 2022
e52fc32
add extra_info for stream
guo-shaoge Jul 2, 2022
6fbeb02
make enable_fine_grained_shuffle as template argument
guo-shaoge Jul 3, 2022
709ac80
change uint64_t to UInt64
guo-shaoge Jul 3, 2022
2c7ab24
fix some comment, add fullstack test
guo-shaoge Jul 4, 2022
505f49a
update tipb
guo-shaoge Jul 4, 2022
b1de8cd
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 4, 2022
383b89e
fix fmt
guo-shaoge Jul 4, 2022
db8497f
update window.test
guo-shaoge Jul 4, 2022
eda9596
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 4, 2022
fb13e0e
fix some comments
guo-shaoge Jul 5, 2022
c23487a
update kvproto
guo-shaoge Jul 5, 2022
6e9f7b9
update kvproto
guo-shaoge Jul 5, 2022
030b874
fix bunch of comments
guo-shaoge Jul 7, 2022
59e3cb8
update
guo-shaoge Jul 7, 2022
c64d8ef
move send_exec_summary_at_last to top; Add RUNTIME_CHECK in DAGQueryB…
guo-shaoge Jul 7, 2022
c9ac908
using unique_ptr for msg_channels; use min(stream_count, max_stream)
guo-shaoge Jul 7, 2022
5893b37
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 7, 2022
0f035ec
fix
guo-shaoge Jul 7, 2022
031d1ed
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tics int…
guo-shaoge Jul 7, 2022
4f05ba7
Merge branch 'master' of github.com:pingcap/tiflash into fine_grained…
guo-shaoge Jul 8, 2022
8c34265
update tiflash-proxy
guo-shaoge Jul 8, 2022
ec3fe0d
fix conflict(mockExecutor.cpp)
guo-shaoge Jul 8, 2022
7b35d25
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 8, 2022
920673c
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

uint64_t total_rows;

// For fine grained shuffle, sender will partition data into muiltiple streams by hashing.
// ExchangeReceiverBlockInputStream only need to read its own stream, i.e., streams[stream_id].
// CoprocessorBlockInputStream doesn't take care of this.
size_t stream_id;

void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
for (const auto & execution_summary : resp.execution_summaries())
Expand Down Expand Up @@ -120,7 +125,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

bool fetchRemoteResult()
{
auto result = remote_reader->nextResult(block_queue, sample_block);
auto result = remote_reader->nextResult(block_queue, sample_block, stream_id);
if (result.meet_error)
{
LOG_FMT_WARNING(log, "remote reader meets error: {}", result.error_msg);
Expand Down Expand Up @@ -168,13 +173,14 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
}

public:
TiRemoteBlockInputStream(std::shared_ptr<RemoteReader> remote_reader_, const String & req_id, const String & executor_id)
TiRemoteBlockInputStream(std::shared_ptr<RemoteReader> remote_reader_, const String & req_id, const String & executor_id, size_t stream_id_)
: remote_reader(remote_reader_)
, source_num(remote_reader->getSourceNum())
, name(fmt::format("TiRemoteBlockInputStream({})", RemoteReader::name))
, execution_summaries_inited(source_num)
, log(Logger::get(name, req_id, executor_id))
, total_rows(0)
, stream_id(stream_id_)
{
// generate sample block
ColumnsWithTypeAndName columns;
Expand Down
18 changes: 11 additions & 7 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ bool ExchangeReceiver::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t c
{
tipb_executor->set_tp(tipb::ExecType::TypeExchangeReceiver);
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
tipb::ExchangeReceiver * exchange_receiver = tipb_executor->mutable_exchange_receiver();
for (auto & field : output_schema)
{
Expand Down Expand Up @@ -1354,6 +1355,7 @@ bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id
{
tipb_executor->set_tp(tipb::ExecType::TypeWindow);
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
tipb::Window * window = tipb_executor->mutable_window();
auto & input_schema = children[0]->output_schema;
for (const auto & expr : func_descs)
Expand Down Expand Up @@ -1430,6 +1432,7 @@ bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id,
{
tipb_executor->set_tp(tipb::ExecType::TypeSort);
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
tipb::Sort * sort = tipb_executor->mutable_sort();
sort->set_ispartialsort(is_partial_sort);

Expand Down Expand Up @@ -1665,13 +1668,13 @@ ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, ti
return exchange_sender;
}

ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema)
ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count)
{
ExecutorPtr exchange_receiver = std::make_shared<mock::ExchangeReceiver>(executor_index, schema);
ExecutorPtr exchange_receiver = std::make_shared<mock::ExchangeReceiver>(executor_index, schema, fine_grained_shuffle_stream_count);
return exchange_receiver;
}

ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame)
ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count)
{
std::vector<ASTPtr> partition_columns;
if (partition_by_expr_list != nullptr)
Expand Down Expand Up @@ -1739,12 +1742,13 @@ ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr fun
window_exprs,
std::move(partition_columns),
std::move(order_columns),
frame);
frame,
fine_grained_shuffle_stream_count);
window->children.push_back(input);
return window;
}

ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort)
ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count)
{
std::vector<ASTPtr> order_columns;
if (order_by_expr_list != nullptr)
Expand All @@ -1758,8 +1762,8 @@ ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order
compileExpr(input->output_schema, elem->children[0]);
}
}
ExecutorPtr sort = std::make_shared<mock::Sort>(executor_index, input->output_schema, std::move(order_columns), is_partial_sort);
ExecutorPtr sort = std::make_shared<mock::Sort>(executor_index, input->output_schema, std::move(order_columns), is_partial_sort, fine_grained_shuffle_stream_count);
sort->children.push_back(input);
return sort;
}
} // namespace DB
} // namespace DB
19 changes: 13 additions & 6 deletions dbms/src/Debug/astToExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,11 @@ struct ExchangeSender : Executor
struct ExchangeReceiver : Executor
{
TaskMetas task_metas;
ExchangeReceiver(size_t & index, const DAGSchema & output)
uint64_t fine_grained_shuffle_stream_count;

ExchangeReceiver(size_t & index, const DAGSchema & output, uint64_t fine_grained_shuffle_stream_count_ = 0)
: Executor(index, "exchange_receiver_" + std::to_string(index), output)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{}
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context &) override;
Expand Down Expand Up @@ -292,13 +295,15 @@ struct Window : Executor
std::vector<ASTPtr> partition_by_exprs;
std::vector<ASTPtr> order_by_exprs;
MockWindowFrame frame;
uint64_t fine_grained_shuffle_stream_count;

Window(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> func_descs_, std::vector<ASTPtr> partition_by_exprs_, std::vector<ASTPtr> order_by_exprs_, MockWindowFrame frame_)
Window(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> func_descs_, std::vector<ASTPtr> partition_by_exprs_, std::vector<ASTPtr> order_by_exprs_, MockWindowFrame frame_, uint64_t fine_grained_shuffle_stream_count_ = 0)
: Executor(index_, "window_" + std::to_string(index_), output_schema_)
, func_descs(std::move(func_descs_))
, partition_by_exprs(std::move(partition_by_exprs_))
, order_by_exprs(order_by_exprs_)
, frame(frame_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{
}
// Currently only use Window Executor in Unit Test which don't call columnPrume.
Expand All @@ -311,11 +316,13 @@ struct Sort : Executor
{
std::vector<ASTPtr> by_exprs;
bool is_partial_sort;
uint64_t fine_grained_shuffle_stream_count;

Sort(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> by_exprs_, bool is_partial_sort_)
Sort(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> by_exprs_, bool is_partial_sort_, uint64_t fine_grained_shuffle_stream_count_ = 0)
: Executor(index_, "sort_" + std::to_string(index_), output_schema_)
, by_exprs(by_exprs_)
, is_partial_sort(is_partial_sort_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{
}
// Currently only use Sort Executor in Unit Test which don't call columnPrume.
Expand Down Expand Up @@ -343,11 +350,11 @@ ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr r

ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);

ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema);
ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count = 0);

ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame);
ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count = 0);

ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort);
ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count = 0);

void literalFieldToTiPBExpr(const ColumnInfo & ci, const Field & field, tipb::Expr * expr, Int32 collator_id);
} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,9 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA
tipb_exchange_receiver.encoded_task_meta_size(),
10,
/*req_id=*/"",
/*executor_id=*/"");
BlockInputStreamPtr ret = std::make_shared<ExchangeReceiverInputStream>(exchange_receiver, /*req_id=*/"", /*executor_id=*/"");
/*executor_id=*/"",
/*fine_grained_shuffle_stream_count=*/0);
BlockInputStreamPtr ret = std::make_shared<ExchangeReceiverInputStream>(exchange_receiver, /*req_id=*/"", /*executor_id=*/"", /*stream_id*/ 0);
return ret;
}
else
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class CoprocessorReader
return detail;
}

CoprocessorReaderResult nextResult(std::queue<Block> & block_queue, const Block & header)
// stream_id is only meaningful for ExchagneReceiver.
CoprocessorReaderResult nextResult(std::queue<Block> & block_queue, const Block & header, size_t /*stream_id*/)
{
auto && [result, has_next] = resp_iter.next();
if (!result.error.empty())
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ constexpr UInt64 NO_ENGINE_SUBSTITUTION = 1ul << 30ul;
constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul;
} // namespace TiDBSQLMode

inline bool enableFineGrainedShuffle(uint64_t stream_count)
{
return stream_count > 0;
}

/// A context used to track the information that needs to be passed around during DAG planning.
class DAGContext
{
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ DAGDriver<true>::DAGDriver(
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_,
bool internal_)
: context(context_)
, dag_response(nullptr)
, writer(writer_)
, internal(internal_)
, log(&Poco::Logger::get("DAGDriver"))
Expand Down Expand Up @@ -129,15 +130,17 @@ try
auto streaming_writer = std::make_shared<StreamWriter>(writer);
TiDB::TiDBCollators collators;

std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr>>(
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr, false>>(
streaming_writer,
std::vector<Int64>(),
collators,
tipb::ExchangeType::PassThrough,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
true,
dag_context);
dag_context,
/*fine_grained_shuffle_stream_count=*/0,
/*fine_grained_shuffle_batch_size=*/0);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
}
Expand Down
Loading