From 194ebc3adf254b8f8b14c95bad1bd44a6ae2d57a Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Tue, 19 Mar 2024 22:17:13 +0800 Subject: [PATCH] Resolve comments --- velox/exec/RankLikeWindowBuild.cpp | 66 ++++++----- velox/exec/RankLikeWindowBuild.h | 25 ++-- velox/exec/SortWindowBuild.cpp | 4 +- velox/exec/SortWindowBuild.h | 2 +- velox/exec/StreamingWindowBuild.cpp | 4 +- velox/exec/StreamingWindowBuild.h | 2 +- velox/exec/Window.cpp | 140 +++++++++++++---------- velox/exec/Window.h | 4 +- velox/exec/WindowBuild.h | 2 +- velox/exec/WindowPartition.cpp | 61 +++++++++- velox/exec/WindowPartition.h | 50 ++++++-- velox/exec/tests/WindowTest.cpp | 28 +++++ velox/expression/FunctionSignature.cpp | 9 +- velox/expression/FunctionSignature.h | 16 ++- velox/functions/lib/window/Rank.cpp | 15 ++- velox/functions/lib/window/RowNumber.cpp | 5 +- 16 files changed, 298 insertions(+), 135 deletions(-) diff --git a/velox/exec/RankLikeWindowBuild.cpp b/velox/exec/RankLikeWindowBuild.cpp index 5c6c5fb7fdbbb..e5b6e5f43174d 100644 --- a/velox/exec/RankLikeWindowBuild.cpp +++ b/velox/exec/RankLikeWindowBuild.cpp @@ -23,8 +23,30 @@ RankLikeWindowBuild::RankLikeWindowBuild( velox::memory::MemoryPool* pool, const common::SpillConfig* spillConfig, tsan_atomic* nonReclaimableSection) - : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) { - partitionOffsets_.push_back(0); + : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {} + +void RankLikeWindowBuild::buildNextInputOrPartition(bool isFinished) { + sortedRows_.push_back(inputRows_); + if (windowPartitions_.size() <= inputCurrentPartition_) { + auto partition = + folly::Range(sortedRows_.back().data(), sortedRows_.back().size()); + + windowPartitions_.push_back(std::make_shared( + data_.get(), partition, inputColumns_, sortKeyInfo_, true)); + } else { + windowPartitions_[inputCurrentPartition_]->insertNewBatch( + sortedRows_.back()); + } + + if (isFinished) { + windowPartitions_[inputCurrentPartition_]->setTotalNum( + currentPartitionNum_ - 1); + windowPartitions_[inputCurrentPartition_]->setFinished(); + + inputRows_.clear(); + inputCurrentPartition_++; + currentPartitionNum_ = 1; + } } void RankLikeWindowBuild::addInput(RowVectorPtr input) { @@ -33,6 +55,7 @@ void RankLikeWindowBuild::addInput(RowVectorPtr input) { } for (auto row = 0; row < input->size(); ++row) { + currentPartitionNum_++; char* newRow = data_->newRow(); for (auto col = 0; col < input->childrenSize(); ++col) { @@ -41,49 +64,34 @@ void RankLikeWindowBuild::addInput(RowVectorPtr input) { if (previousRow_ != nullptr && compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) { - sortedRows_.push_back(inputRows_); - partitionOffsets_.push_back(0); - inputRows_.clear(); + buildNextInputOrPartition(true); } inputRows_.push_back(newRow); previousRow_ = newRow; } - partitionOffsets_.push_back(inputRows_.size()); - sortedRows_.push_back(inputRows_); + + buildNextInputOrPartition(false); + inputRows_.clear(); } void RankLikeWindowBuild::noMoreInput() { isFinished_ = true; + windowPartitions_[outputCurrentPartition_]->setTotalNum( + currentPartitionNum_ - 1); + windowPartitions_[outputCurrentPartition_]->setFinished(); inputRows_.clear(); } -std::unique_ptr RankLikeWindowBuild::nextPartition() { - currentPartition_++; - - if (currentPartition_ > 0) { - // Erase data_ and sortedRows; - data_->eraseRows(folly::Range( - sortedRows_[currentPartition_ - 1].data(), - sortedRows_[currentPartition_ - 1].size())); - sortedRows_[currentPartition_ - 1].clear(); - } - - auto partition = folly::Range( - sortedRows_[currentPartition_].data(), - sortedRows_[currentPartition_].size()); - - auto offset = 0; - for (auto i = currentPartition_; partitionOffsets_[i] != 0; i--) { - offset += partitionOffsets_[i]; - } - return std::make_unique( - data_.get(), partition, inputColumns_, sortKeyInfo_, offset); +std::shared_ptr RankLikeWindowBuild::nextPartition() { + outputCurrentPartition_++; + return windowPartitions_[outputCurrentPartition_]; } bool RankLikeWindowBuild::hasNextPartition() { - return sortedRows_.size() > 0 && currentPartition_ != sortedRows_.size() - 1; + return windowPartitions_.size() > 0 && + outputCurrentPartition_ != windowPartitions_.size() - 1; } } // namespace facebook::velox::exec diff --git a/velox/exec/RankLikeWindowBuild.h b/velox/exec/RankLikeWindowBuild.h index ab864284e820c..7507e31c06194 100644 --- a/velox/exec/RankLikeWindowBuild.h +++ b/velox/exec/RankLikeWindowBuild.h @@ -47,34 +47,37 @@ class RankLikeWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; bool needsInput() override { return !isFinished_; } private: + void buildNextInputOrPartition(bool isFinished); + // Vector of pointers to each input row in the data_ RowContainer. - // Rows are erased from data_ when they are output from the - // Window operator. + // Rows are erased from data_ when they are processed in WindowPartition. std::vector> sortedRows_; // Holds input rows within the current partition. std::vector inputRows_; - // Indices of the start row (in sortedRows_) of each partition in - // the RowContainer data_. This auxiliary structure helps demarcate - // partitions. - std::vector partitionOffsets_; - // Used to compare rows based on partitionKeys. char* previousRow_ = nullptr; - // Current partition being output. Used to construct WindowPartitions - // during resetPartition. - vector_size_t currentPartition_ = -1; + // Current partition being output. Used to return the WidnowPartitions. + vector_size_t outputCurrentPartition_ = -1; bool isFinished_ = false; + + // Current partition when adding input. Used to construct WindowPartitions. + vector_size_t inputCurrentPartition_ = 0; + + std::vector> windowPartitions_; + + // Records the total rows number in each partition. + vector_size_t currentPartitionNum_ = 0; }; } // namespace facebook::velox::exec diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index 2c86ae062575b..5b06e6751c544 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -288,7 +288,7 @@ void SortWindowBuild::loadNextPartitionFromSpill() { } } -std::unique_ptr SortWindowBuild::nextPartition() { +std::shared_ptr SortWindowBuild::nextPartition() { if (merge_ != nullptr) { VELOX_CHECK(!sortedRows_.empty(), "No window partitions available") auto partition = folly::Range(sortedRows_.data(), sortedRows_.size()); @@ -310,7 +310,7 @@ std::unique_ptr SortWindowBuild::nextPartition() { auto partition = folly::Range( sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inputColumns_, sortKeyInfo_); } diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index bc73de0117b89..6658b95cb5c05 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -52,7 +52,7 @@ class SortWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; private: void ensureInputFits(const RowVectorPtr& input); diff --git a/velox/exec/StreamingWindowBuild.cpp b/velox/exec/StreamingWindowBuild.cpp index cab3e17030189..99073ea762fa0 100644 --- a/velox/exec/StreamingWindowBuild.cpp +++ b/velox/exec/StreamingWindowBuild.cpp @@ -60,7 +60,7 @@ void StreamingWindowBuild::noMoreInput() { partitionStartRows_.push_back(sortedRows_.size()); } -std::unique_ptr StreamingWindowBuild::nextPartition() { +std::shared_ptr StreamingWindowBuild::nextPartition() { VELOX_CHECK_GT( partitionStartRows_.size(), 0, "No window partitions available") @@ -89,7 +89,7 @@ std::unique_ptr StreamingWindowBuild::nextPartition() { sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inputColumns_, sortKeyInfo_); } diff --git a/velox/exec/StreamingWindowBuild.h b/velox/exec/StreamingWindowBuild.h index 2573f1f7e8d56..1de94ad6a56eb 100644 --- a/velox/exec/StreamingWindowBuild.h +++ b/velox/exec/StreamingWindowBuild.h @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; bool needsInput() override { // No partitions are available or the currentPartition is the last available diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index 317fbe99fedd5..64f96c8b9553a 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ #include "velox/exec/Window.h" +#include #include "velox/exec/OperatorUtils.h" #include "velox/exec/RankLikeWindowBuild.h" #include "velox/exec/SortWindowBuild.h" @@ -22,6 +23,48 @@ namespace facebook::velox::exec { +Window::Window( + int32_t operatorId, + DriverCtx* driverCtx, + const std::shared_ptr& windowNode) + : Operator( + driverCtx, + windowNode->outputType(), + operatorId, + windowNode->id(), + "Window", + windowNode->canSpill(driverCtx->queryConfig()) + ? driverCtx->makeSpillConfig(operatorId) + : std::nullopt), + numInputColumns_(windowNode->inputType()->size()), + windowNode_(windowNode), + currentPartition_(nullptr), + stringAllocator_(pool()) { + auto* spillConfig = + spillConfig_.has_value() ? &spillConfig_.value() : nullptr; + if (windowNode->inputsSorted()) { + if (supportRankWindowBuild()) { + windowBuild_ = std::make_unique( + windowNode_, pool(), spillConfig, &nonReclaimableSection_); + } else { + windowBuild_ = std::make_unique( + windowNode, pool(), spillConfig, &nonReclaimableSection_); + } + + } else { + windowBuild_ = std::make_unique( + windowNode, pool(), spillConfig, &nonReclaimableSection_); + } +} + +void Window::initialize() { + Operator::initialize(); + VELOX_CHECK_NOT_NULL(windowNode_); + createWindowFunctions(); + createPeerAndFrameBuffers(); + windowNode_.reset(); +} + namespace { void checkRowFrameBounds(const core::WindowNode::Frame& frame) { auto frameBoundCheck = [&](const core::TypedExprPtr& frameValue) -> void { @@ -71,70 +114,8 @@ void checkKRangeFrameBounds( frameBoundCheck(frame.endValue); } -// The RankLikeWindowBuild is designed to support 'rank', 'dense_rank', and -// 'row_number' functions with a default frame. -bool checkRankLikeWindowBuild( - const std::shared_ptr& windowNode) { - for (const auto& windowNodeFunction : windowNode->windowFunctions()) { - const auto& functionName = windowNodeFunction.functionCall->name(); - const auto& frame = windowNodeFunction.frame; - - bool isRankLikeFunction = - (functionName == "rank" || functionName == "row_number"); - bool isDefaultFrame = - (frame.startType == core::WindowNode::BoundType::kUnboundedPreceding && - frame.endType == core::WindowNode::BoundType::kCurrentRow); - - if (!isRankLikeFunction || !isDefaultFrame) { - return false; - } - } - return true; -} - } // namespace -Window::Window( - int32_t operatorId, - DriverCtx* driverCtx, - const std::shared_ptr& windowNode) - : Operator( - driverCtx, - windowNode->outputType(), - operatorId, - windowNode->id(), - "Window", - windowNode->canSpill(driverCtx->queryConfig()) - ? driverCtx->makeSpillConfig(operatorId) - : std::nullopt), - numInputColumns_(windowNode->inputType()->size()), - windowNode_(windowNode), - currentPartition_(nullptr), - stringAllocator_(pool()) { - auto* spillConfig = - spillConfig_.has_value() ? &spillConfig_.value() : nullptr; - if (windowNode->inputsSorted()) { - if (checkRankLikeWindowBuild(windowNode)) { - windowBuild_ = std::make_unique( - windowNode, pool(), spillConfig, &nonReclaimableSection_); - } else { - windowBuild_ = std::make_unique( - windowNode, pool(), spillConfig, &nonReclaimableSection_); - } - } else { - windowBuild_ = std::make_unique( - windowNode, pool(), spillConfig, &nonReclaimableSection_); - } -} - -void Window::initialize() { - Operator::initialize(); - VELOX_CHECK_NOT_NULL(windowNode_); - createWindowFunctions(); - createPeerAndFrameBuffers(); - windowNode_.reset(); -} - Window::WindowFrame Window::createWindowFrame( const std::shared_ptr& windowNode, const core::WindowNode::Frame& frame, @@ -214,6 +195,28 @@ void Window::createWindowFunctions() { } } +// The supportRankWindowBuild is designed to support 'rank' and +// 'row_number' functions with a default frame. +bool Window::supportRankWindowBuild() { + for (const auto& windowNodeFunction : windowNode_->windowFunctions()) { + bool isRankFunction = exec::getWindowFunctionSignatures( + windowNodeFunction.functionCall->name()) + .value()[0] + ->streaming(); + bool isDefaultFrame = + (windowNodeFunction.frame.startType == + core::WindowNode::BoundType::kUnboundedPreceding && + windowNodeFunction.frame.endType == + core::WindowNode::BoundType::kCurrentRow); + + if (!(isRankFunction) || !isDefaultFrame) { + return false; + } + } + + return true; +} + void Window::addInput(RowVectorPtr input) { windowBuild_->addInput(input); numRows_ += input->size(); @@ -574,7 +577,14 @@ vector_size_t Window::callApplyLoop( result); resultIndex += rowsForCurrentPartition; numOutputRowsLeft -= rowsForCurrentPartition; - callResetPartition(); + if (currentPartition_->isFinished()) { + callResetPartition(); + } else { + // Break until the next getOutput call to handle the remaining data in + // currentPartition_. + break; + } + if (!currentPartition_) { // The WindowBuild doesn't have any more partitions to process right // now. So break until the next getOutput call. @@ -616,6 +626,10 @@ RowVectorPtr Window::getOutput() { } } + if (!currentPartition_->isFinished()) { + currentPartition_->buildNextBatch(); + } + auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft); auto result = BaseVector::create( outputType_, numOutputRows, operatorCtx_->pool()); diff --git a/velox/exec/Window.h b/velox/exec/Window.h index 9be9a011baae6..e0d49ddfa4672 100644 --- a/velox/exec/Window.h +++ b/velox/exec/Window.h @@ -88,6 +88,8 @@ class Window : public Operator { const std::optional end; }; + bool supportRankWindowBuild(); + // Creates WindowFunction and frame objects for this operator. void createWindowFunctions(); @@ -165,7 +167,7 @@ class Window : public Operator { // Used to access window partition rows and columns by the window // operator and functions. This structure is owned by the WindowBuild. - std::unique_ptr currentPartition_; + std::shared_ptr currentPartition_; // HashStringAllocator required by functions that allocate out of line // buffers. diff --git a/velox/exec/WindowBuild.h b/velox/exec/WindowBuild.h index 1ac8d3630ea61..11832c4cf5929 100644 --- a/velox/exec/WindowBuild.h +++ b/velox/exec/WindowBuild.h @@ -68,7 +68,7 @@ class WindowBuild { // the underlying columns of Window partition data. // Check hasNextPartition() before invoking this function. This function fails // if called when no partition is available. - virtual std::unique_ptr nextPartition() = 0; + virtual std::shared_ptr nextPartition() = 0; // Returns the average size of input rows in bytes stored in the // data container of the WindowBuild. diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index 065588e225406..5ac5d73ea5abf 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -22,12 +22,40 @@ WindowPartition::WindowPartition( const folly::Range& rows, const std::vector& columns, const std::vector>& sortKeyInfo, - vector_size_t offsetInPartition) + bool streaming) : data_(data), partition_(rows), columns_(columns), sortKeyInfo_(sortKeyInfo), - offsetInPartition_(offsetInPartition) {} + streaming_(streaming) { + processedNum_ = partition_.size(); +} + +void WindowPartition::buildNextBatch() { + if (rows_.size() == 0 || + (currentBatchIndex_ >= 0 && currentBatchIndex_ == (rows_.size() - 1))) + return; + peerGroup_ = false; + currentBatchIndex_++; + // Compute whehter the last row in current batch is same with the first row + // in next batch. + auto peerCompare = [&](const char* lhs, const char* rhs) -> bool { + return compareRowsWithSortKeys(lhs, rhs); + }; + if (!peerCompare( + partition_[partition_.size() - 1], rows_[currentBatchIndex_][0])) { + peerGroup_ = true; + } + + // Erase partition_ in data_ and itself. + data_->eraseRows(partition_); + offsetInPartition_ += partition_.size(); + partition_.clear(); + // Set new partition_. + partition_ = folly::Range( + rows_[currentBatchIndex_].data(), rows_[currentBatchIndex_].size()); + processedNum_ += partition_.size(); +} void WindowPartition::extractColumn( int32_t columnIndex, @@ -49,7 +77,7 @@ void WindowPartition::extractColumn( vector_size_t resultOffset, const VectorPtr& result) const { RowContainer::extractColumn( - partition_.data() + partitionOffset, + partition_.data() + partitionOffset - offsetInPartition_, numRows, columns_[columnIndex], resultOffset, @@ -144,7 +172,28 @@ std::pair WindowPartition::computePeerBuffers( auto lastPartitionRow = numRows() - 1; auto peerStart = prevPeerStart; auto peerEnd = prevPeerEnd; - for (auto i = start, j = 0; i < end; i++, j++) { + + auto nextStart = start; + if (peerGroup_) { + peerEnd++; + nextStart = start + 1; + while (nextStart <= lastPartitionRow) { + if (peerCompare( + partition_[start - offsetInPartition_], + partition_[nextStart - offsetInPartition_])) { + break; + } + peerEnd++; + nextStart++; + } + + for (auto j = start; j < nextStart; j++) { + rawPeerStarts[j - offsetInPartition_] = peerStart; + rawPeerEnds[j - offsetInPartition_] = peerEnd; + } + } + + for (auto i = nextStart, j = (nextStart - start); i < end; i++, j++) { // When traversing input partition rows, the peers are the rows // with the same values for the ORDER BY clause. These rows // are equal in some ways and affect the results of ranking functions. @@ -160,7 +209,9 @@ std::pair WindowPartition::computePeerBuffers( peerStart = i; peerEnd = i; while (peerEnd <= lastPartitionRow) { - if (peerCompare(partition_[peerStart], partition_[peerEnd])) { + if (peerCompare( + partition_[peerStart - offsetInPartition_], + partition_[peerEnd - offsetInPartition_])) { break; } peerEnd++; diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index ef30a6db20295..35dee9db31e06 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -33,25 +33,36 @@ class WindowPartition { /// 'columns' : Input rows of 'data' used for accessing column data from it. /// 'sortKeyInfo' : Order by columns used by the the Window operator. Used to /// get peer rows from the input partition. - /// 'offsetInPartition' : In RankLikeWindowBuild, record the current offset of - /// the partial WindowPartition within the entire Partition. + /// 'streaming' : Whether support streaming in WindowPartition. WindowPartition( RowContainer* data, const folly::Range& rows, const std::vector& columns, const std::vector>& sortKeyInfo, - vector_size_t offsetInPartition = 0); + bool streaming = false); /// Returns the number of rows in the current WindowPartition. vector_size_t numRows() const { - return partition_.size(); + return partition_.size() + offsetInPartition_; } - /// Returns the current offset of the partial WindowPartition within the - /// entire Partition. - vector_size_t offsetInPartition() const { - return offsetInPartition_; + void insertNewBatch(const std::vector& inputRows) { + rows_.push_back(inputRows); + } + + void setTotalNum(const vector_size_t& num) { + totalNum_ = num; + } + + void buildNextBatch(); + + void setFinished() { + isFinished_ = true; + } + + bool isFinished() { + return (!streaming_) || (isFinished_ && totalNum_ == processedNum_); } /// Copies the values at 'columnIndex' into 'result' (starting at @@ -191,6 +202,29 @@ class WindowPartition { // ORDER BY column info for this partition. const std::vector> sortKeyInfo_; + // The offset of every batch in partition. vector_size_t offsetInPartition_ = 0; + + // The processed num in current partition. + vector_size_t processedNum_ = 0; + + // Whether the last row of current batch is same with the first row of next + // batch. + bool peerGroup_ = false; + + // Whether support streaming in WindowPartition. + bool streaming_ = false; + + // Add new batch in WindowPartition. + std::vector> rows_; + + // The batch index in WindowPartition. + vector_size_t currentBatchIndex_ = -1; + + // Whether all the batches added. + bool isFinished_ = false; + + // The total num in WindowPartition. + vector_size_t totalNum_ = 0; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index db66246ee6248..a74efaa81117b 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -79,6 +79,34 @@ TEST_F(WindowTest, spill) { ASSERT_GT(stats.spilledPartitions, 0); } +TEST_F(WindowTest, rankLikeWithEqualValue) { + auto data = makeRowVector( + {"c1"}, + {makeFlatVector(std::vector{1, 1, 1, 1, 1, 2, 2})}); + + createDuckDbTable({data}); + + const std::vector kClauses = {"rank() over (order by c1)"}; + core::PlanNodeId windowId; + auto plan = PlanBuilder() + .values({data}) + .orderBy({"c1"}, false) + .streamingWindow(kClauses) + .capturePlanNodeId(windowId) + .planNode(); + + auto spillDirectory = TempDirectoryPath::create(); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config(core::QueryConfig::kMaxOutputBatchRows, "2") + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kWindowSpillEnabled, "true") + .spillDirectory(spillDirectory->path) + .assertResults("SELECT *, rank() over (order by c1) FROM tmp"); +} + TEST_F(WindowTest, rankLikeOptimization) { const vector_size_t size = 1'000; auto data = makeRowVector( diff --git a/velox/expression/FunctionSignature.cpp b/velox/expression/FunctionSignature.cpp index 7c7d9dd401907..2ffad6ab29706 100644 --- a/velox/expression/FunctionSignature.cpp +++ b/velox/expression/FunctionSignature.cpp @@ -206,12 +206,14 @@ FunctionSignature::FunctionSignature( TypeSignature returnType, std::vector argumentTypes, std::vector constantArguments, - bool variableArity) + bool variableArity, + bool streaming) : variables_{std::move(variables)}, returnType_{std::move(returnType)}, argumentTypes_{std::move(argumentTypes)}, constantArguments_{std::move(constantArguments)}, - variableArity_{variableArity} { + variableArity_{variableArity}, + streaming_{streaming} { validate(variables_, returnType_, argumentTypes_, constantArguments_); } @@ -229,7 +231,8 @@ FunctionSignaturePtr FunctionSignatureBuilder::build() { returnType_.value(), std::move(argumentTypes_), std::move(constantArguments_), - variableArity_); + variableArity_, + streaming_); } FunctionSignatureBuilder& FunctionSignatureBuilder::knownTypeVariable( diff --git a/velox/expression/FunctionSignature.h b/velox/expression/FunctionSignature.h index e33a3c4d639c5..ef0370697b136 100644 --- a/velox/expression/FunctionSignature.h +++ b/velox/expression/FunctionSignature.h @@ -127,7 +127,8 @@ class FunctionSignature { TypeSignature returnType, std::vector argumentTypes, std::vector constantArguments, - bool variableArity); + bool variableArity, + bool streaming = false); virtual ~FunctionSignature() = default; @@ -152,6 +153,10 @@ class FunctionSignature { return variableArity_; } + bool streaming() const { + return streaming_; + } + virtual std::string toString() const; const auto& variables() const { @@ -164,7 +169,7 @@ class FunctionSignature { bool operator==(const FunctionSignature& rhs) const { return variables_ == rhs.variables_ && returnType_ == rhs.returnType_ && argumentTypes_ == rhs.argumentTypes_ && - variableArity_ == rhs.variableArity_; + variableArity_ == rhs.variableArity_ && streaming_ == rhs.streaming_; } protected: @@ -177,6 +182,7 @@ class FunctionSignature { const std::vector argumentTypes_; const std::vector constantArguments_; const bool variableArity_; + const bool streaming_; }; using FunctionSignaturePtr = std::shared_ptr; @@ -297,6 +303,11 @@ class FunctionSignatureBuilder { return *this; } + FunctionSignatureBuilder& streaming() { + streaming_ = true; + return *this; + } + FunctionSignaturePtr build(); private: @@ -305,6 +316,7 @@ class FunctionSignatureBuilder { std::vector argumentTypes_; std::vector constantArguments_; bool variableArity_{false}; + bool streaming_{false}; }; /// Convenience class for creating AggregageFunctionSignature instances. diff --git a/velox/functions/lib/window/Rank.cpp b/velox/functions/lib/window/Rank.cpp index b51751577de63..2691d12d65df9 100644 --- a/velox/functions/lib/window/Rank.cpp +++ b/velox/functions/lib/window/Rank.cpp @@ -37,7 +37,7 @@ class RankFunction : public exec::WindowFunction { : WindowFunction(resultType, nullptr, nullptr) {} void resetPartition(const exec::WindowPartition* partition) override { - rank_ = 1 + partition->offsetInPartition(); + rank_ = 1; currentPeerGroupStart_ = 0; previousPeerCount_ = 0; numPartitionRows_ = partition->numRows(); @@ -93,9 +93,16 @@ template void registerRankInternal( const std::string& name, const std::string& returnType) { - std::vector signatures{ - exec::FunctionSignatureBuilder().returnType(returnType).build(), - }; + std::vector signatures; + if constexpr (TRank == RankType::kRank) { + signatures.push_back(exec::FunctionSignatureBuilder() + .returnType(returnType) + .streaming() + .build()); + } else { + signatures.push_back( + exec::FunctionSignatureBuilder().returnType(returnType).build()); + } exec::registerWindowFunction( name, diff --git a/velox/functions/lib/window/RowNumber.cpp b/velox/functions/lib/window/RowNumber.cpp index 60a564fee73db..15fa0dce0b9f5 100644 --- a/velox/functions/lib/window/RowNumber.cpp +++ b/velox/functions/lib/window/RowNumber.cpp @@ -28,8 +28,8 @@ class RowNumberFunction : public exec::WindowFunction { explicit RowNumberFunction(const TypePtr& resultType) : WindowFunction(resultType, nullptr, nullptr) {} - void resetPartition(const exec::WindowPartition* partition) override { - rowNumber_ = 1 + partition->offsetInPartition(); + void resetPartition(const exec::WindowPartition* /*partition*/) override { + rowNumber_ = 1; } void apply( @@ -69,6 +69,7 @@ void registerRowNumber(const std::string& name, TypeKind resultTypeKind) { std::vector signatures{ exec::FunctionSignatureBuilder() .returnType(mapTypeKindToName(resultTypeKind)) + .streaming() .build(), };