Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support aggregation function pushdown in window #7812

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
9 changes: 7 additions & 2 deletions dbms/src/AggregateFunctions/AggregateFunctionArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class AggregateFunctionArray final : public IAggregateFunctionHelper<AggregateFu
for (size_t i = 0; i < num_arguments; ++i)
nested[i] = &static_cast<const ColumnArray &>(*columns[i]).getData();

const ColumnArray & first_array_column = static_cast<const ColumnArray &>(*columns[0]);
const auto & first_array_column = static_cast<const ColumnArray &>(*columns[0]);
const IColumn::Offsets & offsets = first_array_column.getOffsets();

size_t begin = row_num == 0 ? 0 : offsets[row_num - 1];
Expand All @@ -101,7 +101,7 @@ class AggregateFunctionArray final : public IAggregateFunctionHelper<AggregateFu
/// Sanity check. NOTE We can implement specialization for a case with single argument, if the check will hurt performance.
for (size_t i = 1; i < num_arguments; ++i)
{
const ColumnArray & ith_column = static_cast<const ColumnArray &>(*columns[i]);
const auto & ith_column = static_cast<const ColumnArray &>(*columns[i]);
const IColumn::Offsets & ith_offsets = ith_column.getOffsets();

if (ith_offsets[row_num] != end || (row_num != 0 && ith_offsets[row_num - 1] != begin))
Expand Down Expand Up @@ -132,6 +132,11 @@ class AggregateFunctionArray final : public IAggregateFunctionHelper<AggregateFu
nested_func->insertResultInto(place, to, arena);
}

void insertMergeResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
nested_func->insertMergeResultInto(place, to, arena);
}

bool allocatesMemoryInArena() const override
{
return nested_func->allocatesMemoryInArena();
Expand Down
19 changes: 15 additions & 4 deletions dbms/src/AggregateFunctions/AggregateFunctionForEach.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class AggregateFunctionForEach final : public IAggregateFunctionDataHelper<Aggre
for (size_t i = 0; i < num_arguments; ++i)
nested[i] = &static_cast<const ColumnArray &>(*columns[i]).getData();

const ColumnArray & first_array_column = static_cast<const ColumnArray &>(*columns[0]);
const auto & first_array_column = static_cast<const ColumnArray &>(*columns[0]);
const IColumn::Offsets & offsets = first_array_column.getOffsets();

size_t begin = row_num == 0 ? 0 : offsets[row_num - 1];
Expand All @@ -165,7 +165,7 @@ class AggregateFunctionForEach final : public IAggregateFunctionDataHelper<Aggre
/// Sanity check. NOTE We can implement specialization for a case with single argument, if the check will hurt performance.
for (size_t i = 1; i < num_arguments; ++i)
{
const ColumnArray & ith_column = static_cast<const ColumnArray &>(*columns[i]);
const auto & ith_column = static_cast<const ColumnArray &>(*columns[i]);
const IColumn::Offsets & ith_offsets = ith_column.getOffsets();

if (ith_offsets[row_num] != end || (row_num != 0 && ith_offsets[row_num - 1] != begin))
Expand Down Expand Up @@ -229,11 +229,12 @@ class AggregateFunctionForEach final : public IAggregateFunctionDataHelper<Aggre
}
}

void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
template <bool merge>
void insertResultIntoImpl(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const
{
const AggregateFunctionForEachData & state = data(place);

ColumnArray & arr_to = static_cast<ColumnArray &>(to);
auto & arr_to = static_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
IColumn & elems_to = arr_to.getData();

Expand All @@ -247,6 +248,16 @@ class AggregateFunctionForEach final : public IAggregateFunctionDataHelper<Aggre
offsets_to.push_back(offsets_to.empty() ? state.dynamic_array_size : offsets_to.back() + state.dynamic_array_size);
}

void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<false>(place, to, arena);
}

void insertMergeResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<true>(place, to, arena);
}

bool allocatesMemoryInArena() const override
{
return true;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/AggregateFunctions/AggregateFunctionIf.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ class AggregateFunctionIf final : public IAggregateFunctionHelper<AggregateFunct
nested_func->insertResultInto(place, to, arena);
}

void insertMergeResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
nested_func->insertMergeResultInto(place, to, arena);
}

bool allocatesMemoryInArena() const override
{
return nested_func->allocatesMemoryInArena();
Expand Down
23 changes: 20 additions & 3 deletions dbms/src/AggregateFunctions/AggregateFunctionNull.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,18 @@ class AggregateFunctionNullBase : public IAggregateFunctionHelper<Derived>
}
}

void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
template <bool merge>
void insertResultIntoImpl(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const
{
if constexpr (result_is_nullable)
{
auto & to_concrete = static_cast<ColumnNullable &>(to);
if (getFlag(place))
{
nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena);
if constexpr (merge)
nested_function->insertMergeResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena);
else
nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena);
to_concrete.getNullMapData().push_back(0);
}
else
Expand All @@ -193,10 +197,23 @@ class AggregateFunctionNullBase : public IAggregateFunctionHelper<Derived>
}
else
{
nested_function->insertResultInto(nestedPlace(place), to, arena);
if constexpr (merge)
nested_function->insertMergeResultInto(nestedPlace(place), to, arena);
else
nested_function->insertResultInto(nestedPlace(place), to, arena);
}
}

void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<false>(place, to, arena);
}

void insertMergeResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
insertResultIntoImpl<true>(place, to, arena);
}

bool allocatesMemoryInArena() const override
{
return nested_function->allocatesMemoryInArena();
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/AggregateFunctions/AggregateFunctionState.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ class AggregateFunctionState final : public IAggregateFunctionHelper<AggregateFu
static_cast<ColumnAggregateFunction &>(to).getData().push_back(const_cast<AggregateDataPtr>(place));
}

void insertMergeResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
assert_cast<ColumnAggregateFunction &>(to).insertFrom(place);
}

/// Aggregate function or aggregate function state.
bool isState() const override { return true; }

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/AggregateFunctions/AggregateFunctionSumMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class AggregateFunctionSumMap final : public IAggregateFunctionDataHelper<Aggreg
void add(AggregateDataPtr __restrict place, const IColumn ** columns, const size_t row_num, Arena *) const override
{
// Column 0 contains array of keys of known type
const ColumnArray & array_column = static_cast<const ColumnArray &>(*columns[0]);
const auto & array_column = static_cast<const ColumnArray &>(*columns[0]);
const IColumn::Offsets & offsets = array_column.getOffsets();
const auto & keys_vec = static_cast<const ColumnVector<T> &>(array_column.getData());
const size_t keys_vec_offset = row_num == 0 ? 0 : offsets[row_num - 1];
Expand All @@ -99,7 +99,7 @@ class AggregateFunctionSumMap final : public IAggregateFunctionDataHelper<Aggreg
for (size_t col = 0, size = values_types.size(); col < size; ++col)
{
Field value;
const ColumnArray & array_column = static_cast<const ColumnArray &>(*columns[col + 1]);
const auto & array_column = static_cast<const ColumnArray &>(*columns[col + 1]);
const IColumn::Offsets & offsets = array_column.getOffsets();
const size_t values_vec_offset = row_num == 0 ? 0 : offsets[row_num - 1];
const size_t values_vec_size = (offsets[row_num] - values_vec_offset);
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/AggregateFunctions/IAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ class IAggregateFunction
/// Inserts results into a column.
virtual void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const = 0;

// Special method for aggregate functions with -State combinator, it behaves the same way as insertResultInto,
// but if we need to insert AggregateData into ColumnAggregateFunction we use special method
// insertInto that inserts default value and then performs merge with provided AggregateData
// instead of just copying pointer to this AggregateData. Used in WindowTransform.
virtual void insertMergeResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const
{
if (isState())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Function is marked as State but method insertMergeResultInto is not implemented");

insertResultInto(place, to, arena);
}

/** Returns true for aggregate functions of type -State.
* They are executed as other aggregate functions, but not finalized (return an aggregation state that can be combined with another).
*/
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/AggregateFunctions/IAggregateFunctionCombinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class IAggregateFunctionCombinator
const DataTypes & arguments,
const Array & params) const = 0;

virtual ~IAggregateFunctionCombinator() {}
virtual ~IAggregateFunctionCombinator() = default;
};

using AggregateFunctionCombinatorPtr = std::shared_ptr<const IAggregateFunctionCombinator>;
Expand Down
62 changes: 62 additions & 0 deletions dbms/src/Common/AlignedBuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/AlignedBuffer.h>
#include <Common/Exception.h>

namespace DB
{

namespace ErrorCodes
{
extern const int CANNOT_ALLOCATE_MEMORY;
}


void AlignedBuffer::alloc(size_t size, size_t alignment)
{
void * new_buf;
int res = ::posix_memalign(&new_buf, std::max(alignment, sizeof(void *)), size);
if (0 != res)
throwFromErrno(fmt::format("Cannot allocate memory (posix_memalign), size: {}, alignment: {}.",
size,
alignment),
ErrorCodes::CANNOT_ALLOCATE_MEMORY,
res);
buf = new_buf;
}

void AlignedBuffer::dealloc()
{
if (buf)
::free(buf);
}

void AlignedBuffer::reset(size_t size, size_t alignment)
{
dealloc();
alloc(size, alignment);
}

AlignedBuffer::AlignedBuffer(size_t size, size_t alignment)
{
alloc(size, alignment);
}

AlignedBuffer::~AlignedBuffer()
{
dealloc();
}

} // namespace DB
48 changes: 48 additions & 0 deletions dbms/src/Common/AlignedBuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <boost/noncopyable.hpp>
#include <cstdlib>
#include <utility>

namespace DB
{

/** Aligned piece of memory.
* It can only be allocated and destroyed.
* MemoryTracker is not used. AlignedBuffer is intended for small pieces of memory.
*/
class AlignedBuffer : private boost::noncopyable
{
private:
void * buf = nullptr;

void alloc(size_t size, size_t alignment);
void dealloc();

public:
AlignedBuffer() = default;
AlignedBuffer(size_t size, size_t alignment);
AlignedBuffer(AlignedBuffer && old) noexcept { std::swap(buf, old.buf); }
~AlignedBuffer();

void reset(size_t size, size_t alignment);

char * data() { return static_cast<char *>(buf); }
const char * data() const { return static_cast<const char *>(buf); }
};

} // namespace DB
Loading