diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index e35089b2583..013382df944 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -20,29 +20,18 @@ namespace DB { HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( const BlockInputStreamPtr & input, - const ExpressionActionsPtr & expression_, + const ExpressionActionsPtr & join_probe_actions_, const LogWithPrefixPtr & log_) : log(getMPPTaskLog(log_, name)) + , join_probe_actions(join_probe_actions_) { children.push_back(input); - bool has_join_probe_action = false; - if (expression_) + if (!join_probe_actions || join_probe_actions->getActions().size() != 1 + || join_probe_actions->getActions().back().type != ExpressionAction::Type::JOIN) { - for (const auto & action : expression_->getActions()) - { - if (action.type == ExpressionAction::Type::JOIN) - { - has_join_probe_action = true; - break; - } - } + throw Exception("isn't join probe actions", ErrorCodes::LOGICAL_ERROR); } - if (!has_join_probe_action) - { - throw Exception("join probe expression should have join probe action", ErrorCodes::LOGICAL_ERROR); - } - expression = expression_; } Block HashJoinProbeBlockInputStream::getTotals() @@ -50,7 +39,7 @@ Block HashJoinProbeBlockInputStream::getTotals() if (IProfilingBlockInputStream * child = dynamic_cast(&*children.back())) { totals = child->getTotals(); - expression->executeOnTotals(totals); + join_probe_actions->executeOnTotals(totals); } return totals; @@ -59,7 +48,7 @@ Block HashJoinProbeBlockInputStream::getTotals() Block HashJoinProbeBlockInputStream::getHeader() const { Block res = children.back()->getHeader(); - expression->execute(res); + join_probe_actions->execute(res); return res; } @@ -68,7 +57,12 @@ Block HashJoinProbeBlockInputStream::readImpl() Block res = children.back()->read(); if (!res) return res; - expression->execute(res); + + join_probe_actions->execute(res); + + // TODO split block if block.size() > settings.max_block_size + // https://github.com/pingcap/tiflash/issues/3436 + return res; } diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index 2c1d0cf8c78..9c24ecd9190 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -37,7 +37,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream public: HashJoinProbeBlockInputStream( const BlockInputStreamPtr & input, - const ExpressionActionsPtr & expression_, + const ExpressionActionsPtr & join_probe_actions_, const LogWithPrefixPtr & log_); String getName() const override { return name; } @@ -49,7 +49,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream private: const LogWithPrefixPtr log; - ExpressionActionsPtr expression; + ExpressionActionsPtr join_probe_actions; }; } // namespace DB