diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp new file mode 100644 index 00000000000..3657f099b74 --- /dev/null +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -0,0 +1,69 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( + const BlockInputStreamPtr & input, + const ExpressionActionsPtr & join_probe_actions_, + const LogWithPrefixPtr & log_) + : log(getMPPTaskLog(log_, name)) + , join_probe_actions(join_probe_actions_) +{ + children.push_back(input); + + if (!join_probe_actions || join_probe_actions->getActions().size() != 1 + || join_probe_actions->getActions().back().type != ExpressionAction::Type::JOIN) + { + throw Exception("isn't valid join probe actions", ErrorCodes::LOGICAL_ERROR); + } +} + +Block HashJoinProbeBlockInputStream::getTotals() +{ + if (IProfilingBlockInputStream * child = dynamic_cast(&*children.back())) + { + totals = child->getTotals(); + join_probe_actions->executeOnTotals(totals); + } + + return totals; +} + +Block HashJoinProbeBlockInputStream::getHeader() const +{ + Block res = children.back()->getHeader(); + join_probe_actions->execute(res); + return res; +} + +Block HashJoinProbeBlockInputStream::readImpl() +{ + Block res = children.back()->read(); + if (!res) + return res; + + join_probe_actions->execute(res); + + // TODO split block if block.size() > settings.max_block_size + // https://github.com/pingcap/tiflash/issues/3436 + + return res; +} + +} // namespace DB diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h new file mode 100644 index 00000000000..9c24ecd9190 --- /dev/null +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -0,0 +1,55 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +class ExpressionActions; + +/** Executes a certain expression over the block. + * Basically the same as ExpressionBlockInputStream, + * but requires that there must be a join probe action in the Expression. + * + * The join probe action is different from the general expression + * and needs to be executed after join hash map building. + * We should separate it from the ExpressionBlockInputStream. + */ +class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream +{ +private: + using ExpressionActionsPtr = std::shared_ptr; + static constexpr auto name = "HashJoinProbe"; + +public: + HashJoinProbeBlockInputStream( + const BlockInputStreamPtr & input, + const ExpressionActionsPtr & join_probe_actions_, + const LogWithPrefixPtr & log_); + + String getName() const override { return name; } + Block getTotals() override; + Block getHeader() const override; + +protected: + Block readImpl() override; + +private: + const LogWithPrefixPtr log; + ExpressionActionsPtr join_probe_actions; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index e24d57e138c..6f5dfd24ffd 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -27,7 +28,6 @@ #include #include #include -#include #include #include #include @@ -724,7 +724,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & } } for (auto & stream : pipeline.streams) - stream = std::make_shared(stream, chain.getLastActions(), taskLogger()); + stream = std::make_shared(stream, chain.getLastActions(), taskLogger()); /// add a project to remove all the useless column NamesWithAliases project_cols;