Skip to content

Commit

Permalink
fix session range comparision expression
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen committed Aug 7, 2024
1 parent d5dc45f commit aba4709
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 46 deletions.
23 changes: 2 additions & 21 deletions src/Interpreters/Streaming/WindowCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ ASTs checkAndExtractTumbleArguments(const ASTFunction * func_ast)

/// tumble(table, [timestamp_expr], win_interval, [timezone])
if (func_ast->children.size() != 1)
throw Exception(HOP_HELP_MESSAGE, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(TUMBLE_HELP_MESSAGE, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

const auto & args = func_ast->arguments->children;
if (args.size() < 2)
Expand Down Expand Up @@ -315,7 +315,7 @@ ASTs checkAndExtractSessionArguments(const ASTFunction * func_ast)
{
assert(isTableFunctionSession(func_ast));

/// session(stream, [timestamp_expr], timeout_interval, [max_emit_interval], [range_comparision])
/// session(stream, [timestamp_expr], timeout_interval, [max_emit_interval], [range_comparision] | [start_prediction, end_prediction])
if (func_ast->children.size() != 1)
throw Exception(SESSION_HELP_MESSAGE, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

Expand Down Expand Up @@ -347,11 +347,6 @@ ASTs checkAndExtractSessionArguments(const ASTFunction * func_ast)
/// Case: session(stream, timestamp, INTERVAL 5 SECOND, ...)
time_expr = args[i++];
}
else
{
/// Case: session(stream, INTERVAL 5 SECOND, ...)
time_expr = std::make_shared<ASTIdentifier>(ProtonConsts::RESERVED_EVENT_TIME);
}

if (isIntervalAST(args[i]))
{
Expand All @@ -372,12 +367,6 @@ ASTs checkAndExtractSessionArguments(const ASTFunction * func_ast)
/// session will emit.
max_session_size = args[i++];
}
else
{
/// Set default max session size.
auto [unit_nums, unit] = extractInterval(timeout_interval->as<ASTFunction>());
max_session_size = makeASTInterval(unit_nums * ProtonConsts::SESSION_SIZE_MULTIPLIER, unit);
}

/// Handle optional start_condition/end_condition
if (i < args.size())
Expand Down Expand Up @@ -407,14 +396,6 @@ ASTs checkAndExtractSessionArguments(const ASTFunction * func_ast)
end_with_inclusion = std::make_shared<ASTLiteral>(true);
}
}
else
{
/// OPT-3: If range predication is not assigned, any incoming event should be able to start a session window.
start_condition = std::make_shared<ASTLiteral>(true);
start_with_inclusion = std::make_shared<ASTLiteral>(true);
end_condition = std::make_shared<ASTLiteral>(false);
end_with_inclusion = std::make_shared<ASTLiteral>(true);
}

if (i != args.size())
break;
Expand Down
49 changes: 28 additions & 21 deletions src/Parsers/Streaming/ASTSessionRangeComparision.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@

namespace DB
{
ASTPtr ASTSessionRangeComparision::clone() const
{
auto res = std::make_shared<ASTSessionRangeComparision>(*this);
res->children.clear();
for (auto & child : children)
res->children.push_back(child->clone());

return res;
}

void ASTSessionRangeComparision::updateTreeHashImpl(SipHash & hash_state) const
{
if (children.size() == 2)
{
children[0]->updateTreeHashImpl(hash_state);
children[1]->updateTreeHashImpl(hash_state);
}
assert(children.size() == 2);

children[0]->updateTreeHashImpl(hash_state);
children[1]->updateTreeHashImpl(hash_state);

hash_state.update(start_with_inclusion);
hash_state.update(end_with_inclusion);
Expand All @@ -21,26 +30,24 @@ void ASTSessionRangeComparision::updateTreeHashImpl(SipHash & hash_state) const

void ASTSessionRangeComparision::formatImplWithoutAlias(const FormatSettings & settings, FormatState &, FormatStateStacked) const
{
if (children.size() == 2)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << (start_with_inclusion ? "[" : "(") << (settings.hilite ? hilite_none : "");
children[0]->format(settings);
settings.ostr << (settings.hilite ? hilite_keyword : "") << "," << (settings.hilite ? hilite_none : "");
children[1]->format(settings);
settings.ostr << (settings.hilite ? hilite_keyword : "") << (end_with_inclusion ? "]" : ")") << (settings.hilite ? hilite_none : "");
}
assert(children.size() == 2);

settings.ostr << (settings.hilite ? hilite_keyword : "") << (start_with_inclusion ? "[" : "(") << (settings.hilite ? hilite_none : "");
children[0]->format(settings);
settings.ostr << (settings.hilite ? hilite_keyword : "") << "," << (settings.hilite ? hilite_none : "");
children[1]->format(settings);
settings.ostr << (settings.hilite ? hilite_keyword : "") << (end_with_inclusion ? "]" : ")") << (settings.hilite ? hilite_none : "");
}

void ASTSessionRangeComparision::appendColumnNameImpl(WriteBuffer & ostr) const
{
if (children.size() == 2)
{
writeString((start_with_inclusion ? "[" : "("), ostr);
children[0]->appendColumnName(ostr);
writeString(", ", ostr);
children[1]->appendColumnName(ostr);
writeString((end_with_inclusion ? "]" : ")"), ostr);
}
assert(children.size() == 2);

writeString((start_with_inclusion ? "[" : "("), ostr);
children[0]->appendColumnName(ostr);
writeString(", ", ostr);
children[1]->appendColumnName(ostr);
writeString((end_with_inclusion ? "]" : ")"), ostr);
}

}
2 changes: 1 addition & 1 deletion src/Parsers/Streaming/ASTSessionRangeComparision.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct ASTSessionRangeComparision : public ASTWithAlias

String getID(char) const override { return "SessionRangeComparision"; }

ASTPtr clone() const override { return std::make_shared<ASTSessionRangeComparision>(*this); }
ASTPtr clone() const override;

void updateTreeHashImpl(SipHash & hash_state) const override;

Expand Down
3 changes: 3 additions & 0 deletions src/TableFunctions/TableFunctionHop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ ASTs TableFunctionHop::checkAndExtractArguments(ASTFunction * node) const

void TableFunctionHop::postArgs(ASTs & args) const
{
/// __hop(timestamp_expr, hop_interval, win_interval, [timezone])
assert(args.size() == 4);

//// [timezone]
/// Prune the empty timezone if user doesn't specify one
if (!args.back())
Expand Down
37 changes: 34 additions & 3 deletions src/TableFunctions/TableFunctionSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,43 @@ void TableFunctionSession::parseArguments(const ASTPtr & func_ast, ContextPtr co

ASTs TableFunctionSession::checkAndExtractArguments(ASTFunction * node) const
{
/// session(stream, [timestamp_expr], timeout_interval, [max_emit_interval], [range_comparision])
/// session(stream, [timestamp_expr], timeout_interval, [max_emit_interval], [start_cond, end_cond])
/// session(stream, [timestamp_expr], timeout_interval, [max_emit_interval], [start_cond, start_with_inclusion, end_cond, end_with_inclusion])
/// session(stream, [timestamp_expr], timeout_interval, [max_session_size], [range_comparision])
/// session(stream, [timestamp_expr], timeout_interval, [max_session_size], [start_cond, end_cond])
return checkAndExtractSessionArguments(node);
}

void TableFunctionSession::postArgs(ASTs & args) const
{
/// __session(timestamp_expr, timeout_interval, [max_session_size], [<range_comparision>: start_cond, start_with_inclusion, end_cond, end_with_inclusion])
assert(args.size() == 7);

auto timeout_interval = extractInterval(args[1]->as<ASTFunction>());
/// Set default max_session_size if not provided
if (!args[2])
args[2] = makeASTInterval(timeout_interval.interval * ProtonConsts::SESSION_SIZE_MULTIPLIER, timeout_interval.unit);

/// If range predication is not assigned, any incoming event should be able to start a session window.
if (!args[3])
{
args[3] = std::make_shared<ASTLiteral>(true);
args[4] = std::make_shared<ASTLiteral>(true);
args[5] = std::make_shared<ASTLiteral>(false);
args[6] = std::make_shared<ASTLiteral>(true);
}
assert(args[3] && args[4] && args[5] && args[6]);

/// They may be used in aggregation transform, so set an internal alias for them
args[3]->setAlias(ProtonConsts::STREAMING_SESSION_START);
args[5]->setAlias(ProtonConsts::STREAMING_SESSION_END);

/// Try do the same scale conversion of timeout_interval and max_session_size
convertToSameKindIntervalAST(
BaseScaleInterval::toBaseScale(timeout_interval),
BaseScaleInterval::toBaseScale(extractInterval(args[2]->as<ASTFunction>())),
args[1],
args[2]);
}

NamesAndTypesList TableFunctionSession::getAdditionalResultColumns(const ColumnsWithTypeAndName & arguments) const
{
/// __session(timestamp_expr, timeout_interval, max_session_size, start_cond, start_with_inclusion, end_cond, end_with_inclusion)
Expand Down
1 change: 1 addition & 0 deletions src/TableFunctions/TableFunctionSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TableFunctionSession final : public TableFunctionWindow
const char * getStorageTypeName() const override { return "session"; }
void parseArguments(const ASTPtr & func_ast, ContextPtr context) override;
ASTs checkAndExtractArguments(ASTFunction * node) const override;
void postArgs(ASTs & args) const override;
NamesAndTypesList getAdditionalResultColumns(const ColumnsWithTypeAndName & arguments) const final;
};
}
Expand Down
3 changes: 3 additions & 0 deletions src/TableFunctions/TableFunctionTumble.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ ASTs TableFunctionTumble::checkAndExtractArguments(ASTFunction * node) const

void TableFunctionTumble::postArgs(ASTs & args) const
{
/// __tumble(timestamp_expr, win_interval, [timezone])
assert(args.size() == 3);

//// [timezone]
/// Prune the empty timezone if user doesn't specify one
if (!args.back())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE MATERIALIZED VIEW default.`99004_v`\n(\n `start` datetime64(3, \'UTC\'),\n `end` datetime64(3, \'UTC\'),\n `avg(speed)` float64,\n `_tp_time` datetime64(3, \'UTC\') DEFAULT now64(3, \'UTC\') CODEC(DoubleDelta, LZ4),\n INDEX _tp_time_index _tp_time TYPE minmax GRANULARITY 2\n) AS\nSELECT\n window_start AS start, window_end AS end, avg(speed)\nFROM\n session(default.`99004_speed`, 1m, [speed > 50,speed < 50))\nGROUP BY\n window_start, window_end
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
DROP VIEW IF EXISTS default.99004_v;

DROP STREAM IF EXISTS default.99004_speed;

CREATE STREAM default.99004_speed (speed float64);

CREATE MATERIALIZED VIEW default.99004_v AS
SELECT
window_start AS start, window_end AS end, avg(speed)
FROM
session(default.99004_speed, 1m, [speed > 50,speed < 50))
GROUP BY
window_start, window_end;

SHOW CREATE default.99004_v;

DROP VIEW default.99004_v;

DROP STREAM default.99004_speed;

0 comments on commit aba4709

Please sign in to comment.