Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add HashJoinProbeBlockInputStream #4246

Merged
merged 12 commits into from
Mar 16, 2022
61 changes: 61 additions & 0 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include <DataStreams/HashJoinProbeBlockInputStream.h>
#include <Flash/Mpp/getMPPTaskLog.h>
#include <Interpreters/ExpressionActions.h>

namespace DB
{
HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const ExpressionActionsPtr & expression_,
const LogWithPrefixPtr & log_)
: log(getMPPTaskLog(log_, NAME))
{
children.push_back(input);

bool has_join_probe_action = false;
if (expression_)
{
for (const auto & action : expression_->getActions())
{
if (action.type == ExpressionAction::Type::JOIN)
{
has_join_probe_action = true;
break;
}
}
}
if (!has_join_probe_action)
{
throw Exception("join probe expression should have join action", ErrorCodes::LOGICAL_ERROR);
}
expression = expression_;
}

Block HashJoinProbeBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
expression->executeOnTotals(totals);
}

return totals;
}

Block HashJoinProbeBlockInputStream::getHeader() const
{
Block res = children.back()->getHeader();
expression->execute(res);
return res;
}

Block HashJoinProbeBlockInputStream::readImpl()
{
Block res = children.back()->read();
if (!res)
return res;
expression->execute(res);
return res;
}

} // namespace DB
36 changes: 36 additions & 0 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>

namespace DB
{
class ExpressionActions;

/** Executes a certain expression over the block.
* Expression should have join action.
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
*/
class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
static constexpr auto NAME = "HashJoinProbe";
SeaRise marked this conversation as resolved.
Show resolved Hide resolved

public:
HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const ExpressionActionsPtr & expression_,
const LogWithPrefixPtr & log_);

String getName() const override { return NAME; }
Block getTotals() override;
Block getHeader() const override;

protected:
Block readImpl() override;

private:
const LogWithPrefixPtr log;
ExpressionActionsPtr expression;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since its name is HashJoinProbeBlockInputStream, how about just put a single join expressionAction here? I think this will be the first step for #3436

Copy link
Contributor Author

@SeaRise SeaRise Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to use a single ExpressionAction...
But we can check that actions.size() == 1 && actions.back().type == JOIN...

};

} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/HashJoinBuildBlockInputStream.h>
#include <DataStreams/HashJoinProbeBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/ParallelAggregatingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/getLeastSupertype.h>
Expand Down Expand Up @@ -724,7 +724,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
}
}
for (auto & stream : pipeline.streams)
stream = std::make_shared<ExpressionBlockInputStream>(stream, chain.getLastActions(), taskLogger());
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, chain.getLastActions(), taskLogger());

/// add a project to remove all the useless column
NamesWithAliases project_cols;
Expand Down