diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index e1e52fab73b..0df79f89a84 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -316,6 +316,7 @@ if (ENABLE_TESTS) ${TiFlash_SOURCE_DIR}/dbms/src/AggregateFunctions/AggregateFunctionSum.cpp ) target_include_directories(bench_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR} ${benchmark_SOURCE_DIR}/include) + target_compile_definitions(bench_dbms PUBLIC DBMS_PUBLIC_GTEST) target_link_libraries(bench_dbms gtest dbms test_util_bench_main benchmark clickhouse_functions) if (ENABLE_TIFLASH_DTWORKLOAD) diff --git a/dbms/src/Debug/astToExecutor.h b/dbms/src/Debug/astToExecutor.h index cbd2e5ade3a..4d87c0db77e 100644 --- a/dbms/src/Debug/astToExecutor.h +++ b/dbms/src/Debug/astToExecutor.h @@ -350,4 +350,4 @@ ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr fun ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort); void literalFieldToTiPBExpr(const ColumnInfo & ci, const Field & field, tipb::Expr * expr, Int32 collator_id); -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h index 9f201006a88..046088ab2b2 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h @@ -153,7 +153,9 @@ class DAGExpressionAnalyzer : private boost::noncopyable const tipb::Window & window, size_t window_columns_start_index); +#ifndef DBMS_PUBLIC_GTEST private: +#endif NamesAndTypes buildOrderColumns( const ExpressionActionsPtr & actions, const ::google::protobuf::RepeatedPtrField & order_by); diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index 0b3b2db9623..e68c4f91cee 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -54,7 +54,9 @@ class DAGQueryBlockInterpreter BlockInputStreams execute(); +#ifndef DBMS_PUBLIC_GTEST private: +#endif void executeImpl(DAGPipeline & pipeline); void handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline); void handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline); diff --git a/dbms/src/Flash/tests/CMakeLists.txt b/dbms/src/Flash/tests/CMakeLists.txt index a34e4b23432..944908dcb25 100644 --- a/dbms/src/Flash/tests/CMakeLists.txt +++ b/dbms/src/Flash/tests/CMakeLists.txt @@ -13,14 +13,3 @@ # limitations under the License. include_directories (${CMAKE_CURRENT_BINARY_DIR}) - -add_executable (exchange_perftest - exchange_perftest.cpp - ${TiFlash_SOURCE_DIR}/dbms/src/Server/StorageConfigParser.cpp - ${TiFlash_SOURCE_DIR}/dbms/src/Functions/FunctionsConversion.cpp) -target_link_libraries (exchange_perftest - gtest_main - dbms - clickhouse_functions - clickhouse_aggregate_functions - tiflash-dttool-lib) diff --git a/dbms/src/Flash/tests/WindowTestUtil.h b/dbms/src/Flash/tests/WindowTestUtil.h new file mode 100644 index 00000000000..3f4cb7d595f --- /dev/null +++ b/dbms/src/Flash/tests/WindowTestUtil.h @@ -0,0 +1,81 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +namespace tests +{ + +inline std::shared_ptr mockInterpreter(Context & context, const std::vector & source_columns, int concurrency) +{ + std::vector mock_input_streams_vec = {}; + DAGQueryBlock mock_query_block(0, static_cast>(nullptr)); + std::vector mock_subqueries_for_sets = {}; + std::shared_ptr mock_interpreter = std::make_shared(context, + mock_input_streams_vec, + mock_query_block, + concurrency); + mock_interpreter->analyzer = std::make_unique(std::move(source_columns), context); + return mock_interpreter; +} + +inline void mockExecuteProject(std::shared_ptr & mock_interpreter, DAGPipeline & pipeline, NamesWithAliases & final_project) +{ + mock_interpreter->executeProject(pipeline, final_project); +} + +inline void mockExecuteWindowOrder(std::shared_ptr & mock_interpreter, DAGPipeline & pipeline, const tipb::Sort & sort) +{ + mock_interpreter->handleWindowOrder(pipeline, sort); + mock_interpreter->input_streams_vec[0] = pipeline.streams; + NamesWithAliases final_project; + for (const auto & column : (*mock_interpreter->analyzer).source_columns) + { + final_project.push_back({column.name, ""}); + } + mockExecuteProject(mock_interpreter, pipeline, final_project); +} + +inline void mockExecuteWindowOrder(std::shared_ptr & mock_interpreter, DAGPipeline & pipeline, const String & sort_json) +{ + tipb::Sort sort; + ::google::protobuf::util::JsonStringToMessage(sort_json, &sort); + mockExecuteWindowOrder(mock_interpreter, pipeline, sort); +} + +inline void mockExecuteWindow(std::shared_ptr & mock_interpreter, DAGPipeline & pipeline, const tipb::Window & window) +{ + mock_interpreter->handleWindow(pipeline, window); + mock_interpreter->input_streams_vec[0] = pipeline.streams; + NamesWithAliases final_project; + for (const auto & column : (*mock_interpreter->analyzer).source_columns) + { + final_project.push_back({column.name, ""}); + } + mockExecuteProject(mock_interpreter, pipeline, final_project); +} + +inline void mockExecuteWindow(std::shared_ptr & mock_interpreter, DAGPipeline & pipeline, std::string window_json_str) +{ + tipb::Window window; + google::protobuf::util::JsonStringToMessage(window_json_str, &window); + mockExecuteWindow(mock_interpreter, pipeline, window); +} + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/tests/bench_exchange.cpp b/dbms/src/Flash/tests/bench_exchange.cpp new file mode 100644 index 00000000000..fbb53bfd4a4 --- /dev/null +++ b/dbms/src/Flash/tests/bench_exchange.cpp @@ -0,0 +1,407 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include // to include the implementation of StreamingDAGResponseWriter +#include // to include the implementation of ExchangeReceiver +#include // to include the implementation of MPPTunnel +#include // to include the implementation of MPPTunnelSet +#include +#include + + +namespace DB +{ +namespace tests +{ + +std::random_device rd; + +MockBlockInputStream::MockBlockInputStream(const std::vector & blocks_, StopFlag & stop_flag_) + : blocks(blocks_) + , header(blocks[0].cloneEmpty()) + , mt(rd()) + , dist(0, blocks.size() - 1) + , stop_flag(stop_flag_) +{} + +MockFixedRowsBlockInputStream::MockFixedRowsBlockInputStream(size_t total_rows_, const std::vector & blocks_) + : header(blocks_[0].cloneEmpty()) + , mt(rd()) + , dist(0, blocks_.size() - 1) + , current_rows(0) + , total_rows(total_rows_) + , blocks(blocks_) +{} + +Block makeBlock(int row_num) +{ + std::mt19937 mt(rd()); + std::uniform_int_distribution int64_dist; + std::uniform_int_distribution len_dist(10, 20); + std::uniform_int_distribution char_dist; + + InferredDataVector> int64_vec; + InferredDataVector> int64_vec2; + for (int i = 0; i < row_num; ++i) + { + int64_vec.emplace_back(int64_dist(mt)); + int64_vec2.emplace_back(int64_dist(mt)); + } + + InferredDataVector> string_vec; + for (int i = 0; i < row_num; ++i) + { + int len = len_dist(mt); + String s; + for (int j = 0; j < len; ++j) + s.push_back(char_dist(mt)); + string_vec.push_back(std::move(s)); + } + + auto int64_data_type = makeDataType>(); + ColumnWithTypeAndName int64_column(makeColumn>(int64_data_type, int64_vec), int64_data_type, "int64_1"); + ColumnWithTypeAndName int64_column2(makeColumn>(int64_data_type, int64_vec2), int64_data_type, "int64_2"); + + auto string_data_type = makeDataType>(); + ColumnWithTypeAndName string_column(makeColumn>(string_data_type, string_vec), string_data_type, "string"); + + return Block({int64_column, string_column, int64_column2}); +} + +std::vector makeBlocks(int block_num, int row_num) +{ + std::vector blocks; + for (int i = 0; i < block_num; ++i) + blocks.push_back(makeBlock(row_num)); + return blocks; +} + +mpp::MPPDataPacket makePacket(ChunkCodecStream & codec, int row_num) +{ + auto block = makeBlock(row_num); + codec.encode(block, 0, row_num); + + mpp::MPPDataPacket packet; + packet.add_chunks(codec.getString()); + codec.clear(); + + return packet; +} + +std::vector makePackets(ChunkCodecStream & codec, int packet_num, int row_num) +{ + std::vector packets; + for (int i = 0; i < packet_num; ++i) + packets.push_back(std::make_shared(makePacket(codec, row_num))); + return packets; +} + +std::vector makePacketQueues(int source_num, int queue_size) +{ + std::vector queues(source_num); + for (int i = 0; i < source_num; ++i) + queues[i] = std::make_shared(queue_size); + return queues; +} + +std::vector makeFields() +{ + std::vector fields(3); + fields[0].set_tp(TiDB::TypeLongLong); + fields[1].set_tp(TiDB::TypeString); + fields[2].set_tp(TiDB::TypeLongLong); + return fields; +} + +void printException(const Exception & e) +{ + std::string text = e.displayText(); + + auto embedded_stack_trace_pos = text.find("Stack trace"); + std::cerr << "Code: " << e.code() << ". " << text << std::endl + << std::endl; + if (std::string::npos == embedded_stack_trace_pos) + std::cerr << "Stack trace:" << std::endl + << e.getStackTrace().toString() << std::endl; +} + +void sendPacket(const std::vector & packets, const PacketQueuePtr & queue, StopFlag & stop_flag) +{ + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0, packets.size() - 1); + + while (!stop_flag.load()) + { + int i = dist(mt); + queue->tryPush(packets[i], std::chrono::milliseconds(10)); + } + queue->finish(); +} + +void receivePacket(const PacketQueuePtr & queue) +{ + while (true) + { + PacketPtr packet; + if (!queue->pop(packet)) + break; + } +} + +ReceiverHelper::ReceiverHelper(int concurrency_, int source_num_) + : concurrency(concurrency_) + , source_num(source_num_) +{ + pb_exchange_receiver.set_tp(tipb::Hash); + for (int i = 0; i < source_num; ++i) + { + mpp::TaskMeta task; + task.set_start_ts(0); + task.set_task_id(i); + task.set_partition_id(i); + task.set_address(""); + + String encoded_task; + task.SerializeToString(&encoded_task); + + pb_exchange_receiver.add_encoded_task_meta(encoded_task); + } + + fields = makeFields(); + *pb_exchange_receiver.add_field_types() = fields[0]; + *pb_exchange_receiver.add_field_types() = fields[1]; + *pb_exchange_receiver.add_field_types() = fields[2]; + + task_meta.set_task_id(100); + + queues = makePacketQueues(source_num, 10); +} + +MockExchangeReceiverPtr ReceiverHelper::buildReceiver() +{ + return std::make_shared( + std::make_shared(queues, fields), + source_num, + concurrency, + "mock_req_id", + "mock_exchange_receiver_id"); +} + +std::vector ReceiverHelper::buildExchangeReceiverStream() +{ + auto receiver = buildReceiver(); + std::vector streams(concurrency); + for (int i = 0; i < concurrency; ++i) + { + streams[i] = std::make_shared(receiver, "mock_req_id", "mock_executor_id" + std::to_string(i)); + } + return streams; +} + +BlockInputStreamPtr ReceiverHelper::buildUnionStream() +{ + auto streams = buildExchangeReceiverStream(); + return std::make_shared>(streams, nullptr, concurrency, /*req_id=*/""); +} + +void ReceiverHelper::finish() +{ + if (join_ptr) + { + join_ptr->setBuildTableState(Join::BuildTableState::SUCCEED); + std::cout << fmt::format("Hash table size: {} bytes", join_ptr->getTotalByteCount()) << std::endl; + } +} + +SenderHelper::SenderHelper( + int source_num_, + int concurrency_, + const std::vector & queues_, + const std::vector & fields) + : source_num(source_num_) + , concurrency(concurrency_) + , queues(queues_) +{ + mpp::TaskMeta task_meta; + tunnel_set = std::make_shared("mock_req_id"); + for (int i = 0; i < source_num; ++i) + { + auto writer = std::make_shared(queues[i]); + mock_writers.push_back(writer); + + auto tunnel = std::make_shared( + task_meta, + task_meta, + std::chrono::seconds(60), + concurrency, + false, + false, + "mock_req_id"); + tunnel->connect(writer.get()); + tunnels.push_back(tunnel); + MPPTaskId id(0, i); + tunnel_set->registerTunnel(id, tunnel); + } + + tipb::DAGRequest dag_request; + tipb::Executor root_executor; + root_executor.set_executor_id("ExchangeSender_100"); + *dag_request.mutable_root_executor() = root_executor; + + dag_context = std::make_unique(dag_request); + dag_context->is_mpp_task = true; + dag_context->is_root_mpp_task = false; + dag_context->encode_type = tipb::EncodeType::TypeCHBlock; + dag_context->result_field_types = fields; +} + +BlockInputStreamPtr SenderHelper::buildUnionStream( + StopFlag & stop_flag, + const std::vector & blocks) +{ + std::vector send_streams; + for (int i = 0; i < concurrency; ++i) + { + BlockInputStreamPtr stream = std::make_shared(blocks, stop_flag); + std::unique_ptr response_writer( + new StreamingDAGResponseWriter( + tunnel_set, + {0, 1, 2}, + TiDB::TiDBCollators(3), + tipb::Hash, + -1, + -1, + true, + *dag_context)); + send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); + } + + return std::make_shared>(send_streams, nullptr, concurrency, /*req_id=*/""); +} + +BlockInputStreamPtr SenderHelper::buildUnionStream(size_t total_rows, const std::vector & blocks) +{ + std::vector send_streams; + for (int i = 0; i < concurrency; ++i) + { + BlockInputStreamPtr stream = std::make_shared(total_rows / concurrency, blocks); + std::unique_ptr response_writer( + new StreamingDAGResponseWriter( + tunnel_set, + {0, 1, 2}, + TiDB::TiDBCollators(3), + tipb::Hash, + -1, + -1, + true, + *dag_context)); + send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); + } + + return std::make_shared>(send_streams, nullptr, concurrency, /*req_id=*/""); +} + +void SenderHelper::finish() +{ + for (size_t i = 0; i < tunnels.size(); ++i) + { + tunnels[i]->writeDone(); + tunnels[i]->waitForFinish(); + mock_writers[i]->finish(); + } +} + +void ExchangeBench::SetUp(const benchmark::State &) +{ + Poco::Logger::root().setLevel("error"); + + DynamicThreadPool::global_instance = std::make_unique( + /*fixed_thread_num=*/300, + std::chrono::milliseconds(100000)); + + input_blocks = makeBlocks(/*block_num=*/100, /*row_num=*/1024); + + try + { + DB::registerWindowFunctions(); + DB::registerFunctions(); + } + catch (DB::Exception &) + { + // Maybe another test has already registered, ignore exception here. + } +} + +void ExchangeBench::TearDown(const benchmark::State &) +{ + input_blocks.clear(); + // NOTE: Must reset here, otherwise DynamicThreadPool::fixedWork() may core because metrics already destroyed. + DynamicThreadPool::global_instance.reset(); +} + +void ExchangeBench::runAndWait(std::shared_ptr receiver_helper, + BlockInputStreamPtr receiver_stream, + std::shared_ptr & sender_helper, + BlockInputStreamPtr sender_stream) +{ + std::future sender_future = DynamicThreadPool::global_instance->schedule(/*memory_tracker=*/false, + [sender_stream, sender_helper] { + sender_stream->readPrefix(); + while (const auto & block = sender_stream->read()) {} + sender_stream->readSuffix(); + sender_helper->finish(); + }); + std::future receiver_future = DynamicThreadPool::global_instance->schedule(/*memory_tracker=*/false, + [receiver_stream, receiver_helper] { + receiver_stream->readPrefix(); + while (const auto & block = receiver_stream->read()) {} + receiver_stream->readSuffix(); + receiver_helper->finish(); + }); + sender_future.get(); + receiver_future.get(); +} + +BENCHMARK_DEFINE_F(ExchangeBench, basic_send_receive) +(benchmark::State & state) +try +{ + const int concurrency = state.range(0); + const int source_num = state.range(1); + const int total_rows = state.range(2); + Context context = TiFlashTestEnv::getContext(); + + for (auto _ : state) + { + std::shared_ptr receiver_helper = std::make_shared(concurrency, source_num); + BlockInputStreamPtr receiver_stream = receiver_helper->buildUnionStream(); + + std::shared_ptr sender_helper = std::make_shared(source_num, + concurrency, + receiver_helper->queues, + receiver_helper->fields); + BlockInputStreamPtr sender_stream = sender_helper->buildUnionStream(total_rows, input_blocks); + + runAndWait(receiver_helper, receiver_stream, sender_helper, sender_stream); + } +} +CATCH +BENCHMARK_REGISTER_F(ExchangeBench, basic_send_receive) + ->Args({8, 1, 1024 * 1000}); + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/tests/bench_exchange.h b/dbms/src/Flash/tests/bench_exchange.h new file mode 100644 index 00000000000..6b09e319613 --- /dev/null +++ b/dbms/src/Flash/tests/bench_exchange.h @@ -0,0 +1,291 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace tests +{ + + +using Packet = mpp::MPPDataPacket; +using PacketPtr = std::shared_ptr; +using PacketQueue = MPMCQueue; +using PacketQueuePtr = std::shared_ptr; +using StopFlag = std::atomic; + +// NOLINTBEGIN(readability-convert-member-functions-to-static) +struct MockReceiverContext +{ + using Status = ::grpc::Status; + struct Request + { + String debugString() const + { + return "{Request}"; + } + + int source_index = 0; + int send_task_id = 0; + int recv_task_id = -1; + }; + + struct Reader + { + explicit Reader(const PacketQueuePtr & queue_) + : queue(queue_) + {} + + void initialize() const {} + + bool read(PacketPtr & packet [[maybe_unused]]) const + { + PacketPtr res; + if (queue->pop(res)) + { + *packet = *res; // avoid change shared packets + return true; + } + return false; + } + + Status finish() const + { + return ::grpc::Status(); + } + + PacketQueuePtr queue; + }; + + struct MockAsyncGrpcExchangePacketReader + { + // Not implement benchmark for Async GRPC for now. + void init(UnaryCallback *) { assert(0); } + void read(MPPDataPacketPtr &, UnaryCallback *) { assert(0); } + void finish(::grpc::Status &, UnaryCallback *) { assert(0); } + }; + + using AsyncReader = MockAsyncGrpcExchangePacketReader; + + MockReceiverContext( + const std::vector & queues_, + const std::vector & field_types_) + : queues(queues_) + , field_types(field_types_) + {} + + void fillSchema(DAGSchema & schema) const + { + schema.clear(); + for (size_t i = 0; i < field_types.size(); ++i) + { + String name = "exchange_receiver_" + std::to_string(i); + ColumnInfo info = TiDB::fieldTypeToColumnInfo(field_types[i]); + schema.emplace_back(std::move(name), std::move(info)); + } + } + + Request makeRequest(int index) const + { + return {index, index, -1}; + } + + std::shared_ptr makeReader(const Request & request) + { + return std::make_shared(queues[request.send_task_id]); + } + + static Status getStatusOK() + { + return ::grpc::Status(); + } + + bool supportAsync(const Request &) const { return false; } + void makeAsyncReader( + const Request &, + std::shared_ptr &, + UnaryCallback *) const {} + + std::vector queues; + std::vector field_types; +}; +// NOLINTEND(readability-convert-member-functions-to-static) + +using MockExchangeReceiver = ExchangeReceiverBase; +using MockExchangeReceiverPtr = std::shared_ptr; +using MockExchangeReceiverInputStream = TiRemoteBlockInputStream; + +struct MockWriter : public PacketWriter +{ + explicit MockWriter(PacketQueuePtr queue_) + : queue(std::move(queue_)) + {} + + bool write(const Packet & packet) override + { + queue->push(std::make_shared(packet)); + return true; + } + + void finish() + { + queue->finish(); + } + + PacketQueuePtr queue; +}; + +using MockWriterPtr = std::shared_ptr; +using MockTunnel = MPPTunnelBase; +using MockTunnelPtr = std::shared_ptr; +using MockTunnelSet = MPPTunnelSetBase; +using MockTunnelSetPtr = std::shared_ptr; + +struct MockBlockInputStream : public IProfilingBlockInputStream +{ + const std::vector & blocks; + Block header; + std::mt19937 mt; + std::uniform_int_distribution dist; + StopFlag & stop_flag; + + MockBlockInputStream(const std::vector & blocks_, StopFlag & stop_flag_); + + String getName() const override { return "MockBlockInputStream"; } + Block getHeader() const override { return header; } + + Block readImpl() override + { + if (stop_flag.load(std::memory_order_relaxed)) + return Block{}; + return blocks[dist(mt)]; + } +}; + +// Similar to MockBlockInputStream, but return fixed count of rows. +struct MockFixedRowsBlockInputStream : public IProfilingBlockInputStream +{ + Block header; + std::mt19937 mt; + std::uniform_int_distribution dist; + size_t current_rows; + size_t total_rows; + const std::vector & blocks; + + MockFixedRowsBlockInputStream(size_t total_rows_, const std::vector & blocks_); + + String getName() const override { return "MockBlockInputStream"; } + Block getHeader() const override { return header; } + + Block readImpl() override + { + if (current_rows >= total_rows) + return Block{}; + Block res = blocks[dist(mt)]; + current_rows += res.rows(); + return res; + } +}; + +Block makeBlock(int row_num); +std::vector makeBlocks(int block_num, int row_num); +mpp::MPPDataPacket makePacket(ChunkCodecStream & codec, int row_num); +std::vector makePackets(ChunkCodecStream & codec, int packet_num, int row_num); +std::vector makePacketQueues(int source_num, int queue_size); +std::vector makeFields(); +void printException(const Exception & e); +void sendPacket(const std::vector & packets, const PacketQueuePtr & queue, StopFlag & stop_flag); +void receivePacket(const PacketQueuePtr & queue); + +struct ReceiverHelper +{ + const int concurrency; + const int source_num; + tipb::ExchangeReceiver pb_exchange_receiver; + std::vector fields; + mpp::TaskMeta task_meta; + std::vector queues; + std::shared_ptr join_ptr; + + explicit ReceiverHelper(int concurrency_, int source_num_); + MockExchangeReceiverPtr buildReceiver(); + std::vector buildExchangeReceiverStream(); + BlockInputStreamPtr buildUnionStream(); + BlockInputStreamPtr buildUnionStreamWithHashJoinBuildStream(); + void finish(); +}; + +struct SenderHelper +{ + const int source_num; + const int concurrency; + + std::vector queues; + std::vector mock_writers; + std::vector tunnels; + MockTunnelSetPtr tunnel_set; + std::unique_ptr dag_context; + + SenderHelper( + int source_num_, + int concurrency_, + const std::vector & queues_, + const std::vector & fields); + + // Using MockBlockInputStream to build streams. + BlockInputStreamPtr buildUnionStream(StopFlag & stop_flag, const std::vector & blocks); + // Using MockFixedRowsBlockInputStream to build streams. + BlockInputStreamPtr buildUnionStream(size_t total_rows, const std::vector & blocks); + + void finish(); +}; + +class ExchangeBench : public benchmark::Fixture +{ +public: + void SetUp(const benchmark::State &) override; + void TearDown(const benchmark::State &) override; + void runAndWait(std::shared_ptr receiver_helper, + BlockInputStreamPtr receiver_stream, + std::shared_ptr & sender_helper, + BlockInputStreamPtr sender_stream); + + std::vector input_blocks; +}; + + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/tests/bench_window.cpp b/dbms/src/Flash/tests/bench_window.cpp new file mode 100644 index 00000000000..da9df20fdf3 --- /dev/null +++ b/dbms/src/Flash/tests/bench_window.cpp @@ -0,0 +1,107 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +namespace tests +{ +class WindowFunctionBench : public ExchangeBench +{ +public: + void SetUp(const benchmark::State & state) override + { + // build tipb::Window and tipb::Sort. + // select row_number() over w1 from t1 window w1 as (partition by c1, c2, c3 order by c1, c2, c3); + ExchangeBench::SetUp(state); + MockColumnInfos columns{ + {"c1", TiDB::TP::TypeLongLong}, + {"c2", TiDB::TP::TypeString}, + {"c3", TiDB::TP::TypeLongLong}, + }; + size_t executor_index = 0; + DAGRequestBuilder builder(executor_index); + builder + .mockTable("test", "t1", columns) + .sort({{"c1", false}, {"c2", false}, {"c3", false}}, true) + .window(RowNumber(), + {{"c1", false}, {"c2", false}, {"c3", false}}, + {{"c1", false}, {"c2", false}, {"c3", false}}, + buildDefaultRowsFrame()); + tipb::DAGRequest req; + MPPInfo mpp_info(0, -1, -1, {}, std::unordered_map>{}); + builder.getRoot()->toTiPBExecutor(req.mutable_root_executor(), /*collator_id=*/0, mpp_info, TiFlashTestEnv::getContext()); + assert(req.root_executor().tp() == tipb::TypeWindow); + window = req.root_executor().window(); + assert(window.child().tp() == tipb::TypeSort); + sort = window.child().sort(); + } + + void prepareWindowStream(Context & context, int concurrency, int source_num, int total_rows, const std::vector & blocks, BlockInputStreamPtr & sender_stream, BlockInputStreamPtr & receiver_stream, std::shared_ptr & sender_helper, std::shared_ptr & receiver_helper) const + { + DAGPipeline pipeline; + receiver_helper = std::make_shared(concurrency, source_num); + pipeline.streams = receiver_helper->buildExchangeReceiverStream(); + + sender_helper = std::make_shared(source_num, concurrency, receiver_helper->queues, receiver_helper->fields); + sender_stream = sender_helper->buildUnionStream(total_rows, blocks); + + context.setDAGContext(sender_helper->dag_context.get()); + std::vector source_columns{ + NameAndTypePair("c1", makeNullable(std::make_shared())), + NameAndTypePair("c2", makeNullable(std::make_shared())), + NameAndTypePair("c3", makeNullable(std::make_shared()))}; + auto mock_interpreter = mockInterpreter(context, source_columns, concurrency); + mock_interpreter->input_streams_vec.push_back(pipeline.streams); + mockExecuteWindowOrder(mock_interpreter, pipeline, sort); + mockExecuteWindow(mock_interpreter, pipeline, window); + pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, 8192, 0, "mock_executor_id_squashing"); + }); + receiver_stream = std::make_shared>(pipeline.streams, nullptr, concurrency, /*req_id=*/""); + } + + tipb::Window window; + tipb::Sort sort; +}; + +BENCHMARK_DEFINE_F(WindowFunctionBench, basic_row_number) +(benchmark::State & state) +try +{ + const int concurrency = state.range(0); + const int source_num = state.range(1); + const int total_rows = state.range(2); + Context context = TiFlashTestEnv::getContext(); + + for (auto _ : state) + { + std::shared_ptr sender_helper; + std::shared_ptr receiver_helper; + BlockInputStreamPtr sender_stream; + BlockInputStreamPtr receiver_stream; + + prepareWindowStream(context, concurrency, source_num, total_rows, input_blocks, sender_stream, receiver_stream, sender_helper, receiver_helper); + + runAndWait(receiver_helper, receiver_stream, sender_helper, sender_stream); + } +} +CATCH +BENCHMARK_REGISTER_F(WindowFunctionBench, basic_row_number) + ->Args({8, 1, 1024 * 1000}); + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/tests/exchange_perftest.cpp b/dbms/src/Flash/tests/exchange_perftest.cpp deleted file mode 100644 index c2e047bec62..00000000000 --- a/dbms/src/Flash/tests/exchange_perftest.cpp +++ /dev/null @@ -1,699 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include // to include the implementation of StreamingDAGResponseWriter -#include // to include the implementation of ExchangeReceiver -#include // to include the implementation of MPPTunnel -#include // to include the implementation of MPPTunnelSet -#include -#include -#include - -namespace DB::tests -{ -namespace -{ -std::random_device rd; - -using Packet = mpp::MPPDataPacket; -using PacketPtr = std::shared_ptr; -using PacketQueue = MPMCQueue; -using PacketQueuePtr = std::shared_ptr; -using StopFlag = std::atomic; - -std::atomic received_data_size{0}; - -struct MockReceiverContext -{ - struct Status - { - int status_code = 0; - String error_msg; - - bool ok() const - { - return status_code == 0; - } - - const String & error_message() const - { - return error_msg; - } - - int error_code() const - { - return status_code; - } - }; - - struct Request - { - String debugString() const - { - return "{Request}"; - } - - int source_index = 0; - int send_task_id = 0; - int recv_task_id = -1; - }; - - struct Reader - { - explicit Reader(const PacketQueuePtr & queue_) - : queue(queue_) - {} - - void initialize() const - { - } - - bool read(PacketPtr & packet [[maybe_unused]]) const - { - PacketPtr res; - if (queue->pop(res)) - { - received_data_size.fetch_add(res->ByteSizeLong()); - *packet = *res; // avoid change shared packets - return true; - } - return false; - } - - Status finish() const - { - return {0, ""}; - } - - PacketQueuePtr queue; - }; - - MockReceiverContext( - const std::vector & queues_, - const std::vector & field_types_) - : queues(queues_) - , field_types(field_types_) - { - } - - void fillSchema(DAGSchema & schema) const - { - schema.clear(); - for (size_t i = 0; i < field_types.size(); ++i) - { - String name = "exchange_receiver_" + std::to_string(i); - ColumnInfo info = TiDB::fieldTypeToColumnInfo(field_types[i]); - schema.emplace_back(std::move(name), std::move(info)); - } - } - - Request makeRequest(int index) const - { - return {index, index, -1}; - } - - std::shared_ptr makeReader(const Request & request) - { - return std::make_shared(queues[request.send_task_id]); - } - - static Status getStatusOK() - { - return {0, ""}; - } - - std::vector queues; - std::vector field_types; -}; - -using MockExchangeReceiver = ExchangeReceiverBase; -using MockExchangeReceiverPtr = std::shared_ptr; -using MockExchangeReceiverInputStream = TiRemoteBlockInputStream; - -struct MockWriter -{ - explicit MockWriter(PacketQueuePtr queue_) - : queue(std::move(queue_)) - {} - - bool Write(const Packet & packet) - { - queue->push(std::make_shared(packet)); - return true; - } - - void finish() - { - queue->finish(); - } - - PacketQueuePtr queue; -}; - -using MockWriterPtr = std::shared_ptr; -using MockTunnel = MPPTunnelBase; -using MockTunnelPtr = std::shared_ptr; -using MockTunnelSet = MPPTunnelSetBase; -using MockTunnelSetPtr = std::shared_ptr; - -struct MockBlockInputStream : public IProfilingBlockInputStream -{ - const std::vector & blocks; - Block header; - std::mt19937 mt; - std::uniform_int_distribution dist; - StopFlag & stop_flag; - - MockBlockInputStream(const std::vector & blocks_, StopFlag & stop_flag_) - : blocks(blocks_) - , header(blocks[0].cloneEmpty()) - , mt(rd()) - , dist(0, blocks.size() - 1) - , stop_flag(stop_flag_) - {} - - String getName() const override { return "MockBlockInputStream"; } - Block getHeader() const override { return header; } - - Block readImpl() override - { - if (stop_flag.load(std::memory_order_relaxed)) - return Block{}; - return blocks[dist(mt)]; - } -}; - -Block makeBlock(int row_num) -{ - std::mt19937 mt(rd()); - std::uniform_int_distribution int64_dist; - std::uniform_int_distribution len_dist(10, 20); - std::uniform_int_distribution char_dist; - - InferredDataVector> int64_vec; - InferredDataVector> int64_vec2; - for (int i = 0; i < row_num; ++i) - { - int64_vec.emplace_back(int64_dist(mt)); - int64_vec2.emplace_back(int64_dist(mt)); - } - - InferredDataVector> string_vec; - for (int i = 0; i < row_num; ++i) - { - int len = len_dist(mt); - String s; - for (int j = 0; j < len; ++j) - s.push_back(char_dist(mt)); - string_vec.push_back(std::move(s)); - } - - auto int64_data_type = makeDataType>(); - ColumnWithTypeAndName int64_column(makeColumn>(int64_data_type, int64_vec), int64_data_type, "int64_1"); - ColumnWithTypeAndName int64_column2(makeColumn>(int64_data_type, int64_vec2), int64_data_type, "int64_2"); - - auto string_data_type = makeDataType>(); - ColumnWithTypeAndName string_column(makeColumn>(string_data_type, string_vec), string_data_type, "string"); - - return Block({int64_column, string_column, int64_column2}); -} - -std::vector makeBlocks(int block_num, int row_num) -{ - std::vector blocks; - for (int i = 0; i < block_num; ++i) - blocks.push_back(makeBlock(row_num)); - return blocks; -} - -mpp::MPPDataPacket makePacket(ChunkCodecStream & codec, int row_num) -{ - auto block = makeBlock(row_num); - codec.encode(block, 0, row_num); - - mpp::MPPDataPacket packet; - packet.add_chunks(codec.getString()); - codec.clear(); - - return packet; -} - -std::vector makePackets(ChunkCodecStream & codec, int packet_num, int row_num) -{ - std::vector packets; - for (int i = 0; i < packet_num; ++i) - packets.push_back(std::make_shared(makePacket(codec, row_num))); - return packets; -} - -std::vector makePacketQueues(int source_num, int queue_size) -{ - std::vector queues; - for (int i = 0; i < source_num; ++i) - queues.push_back(std::make_shared(queue_size)); - return queues; -} - -std::vector makeFields() -{ - std::vector fields(3); - fields[0].set_tp(TiDB::TypeLongLong); - fields[1].set_tp(TiDB::TypeString); - fields[2].set_tp(TiDB::TypeLongLong); - return fields; -} - -void printException(const Exception & e) -{ - std::string text = e.displayText(); - - auto embedded_stack_trace_pos = text.find("Stack trace"); - std::cerr << "Code: " << e.code() << ". " << text << std::endl - << std::endl; - if (std::string::npos == embedded_stack_trace_pos) - std::cerr << "Stack trace:" << std::endl - << e.getStackTrace().toString() << std::endl; -} - -void sendPacket(const std::vector & packets, const PacketQueuePtr & queue, StopFlag & stop_flag) -{ - std::mt19937 mt(rd()); - std::uniform_int_distribution dist(0, packets.size() - 1); - - while (!stop_flag.load()) - { - int i = dist(mt); - queue->tryPush(packets[i], std::chrono::milliseconds(10)); - } - queue->finish(); -} - -void receivePacket(const PacketQueuePtr & queue) -{ - while (true) - { - PacketPtr packet; - if (queue->pop(packet)) - received_data_size.fetch_add(packet->ByteSizeLong()); - else - break; - } -} - -template -void readBlock(BlockInputStreamPtr stream) -{ - [[maybe_unused]] auto get_rate = [](auto count, auto duration) { - return count * 1000 / duration.count(); - }; - - [[maybe_unused]] auto get_mib = [](auto v) { - return v / 1024 / 1024; - }; - - [[maybe_unused]] auto start = std::chrono::high_resolution_clock::now(); - [[maybe_unused]] auto second_ago = start; - [[maybe_unused]] Int64 block_count = 0; - [[maybe_unused]] Int64 last_block_count = 0; - [[maybe_unused]] Int64 last_data_size = received_data_size.load(); - try - { - stream->readPrefix(); - while (auto block = stream->read()) - { - if constexpr (print_progress) - { - ++block_count; - auto cur = std::chrono::high_resolution_clock::now(); - auto duration = std::chrono::duration_cast(cur - second_ago); - if (duration.count() >= 1000) - { - Int64 data_size = received_data_size.load(); - std::cout - << fmt::format( - "Blocks: {:<10} Data(MiB): {:<8} Block/s: {:<6} Data/s(MiB): {:<6}", - block_count, - get_mib(data_size), - get_rate(block_count - last_block_count, duration), - get_mib(get_rate(data_size - last_data_size, duration))) - << std::endl; - second_ago = cur; - last_block_count = block_count; - last_data_size = data_size; - } - } - } - stream->readSuffix(); - - if constexpr (print_progress) - { - auto cur = std::chrono::high_resolution_clock::now(); - auto duration = std::chrono::duration_cast(cur - start); - Int64 data_size = received_data_size.load(); - std::cout - << fmt::format( - "End. Blocks: {:<10} Data(MiB): {:<8} Block/s: {:<6} Data/s(MiB): {:<6}", - block_count, - get_mib(data_size), - get_rate(block_count, duration), - get_mib(get_rate(data_size, duration))) - << std::endl; - } - } - catch (const Exception & e) - { - printException(e); - throw; - } -} - -struct ReceiverHelper -{ - const int source_num; - tipb::ExchangeReceiver pb_exchange_receiver; - std::vector fields; - mpp::TaskMeta task_meta; - std::vector queues; - std::shared_ptr join_ptr; - - explicit ReceiverHelper(int source_num_) - : source_num(source_num_) - { - pb_exchange_receiver.set_tp(tipb::Hash); - for (int i = 0; i < source_num; ++i) - { - mpp::TaskMeta task; - task.set_start_ts(0); - task.set_task_id(i); - task.set_partition_id(i); - task.set_address(""); - - String encoded_task; - task.SerializeToString(&encoded_task); - - pb_exchange_receiver.add_encoded_task_meta(encoded_task); - } - - fields = makeFields(); - *pb_exchange_receiver.add_field_types() = fields[0]; - *pb_exchange_receiver.add_field_types() = fields[1]; - *pb_exchange_receiver.add_field_types() = fields[2]; - - task_meta.set_task_id(100); - - queues = makePacketQueues(source_num, 10); - } - - MockExchangeReceiverPtr buildReceiver() - { - return std::make_shared( - std::make_shared(queues, fields), - source_num, - source_num * 5, - nullptr); - } - - BlockInputStreamPtr buildUnionStream(int concurrency) - { - auto receiver = buildReceiver(); - std::vector streams; - for (int i = 0; i < concurrency; ++i) - streams.push_back(std::make_shared(receiver, nullptr)); - return std::make_shared>(streams, nullptr, concurrency, /*req_id=*/""); - } - - BlockInputStreamPtr buildUnionStreamWithHashJoinBuildStream(int concurrency) - { - auto receiver = buildReceiver(); - std::vector streams; - for (int i = 0; i < concurrency; ++i) - streams.push_back(std::make_shared(receiver, nullptr)); - - auto receiver_header = streams.front()->getHeader(); - auto key_name = receiver_header.getByPosition(0).name; - - join_ptr = std::make_shared( - Names{key_name}, - Names{key_name}, - true, - SizeLimits(0, 0, OverflowMode::THROW), - ASTTableJoin::Kind::Inner, - ASTTableJoin::Strictness::All, - /*req_id=*/"", - TiDB::TiDBCollators{nullptr}, - "", - "", - "", - "", - nullptr, - 65536); - - join_ptr->init(receiver_header, concurrency); - - for (int i = 0; i < concurrency; ++i) - streams[i] = std::make_shared(streams[i], join_ptr, i, /*req_id=*/""); - - return std::make_shared>(streams, nullptr, concurrency, /*req_id=*/""); - } - - void finish() - { - if (join_ptr) - { - join_ptr->setBuildTableState(Join::BuildTableState::SUCCEED); - std::cout << fmt::format("Hash table size: {} bytes", join_ptr->getTotalByteCount()) << std::endl; - } - } -}; - -struct SenderHelper -{ - const int source_num; - const int concurrency; - - std::vector queues; - std::vector mock_writers; - std::vector tunnels; - MockTunnelSetPtr tunnel_set; - std::unique_ptr dag_context; - - SenderHelper( - int source_num_, - int concurrency_, - const std::vector & queues_, - const std::vector & fields) - : source_num(source_num_) - , concurrency(concurrency_) - , queues(queues_) - { - mpp::TaskMeta task_meta; - tunnel_set = std::make_shared(); - for (int i = 0; i < source_num; ++i) - { - auto writer = std::make_shared(queues[i]); - mock_writers.push_back(writer); - - auto tunnel = std::make_shared( - task_meta, - task_meta, - std::chrono::seconds(60), - concurrency, - false); - tunnel->connect(writer.get()); - tunnels.push_back(tunnel); - tunnel_set->addTunnel(tunnel); - } - - tipb::DAGRequest dag_request; - tipb::Executor root_executor; - root_executor.set_executor_id("ExchangeSender_100"); - *dag_request.mutable_root_executor() = root_executor; - - dag_context = std::make_unique(dag_request); - dag_context->is_mpp_task = true; - dag_context->is_root_mpp_task = false; - dag_context->encode_type = tipb::EncodeType::TypeCHBlock; - dag_context->result_field_types = fields; - } - - BlockInputStreamPtr buildUnionStream( - StopFlag & stop_flag, - const std::vector & blocks) - { - std::vector send_streams; - for (int i = 0; i < concurrency; ++i) - { - BlockInputStreamPtr stream = std::make_shared(blocks, stop_flag); - std::unique_ptr response_writer( - new StreamingDAGResponseWriter( - tunnel_set, - {0, 1, 2}, - TiDB::TiDBCollators(3), - tipb::Hash, - -1, - -1, - true, - *dag_context)); - send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); - } - - return std::make_shared>(send_streams, nullptr, concurrency, /*req_id=*/""); - } - - void finish() - { - for (size_t i = 0; i < tunnels.size(); ++i) - { - tunnels[i]->writeDone(); - tunnels[i]->waitForFinish(); - mock_writers[i]->finish(); - } - } -}; - -void testOnlyReceiver(int concurrency, int source_num, int block_rows, int seconds) -{ - ReceiverHelper receiver_helper(source_num); - auto union_input_stream = receiver_helper.buildUnionStream(concurrency); - - auto chunk_codec_stream = CHBlockChunkCodec().newCodecStream(receiver_helper.fields); - auto packets = makePackets(*chunk_codec_stream, 100, block_rows); - - StopFlag stop_flag(false); - - std::vector threads; - for (const auto & queue : receiver_helper.queues) - threads.emplace_back(sendPacket, std::cref(packets), queue, std::ref(stop_flag)); - threads.emplace_back(readBlock, union_input_stream); - - std::this_thread::sleep_for(std::chrono::seconds(seconds)); - stop_flag.store(true); - for (auto & thread : threads) - thread.join(); - - receiver_helper.finish(); -} - -template -void testSenderReceiver(int concurrency, int source_num, int block_rows, int seconds) -{ - ReceiverHelper receiver_helper(source_num); - BlockInputStreamPtr union_receive_stream; - if constexpr (with_join) - union_receive_stream = receiver_helper.buildUnionStreamWithHashJoinBuildStream(concurrency); - else - union_receive_stream = receiver_helper.buildUnionStream(concurrency); - - StopFlag stop_flag(false); - auto blocks = makeBlocks(100, block_rows); - - SenderHelper sender_helper(source_num, concurrency, receiver_helper.queues, receiver_helper.fields); - auto union_send_stream = sender_helper.buildUnionStream(stop_flag, blocks); - - auto write_thread = std::thread(readBlock, union_send_stream); - auto read_thread = std::thread(readBlock, union_receive_stream); - - std::this_thread::sleep_for(std::chrono::seconds(seconds)); - stop_flag.store(true); - - write_thread.join(); - sender_helper.finish(); - - read_thread.join(); - receiver_helper.finish(); -} - -void testOnlySender(int concurrency, int source_num, int block_rows, int seconds) -{ - auto queues = makePacketQueues(source_num, 10); - auto fields = makeFields(); - - StopFlag stop_flag(false); - auto blocks = makeBlocks(100, block_rows); - - SenderHelper sender_helper(source_num, concurrency, queues, fields); - auto union_send_stream = sender_helper.buildUnionStream(stop_flag, blocks); - - auto write_thread = std::thread(readBlock, union_send_stream); - std::vector read_threads; - for (int i = 0; i < source_num; ++i) - read_threads.emplace_back(receivePacket, queues[i]); - - std::this_thread::sleep_for(std::chrono::seconds(seconds)); - stop_flag.store(true); - - write_thread.join(); - sender_helper.finish(); - - for (auto & t : read_threads) - t.join(); -} - -} // namespace -} // namespace DB::tests - -int main(int argc [[maybe_unused]], char ** argv [[maybe_unused]]) -{ - if (argc < 2 || argc > 6) - { - std::cerr << fmt::format("Usage: {} [receiver|sender|sender_receiver|sender_receiver_join] ", argv[0]) << std::endl; - exit(1); - } - - String method = argv[1]; - int concurrency = argc >= 3 ? atoi(argv[2]) : 5; - int source_num = argc >= 4 ? atoi(argv[3]) : 2; - int block_rows = argc >= 5 ? atoi(argv[4]) : 5000; - int seconds = argc >= 6 ? atoi(argv[5]) : 10; - - using TestHandler = std::function; - std::unordered_map handlers = { - {"receiver", DB::tests::testOnlyReceiver}, - {"sender", DB::tests::testOnlySender}, - {"sender_receiver", DB::tests::testSenderReceiver}, - {"sender_receiver_join", DB::tests::testSenderReceiver}, - }; - - auto it = handlers.find(method); - if (it != handlers.end()) - { - std::cout - << fmt::format( - "{}. concurrency = {}. source_num = {}. block_rows = {}. seconds = {}", - method, - concurrency, - source_num, - block_rows, - seconds) - << std::endl; - it->second(concurrency, source_num, block_rows, seconds); - } - else - { - std::cerr << "Unknown method: " << method << std::endl; - exit(1); - } -} diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index e1ccbdbb010..2cf8a939b58 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -440,4 +440,5 @@ DAGRequestBuilder MockDAGRequestContext::receive(String exchange_name) } return builder; } -} // namespace DB::tests \ No newline at end of file + +} // namespace DB::tests diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index d52b5ec674a..c11635ac93e 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -188,4 +188,4 @@ MockWindowFrame buildDefaultRowsFrame(); #define Rank() makeASTFunction("Rank") #define DenseRank() makeASTFunction("DenseRank") -} // namespace DB::tests \ No newline at end of file +} // namespace DB::tests diff --git a/libs/libcommon/include/common/types.h b/libs/libcommon/include/common/types.h index 139fc10e980..87c7215d91f 100644 --- a/libs/libcommon/include/common/types.h +++ b/libs/libcommon/include/common/types.h @@ -25,6 +25,7 @@ #if defined(__clang__) #pragma GCC diagnostic ignored "-Wunknown-warning-option" #pragma GCC diagnostic ignored "-Wdeprecated-copy" +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" #pragma GCC diagnostic ignored "-Wtautological-constant-out-of-range-compare" #endif #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"