Skip to content

Commit

Permalink
Interpreter: Run interpreter test with exchange executors (#4858)
Browse files Browse the repository at this point in the history
ref #4609
  • Loading branch information
ywqzzy authored May 17, 2022
1 parent e03dcc1 commit 6b8ca3d
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 20 deletions.
57 changes: 57 additions & 0 deletions dbms/src/DataStreams/MockExchangeReceiverInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/MockExchangeReceiverInputStream.h>
#include <Flash/Coprocessor/ChunkCodec.h>

namespace DB
{
MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(const tipb::ExchangeReceiver & receiver, size_t max_block_size, size_t rows_)
: output_index(0)
, max_block_size(max_block_size)
, rows(rows_)
{
for (int i = 0; i < receiver.field_types_size(); ++i)
{
columns.emplace_back(
getDataTypeByColumnInfoForComputingLayer(TiDB::fieldTypeToColumnInfo(receiver.field_types(i))),
fmt::format("exchange_receiver_{}", i));
}
}

ColumnPtr MockExchangeReceiverInputStream::makeColumn(ColumnWithTypeAndName elem) const
{
auto column = elem.type->createColumn();
size_t row_count = 0;
for (size_t i = output_index; (i < rows) & (row_count < max_block_size); ++i)
{
column->insert((*elem.column)[i]);
++row_count;
}
return column;
}

Block MockExchangeReceiverInputStream::readImpl()
{
if (output_index >= rows)
return {};
ColumnsWithTypeAndName output_columns;
for (const auto & elem : columns)
{
output_columns.push_back({makeColumn(elem), elem.type, elem.name, elem.column_id});
}
output_index += max_block_size;
return Block(output_columns);
}
} // namespace DB
41 changes: 41 additions & 0 deletions dbms/src/DataStreams/MockExchangeReceiverInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/Transaction/TypeMapping.h>
#include <tipb/executor.pb.h>

namespace DB
{
/// Mock the receiver like table scan, can mock blocks according to the receiver's schema.
/// TODO: Mock the receiver process
class MockExchangeReceiverInputStream : public IProfilingBlockInputStream
{
public:
MockExchangeReceiverInputStream(const tipb::ExchangeReceiver & receiver, size_t max_block_size, size_t rows_);
Block getHeader() const override { return Block(columns); }
String getName() const override { return "MockExchangeReceiver"; }
ColumnsWithTypeAndName columns;
size_t output_index;
size_t max_block_size;
size_t rows;

protected:
Block readImpl() override;
ColumnPtr makeColumn(ColumnWithTypeAndName elem) const;
};

} // namespace DB
46 changes: 46 additions & 0 deletions dbms/src/DataStreams/MockExchangeSenderInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/MockExchangeSenderInputStream.h>
namespace DB
{
MockExchangeSenderInputStream::MockExchangeSenderInputStream(
const BlockInputStreamPtr & input,
const String & req_id)
: log(Logger::get(NAME, req_id))
{
children.push_back(input);
}

Block MockExchangeSenderInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
}

return totals;
}

Block MockExchangeSenderInputStream::getHeader() const
{
return children.back()->getHeader();
}

Block MockExchangeSenderInputStream::readImpl()
{
return children.back()->read();
}

} // namespace DB
46 changes: 46 additions & 0 deletions dbms/src/DataStreams/MockExchangeSenderInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/Transaction/TypeMapping.h>
#include <tipb/executor.pb.h>

namespace DB
{
/// Currently, this operator do nothing with the block.
/// TODO: Mock the sender process
class MockExchangeSenderInputStream : public IProfilingBlockInputStream
{
private:
static constexpr auto NAME = "MockExchangeSender";

public:
MockExchangeSenderInputStream(
const BlockInputStreamPtr & input,
const String & req_id);

String getName() const override { return NAME; }
Block getTotals() override;
Block getHeader() const override;

protected:
Block readImpl() override;

private:
const LoggerPtr log;
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/MockTableScanBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ColumnPtr MockTableScanBlockInputStream::makeColumn(ColumnWithTypeAndName elem)
{
auto column = elem.type->createColumn();
size_t row_count = 0;
for (size_t i = output_index; i < rows & row_count < max_block_size; ++i)
for (size_t i = output_index; (i < rows) & (row_count < max_block_size); ++i)
{
column->insert((*elem.column)[i]);
row_count++;
Expand Down
63 changes: 47 additions & 16 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/FailPoint.h>
#include <Common/TiFlashException.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ExchangeSenderBlockInputStream.h>
Expand All @@ -23,6 +24,8 @@
#include <DataStreams/HashJoinProbeBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/MockExchangeReceiverInputStream.h>
#include <DataStreams/MockExchangeSenderInputStream.h>
#include <DataStreams/MockTableScanBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/ParallelAggregatingBlockInputStream.h>
Expand Down Expand Up @@ -51,6 +54,7 @@
#include <Storages/Transaction/TiDB.h>
#include <WindowFunctions/WindowFunctionFactory.h>


namespace DB
{
namespace FailPoints
Expand Down Expand Up @@ -190,7 +194,7 @@ void DAGQueryBlockInterpreter::prepareJoin(
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters,
String & filter_column_name)
{
std::vector<NameAndTypePair> source_columns;
NamesAndTypes source_columns;
for (auto const & p : pipeline.firstStream()->getHeader().getNamesAndTypesList())
source_columns.emplace_back(p.name, p.type);
DAGExpressionAnalyzer dag_analyzer(std::move(source_columns), context);
Expand All @@ -203,7 +207,7 @@ void DAGQueryBlockInterpreter::prepareJoin(

ExpressionActionsPtr DAGQueryBlockInterpreter::genJoinOtherConditionAction(
const tipb::Join & join,
std::vector<NameAndTypePair> & source_columns,
NamesAndTypes & source_columns,
String & filter_column_for_other_condition,
String & filter_column_for_other_eq_condition)
{
Expand Down Expand Up @@ -326,7 +330,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
right_pipeline.streams = input_streams_vec[1];
}

std::vector<NameAndTypePair> join_output_columns;
NamesAndTypes join_output_columns;
/// columns_for_other_join_filter is a vector of columns used
/// as the input columns when compiling other join filter.
/// Note the order in the column vector is very important:
Expand All @@ -336,7 +340,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
/// append the extra columns afterwards. In order to figure out
/// whether a given column is already in the column vector or
/// not quickly, we use another set to store the column names
std::vector<NameAndTypePair> columns_for_other_join_filter;
NamesAndTypes columns_for_other_join_filter;
std::unordered_set<String> column_set_for_other_join_filter;
bool make_nullable = join.join_type() == tipb::JoinType::TypeRightOuterJoin;
for (auto const & p : input_streams_vec[0][0]->getHeader().getNamesAndTypesList())
Expand Down Expand Up @@ -479,7 +483,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
right_query.join = join_ptr;
right_query.join->setSampleBlock(right_query.source->getHeader());

std::vector<NameAndTypePair> source_columns;
NamesAndTypes source_columns;
for (const auto & p : left_pipeline.streams[0]->getHeader().getNamesAndTypesList())
source_columns.emplace_back(p.name, p.type);
DAGExpressionAnalyzer dag_analyzer(std::move(source_columns), context);
Expand Down Expand Up @@ -617,7 +621,7 @@ void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDe
orderStreams(pipeline, sort_desc, 0);
}

void DAGQueryBlockInterpreter::executeOrder(DAGPipeline & pipeline, const std::vector<NameAndTypePair> & order_columns)
void DAGQueryBlockInterpreter::executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns)
{
Int64 limit = query_block.limit_or_topn->topn().limit();
orderStreams(pipeline, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit);
Expand Down Expand Up @@ -673,25 +677,39 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline)
stream = std::make_shared<SquashingBlockInputStream>(stream, 8192, 0, log->identifier());
pipeline.streams.push_back(stream);
}
std::vector<NameAndTypePair> source_columns;
Block block = pipeline.firstStream()->getHeader();
for (const auto & col : block.getColumnsWithTypeAndName())
NamesAndTypes source_columns;
for (const auto & col : pipeline.firstStream()->getHeader())
{
source_columns.emplace_back(col.name, col.type);
}
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}

void DAGQueryBlockInterpreter::handleMockExchangeReceiver(DAGPipeline & pipeline)
{
for (size_t i = 0; i < max_streams; ++i)
{
// use max_block_size / 10 to determine the mock block's size
pipeline.streams.push_back(std::make_shared<MockExchangeReceiverInputStream>(query_block.source->exchange_receiver(), context.getSettingsRef().max_block_size, context.getSettingsRef().max_block_size / 10));
}
NamesAndTypes source_columns;
for (const auto & col : pipeline.firstStream()->getHeader())
{
source_columns.emplace_back(NameAndTypePair(col.name, col.type));
source_columns.emplace_back(col.name, col.type);
}
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}

void DAGQueryBlockInterpreter::handleProjection(DAGPipeline & pipeline, const tipb::Projection & projection)
{
std::vector<NameAndTypePair> input_columns;
NamesAndTypes input_columns;
pipeline.streams = input_streams_vec[0];
for (auto const & p : pipeline.firstStream()->getHeader().getNamesAndTypesList())
input_columns.emplace_back(p.name, p.type);
DAGExpressionAnalyzer dag_analyzer(std::move(input_columns), context);
ExpressionActionsChain chain;
auto & last_step = dag_analyzer.initAndGetLastStep(chain);
std::vector<NameAndTypePair> output_columns;
NamesAndTypes output_columns;
NamesWithAliases project_cols;
UniqueNameGenerator unique_name_generator;
for (const auto & expr : projection.exprs())
Expand Down Expand Up @@ -758,7 +776,10 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
}
else if (query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver)
{
handleExchangeReceiver(pipeline);
if (unlikely(dagContext().isTest()))
handleMockExchangeReceiver(pipeline);
else
handleExchangeReceiver(pipeline);
recordProfileStreams(pipeline, query_block.source_name);
}
else if (query_block.source->tp() == tipb::ExecType::TypeProjection)
Expand All @@ -769,7 +790,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
else if (query_block.isTableScanSource())
{
TiDBTableScan table_scan(query_block.source, query_block.source_name, dagContext());
if (dagContext().isTest())
if (unlikely(dagContext().isTest()))
handleMockTableScan(table_scan, pipeline);
else
handleTableScan(table_scan, pipeline);
Expand Down Expand Up @@ -850,7 +871,10 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
// execute exchange_sender
if (query_block.exchange_sender)
{
handleExchangeSender(pipeline);
if (unlikely(dagContext().isTest()))
handleMockExchangeSender(pipeline);
else
handleExchangeSender(pipeline);
recordProfileStreams(pipeline, query_block.exchange_sender_name);
}
}
Expand Down Expand Up @@ -901,6 +925,13 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
});
}

void DAGQueryBlockInterpreter::handleMockExchangeSender(DAGPipeline & pipeline)
{
pipeline.transform([&](auto & stream) {
stream = std::make_shared<MockExchangeSenderInputStream>(stream, log->identifier());
});
}

void DAGQueryBlockInterpreter::restorePipelineConcurrency(DAGPipeline & pipeline)
{
if (query_block.can_restore_pipeline_concurrency)
Expand All @@ -919,4 +950,4 @@ BlockInputStreams DAGQueryBlockInterpreter::execute()

return pipeline.streams;
}
} // namespace DB
} // namespace DB
Loading

0 comments on commit 6b8ca3d

Please sign in to comment.