Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-3722][CH] Improve shuffle writer #3728

Merged
merged 11 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public long make(
int subDirsPerLocalDir,
boolean preferSpill,
long spillThreshold,
String hashAlgorithm) {
String hashAlgorithm,
boolean throwIfMemoryExceed) {
return nativeMake(
part.getShortName(),
part.getNumPartitions(),
Expand All @@ -47,7 +48,8 @@ public long make(
subDirsPerLocalDir,
preferSpill,
spillThreshold,
hashAlgorithm);
hashAlgorithm,
throwIfMemoryExceed);
}

public long makeForRSS(
Expand All @@ -58,7 +60,8 @@ public long makeForRSS(
String codec,
long spillThreshold,
String hashAlgorithm,
Object pusher) {
Object pusher,
boolean throwIfMemoryExceed) {
return nativeMakeForRSS(
part.getShortName(),
part.getNumPartitions(),
Expand All @@ -70,7 +73,8 @@ public long makeForRSS(
codec,
spillThreshold,
hashAlgorithm,
pusher);
pusher,
throwIfMemoryExceed);
}

public native long nativeMake(
Expand All @@ -87,7 +91,8 @@ public native long nativeMake(
int subDirsPerLocalDir,
boolean preferSpill,
long spillThreshold,
String hashAlgorithm);
String hashAlgorithm,
boolean throwIfMemoryExceed);

public native long nativeMakeForRSS(
String shortName,
Expand All @@ -100,7 +105,8 @@ public native long nativeMakeForRSS(
String codec,
long spillThreshold,
String hashAlgorithm,
Object pusher);
Object pusher,
boolean throwIfMemoryExceed);

public native void split(long splitterId, long block);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class CHColumnarShuffleWriter[K, V](
private val customizedCompressCodec =
GlutenShuffleUtils.getCompressionCodec(conf).toUpperCase(Locale.ROOT)
private val preferSpill = GlutenConfig.getConf.chColumnarShufflePreferSpill
private val throwIfMemoryExceed = GlutenConfig.getConf.chColumnarThrowIfMemoryExceed
private val spillThreshold = GlutenConfig.getConf.chColumnarShuffleSpillThreshold
private val jniWrapper = new CHShuffleSplitterJniWrapper
// Are we in the process of stopping? Because map tasks can call stop() with success = true
Expand Down Expand Up @@ -102,7 +103,8 @@ class CHColumnarShuffleWriter[K, V](
subDirsPerLocalDir,
preferSpill,
spillThreshold,
CHBackendSettings.shuffleHashAlgorithm
CHBackendSettings.shuffleHashAlgorithm,
throwIfMemoryExceed
)
CHNativeMemoryAllocators.createSpillable(
"ShuffleWriter",
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ std::map<std::string, std::string> BackendInitializerUtil::getBackendConfMap(std
return ch_backend_conf;
}


/// Parse backend configs from plan extensions
do
{
Expand Down
5 changes: 5 additions & 0 deletions cpp-ch/local-engine/Operator/BlockCoalesceOperator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,29 @@

namespace local_engine
{

void BlockCoalesceOperator::mergeBlock(DB::Block & block)
{
block_buffer.add(block, 0, static_cast<int>(block.rows()));
}

bool BlockCoalesceOperator::isFull()
{
return block_buffer.size() >= buf_size;
}

DB::Block * BlockCoalesceOperator::releaseBlock()
{
clearCache();
cached_block = new DB::Block(block_buffer.releaseColumns());
return cached_block;
}

BlockCoalesceOperator::~BlockCoalesceOperator()
{
clearCache();
}

void BlockCoalesceOperator::clearCache()
{
if (cached_block)
Expand Down
9 changes: 6 additions & 3 deletions cpp-ch/local-engine/Operator/BlockCoalesceOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,23 @@ class Block;

namespace local_engine
{

class BlockCoalesceOperator
{
public:
BlockCoalesceOperator(size_t buf_size_) : buf_size(buf_size_) { }
virtual ~BlockCoalesceOperator();
explicit BlockCoalesceOperator(size_t buf_size_) : buf_size(buf_size_) { }
~BlockCoalesceOperator();

void mergeBlock(DB::Block & block);
bool isFull();
DB::Block * releaseBlock();

private:
void clearCache();

size_t buf_size;
ColumnsBuffer block_buffer;
DB::Block * cached_block = nullptr;

void clearCache();
};
}
47 changes: 23 additions & 24 deletions cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ namespace ErrorCodes

namespace local_engine
{

using namespace DB;
CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions & options_, jobject rss_pusher)

CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const SplitOptions & options_, jobject rss_pusher) : options(options_)
{
options = options_;
if (short_name == "rr")
{
partitioner = std::make_unique<RoundRobinSelectorBuilder>(options.partition_nums);
partitioner = std::make_unique<RoundRobinSelectorBuilder>(options.partition_num);
}
else if (short_name == "hash")
{
Expand All @@ -51,16 +52,16 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions
{
hash_fields.push_back(std::stoi(expr));
}
partitioner = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, options_.hash_algorithm);
partitioner = std::make_unique<HashSelectorBuilder>(options.partition_num, hash_fields, options_.hash_algorithm);
}
else if (short_name == "single")
{
options.partition_nums = 1;
partitioner = std::make_unique<RoundRobinSelectorBuilder>(options.partition_nums);
options.partition_num = 1;
partitioner = std::make_unique<RoundRobinSelectorBuilder>(options.partition_num);
}
else if (short_name == "range")
{
partitioner = std::make_unique<RangeSelectorBuilder>(options.hash_exprs, options.partition_nums);
partitioner = std::make_unique<RangeSelectorBuilder>(options.hash_exprs, options.partition_num);
}
else
{
Expand All @@ -72,6 +73,7 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions
{
output_columns_indicies.push_back(std::stoi(iter));
}

if (rss_pusher)
{
GET_JNIENV(env)
Expand All @@ -87,42 +89,41 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions
{
partition_writer = std::make_unique<LocalPartitionWriter>(this);
}
split_result.partition_length.resize(options.partition_nums, 0);
split_result.raw_partition_length.resize(options.partition_nums, 0);

split_result.partition_length.resize(options.partition_num, 0);
split_result.raw_partition_length.resize(options.partition_num, 0);
}


void CachedShuffleWriter::split(DB::Block & block)
{
initOutputIfNeeded(block);

Stopwatch split_time_watch;
split_time_watch.start();
block = convertAggregateStateInBlock(block);
split_result.total_split_time += split_time_watch.elapsedNanoseconds();

Stopwatch compute_pid_time_watch;
compute_pid_time_watch.start();
partition_info = partitioner->build(block);
PartitionInfo partition_info = partitioner->build(block);
split_result.total_compute_pid_time += compute_pid_time_watch.elapsedNanoseconds();

DB::Block out_block;
for (size_t col = 0; col < output_header.columns(); ++col)
for (size_t col_i = 0; col_i < output_header.columns(); ++col_i)
{
out_block.insert(block.getByPosition(output_columns_indicies[col]));
out_block.insert(block.getByPosition(output_columns_indicies[col_i]));
}
partition_writer->write(partition_info, out_block);

if (options.spill_threshold > 0 && partition_writer->totalCacheSize() > options.spill_threshold)
{
partition_writer->evictPartitions();
partition_writer->evictPartitions(false);
}
}

void CachedShuffleWriter::initOutputIfNeeded(Block & block)
{
if (output_header.columns() == 0) [[unlikely]]
if (!output_header)
{
output_header = block.cloneEmpty();
if (output_columns_indicies.empty())
{
output_header = block.cloneEmpty();
Expand All @@ -135,24 +136,22 @@ void CachedShuffleWriter::initOutputIfNeeded(Block & block)
{
ColumnsWithTypeAndName cols;
for (const auto & index : output_columns_indicies)
{
cols.push_back(block.getByPosition(index));
}
output_header = DB::Block(cols);

output_header = DB::Block(std::move(cols));
}
}
}

SplitResult CachedShuffleWriter::stop()
{
partition_writer->stop();
return split_result;
}

size_t CachedShuffleWriter::evictPartitions()
{
auto size = partition_writer->totalCacheSize();
if (size)
partition_writer->evictPartitions(true);
return size;
return partition_writer->evictPartitions(true);
}

}
5 changes: 3 additions & 2 deletions cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ class CachedShuffleWriter : public ShuffleWriterBase
friend class PartitionWriter;
friend class LocalPartitionWriter;
friend class CelebornPartitionWriter;
explicit CachedShuffleWriter(const String & short_name, SplitOptions & options, jobject rss_pusher = nullptr);

explicit CachedShuffleWriter(const String & short_name, const SplitOptions & options, jobject rss_pusher = nullptr);
~CachedShuffleWriter() override = default;

void split(DB::Block & block) override;
size_t evictPartitions() override;
SplitResult stop() override;
Expand All @@ -45,7 +47,6 @@ class CachedShuffleWriter : public ShuffleWriterBase
void initOutputIfNeeded(DB::Block & block);

bool stopped = false;
PartitionInfo partition_info;
DB::Block output_header;
SplitOptions options;
SplitResult split_result;
Expand Down
19 changes: 10 additions & 9 deletions cpp-ch/local-engine/Shuffle/NativeSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void NativeSplitter::split(DB::Block & block)
}
}

for (size_t i = 0; i < options.partition_nums; ++i)
for (size_t i = 0; i < options.partition_num; ++i)
{
if (partition_buffer[i]->size() >= options.buffer_size)
{
Expand All @@ -95,8 +95,8 @@ NativeSplitter::NativeSplitter(Options options_, jobject input_) : options(optio
{
GET_JNIENV(env)
input = env->NewGlobalRef(input_);
partition_buffer.reserve(options.partition_nums);
for (size_t i = 0; i < options.partition_nums; ++i)
partition_buffer.reserve(options.partition_num);
for (size_t i = 0; i < options.partition_num; ++i)
{
partition_buffer.emplace_back(std::make_shared<ColumnsBuffer>(options.buffer_size));
}
Expand All @@ -120,7 +120,7 @@ bool NativeSplitter::hasNext()
}
else
{
for (size_t i = 0; i < options.partition_nums; ++i)
for (size_t i = 0; i < options.partition_num; ++i)
{
auto buffer = partition_buffer.at(i);
if (buffer->size() > 0)
Expand Down Expand Up @@ -150,7 +150,7 @@ DB::Block * NativeSplitter::next()
return &currentBlock();
}

int32_t NativeSplitter::nextPartitionId()
int32_t NativeSplitter::nextPartitionId() const
{
return next_partition_id;
}
Expand All @@ -170,6 +170,7 @@ int64_t NativeSplitter::inputNext()
CLEAN_JNIENV
return result;
}

std::unique_ptr<NativeSplitter> NativeSplitter::create(const std::string & short_name, Options options_, jobject input)
{
if (short_name == "rr")
Expand All @@ -182,7 +183,7 @@ std::unique_ptr<NativeSplitter> NativeSplitter::create(const std::string & short
}
else if (short_name == "single")
{
options_.partition_nums = 1;
options_.partition_num = 1;
return std::make_unique<RoundRobinNativeSplitter>(options_, input);
}
else if (short_name == "range")
Expand Down Expand Up @@ -210,7 +211,7 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject
output_columns_indicies.push_back(std::stoi(*iter));
}

selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, options_.hash_algorithm);
selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_num, hash_fields, options_.hash_algorithm);
}

void HashNativeSplitter::computePartitionId(Block & block)
Expand All @@ -225,7 +226,7 @@ RoundRobinNativeSplitter::RoundRobinNativeSplitter(NativeSplitter::Options optio
{
output_columns_indicies.push_back(std::stoi(*iter));
}
selector_builder = std::make_unique<RoundRobinSelectorBuilder>(options_.partition_nums);
selector_builder = std::make_unique<RoundRobinSelectorBuilder>(options_.partition_num);
}

void RoundRobinNativeSplitter::computePartitionId(Block & block)
Expand All @@ -241,7 +242,7 @@ RangePartitionNativeSplitter::RangePartitionNativeSplitter(NativeSplitter::Optio
{
output_columns_indicies.push_back(std::stoi(*iter));
}
selector_builder = std::make_unique<RangeSelectorBuilder>(options_.exprs_buffer, options_.partition_nums);
selector_builder = std::make_unique<RangeSelectorBuilder>(options_.exprs_buffer, options_.partition_num);
}

void RangePartitionNativeSplitter::computePartitionId(DB::Block & block)
Expand Down
Loading
Loading