Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise committed Mar 15, 2022
1 parent c37fb89 commit c63bc0f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 21 deletions.
32 changes: 13 additions & 19 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,26 @@ namespace DB
{
HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const ExpressionActionsPtr & expression_,
const ExpressionActionsPtr & join_probe_actions_,
const LogWithPrefixPtr & log_)
: log(getMPPTaskLog(log_, name))
, join_probe_actions(join_probe_actions_)
{
children.push_back(input);

bool has_join_probe_action = false;
if (expression_)
if (!join_probe_actions || join_probe_actions->getActions().size() != 1
|| join_probe_actions->getActions().back().type != ExpressionAction::Type::JOIN)
{
for (const auto & action : expression_->getActions())
{
if (action.type == ExpressionAction::Type::JOIN)
{
has_join_probe_action = true;
break;
}
}
throw Exception("isn't join probe actions", ErrorCodes::LOGICAL_ERROR);
}
if (!has_join_probe_action)
{
throw Exception("join probe expression should have join probe action", ErrorCodes::LOGICAL_ERROR);
}
expression = expression_;
}

Block HashJoinProbeBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
expression->executeOnTotals(totals);
join_probe_actions->executeOnTotals(totals);
}

return totals;
Expand All @@ -59,7 +48,7 @@ Block HashJoinProbeBlockInputStream::getTotals()
Block HashJoinProbeBlockInputStream::getHeader() const
{
Block res = children.back()->getHeader();
expression->execute(res);
join_probe_actions->execute(res);
return res;
}

Expand All @@ -68,7 +57,12 @@ Block HashJoinProbeBlockInputStream::readImpl()
Block res = children.back()->read();
if (!res)
return res;
expression->execute(res);

join_probe_actions->execute(res);

// TODO split block if block.size() > settings.max_block_size
// https://github.com/pingcap/tiflash/issues/3436

return res;
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
public:
HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const ExpressionActionsPtr & expression_,
const ExpressionActionsPtr & join_probe_actions_,
const LogWithPrefixPtr & log_);

String getName() const override { return name; }
Expand All @@ -49,7 +49,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream

private:
const LogWithPrefixPtr log;
ExpressionActionsPtr expression;
ExpressionActionsPtr join_probe_actions;
};

} // namespace DB

0 comments on commit c63bc0f

Please sign in to comment.