diff --git a/src/Interpreters/Streaming/WindowCommon.cpp b/src/Interpreters/Streaming/WindowCommon.cpp index 84fa63c3300..6cfb7fc0e37 100644 --- a/src/Interpreters/Streaming/WindowCommon.cpp +++ b/src/Interpreters/Streaming/WindowCommon.cpp @@ -172,7 +172,7 @@ ASTs checkAndExtractTumbleArguments(const ASTFunction * func_ast) /// tumble(table, [timestamp_expr], win_interval, [timezone]) if (func_ast->children.size() != 1) - throw Exception(HOP_HELP_MESSAGE, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + throw Exception(TUMBLE_HELP_MESSAGE, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); const auto & args = func_ast->arguments->children; if (args.size() < 2) @@ -315,7 +315,7 @@ ASTs checkAndExtractSessionArguments(const ASTFunction * func_ast) { assert(isTableFunctionSession(func_ast)); - /// session(stream, [timestamp_expr], timeout_interval, [max_emit_interval], [range_comparision]) + /// session(stream, [timestamp_expr], timeout_interval, [max_emit_interval], [range_comparision] | [start_prediction, end_prediction]) if (func_ast->children.size() != 1) throw Exception(SESSION_HELP_MESSAGE, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); @@ -347,11 +347,6 @@ ASTs checkAndExtractSessionArguments(const ASTFunction * func_ast) /// Case: session(stream, timestamp, INTERVAL 5 SECOND, ...) time_expr = args[i++]; } - else - { - /// Case: session(stream, INTERVAL 5 SECOND, ...) - time_expr = std::make_shared(ProtonConsts::RESERVED_EVENT_TIME); - } if (isIntervalAST(args[i])) { @@ -372,12 +367,6 @@ ASTs checkAndExtractSessionArguments(const ASTFunction * func_ast) /// session will emit. max_session_size = args[i++]; } - else - { - /// Set default max session size. - auto [unit_nums, unit] = extractInterval(timeout_interval->as()); - max_session_size = makeASTInterval(unit_nums * ProtonConsts::SESSION_SIZE_MULTIPLIER, unit); - } /// Handle optional start_condition/end_condition if (i < args.size()) @@ -407,14 +396,6 @@ ASTs checkAndExtractSessionArguments(const ASTFunction * func_ast) end_with_inclusion = std::make_shared(true); } } - else - { - /// OPT-3: If range predication is not assigned, any incoming event should be able to start a session window. - start_condition = std::make_shared(true); - start_with_inclusion = std::make_shared(true); - end_condition = std::make_shared(false); - end_with_inclusion = std::make_shared(true); - } if (i != args.size()) break; diff --git a/src/Parsers/Streaming/ASTSessionRangeComparision.cpp b/src/Parsers/Streaming/ASTSessionRangeComparision.cpp index b89e32b946e..e9ee2206423 100644 --- a/src/Parsers/Streaming/ASTSessionRangeComparision.cpp +++ b/src/Parsers/Streaming/ASTSessionRangeComparision.cpp @@ -5,13 +5,22 @@ namespace DB { +ASTPtr ASTSessionRangeComparision::clone() const +{ + auto res = std::make_shared(*this); + res->children.clear(); + for (auto & child : children) + res->children.push_back(child->clone()); + + return res; +} + void ASTSessionRangeComparision::updateTreeHashImpl(SipHash & hash_state) const { - if (children.size() == 2) - { - children[0]->updateTreeHashImpl(hash_state); - children[1]->updateTreeHashImpl(hash_state); - } + assert(children.size() == 2); + + children[0]->updateTreeHashImpl(hash_state); + children[1]->updateTreeHashImpl(hash_state); hash_state.update(start_with_inclusion); hash_state.update(end_with_inclusion); @@ -21,26 +30,24 @@ void ASTSessionRangeComparision::updateTreeHashImpl(SipHash & hash_state) const void ASTSessionRangeComparision::formatImplWithoutAlias(const FormatSettings & settings, FormatState &, FormatStateStacked) const { - if (children.size() == 2) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << (start_with_inclusion ? "[" : "(") << (settings.hilite ? hilite_none : ""); - children[0]->format(settings); - settings.ostr << (settings.hilite ? hilite_keyword : "") << "," << (settings.hilite ? hilite_none : ""); - children[1]->format(settings); - settings.ostr << (settings.hilite ? hilite_keyword : "") << (end_with_inclusion ? "]" : ")") << (settings.hilite ? hilite_none : ""); - } + assert(children.size() == 2); + + settings.ostr << (settings.hilite ? hilite_keyword : "") << (start_with_inclusion ? "[" : "(") << (settings.hilite ? hilite_none : ""); + children[0]->format(settings); + settings.ostr << (settings.hilite ? hilite_keyword : "") << "," << (settings.hilite ? hilite_none : ""); + children[1]->format(settings); + settings.ostr << (settings.hilite ? hilite_keyword : "") << (end_with_inclusion ? "]" : ")") << (settings.hilite ? hilite_none : ""); } void ASTSessionRangeComparision::appendColumnNameImpl(WriteBuffer & ostr) const { - if (children.size() == 2) - { - writeString((start_with_inclusion ? "[" : "("), ostr); - children[0]->appendColumnName(ostr); - writeString(", ", ostr); - children[1]->appendColumnName(ostr); - writeString((end_with_inclusion ? "]" : ")"), ostr); - } + assert(children.size() == 2); + + writeString((start_with_inclusion ? "[" : "("), ostr); + children[0]->appendColumnName(ostr); + writeString(", ", ostr); + children[1]->appendColumnName(ostr); + writeString((end_with_inclusion ? "]" : ")"), ostr); } } diff --git a/src/Parsers/Streaming/ASTSessionRangeComparision.h b/src/Parsers/Streaming/ASTSessionRangeComparision.h index a1b6ee43fc4..20726216eb2 100644 --- a/src/Parsers/Streaming/ASTSessionRangeComparision.h +++ b/src/Parsers/Streaming/ASTSessionRangeComparision.h @@ -12,7 +12,7 @@ struct ASTSessionRangeComparision : public ASTWithAlias String getID(char) const override { return "SessionRangeComparision"; } - ASTPtr clone() const override { return std::make_shared(*this); } + ASTPtr clone() const override; void updateTreeHashImpl(SipHash & hash_state) const override; diff --git a/src/TableFunctions/TableFunctionHop.cpp b/src/TableFunctions/TableFunctionHop.cpp index b1b9044a374..217886b704d 100644 --- a/src/TableFunctions/TableFunctionHop.cpp +++ b/src/TableFunctions/TableFunctionHop.cpp @@ -38,6 +38,9 @@ ASTs TableFunctionHop::checkAndExtractArguments(ASTFunction * node) const void TableFunctionHop::postArgs(ASTs & args) const { + /// __hop(timestamp_expr, hop_interval, win_interval, [timezone]) + assert(args.size() == 4); + //// [timezone] /// Prune the empty timezone if user doesn't specify one if (!args.back()) diff --git a/src/TableFunctions/TableFunctionSession.cpp b/src/TableFunctions/TableFunctionSession.cpp index 599db0b2829..ea6d64eee9c 100644 --- a/src/TableFunctions/TableFunctionSession.cpp +++ b/src/TableFunctions/TableFunctionSession.cpp @@ -32,12 +32,43 @@ void TableFunctionSession::parseArguments(const ASTPtr & func_ast, ContextPtr co ASTs TableFunctionSession::checkAndExtractArguments(ASTFunction * node) const { - /// session(stream, [timestamp_expr], timeout_interval, [max_emit_interval], [range_comparision]) - /// session(stream, [timestamp_expr], timeout_interval, [max_emit_interval], [start_cond, end_cond]) - /// session(stream, [timestamp_expr], timeout_interval, [max_emit_interval], [start_cond, start_with_inclusion, end_cond, end_with_inclusion]) + /// session(stream, [timestamp_expr], timeout_interval, [max_session_size], [range_comparision]) + /// session(stream, [timestamp_expr], timeout_interval, [max_session_size], [start_cond, end_cond]) return checkAndExtractSessionArguments(node); } +void TableFunctionSession::postArgs(ASTs & args) const +{ + /// __session(timestamp_expr, timeout_interval, [max_session_size], [: start_cond, start_with_inclusion, end_cond, end_with_inclusion]) + assert(args.size() == 7); + + auto timeout_interval = extractInterval(args[1]->as()); + /// Set default max_session_size if not provided + if (!args[2]) + args[2] = makeASTInterval(timeout_interval.interval * ProtonConsts::SESSION_SIZE_MULTIPLIER, timeout_interval.unit); + + /// If range predication is not assigned, any incoming event should be able to start a session window. + if (!args[3]) + { + args[3] = std::make_shared(true); + args[4] = std::make_shared(true); + args[5] = std::make_shared(false); + args[6] = std::make_shared(true); + } + assert(args[3] && args[4] && args[5] && args[6]); + + /// They may be used in aggregation transform, so set an internal alias for them + args[3]->setAlias(ProtonConsts::STREAMING_SESSION_START); + args[5]->setAlias(ProtonConsts::STREAMING_SESSION_END); + + /// Try do the same scale conversion of timeout_interval and max_session_size + convertToSameKindIntervalAST( + BaseScaleInterval::toBaseScale(timeout_interval), + BaseScaleInterval::toBaseScale(extractInterval(args[2]->as())), + args[1], + args[2]); +} + NamesAndTypesList TableFunctionSession::getAdditionalResultColumns(const ColumnsWithTypeAndName & arguments) const { /// __session(timestamp_expr, timeout_interval, max_session_size, start_cond, start_with_inclusion, end_cond, end_with_inclusion) diff --git a/src/TableFunctions/TableFunctionSession.h b/src/TableFunctions/TableFunctionSession.h index 8549c9d6008..28fda1b7b27 100644 --- a/src/TableFunctions/TableFunctionSession.h +++ b/src/TableFunctions/TableFunctionSession.h @@ -15,6 +15,7 @@ class TableFunctionSession final : public TableFunctionWindow const char * getStorageTypeName() const override { return "session"; } void parseArguments(const ASTPtr & func_ast, ContextPtr context) override; ASTs checkAndExtractArguments(ASTFunction * node) const override; + void postArgs(ASTs & args) const override; NamesAndTypesList getAdditionalResultColumns(const ColumnsWithTypeAndName & arguments) const final; }; } diff --git a/src/TableFunctions/TableFunctionTumble.cpp b/src/TableFunctions/TableFunctionTumble.cpp index a4575ff1a15..342da1970c6 100644 --- a/src/TableFunctions/TableFunctionTumble.cpp +++ b/src/TableFunctions/TableFunctionTumble.cpp @@ -37,6 +37,9 @@ ASTs TableFunctionTumble::checkAndExtractArguments(ASTFunction * node) const void TableFunctionTumble::postArgs(ASTs & args) const { + /// __tumble(timestamp_expr, win_interval, [timezone]) + assert(args.size() == 3); + //// [timezone] /// Prune the empty timezone if user doesn't specify one if (!args.back()) diff --git a/tests/queries_ported/0_stateless/99004_session_with_range_comparision_bug.reference b/tests/queries_ported/0_stateless/99004_session_with_range_comparision_bug.reference new file mode 100644 index 00000000000..4a3eb184cf0 --- /dev/null +++ b/tests/queries_ported/0_stateless/99004_session_with_range_comparision_bug.reference @@ -0,0 +1 @@ +CREATE MATERIALIZED VIEW default.`99004_v`\n(\n `start` datetime64(3, \'UTC\'),\n `end` datetime64(3, \'UTC\'),\n `avg(speed)` float64,\n `_tp_time` datetime64(3, \'UTC\') DEFAULT now64(3, \'UTC\') CODEC(DoubleDelta, LZ4),\n INDEX _tp_time_index _tp_time TYPE minmax GRANULARITY 2\n) AS\nSELECT\n window_start AS start, window_end AS end, avg(speed)\nFROM\n session(default.`99004_speed`, 1m, [speed > 50,speed < 50))\nGROUP BY\n window_start, window_end diff --git a/tests/queries_ported/0_stateless/99004_session_with_range_comparision_bug.sql b/tests/queries_ported/0_stateless/99004_session_with_range_comparision_bug.sql new file mode 100644 index 00000000000..fbb4a177017 --- /dev/null +++ b/tests/queries_ported/0_stateless/99004_session_with_range_comparision_bug.sql @@ -0,0 +1,19 @@ +DROP VIEW IF EXISTS default.99004_v; + +DROP STREAM IF EXISTS default.99004_speed; + +CREATE STREAM default.99004_speed (speed float64); + +CREATE MATERIALIZED VIEW default.99004_v AS +SELECT + window_start AS start, window_end AS end, avg(speed) +FROM + session(default.99004_speed, 1m, [speed > 50,speed < 50)) +GROUP BY + window_start, window_end; + +SHOW CREATE default.99004_v; + +DROP VIEW default.99004_v; + +DROP STREAM default.99004_speed;