From 46ed6ac76d23f84b3823b2690af3c020b195ab28 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 14 Mar 2022 14:09:52 +0800 Subject: [PATCH 1/5] update --- .../HashJoinProbeBlockInputStream.cpp | 61 +++++++++++++++++++ .../HashJoinProbeBlockInputStream.h | 36 +++++++++++ .../Coprocessor/DAGQueryBlockInterpreter.cpp | 4 +- 3 files changed, 99 insertions(+), 2 deletions(-) create mode 100644 dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp create mode 100644 dbms/src/DataStreams/HashJoinProbeBlockInputStream.h diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp new file mode 100644 index 00000000000..3de72353f3d --- /dev/null +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -0,0 +1,61 @@ +#include +#include +#include + +namespace DB +{ +HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( + const BlockInputStreamPtr & input, + const ExpressionActionsPtr & expression_, + const LogWithPrefixPtr & log_) + : log(getMPPTaskLog(log_, NAME)) +{ + children.push_back(input); + + bool has_join_probe_action = false; + if (expression_) + { + for (const auto & action : expression_->getActions()) + { + if (action.type == ExpressionAction::Type::JOIN) + { + has_join_probe_action = true; + break; + } + } + } + if (!has_join_probe_action) + { + throw Exception("join probe expression should have join action", ErrorCodes::LOGICAL_ERROR); + } + expression = expression_; +} + +Block HashJoinProbeBlockInputStream::getTotals() +{ + if (IProfilingBlockInputStream * child = dynamic_cast(&*children.back())) + { + totals = child->getTotals(); + expression->executeOnTotals(totals); + } + + return totals; +} + +Block HashJoinProbeBlockInputStream::getHeader() const +{ + Block res = children.back()->getHeader(); + expression->execute(res); + return res; +} + +Block HashJoinProbeBlockInputStream::readImpl() +{ + Block res = children.back()->read(); + if (!res) + return res; + expression->execute(res); + return res; +} + +} // namespace DB diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h new file mode 100644 index 00000000000..d1bc9953541 --- /dev/null +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -0,0 +1,36 @@ +#pragma once + +#include + +namespace DB +{ +class ExpressionActions; + +/** Executes a certain expression over the block. + * Expression should have join action. + */ +class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream +{ +private: + using ExpressionActionsPtr = std::shared_ptr; + static constexpr auto NAME = "HashJoinProbe"; + +public: + HashJoinProbeBlockInputStream( + const BlockInputStreamPtr & input, + const ExpressionActionsPtr & expression_, + const LogWithPrefixPtr & log_); + + String getName() const override { return NAME; } + Block getTotals() override; + Block getHeader() const override; + +protected: + Block readImpl() override; + +private: + const LogWithPrefixPtr log; + ExpressionActionsPtr expression; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 5ff09d6d8ca..2a4de3d013b 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -13,7 +14,6 @@ #include #include #include -#include #include #include #include @@ -724,7 +724,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & } } for (auto & stream : pipeline.streams) - stream = std::make_shared(stream, chain.getLastActions(), taskLogger()); + stream = std::make_shared(stream, chain.getLastActions(), taskLogger()); /// add a project to remove all the useless column NamesWithAliases project_cols; From 9c7860cdd1371036c3799120b740990fbcadbbe0 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 14 Mar 2022 14:32:20 +0800 Subject: [PATCH 2/5] address comments --- .../src/DataStreams/HashJoinProbeBlockInputStream.cpp | 4 ++-- dbms/src/DataStreams/HashJoinProbeBlockInputStream.h | 11 ++++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 3de72353f3d..00b8b618582 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -8,7 +8,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const LogWithPrefixPtr & log_) - : log(getMPPTaskLog(log_, NAME)) + : log(getMPPTaskLog(log_, name)) { children.push_back(input); @@ -26,7 +26,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( } if (!has_join_probe_action) { - throw Exception("join probe expression should have join action", ErrorCodes::LOGICAL_ERROR); + throw Exception("join probe expression should have join probe action", ErrorCodes::LOGICAL_ERROR); } expression = expression_; } diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index d1bc9953541..620ef7d56ce 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -7,13 +7,18 @@ namespace DB class ExpressionActions; /** Executes a certain expression over the block. - * Expression should have join action. + * Basically the same as ExpressionBlockInputStream, + * but requires that there must be a join probe action in the Expression. + * + * The join probe action is different from the general expression + * and needs to be executed after join hash map building. + * We should separate it from the ExpressionBlockInputStream. */ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream { private: using ExpressionActionsPtr = std::shared_ptr; - static constexpr auto NAME = "HashJoinProbe"; + static constexpr auto name = "HashJoinProbe"; public: HashJoinProbeBlockInputStream( @@ -21,7 +26,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream const ExpressionActionsPtr & expression_, const LogWithPrefixPtr & log_); - String getName() const override { return NAME; } + String getName() const override { return name; } Block getTotals() override; Block getHeader() const override; From c37fb897ccb629a6870348eda62295f939239283 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 15 Mar 2022 18:02:05 +0800 Subject: [PATCH 3/5] add License --- .../DataStreams/HashJoinProbeBlockInputStream.cpp | 14 ++++++++++++++ .../DataStreams/HashJoinProbeBlockInputStream.h | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 00b8b618582..e35089b2583 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -1,3 +1,17 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #include #include #include diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index 620ef7d56ce..2c1d0cf8c78 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -1,3 +1,17 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #pragma once #include From c63bc0f612e03ebc8fce543784d09aa54488efba Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 15 Mar 2022 21:45:52 +0800 Subject: [PATCH 4/5] address comments --- .../HashJoinProbeBlockInputStream.cpp | 32 ++++++++----------- .../HashJoinProbeBlockInputStream.h | 4 +-- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index e35089b2583..013382df944 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -20,29 +20,18 @@ namespace DB { HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( const BlockInputStreamPtr & input, - const ExpressionActionsPtr & expression_, + const ExpressionActionsPtr & join_probe_actions_, const LogWithPrefixPtr & log_) : log(getMPPTaskLog(log_, name)) + , join_probe_actions(join_probe_actions_) { children.push_back(input); - bool has_join_probe_action = false; - if (expression_) + if (!join_probe_actions || join_probe_actions->getActions().size() != 1 + || join_probe_actions->getActions().back().type != ExpressionAction::Type::JOIN) { - for (const auto & action : expression_->getActions()) - { - if (action.type == ExpressionAction::Type::JOIN) - { - has_join_probe_action = true; - break; - } - } + throw Exception("isn't join probe actions", ErrorCodes::LOGICAL_ERROR); } - if (!has_join_probe_action) - { - throw Exception("join probe expression should have join probe action", ErrorCodes::LOGICAL_ERROR); - } - expression = expression_; } Block HashJoinProbeBlockInputStream::getTotals() @@ -50,7 +39,7 @@ Block HashJoinProbeBlockInputStream::getTotals() if (IProfilingBlockInputStream * child = dynamic_cast(&*children.back())) { totals = child->getTotals(); - expression->executeOnTotals(totals); + join_probe_actions->executeOnTotals(totals); } return totals; @@ -59,7 +48,7 @@ Block HashJoinProbeBlockInputStream::getTotals() Block HashJoinProbeBlockInputStream::getHeader() const { Block res = children.back()->getHeader(); - expression->execute(res); + join_probe_actions->execute(res); return res; } @@ -68,7 +57,12 @@ Block HashJoinProbeBlockInputStream::readImpl() Block res = children.back()->read(); if (!res) return res; - expression->execute(res); + + join_probe_actions->execute(res); + + // TODO split block if block.size() > settings.max_block_size + // https://github.com/pingcap/tiflash/issues/3436 + return res; } diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index 2c1d0cf8c78..9c24ecd9190 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -37,7 +37,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream public: HashJoinProbeBlockInputStream( const BlockInputStreamPtr & input, - const ExpressionActionsPtr & expression_, + const ExpressionActionsPtr & join_probe_actions_, const LogWithPrefixPtr & log_); String getName() const override { return name; } @@ -49,7 +49,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream private: const LogWithPrefixPtr log; - ExpressionActionsPtr expression; + ExpressionActionsPtr join_probe_actions; }; } // namespace DB From 850874d463e2ed28a096491dbb4745b37a01807b Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 16 Mar 2022 10:04:50 +0800 Subject: [PATCH 5/5] Update dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp --- dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 013382df944..3657f099b74 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -30,7 +30,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( if (!join_probe_actions || join_probe_actions->getActions().size() != 1 || join_probe_actions->getActions().back().type != ExpressionAction::Type::JOIN) { - throw Exception("isn't join probe actions", ErrorCodes::LOGICAL_ERROR); + throw Exception("isn't valid join probe actions", ErrorCodes::LOGICAL_ERROR); } }