From 73de35f60a81895c37037bf7534b833acb59d0f3 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Thu, 16 Nov 2023 17:55:44 +0800 Subject: [PATCH 01/11] fix deadlock and improve code styles --- .../Operator/BlockCoalesceOperator.cpp | 5 + .../Operator/BlockCoalesceOperator.h | 9 +- .../Shuffle/CachedShuffleWriter.cpp | 42 ++-- .../Shuffle/CachedShuffleWriter.h | 6 +- .../local-engine/Shuffle/NativeSplitter.cpp | 19 +- cpp-ch/local-engine/Shuffle/NativeSplitter.h | 27 +-- .../local-engine/Shuffle/PartitionWriter.cpp | 190 +++++++++--------- cpp-ch/local-engine/Shuffle/PartitionWriter.h | 32 +-- .../local-engine/Shuffle/ShuffleSplitter.cpp | 118 +++++------ cpp-ch/local-engine/Shuffle/ShuffleSplitter.h | 59 ++++-- .../IO/AggregateSerializationUtils.cpp | 4 + cpp-ch/local-engine/local_engine_jni.cpp | 6 +- .../tests/benchmark_local_engine.cpp | 4 +- 13 files changed, 289 insertions(+), 232 deletions(-) diff --git a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.cpp b/cpp-ch/local-engine/Operator/BlockCoalesceOperator.cpp index f3f05b4dcf0c..756249e8a571 100644 --- a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.cpp +++ b/cpp-ch/local-engine/Operator/BlockCoalesceOperator.cpp @@ -19,24 +19,29 @@ namespace local_engine { + void BlockCoalesceOperator::mergeBlock(DB::Block & block) { block_buffer.add(block, 0, static_cast(block.rows())); } + bool BlockCoalesceOperator::isFull() { return block_buffer.size() >= buf_size; } + DB::Block * BlockCoalesceOperator::releaseBlock() { clearCache(); cached_block = new DB::Block(block_buffer.releaseColumns()); return cached_block; } + BlockCoalesceOperator::~BlockCoalesceOperator() { clearCache(); } + void BlockCoalesceOperator::clearCache() { if (cached_block) diff --git a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.h b/cpp-ch/local-engine/Operator/BlockCoalesceOperator.h index 4e4dcaf4b8ff..2b67b40cea1b 100644 --- a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.h +++ b/cpp-ch/local-engine/Operator/BlockCoalesceOperator.h @@ -25,20 +25,23 @@ class Block; namespace local_engine { + class BlockCoalesceOperator { public: - BlockCoalesceOperator(size_t buf_size_) : buf_size(buf_size_) { } - virtual ~BlockCoalesceOperator(); + explicit BlockCoalesceOperator(size_t buf_size_) : buf_size(buf_size_) { } + ~BlockCoalesceOperator(); + void mergeBlock(DB::Block & block); bool isFull(); DB::Block * releaseBlock(); private: + void clearCache(); + size_t buf_size; ColumnsBuffer block_buffer; DB::Block * cached_block = nullptr; - void clearCache(); }; } diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp index 16d56e9bb8e8..b66b5915401a 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp @@ -35,13 +35,14 @@ namespace ErrorCodes namespace local_engine { + using namespace DB; -CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions & options_, jobject rss_pusher) + +CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const SplitOptions & options_, jobject rss_pusher) : options(options_) { - options = options_; if (short_name == "rr") { - partitioner = std::make_unique(options.partition_nums); + partitioner = std::make_unique(options.partition_num); } else if (short_name == "hash") { @@ -51,16 +52,16 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions { hash_fields.push_back(std::stoi(expr)); } - partitioner = std::make_unique(options.partition_nums, hash_fields, options_.hash_algorithm); + partitioner = std::make_unique(options.partition_num, hash_fields, options_.hash_algorithm); } else if (short_name == "single") { - options.partition_nums = 1; - partitioner = std::make_unique(options.partition_nums); + options.partition_num = 1; + partitioner = std::make_unique(options.partition_num); } else if (short_name == "range") { - partitioner = std::make_unique(options.hash_exprs, options.partition_nums); + partitioner = std::make_unique(options.hash_exprs, options.partition_num); } else { @@ -72,6 +73,7 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions { output_columns_indicies.push_back(std::stoi(iter)); } + if (rss_pusher) { GET_JNIENV(env) @@ -87,44 +89,45 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions { partition_writer = std::make_unique(this); } - split_result.partition_length.resize(options.partition_nums, 0); - split_result.raw_partition_length.resize(options.partition_nums, 0); + + split_result.partition_length.resize(options.partition_num, 0); + split_result.raw_partition_length.resize(options.partition_num, 0); } void CachedShuffleWriter::split(DB::Block & block) { initOutputIfNeeded(block); + Stopwatch split_time_watch; - split_time_watch.start(); block = convertAggregateStateInBlock(block); split_result.total_split_time += split_time_watch.elapsedNanoseconds(); Stopwatch compute_pid_time_watch; - compute_pid_time_watch.start(); - partition_info = partitioner->build(block); + PartitionInfo partition_info = partitioner->build(block); split_result.total_compute_pid_time += compute_pid_time_watch.elapsedNanoseconds(); DB::Block out_block; - for (size_t col = 0; col < output_header.columns(); ++col) + for (size_t col_i = 0; col_i < output_header.columns(); ++col_i) { - out_block.insert(block.getByPosition(output_columns_indicies[col])); + out_block.insert(block.getByPosition(output_columns_indicies[col_i])); } partition_writer->write(partition_info, out_block); if (options.spill_threshold > 0 && partition_writer->totalCacheSize() > options.spill_threshold) { - partition_writer->evictPartitions(); + partition_writer->evictPartitions(false); } } void CachedShuffleWriter::initOutputIfNeeded(Block & block) { - if (output_header.columns() == 0) [[unlikely]] + if (!output_header) { output_header = block.cloneEmpty(); if (output_columns_indicies.empty()) { + /// TODO 这里逻辑有问题?output_header被赋值了两遍 output_header = block.cloneEmpty(); for (size_t i = 0; i < block.columns(); ++i) { @@ -135,18 +138,19 @@ void CachedShuffleWriter::initOutputIfNeeded(Block & block) { ColumnsWithTypeAndName cols; for (const auto & index : output_columns_indicies) - { cols.push_back(block.getByPosition(index)); - } - output_header = DB::Block(cols); + + output_header = DB::Block(std::move(cols)); } } } + SplitResult CachedShuffleWriter::stop() { partition_writer->stop(); return split_result; } + size_t CachedShuffleWriter::evictPartitions() { auto size = partition_writer->totalCacheSize(); diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h index 3b13288f0b9e..5b69c2bf66c7 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h @@ -35,8 +35,10 @@ class CachedShuffleWriter : public ShuffleWriterBase friend class PartitionWriter; friend class LocalPartitionWriter; friend class CelebornPartitionWriter; - explicit CachedShuffleWriter(const String & short_name, SplitOptions & options, jobject rss_pusher = nullptr); + + explicit CachedShuffleWriter(const String & short_name, const SplitOptions & options, jobject rss_pusher = nullptr); ~CachedShuffleWriter() override = default; + void split(DB::Block & block) override; size_t evictPartitions() override; SplitResult stop() override; @@ -45,7 +47,7 @@ class CachedShuffleWriter : public ShuffleWriterBase void initOutputIfNeeded(DB::Block & block); bool stopped = false; - PartitionInfo partition_info; + // PartitionInfo partition_info; DB::Block output_header; SplitOptions options; SplitResult split_result; diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp index d05f6633ca8a..6cd74c8af343 100644 --- a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp @@ -82,7 +82,7 @@ void NativeSplitter::split(DB::Block & block) } } - for (size_t i = 0; i < options.partition_nums; ++i) + for (size_t i = 0; i < options.partition_num; ++i) { if (partition_buffer[i]->size() >= options.buffer_size) { @@ -95,8 +95,8 @@ NativeSplitter::NativeSplitter(Options options_, jobject input_) : options(optio { GET_JNIENV(env) input = env->NewGlobalRef(input_); - partition_buffer.reserve(options.partition_nums); - for (size_t i = 0; i < options.partition_nums; ++i) + partition_buffer.reserve(options.partition_num); + for (size_t i = 0; i < options.partition_num; ++i) { partition_buffer.emplace_back(std::make_shared(options.buffer_size)); } @@ -120,7 +120,7 @@ bool NativeSplitter::hasNext() } else { - for (size_t i = 0; i < options.partition_nums; ++i) + for (size_t i = 0; i < options.partition_num; ++i) { auto buffer = partition_buffer.at(i); if (buffer->size() > 0) @@ -150,7 +150,7 @@ DB::Block * NativeSplitter::next() return ¤tBlock(); } -int32_t NativeSplitter::nextPartitionId() +int32_t NativeSplitter::nextPartitionId() const { return next_partition_id; } @@ -170,6 +170,7 @@ int64_t NativeSplitter::inputNext() CLEAN_JNIENV return result; } + std::unique_ptr NativeSplitter::create(const std::string & short_name, Options options_, jobject input) { if (short_name == "rr") @@ -182,7 +183,7 @@ std::unique_ptr NativeSplitter::create(const std::string & short } else if (short_name == "single") { - options_.partition_nums = 1; + options_.partition_num = 1; return std::make_unique(options_, input); } else if (short_name == "range") @@ -210,7 +211,7 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject output_columns_indicies.push_back(std::stoi(*iter)); } - selector_builder = std::make_unique(options.partition_nums, hash_fields, options_.hash_algorithm); + selector_builder = std::make_unique(options.partition_num, hash_fields, options_.hash_algorithm); } void HashNativeSplitter::computePartitionId(Block & block) @@ -225,7 +226,7 @@ RoundRobinNativeSplitter::RoundRobinNativeSplitter(NativeSplitter::Options optio { output_columns_indicies.push_back(std::stoi(*iter)); } - selector_builder = std::make_unique(options_.partition_nums); + selector_builder = std::make_unique(options_.partition_num); } void RoundRobinNativeSplitter::computePartitionId(Block & block) @@ -241,7 +242,7 @@ RangePartitionNativeSplitter::RangePartitionNativeSplitter(NativeSplitter::Optio { output_columns_indicies.push_back(std::stoi(*iter)); } - selector_builder = std::make_unique(options_.exprs_buffer, options_.partition_nums); + selector_builder = std::make_unique(options_.exprs_buffer, options_.partition_num); } void RangePartitionNativeSplitter::computePartitionId(DB::Block & block) diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.h b/cpp-ch/local-engine/Shuffle/NativeSplitter.h index c30f235b6550..d9916b91c2ef 100644 --- a/cpp-ch/local-engine/Shuffle/NativeSplitter.h +++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.h @@ -38,7 +38,7 @@ class NativeSplitter : BlockIterator struct Options { size_t buffer_size = DEFAULT_BLOCK_SIZE; - size_t partition_nums; + size_t partition_num; std::string exprs_buffer; std::string schema_buffer; std::string hash_algorithm; @@ -55,15 +55,15 @@ class NativeSplitter : BlockIterator static std::unique_ptr create(const std::string & short_name, Options options, jobject input); NativeSplitter(Options options, jobject input); + virtual ~NativeSplitter(); + bool hasNext(); DB::Block * next(); - int32_t nextPartitionId(); - - - virtual ~NativeSplitter(); + int32_t nextPartitionId() const; protected: - virtual void computePartitionId(DB::Block &) { } + virtual void computePartitionId(DB::Block &) = 0; + Options options; PartitionInfo partition_info; std::vector output_columns_indicies; @@ -74,7 +74,6 @@ class NativeSplitter : BlockIterator int64_t inputNext(); bool inputHasNext(); - std::vector> partition_buffer; std::stack>> output_buffer; int32_t next_partition_id = -1; @@ -83,35 +82,37 @@ class NativeSplitter : BlockIterator class HashNativeSplitter : public NativeSplitter { - void computePartitionId(DB::Block & block) override; - public: HashNativeSplitter(NativeSplitter::Options options_, jobject input); + ~HashNativeSplitter() override = default; private: + void computePartitionId(DB::Block & block) override; + std::unique_ptr selector_builder; }; class RoundRobinNativeSplitter : public NativeSplitter { - void computePartitionId(DB::Block & block) override; - public: RoundRobinNativeSplitter(NativeSplitter::Options options_, jobject input); + ~RoundRobinNativeSplitter() override = default; private: + void computePartitionId(DB::Block & block) override; + std::unique_ptr selector_builder; }; class RangePartitionNativeSplitter : public NativeSplitter { - void computePartitionId(DB::Block & block) override; - public: RangePartitionNativeSplitter(NativeSplitter::Options options_, jobject input); ~RangePartitionNativeSplitter() override = default; private: + void computePartitionId(DB::Block & block) override; + std::unique_ptr selector_builder; }; diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 11347e9aeb79..dfe7e2b0fe09 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -35,70 +35,79 @@ using namespace DB; namespace local_engine { -void local_engine::PartitionWriter::write(const PartitionInfo& partition_info, DB::Block & data) + +void local_engine::PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & data) { Stopwatch time; - time.start(); - for (size_t col = 0; col < data.columns(); ++col) - { - for (size_t j = 0; j < partition_info.partition_num; ++j) - { - size_t from = partition_info.partition_start_points[j]; - size_t length = partition_info.partition_start_points[j + 1] - from; - if (length == 0) - continue; // no data for this partition continue; - partition_block_buffer[j].appendSelective(col, data, partition_info.partition_selector, from, length); - } - } - for (size_t i = 0; i < shuffle_writer->options.partition_nums; ++i) + for (size_t partition_i = 0; partition_i < partition_info.partition_num; ++partition_i) { - ColumnsBuffer & buffer = partition_block_buffer[i]; - if (buffer.size() >= shuffle_writer->options.split_size) + size_t from = partition_info.partition_start_points[partition_i]; + size_t length = partition_info.partition_start_points[partition_i + 1] - from; + + std::unique_lock lock(mtxs[partition_i]); + + /// Make sure buffer size is no greater than split_size + auto & buffer = partition_block_buffer[partition_i]; + if (buffer->size() && buffer->size() + length >= shuffle_writer->options.split_size) { - Block block = buffer.releaseColumns(); - auto bytes = block.allocatedBytes(); + std::cout << "split_size:" << shuffle_writer->options.split_size << " buffer size:" << buffer->size() + << " buffer bytes:" << buffer->bytes() << " buffer allocatedBytes:" << buffer->allocatedBytes() << std::endl; + + Block block = buffer->releaseColumns(); + auto bytes = block.bytes(); total_partition_buffer_size += bytes; - shuffle_writer->split_result.raw_partition_length[i] += bytes; - partition_buffer[i].addBlock(block); + shuffle_writer->split_result.raw_partition_length[partition_i] += bytes; + partition_buffer[partition_i]->addBlock(std::move(block)); } + + for (size_t col_i = 0; col_i < data.columns(); ++col_i) + partition_block_buffer[partition_i]->appendSelective(col_i, data, partition_info.partition_selector, from, length); } + shuffle_writer->split_result.total_split_time += time.elapsedNanoseconds(); } void LocalPartitionWriter::evictPartitions(bool for_memory_spill) { - auto spill_to_file = [this]() -> void { auto file = getNextSpillFile(); WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); + SpillInfo info; info.spilled_file = file; - size_t partition_id = 0; + Stopwatch serialization_time_watch; - serialization_time_watch.start(); - for (auto & partition : partition_buffer) + for (size_t partition_id = 0; partition_id < partition_buffer.size(); ++partition_id) { PartitionSpillInfo partition_spill_info; partition_spill_info.start = output.count(); - size_t raw_size = partition.spill(writer); + + size_t raw_size = 0; + { + std::unique_lock lock(mtxs[partition_id]); + + auto & partition = partition_buffer[partition_id]; + raw_size = partition->spill(writer); + } + compressed_output.sync(); partition_spill_info.length = output.count() - partition_spill_info.start; shuffle_writer->split_result.raw_partition_length[partition_id] += raw_size; partition_spill_info.partition_id = partition_id; - partition_id++; info.partition_spill_infos.emplace_back(partition_spill_info); } + spill_infos.emplace_back(info); shuffle_writer->split_result.total_compress_time += compressed_output.getCompressTime(); shuffle_writer->split_result.total_write_time += compressed_output.getWriteTime(); shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); }; + Stopwatch spill_time_watch; - spill_time_watch.start(); if (for_memory_spill) { // escape memory track from current thread status; add untracked memory limit for create thread object, avoid trigger memory spill again @@ -111,14 +120,10 @@ void LocalPartitionWriter::evictPartitions(bool for_memory_spill) spill_to_file(); } shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); - - for (auto & partition : partition_buffer) - { - partition.clear(); - } shuffle_writer->split_result.total_bytes_spilled += total_partition_buffer_size; total_partition_buffer_size = 0; } + std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) { auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); @@ -126,7 +131,7 @@ std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) NativeWriter writer(compressed_output, shuffle_writer->output_header); std::vector partition_length; - partition_length.resize(shuffle_writer->options.partition_nums, 0); + partition_length.resize(shuffle_writer->options.partition_num, 0); std::vector spill_inputs; spill_inputs.reserve(spill_infos.size()); for (const auto & spill : spill_infos) @@ -136,13 +141,14 @@ std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) } Stopwatch write_time_watch; - write_time_watch.start(); Stopwatch io_time_watch; + Stopwatch serialization_time_watch; size_t merge_io_time = 0; String buffer; - for (size_t partition_id = 0; partition_id < shuffle_writer->options.partition_nums; ++partition_id) + for (size_t partition_id = 0; partition_id < partition_block_buffer.size(); ++partition_id) { auto size_before = data_file.count(); + io_time_watch.restart(); for (size_t i = 0; i < spill_infos.size(); ++i) { @@ -153,20 +159,27 @@ std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) } merge_io_time += io_time_watch.elapsedNanoseconds(); - if (partition_block_buffer[partition_id].size() > 0) + serialization_time_watch.restart(); + size_t raw_size = 0; { - Block block = partition_block_buffer[partition_id].releaseColumns(); - partition_buffer[partition_id].addBlock(block); + std::unique_lock lock(mtxs[partition_id]); + + if (!partition_block_buffer[partition_id]->empty()) + { + Block block = partition_block_buffer[partition_id]->releaseColumns(); + partition_buffer[partition_id]->addBlock(std::move(block)); + } + + raw_size = partition_buffer[partition_id]->spill(writer); } - Stopwatch serialization_time_watch; - serialization_time_watch.start(); - size_t raw_size = partition_buffer[partition_id].spill(writer); + compressed_output.sync(); partition_length[partition_id] = data_file.count() - size_before; shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.total_bytes_written += partition_length[partition_id]; shuffle_writer->split_result.raw_partition_length[partition_id] += raw_size; } + shuffle_writer->split_result.total_write_time += write_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.total_compress_time += compressed_output.getCompressTime(); shuffle_writer->split_result.total_disk_time += compressed_output.getWriteTime(); @@ -177,12 +190,14 @@ std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) { std::filesystem::remove(spill.spilled_file); } + return partition_length; } -LocalPartitionWriter::LocalPartitionWriter(CachedShuffleWriter * shuffle_writer_) - : PartitionWriter(shuffle_writer_) + +LocalPartitionWriter::LocalPartitionWriter(CachedShuffleWriter * shuffle_writer_) : PartitionWriter(shuffle_writer_) { } + String LocalPartitionWriter::getNextSpillFile() { auto file_name = std::to_string(options->shuffle_id) + "_" + std::to_string(options->map_id) + "_" + std::to_string(spill_infos.size()); @@ -204,39 +219,47 @@ void LocalPartitionWriter::stop() } PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_) + : shuffle_writer(shuffle_writer_) + , options(&shuffle_writer->options) + , mtxs(options->partition_num) + , partition_block_buffer(options->partition_num) + , partition_buffer(options->partition_num) { - shuffle_writer = shuffle_writer_; - options = &shuffle_writer->options; - for (size_t i = 0; i < options->partition_nums; ++i) + for (size_t partition_i = 0; partition_i < options->partition_num; ++partition_i) { - partition_block_buffer.emplace_back(ColumnsBuffer(options->split_size)); + partition_block_buffer[partition_i] = std::make_shared(options->split_size); + partition_buffer[partition_i] = std::make_shared(); } - partition_buffer.resize(options->partition_nums); } + CelebornPartitionWriter::CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client_) : PartitionWriter(shuffleWriter), celeborn_client(std::move(celeborn_client_)) { - } void CelebornPartitionWriter::evictPartitions(bool for_memory_spill) { - auto spill_to_celeborn = [this]() -> void { + auto spill_to_celeborn = [this]() + { Stopwatch serialization_time_watch; - serialization_time_watch.start(); for (size_t partition_id = 0; partition_id < partition_buffer.size(); ++partition_id) { - auto & partition = partition_buffer[partition_id]; - if (partition.empty()) continue; WriteBufferFromOwnString output; auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); - size_t raw_size = partition.spill(writer); + + size_t raw_size = 0; + { + std::unique_lock lock(mtxs[partition_id]); + auto & partition = partition_buffer[partition_id]; + raw_size = partition->spill(writer); + } compressed_output.sync(); + Stopwatch push_time_watch; - push_time_watch.start(); celeborn_client->pushPartitionData(partition_id, output.str().data(), output.str().size()); + shuffle_writer->split_result.partition_length[partition_id] += output.str().size(); shuffle_writer->split_result.raw_partition_length[partition_id] += raw_size; shuffle_writer->split_result.total_compress_time += compressed_output.getCompressTime(); @@ -246,8 +269,8 @@ void CelebornPartitionWriter::evictPartitions(bool for_memory_spill) } shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); }; + Stopwatch spill_time_watch; - spill_time_watch.start(); if (for_memory_spill) { // escape memory track from current thread status; add untracked memory limit for create thread object, avoid trigger memory spill again @@ -260,70 +283,55 @@ void CelebornPartitionWriter::evictPartitions(bool for_memory_spill) IgnoreMemoryTracker ignore(2 * 1024 * 1024); spill_to_celeborn(); } + shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); - for (auto & partition : partition_buffer) - { - partition.clear(); - } shuffle_writer->split_result.total_bytes_spilled += total_partition_buffer_size; total_partition_buffer_size = 0; } void CelebornPartitionWriter::stop() { - for (size_t partition_id = 0; partition_id < shuffle_writer->options.partition_nums; ++partition_id) + for (size_t partition_id = 0; partition_id < partition_block_buffer.size(); ++partition_id) { - if (partition_block_buffer[partition_id].size() > 0) + std::unique_lock lock(mtxs[partition_id]); + + if (!partition_block_buffer[partition_id]->empty()) { - Block block = partition_block_buffer[partition_id].releaseColumns(); - partition_buffer[partition_id].addBlock(block); + Block block = partition_block_buffer[partition_id]->releaseColumns(); + partition_buffer[partition_id]->addBlock(std::move(block)); } } + evictPartitions(false); + for (const auto & item : shuffle_writer->split_result.partition_length) { shuffle_writer->split_result.total_bytes_written += item; } } -void Partition::addBlock(DB::Block & block) +void Partition::addBlock(DB::Block block) { /// Do not insert empty blocks, otherwise will cause the shuffle read terminate early. if (!block.rows()) return; - std::unique_lock lock(mtx); - blocks.emplace_back(std::move(block)); -} -bool Partition::empty() const -{ - return blocks.empty(); -} - -void Partition::clear() -{ - std::unique_lock lock(mtx, std::try_to_lock); - if (lock.owns_lock()) - blocks.clear(); + blocks.emplace_back(std::move(block)); } size_t Partition::spill(NativeWriter & writer) { - std::unique_lock lock(mtx, std::try_to_lock); - if (lock.owns_lock()) + size_t total_size = 0; + for (auto & block : blocks) { - size_t raw_size = 0; - while (!blocks.empty()) - { - auto & block = blocks.back(); - raw_size += writer.write(block); - blocks.pop_back(); - } - blocks.clear(); - return raw_size; + total_size += writer.write(block); + + /// Clear each block once it is serialized to reduce peak memory + DB::Block().swap(block); } - else - return 0; + + blocks.clear(); + return total_size; } } diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index 0a457e39415e..62e3830501c1 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -37,25 +37,26 @@ struct SpillInfo { std::vector partition_spill_infos; }; -class CachedShuffleWriter; class Partition { public: - Partition() {} - Partition(const Partition & p) : blocks(p.blocks) {} + Partition() = default; ~Partition() = default; - void addBlock(DB::Block & block); - bool empty() const; - void clear(); + + Partition(Partition && other) noexcept : blocks(std::move(other.blocks)) { } + + void addBlock(DB::Block block); size_t spill(NativeWriter & writer); private: std::vector blocks; - std::mutex mtx; }; -class PartitionWriter { +class CachedShuffleWriter; +using PartitionPtr = std::shared_ptr; +class PartitionWriter : boost::noncopyable +{ public: explicit PartitionWriter(CachedShuffleWriter* shuffle_writer_); virtual ~PartitionWriter() = default; @@ -72,10 +73,18 @@ class PartitionWriter { } protected: - std::vector partition_block_buffer; - std::vector partition_buffer; - SplitOptions * options; CachedShuffleWriter * shuffle_writer; + SplitOptions * options; + + /// The mutex is used to protect partition_block_buffer and partition_buffer + /// It may be accessed by multiple threads in which "CachedShuffleWriter::split" and "CachedShuffleWriter::spill" are invoked simutanously. + /// + /// Notice: the mutex must support being recursively acquired by the same thread, which happens during memory spilling. + /// For more details, pls refer to https://github.com/oap-project/gluten/issues/3722 + mutable std::vector mtxs; + std::vector partition_block_buffer; + std::vector partition_buffer; + size_t total_partition_buffer_size = 0; }; @@ -99,7 +108,6 @@ class CelebornPartitionWriter : public PartitionWriter void evictPartitions(bool for_memory_spill) override; void stop() override; - private: std::unique_ptr celeborn_client; }; diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp index e45b9d32218e..cf19b92861f9 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp @@ -33,6 +33,7 @@ namespace local_engine { + void ShuffleSplitter::split(DB::Block & block) { if (block.rows() == 0) @@ -42,17 +43,16 @@ void ShuffleSplitter::split(DB::Block & block) initOutputIfNeeded(block); computeAndCountPartitionId(block); Stopwatch split_time_watch; - split_time_watch.start(); block = convertAggregateStateInBlock(block); split_result.total_split_time += split_time_watch.elapsedNanoseconds(); splitBlockByPartition(block); } + SplitResult ShuffleSplitter::stop() { // spill all buffers Stopwatch watch; - watch.start(); - for (size_t i = 0; i < options.partition_nums; i++) + for (size_t i = 0; i < options.partition_num; i++) { spillPartition(i); partition_outputs[i]->flush(); @@ -104,7 +104,6 @@ void ShuffleSplitter::initOutputIfNeeded(Block & block) void ShuffleSplitter::splitBlockByPartition(DB::Block & block) { Stopwatch split_time_watch; - split_time_watch.start(); DB::Block out_block; for (size_t col = 0; col < output_header.columns(); ++col) { @@ -118,50 +117,54 @@ void ShuffleSplitter::splitBlockByPartition(DB::Block & block) size_t length = partition_info.partition_start_points[j + 1] - from; if (length == 0) continue; // no data for this partition continue; - partition_buffer[j].appendSelective(col, out_block, partition_info.partition_selector, from, length); + partition_buffer[j]->appendSelective(col, out_block, partition_info.partition_selector, from, length); } } split_result.total_split_time += split_time_watch.elapsedNanoseconds(); - for (size_t i = 0; i < options.partition_nums; ++i) + for (size_t i = 0; i < options.partition_num; ++i) { - ColumnsBuffer & buffer = partition_buffer[i]; - if (buffer.size() >= options.split_size) + auto & buffer = partition_buffer[i]; + if (buffer->size() >= options.split_size) { + std::cout << "split_size:" << options.split_size << " buffer size:" << buffer->size() << " buffer bytes:" << buffer->bytes() + << " buffer allocatedBytes:" << buffer->allocatedBytes() << std::endl; spillPartition(i); } } } + +ShuffleSplitter::ShuffleSplitter(const SplitOptions & options_) : options(options_) +{ + init(); +} + void ShuffleSplitter::init() { - partition_buffer.reserve(options.partition_nums); - partition_outputs.reserve(options.partition_nums); - partition_write_buffers.reserve(options.partition_nums); - partition_cached_write_buffers.reserve(options.partition_nums); - split_result.partition_length.reserve(options.partition_nums); - split_result.raw_partition_length.reserve(options.partition_nums); - for (size_t i = 0; i < options.partition_nums; ++i) + partition_buffer.resize(options.partition_num); + partition_outputs.resize(options.partition_num); + partition_write_buffers.resize(options.partition_num); + partition_cached_write_buffers.resize(options.partition_num); + split_result.partition_length.resize(options.partition_num); + split_result.raw_partition_length.resize(options.partition_num); + for (size_t partition_i = 0; partition_i < options.partition_num; ++partition_i) { - partition_buffer.emplace_back(ColumnsBuffer()); - split_result.partition_length.emplace_back(0); - split_result.raw_partition_length.emplace_back(0); - partition_outputs.emplace_back(nullptr); - partition_write_buffers.emplace_back(nullptr); - partition_cached_write_buffers.emplace_back(nullptr); + partition_buffer[partition_i] = std::make_shared(options.split_size); + split_result.partition_length[partition_i] = 0; + split_result.raw_partition_length[partition_i] = 0; } } void ShuffleSplitter::spillPartition(size_t partition_id) { Stopwatch watch; - watch.start(); if (!partition_outputs[partition_id]) { partition_write_buffers[partition_id] = getPartitionWriteBuffer(partition_id); partition_outputs[partition_id] = std::make_unique(*partition_write_buffers[partition_id], output_header); } - DB::Block result = partition_buffer[partition_id].releaseColumns(); + DB::Block result = partition_buffer[partition_id]->releaseColumns(); if (result.rows() > 0) { partition_outputs[partition_id]->write(result); @@ -173,12 +176,11 @@ void ShuffleSplitter::spillPartition(size_t partition_id) void ShuffleSplitter::mergePartitionFiles() { Stopwatch merge_io_time; - merge_io_time.start(); DB::WriteBufferFromFile data_write_buffer = DB::WriteBufferFromFile(options.data_file); std::string buffer; size_t buffer_size = options.io_buffer_size; buffer.reserve(buffer_size); - for (size_t i = 0; i < options.partition_nums; ++i) + for (size_t i = 0; i < options.partition_num; ++i) { auto file = getPartitionTempFile(i); DB::ReadBufferFromFile reader = DB::ReadBufferFromFile(file, options.io_buffer_size); @@ -196,34 +198,23 @@ void ShuffleSplitter::mergePartitionFiles() data_write_buffer.close(); } -ShuffleSplitter::ShuffleSplitter(SplitOptions && options_) : options(options_) -{ - init(); -} -ShuffleSplitter::Ptr ShuffleSplitter::create(const std::string & short_name, SplitOptions options_) +ShuffleSplitterPtr ShuffleSplitter::create(const std::string & short_name, const SplitOptions & options_) { if (short_name == "rr") - { - return RoundRobinSplitter::create(std::move(options_)); - } + return RoundRobinSplitter::create(options_); else if (short_name == "hash") - { - return HashSplitter::create(std::move(options_)); - } + return HashSplitter::create(options_); else if (short_name == "single") { - options_.partition_nums = 1; - return RoundRobinSplitter::create(std::move(options_)); + SplitOptions options = options_; + options.partition_num = 1; + return RoundRobinSplitter::create(options); } else if (short_name == "range") - { - return RangeSplitter::create(std::move(options_)); - } + return RangeSplitter::create(options_); else - { throw std::runtime_error("unsupported splitter " + short_name); - } } std::string ShuffleSplitter::getPartitionTempFile(size_t partition_id) @@ -260,8 +251,6 @@ std::unique_ptr ShuffleSplitter::getPartitionWriteBuffer(size_t } } -const std::vector ShuffleSplitter::compress_methods = {"", "ZSTD", "LZ4"}; - void ShuffleSplitter::writeIndexFile() { auto index_file = options.data_file + ".index"; @@ -329,9 +318,12 @@ void ColumnsBuffer::appendSelective( size_t ColumnsBuffer::size() const { - if (accumulated_columns.empty()) - return 0; - return accumulated_columns.at(0)->size(); + return accumulated_columns.empty() ? 0 : accumulated_columns[0]->size(); +} + +bool ColumnsBuffer::empty() const +{ + return accumulated_columns.empty() ? true : accumulated_columns[0]->empty(); } DB::Block ColumnsBuffer::releaseColumns() @@ -352,34 +344,34 @@ DB::Block ColumnsBuffer::getHeader() { return header; } + ColumnsBuffer::ColumnsBuffer(size_t prefer_buffer_size_) : prefer_buffer_size(prefer_buffer_size_) { } -RoundRobinSplitter::RoundRobinSplitter(SplitOptions options_) : ShuffleSplitter(std::move(options_)) +RoundRobinSplitter::RoundRobinSplitter(const SplitOptions & options_) : ShuffleSplitter(options_) { Poco::StringTokenizer output_column_tokenizer(options_.out_exprs, ","); for (auto iter = output_column_tokenizer.begin(); iter != output_column_tokenizer.end(); ++iter) { output_columns_indicies.push_back(std::stoi(*iter)); } - selector_builder = std::make_unique(options.partition_nums); + selector_builder = std::make_unique(options.partition_num); } void RoundRobinSplitter::computeAndCountPartitionId(DB::Block & block) { Stopwatch watch; - watch.start(); partition_info = selector_builder->build(block); split_result.total_compute_pid_time += watch.elapsedNanoseconds(); } -std::unique_ptr RoundRobinSplitter::create(SplitOptions && options_) +ShuffleSplitterPtr RoundRobinSplitter::create(const SplitOptions & options_) { - return std::make_unique(std::move(options_)); + return std::make_unique(options_); } -HashSplitter::HashSplitter(SplitOptions options_) : ShuffleSplitter(std::move(options_)) +HashSplitter::HashSplitter(SplitOptions options_) : ShuffleSplitter(options_) { Poco::StringTokenizer exprs_list(options_.hash_exprs, ","); std::vector hash_fields; @@ -394,39 +386,39 @@ HashSplitter::HashSplitter(SplitOptions options_) : ShuffleSplitter(std::move(op output_columns_indicies.push_back(std::stoi(*iter)); } - selector_builder = std::make_unique(options.partition_nums, hash_fields, options_.hash_algorithm); + selector_builder = std::make_unique(options.partition_num, hash_fields, options_.hash_algorithm); } -std::unique_ptr HashSplitter::create(SplitOptions && options_) + +std::unique_ptr HashSplitter::create(const SplitOptions & options_) { - return std::make_unique(std::move(options_)); + return std::make_unique(options_); } void HashSplitter::computeAndCountPartitionId(DB::Block & block) { Stopwatch watch; - watch.start(); partition_info = selector_builder->build(block); split_result.total_compute_pid_time += watch.elapsedNanoseconds(); } -std::unique_ptr RangeSplitter::create(SplitOptions && options_) +ShuffleSplitterPtr RangeSplitter::create(const SplitOptions & options_) { - return std::make_unique(std::move(options_)); + return std::make_unique(options_); } -RangeSplitter::RangeSplitter(SplitOptions options_) : ShuffleSplitter(std::move(options_)) +RangeSplitter::RangeSplitter(const SplitOptions & options_) : ShuffleSplitter(options_) { Poco::StringTokenizer output_column_tokenizer(options_.out_exprs, ","); for (auto iter = output_column_tokenizer.begin(); iter != output_column_tokenizer.end(); ++iter) { output_columns_indicies.push_back(std::stoi(*iter)); } - selector_builder = std::make_unique(options.hash_exprs, options.partition_nums); + selector_builder = std::make_unique(options.hash_exprs, options.partition_num); } + void RangeSplitter::computeAndCountPartitionId(DB::Block & block) { Stopwatch watch; - watch.start(); partition_info = selector_builder->build(block); split_result.total_compute_pid_time += watch.elapsedNanoseconds(); } diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h index aad53508b81b..70f4f7ef7e1c 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h @@ -40,10 +40,9 @@ struct SplitOptions int num_sub_dirs; int shuffle_id; int map_id; - size_t partition_nums; + size_t partition_num; std::string hash_exprs; std::string out_exprs; - // std::vector exprs; std::string compress_method = "zstd"; int compress_level; size_t spill_threshold = 300 * 1024 * 1024; @@ -53,18 +52,40 @@ struct SplitOptions class ColumnsBuffer { public: - explicit ColumnsBuffer(size_t prefer_buffer_size = DEFAULT_BLOCK_SIZE); + ColumnsBuffer(size_t prefer_buffer_size = 8192); + ~ColumnsBuffer() = default; + void add(DB::Block & columns, int start, int end); void appendSelective(size_t column_idx, const DB::Block & source, const DB::IColumn::Selector & selector, size_t from, size_t length); + size_t size() const; + bool empty() const; + DB::Block releaseColumns(); DB::Block getHeader(); + size_t bytes() const + { + size_t res = 0; + for (const auto & col : accumulated_columns) + res += col->byteSize(); + return res; + } + + size_t allocatedBytes() const + { + size_t res = 0; + for (const auto & col : accumulated_columns) + res += col->allocatedBytes(); + return res; + } + private: DB::MutableColumns accumulated_columns; DB::Block header; size_t prefer_buffer_size; }; +using ColumnsBufferPtr = std::shared_ptr; struct SplitResult { @@ -81,18 +102,23 @@ struct SplitResult Int64 total_serialize_time = 0; }; +class ShuffleSplitter; +using ShuffleSplitterPtr = std::unique_ptr; class ShuffleSplitter : public ShuffleWriterBase { +private: + inline const static std::vector compress_methods = {"", "ZSTD", "LZ4"}; + public: - static const std::vector compress_methods; - using Ptr = std::unique_ptr; - static Ptr create(const std::string & short_name, SplitOptions options_); - explicit ShuffleSplitter(SplitOptions && options); + static ShuffleSplitterPtr create(const std::string & short_name, const SplitOptions & options_); + + explicit ShuffleSplitter(const SplitOptions & options); virtual ~ShuffleSplitter() override { if (!stopped) stop(); } + void split(DB::Block & block) override; virtual void computeAndCountPartitionId(DB::Block &) { } std::vector getPartitionLength() const { return split_result.partition_length; } @@ -111,7 +137,7 @@ class ShuffleSplitter : public ShuffleWriterBase protected: bool stopped = false; PartitionInfo partition_info; - std::vector partition_buffer; + std::vector partition_buffer; std::vector> partition_outputs; std::vector> partition_write_buffers; std::vector> partition_cached_write_buffers; @@ -125,11 +151,11 @@ class ShuffleSplitter : public ShuffleWriterBase class RoundRobinSplitter : public ShuffleSplitter { public: - static std::unique_ptr create(SplitOptions && options); + static ShuffleSplitterPtr create(const SplitOptions & options); - explicit RoundRobinSplitter(SplitOptions options_); + explicit RoundRobinSplitter(const SplitOptions & options_); + virtual ~RoundRobinSplitter() override = default; - ~RoundRobinSplitter() override = default; void computeAndCountPartitionId(DB::Block & block) override; private: @@ -139,11 +165,11 @@ class RoundRobinSplitter : public ShuffleSplitter class HashSplitter : public ShuffleSplitter { public: - static std::unique_ptr create(SplitOptions && options); + static ShuffleSplitterPtr create(const SplitOptions & options); explicit HashSplitter(SplitOptions options_); + virtual ~HashSplitter() override = default; - ~HashSplitter() override = default; void computeAndCountPartitionId(DB::Block & block) override; private: @@ -153,8 +179,11 @@ class HashSplitter : public ShuffleSplitter class RangeSplitter : public ShuffleSplitter { public: - static std::unique_ptr create(SplitOptions && options); - explicit RangeSplitter(SplitOptions options_); + static ShuffleSplitterPtr create(const SplitOptions & options); + + explicit RangeSplitter(const SplitOptions & options_); + virtual ~RangeSplitter() override = default; + void computeAndCountPartitionId(DB::Block & block) override; private: diff --git a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp index 84c32f4565f7..f9fe83037ed9 100644 --- a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp +++ b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp @@ -136,7 +136,11 @@ DB::Block convertAggregateStateInBlock(DB::Block& block) { columns.emplace_back(item); } + + std::cout << "column " << item.name << " from " << item.type->getName() << " with bytes " << item.column->allocatedBytes() << " to " + << columns.back().type->getName() << " with bytes " << columns.back().column->allocatedBytes() << std::endl; } + return columns; } } diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 6813a5c9335f..4571635c79af 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -691,7 +691,7 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat .num_sub_dirs = num_sub_dirs, .shuffle_id = shuffle_id, .map_id = static_cast(map_id), - .partition_nums = static_cast(num_partitions), + .partition_num = static_cast(num_partitions), .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), @@ -752,7 +752,7 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat .io_buffer_size = DBMS_DEFAULT_BUFFER_SIZE, .shuffle_id = shuffle_id, .map_id = static_cast(map_id), - .partition_nums = static_cast(num_partitions), + .partition_num = static_cast(num_partitions), .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), @@ -1068,7 +1068,7 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Options options; - options.partition_nums = partition_num; + options.partition_num = partition_num; options.buffer_size = buffer_size; auto hash_algorithm_str = jstring2string(env, hash_algorithm); options.hash_algorithm.swap(hash_algorithm_str); diff --git a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp index 1900ee865786..6bfc443e97e6 100644 --- a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp +++ b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp @@ -285,7 +285,7 @@ DB::ContextMutablePtr global_context; .io_buffer_size = DBMS_DEFAULT_BUFFER_SIZE, .data_file = root + "/data.dat", .map_id = 1, - .partition_nums = 4, + .partition_num = 4, .compress_method = local_engine::ShuffleSplitter::compress_methods[state.range(1)]}; auto splitter = local_engine::ShuffleSplitter::create("rr", options); while (executor.pull(chunk)) @@ -374,7 +374,7 @@ DB::ContextMutablePtr global_context; .io_buffer_size = DBMS_DEFAULT_BUFFER_SIZE, .data_file = root + "/data.dat", .map_id = 1, - .partition_nums = 4, + .partition_num = 4, .compress_method = local_engine::ShuffleSplitter::compress_methods[state.range(1)]}; auto splitter = local_engine::ShuffleSplitter::create("hash", options); while (executor.pull(chunk)) From 99a2559c76c90d21935f9b412373a9b7cd39c9af Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Thu, 16 Nov 2023 22:10:37 +0800 Subject: [PATCH 02/11] remove useless codes --- .../Shuffle/CachedShuffleWriter.cpp | 5 +-- .../local-engine/Shuffle/PartitionWriter.cpp | 32 ++++++++++++++----- cpp-ch/local-engine/Shuffle/PartitionWriter.h | 6 ++-- .../local-engine/Shuffle/ShuffleSplitter.cpp | 2 -- .../local-engine/Shuffle/ShuffleWriterBase.h | 5 +-- .../IO/AggregateSerializationUtils.cpp | 5 ++- 6 files changed, 33 insertions(+), 22 deletions(-) diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp index b66b5915401a..20df5952c159 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp @@ -153,10 +153,7 @@ SplitResult CachedShuffleWriter::stop() size_t CachedShuffleWriter::evictPartitions() { - auto size = partition_writer->totalCacheSize(); - if (size) - partition_writer->evictPartitions(true); - return size; + return partition_writer->evictPartitions(true); } } diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index dfe7e2b0fe09..f7bd8ca1216d 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -51,9 +51,6 @@ void local_engine::PartitionWriter::write(const PartitionInfo & partition_info, auto & buffer = partition_block_buffer[partition_i]; if (buffer->size() && buffer->size() + length >= shuffle_writer->options.split_size) { - std::cout << "split_size:" << shuffle_writer->options.split_size << " buffer size:" << buffer->size() - << " buffer bytes:" << buffer->bytes() << " buffer allocatedBytes:" << buffer->allocatedBytes() << std::endl; - Block block = buffer->releaseColumns(); auto bytes = block.bytes(); total_partition_buffer_size += bytes; @@ -62,15 +59,17 @@ void local_engine::PartitionWriter::write(const PartitionInfo & partition_info, } for (size_t col_i = 0; col_i < data.columns(); ++col_i) - partition_block_buffer[partition_i]->appendSelective(col_i, data, partition_info.partition_selector, from, length); + buffer->appendSelective(col_i, data, partition_info.partition_selector, from, length); } shuffle_writer->split_result.total_split_time += time.elapsedNanoseconds(); } -void LocalPartitionWriter::evictPartitions(bool for_memory_spill) +size_t LocalPartitionWriter::evictPartitions(bool for_memory_spill) { - auto spill_to_file = [this]() -> void { + size_t res = 0; + + auto spill_to_file = [this, &res]() -> void { auto file = getNextSpillFile(); WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); @@ -93,6 +92,7 @@ void LocalPartitionWriter::evictPartitions(bool for_memory_spill) auto & partition = partition_buffer[partition_id]; raw_size = partition->spill(writer); } + res += raw_size; compressed_output.sync(); partition_spill_info.length = output.count() - partition_spill_info.start; @@ -122,6 +122,8 @@ void LocalPartitionWriter::evictPartitions(bool for_memory_spill) shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.total_bytes_spilled += total_partition_buffer_size; total_partition_buffer_size = 0; + + return res; } std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) @@ -237,9 +239,11 @@ CelebornPartitionWriter::CelebornPartitionWriter(CachedShuffleWriter * shuffleWr { } -void CelebornPartitionWriter::evictPartitions(bool for_memory_spill) +size_t CelebornPartitionWriter::evictPartitions(bool for_memory_spill) { - auto spill_to_celeborn = [this]() + size_t res = 0; + + auto spill_to_celeborn = [this, for_memory_spill, &res]() { Stopwatch serialization_time_watch; for (size_t partition_id = 0; partition_id < partition_buffer.size(); ++partition_id) @@ -250,12 +254,23 @@ void CelebornPartitionWriter::evictPartitions(bool for_memory_spill) NativeWriter writer(compressed_output, shuffle_writer->output_header); size_t raw_size = 0; + if (!for_memory_spill) { std::unique_lock lock(mtxs[partition_id]); auto & partition = partition_buffer[partition_id]; raw_size = partition->spill(writer); } + else + { + std::unique_lock lock(mtxs[partition_id], std::try_to_lock); + if (lock.owns_lock()) + { + auto & partition = partition_buffer[partition_id]; + raw_size = partition->spill(writer); + } + } compressed_output.sync(); + res += raw_size; Stopwatch push_time_watch; celeborn_client->pushPartitionData(partition_id, output.str().data(), output.str().size()); @@ -287,6 +302,7 @@ void CelebornPartitionWriter::evictPartitions(bool for_memory_spill) shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.total_bytes_spilled += total_partition_buffer_size; total_partition_buffer_size = 0; + return res; } void CelebornPartitionWriter::stop() diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index 62e3830501c1..eb580350ee7d 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -63,7 +63,7 @@ class PartitionWriter : boost::noncopyable virtual void write(const PartitionInfo& info, DB::Block & data); - virtual void evictPartitions(bool for_memory_spill = false) = 0; + virtual size_t evictPartitions(bool for_memory_spill = false) = 0; virtual void stop() = 0; @@ -92,7 +92,7 @@ class LocalPartitionWriter : public PartitionWriter { public: explicit LocalPartitionWriter(CachedShuffleWriter * shuffle_writer); - void evictPartitions(bool for_memory_spill) override; + size_t evictPartitions(bool for_memory_spill) override; void stop() override; std::vector mergeSpills(DB::WriteBuffer& data_file); @@ -105,7 +105,7 @@ class CelebornPartitionWriter : public PartitionWriter { public: CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client); - void evictPartitions(bool for_memory_spill) override; + size_t evictPartitions(bool for_memory_spill) override; void stop() override; private: diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp index cf19b92861f9..b6aabf59344b 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp @@ -127,8 +127,6 @@ void ShuffleSplitter::splitBlockByPartition(DB::Block & block) auto & buffer = partition_buffer[i]; if (buffer->size() >= options.split_size) { - std::cout << "split_size:" << options.split_size << " buffer size:" << buffer->size() << " buffer bytes:" << buffer->bytes() - << " buffer allocatedBytes:" << buffer->allocatedBytes() << std::endl; spillPartition(i); } } diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriterBase.h b/cpp-ch/local-engine/Shuffle/ShuffleWriterBase.h index fd23f993d0c8..4c2eab853feb 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleWriterBase.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleWriterBase.h @@ -23,9 +23,10 @@ struct SplitResult; class ShuffleWriterBase { public: + virtual ~ShuffleWriterBase() = default; + virtual void split(DB::Block & block) = 0; - virtual size_t evictPartitions() {return 0;} + virtual size_t evictPartitions() { return 0; } virtual SplitResult stop() = 0; - virtual ~ShuffleWriterBase() = default; }; } diff --git a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp index f9fe83037ed9..6d663afc4fb3 100644 --- a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp +++ b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp @@ -118,6 +118,7 @@ DB::ColumnWithTypeAndName convertFixedStringToAggregateState(const DB::ColumnWit } return DB::ColumnWithTypeAndName(std::move(res_col), type, col.name); } + DB::Block convertAggregateStateInBlock(DB::Block& block) { ColumnsWithTypeAndName columns; @@ -136,12 +137,10 @@ DB::Block convertAggregateStateInBlock(DB::Block& block) { columns.emplace_back(item); } - - std::cout << "column " << item.name << " from " << item.type->getName() << " with bytes " << item.column->allocatedBytes() << " to " - << columns.back().type->getName() << " with bytes " << columns.back().column->allocatedBytes() << std::endl; } return columns; } + } From d8739ea01ed97205057df446a6cbe967290c22e1 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Wed, 22 Nov 2023 14:25:24 +0800 Subject: [PATCH 03/11] change as request --- cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp index 20df5952c159..41a096ab5901 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp @@ -124,10 +124,8 @@ void CachedShuffleWriter::initOutputIfNeeded(Block & block) { if (!output_header) { - output_header = block.cloneEmpty(); if (output_columns_indicies.empty()) { - /// TODO 这里逻辑有问题?output_header被赋值了两遍 output_header = block.cloneEmpty(); for (size_t i = 0; i < block.columns(); ++i) { From 925c2a827a5d03c496bb1ee129c2db88688182e2 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Thu, 23 Nov 2023 11:20:50 +0800 Subject: [PATCH 04/11] protect evict be invoke recursively --- .../local-engine/Shuffle/PartitionWriter.cpp | 26 ++++++++++++++----- cpp-ch/local-engine/Shuffle/PartitionWriter.h | 25 +++++++++++------- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index f7bd8ca1216d..6447ef2d30a3 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -36,7 +36,10 @@ using namespace DB; namespace local_engine { -void local_engine::PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & data) +/// To avoid deadlock and data race, we need to make sure that PartitionWriter::evictPartitions won't be invoked recursively. +thread_local std::atomic already_inside_evict_partitions(false); + +void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & data) { Stopwatch time; @@ -65,7 +68,7 @@ void local_engine::PartitionWriter::write(const PartitionInfo & partition_info, shuffle_writer->split_result.total_split_time += time.elapsedNanoseconds(); } -size_t LocalPartitionWriter::evictPartitions(bool for_memory_spill) +size_t LocalPartitionWriter::evictPartitionsImpl(bool for_memory_spill) { size_t res = 0; @@ -234,12 +237,23 @@ PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_) } } +size_t PartitionWriter::evictPartitions(bool for_memory_spill) +{ + if (already_inside_evict_partitions) + return 0; + + already_inside_evict_partitions = true; + evictPartitionsImpl(for_memory_spill); + already_inside_evict_partitions = false; +} + + CelebornPartitionWriter::CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client_) : PartitionWriter(shuffleWriter), celeborn_client(std::move(celeborn_client_)) { } -size_t CelebornPartitionWriter::evictPartitions(bool for_memory_spill) +size_t CelebornPartitionWriter::evictPartitionsImpl(bool for_memory_spill) { size_t res = 0; @@ -307,6 +321,7 @@ size_t CelebornPartitionWriter::evictPartitions(bool for_memory_spill) void CelebornPartitionWriter::stop() { + /// Push the remaining data to Celeborn for (size_t partition_id = 0; partition_id < partition_block_buffer.size(); ++partition_id) { std::unique_lock lock(mtxs[partition_id]); @@ -317,12 +332,11 @@ void CelebornPartitionWriter::stop() partition_buffer[partition_id]->addBlock(std::move(block)); } } - evictPartitions(false); - for (const auto & item : shuffle_writer->split_result.partition_length) + for (const auto & length : shuffle_writer->split_result.partition_length) { - shuffle_writer->split_result.total_bytes_written += item; + shuffle_writer->split_result.total_bytes_written += length; } } diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index eb580350ee7d..b605be2606ae 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -63,16 +63,15 @@ class PartitionWriter : boost::noncopyable virtual void write(const PartitionInfo& info, DB::Block & data); - virtual size_t evictPartitions(bool for_memory_spill = false) = 0; - virtual void stop() = 0; - virtual size_t totalCacheSize() - { - return total_partition_buffer_size; - } + virtual size_t totalCacheSize() const { return total_partition_buffer_size; } + + size_t evictPartitions(bool for_memory_spill = false); protected: + virtual size_t evictPartitionsImpl(bool for_memory_spill = false) = 0; + CachedShuffleWriter * shuffle_writer; SplitOptions * options; @@ -92,12 +91,15 @@ class LocalPartitionWriter : public PartitionWriter { public: explicit LocalPartitionWriter(CachedShuffleWriter * shuffle_writer); - size_t evictPartitions(bool for_memory_spill) override; + ~LocalPartitionWriter() override = default; + void stop() override; std::vector mergeSpills(DB::WriteBuffer& data_file); -private: +protected: + size_t evictPartitionsImpl(bool for_memory_spill) override; String getNextSpillFile(); + std::vector spill_infos; }; @@ -105,10 +107,13 @@ class CelebornPartitionWriter : public PartitionWriter { public: CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client); - size_t evictPartitions(bool for_memory_spill) override; + ~CelebornPartitionWriter() override = default; + void stop() override; -private: +protected: + size_t evictPartitionsImpl(bool for_memory_spill) override; + std::unique_ptr celeborn_client; }; } From 3da802fc04e0eebc1f4eab91cfe9085abd5e63da Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Mon, 27 Nov 2023 14:35:15 +0800 Subject: [PATCH 05/11] fix core dump --- .../local-engine/Shuffle/PartitionWriter.cpp | 34 +++++++------------ cpp-ch/local-engine/Shuffle/PartitionWriter.h | 14 ++++---- .../SubstraitSource/ReadBufferBuilder.cpp | 2 +- 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 6447ef2d30a3..efa69c1872f3 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -36,10 +36,7 @@ using namespace DB; namespace local_engine { -/// To avoid deadlock and data race, we need to make sure that PartitionWriter::evictPartitions won't be invoked recursively. -thread_local std::atomic already_inside_evict_partitions(false); - -void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & data) +void local_engine::PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & data) { Stopwatch time; @@ -68,7 +65,7 @@ void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & da shuffle_writer->split_result.total_split_time += time.elapsedNanoseconds(); } -size_t LocalPartitionWriter::evictPartitionsImpl(bool for_memory_spill) +size_t LocalPartitionWriter::evictPartitions(bool for_memory_spill) { size_t res = 0; @@ -237,23 +234,12 @@ PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_) } } -size_t PartitionWriter::evictPartitions(bool for_memory_spill) -{ - if (already_inside_evict_partitions) - return 0; - - already_inside_evict_partitions = true; - evictPartitionsImpl(for_memory_spill); - already_inside_evict_partitions = false; -} - - CelebornPartitionWriter::CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client_) : PartitionWriter(shuffleWriter), celeborn_client(std::move(celeborn_client_)) { } -size_t CelebornPartitionWriter::evictPartitionsImpl(bool for_memory_spill) +size_t CelebornPartitionWriter::evictPartitions(bool for_memory_spill) { size_t res = 0; @@ -302,10 +288,16 @@ size_t CelebornPartitionWriter::evictPartitionsImpl(bool for_memory_spill) Stopwatch spill_time_watch; if (for_memory_spill) { - // escape memory track from current thread status; add untracked memory limit for create thread object, avoid trigger memory spill again - IgnoreMemoryTracker ignore(2 * 1024 * 1024); - ThreadFromGlobalPool thread(spill_to_celeborn); - thread.join(); + // // escape memory track from current thread status; add untracked memory limit for create thread object, avoid trigger memory spill again + auto n = evict_cnt.fetch_add(1); + if (n) + return 0; + + spill_to_celeborn(); + n = 0; + // IgnoreMemoryTracker ignore(2 * 1024 * 1024); + // ThreadFromGlobalPool thread(spill_to_celeborn); + // thread.join(); } else { diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index b605be2606ae..0ae4936b6b73 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -63,15 +63,13 @@ class PartitionWriter : boost::noncopyable virtual void write(const PartitionInfo& info, DB::Block & data); + virtual size_t evictPartitions(bool for_memory_spill = false) = 0; + virtual void stop() = 0; virtual size_t totalCacheSize() const { return total_partition_buffer_size; } - size_t evictPartitions(bool for_memory_spill = false); - protected: - virtual size_t evictPartitionsImpl(bool for_memory_spill = false) = 0; - CachedShuffleWriter * shuffle_writer; SplitOptions * options; @@ -93,11 +91,12 @@ class LocalPartitionWriter : public PartitionWriter explicit LocalPartitionWriter(CachedShuffleWriter * shuffle_writer); ~LocalPartitionWriter() override = default; + size_t evictPartitions(bool for_memory_spill) override; + void stop() override; std::vector mergeSpills(DB::WriteBuffer& data_file); protected: - size_t evictPartitionsImpl(bool for_memory_spill) override; String getNextSpillFile(); std::vector spill_infos; @@ -109,11 +108,12 @@ class CelebornPartitionWriter : public PartitionWriter CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client); ~CelebornPartitionWriter() override = default; + size_t evictPartitions(bool for_memory_spill) override; + void stop() override; protected: - size_t evictPartitionsImpl(bool for_memory_spill) override; - + std::atomic evict_cnt = 0; std::unique_ptr celeborn_client; }; } diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp index c6ea8e80eb62..8fe6bf92884a 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -419,7 +419,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder /* restricted_seek */ true); }; - DB::StoredObjects stored_objects{DB::StoredObject{key, "", object_size}}; + DB::StoredObjects stored_objects{DB::StoredObject{key, object_size}}; auto s3_impl = std::make_unique( std::move(read_buffer_creator), stored_objects, new_settings, /* cache_log */ nullptr, /* use_external_buffer */ true); From ad4375616d9bedc49764f11ef386b26d55faca07 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Mon, 27 Nov 2023 16:41:48 +0800 Subject: [PATCH 06/11] protect evict/stop/write from being invoked recursively --- .../local-engine/Shuffle/PartitionWriter.cpp | 76 +++++++++++++------ cpp-ch/local-engine/Shuffle/PartitionWriter.h | 33 ++++---- 2 files changed, 70 insertions(+), 39 deletions(-) diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index efa69c1872f3..777f5a7f9d36 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -38,6 +38,12 @@ namespace local_engine void local_engine::PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & data) { + if (evicting_or_writing) + return; + + evicting_or_writing = true; + SCOPE_EXIT({evicting_or_writing = false;}); + Stopwatch time; for (size_t partition_i = 0; partition_i < partition_info.partition_num; ++partition_i) @@ -65,7 +71,7 @@ void local_engine::PartitionWriter::write(const PartitionInfo & partition_info, shuffle_writer->split_result.total_split_time += time.elapsedNanoseconds(); } -size_t LocalPartitionWriter::evictPartitions(bool for_memory_spill) +size_t LocalPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) { size_t res = 0; @@ -213,7 +219,8 @@ String LocalPartitionWriter::getNextSpillFile() std::filesystem::create_directories(dir); return std::filesystem::path(dir) / file_name; } -void LocalPartitionWriter::stop() + +void LocalPartitionWriter::unsafeStop() { WriteBufferFromFile output(options->data_file, options->io_buffer_size); auto offsets = mergeSpills(output); @@ -234,12 +241,32 @@ PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_) } } +size_t PartitionWriter::evictPartitions(bool for_memory_spill) +{ + if (evicting_or_writing) + return 0; + + evicting_or_writing = true; + SCOPE_EXIT({evicting_or_writing = false;}); + return unsafeEvictPartitions(for_memory_spill); +} + +void PartitionWriter::stop() +{ + if (evicting_or_writing) + return; + + evicting_or_writing = true; + SCOPE_EXIT({evicting_or_writing = false;}); + return unsafeStop(); +} + CelebornPartitionWriter::CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client_) : PartitionWriter(shuffleWriter), celeborn_client(std::move(celeborn_client_)) { } -size_t CelebornPartitionWriter::evictPartitions(bool for_memory_spill) +size_t CelebornPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) { size_t res = 0; @@ -254,21 +281,21 @@ size_t CelebornPartitionWriter::evictPartitions(bool for_memory_spill) NativeWriter writer(compressed_output, shuffle_writer->output_header); size_t raw_size = 0; - if (!for_memory_spill) - { + // if (!for_memory_spill) + // { std::unique_lock lock(mtxs[partition_id]); auto & partition = partition_buffer[partition_id]; raw_size = partition->spill(writer); - } - else - { - std::unique_lock lock(mtxs[partition_id], std::try_to_lock); - if (lock.owns_lock()) - { - auto & partition = partition_buffer[partition_id]; - raw_size = partition->spill(writer); - } - } + // } + // else + // { + // std::unique_lock lock(mtxs[partition_id], std::try_to_lock); + // if (lock.owns_lock()) + // { + // auto & partition = partition_buffer[partition_id]; + // raw_size = partition->spill(writer); + // } + // } compressed_output.sync(); res += raw_size; @@ -286,15 +313,10 @@ size_t CelebornPartitionWriter::evictPartitions(bool for_memory_spill) }; Stopwatch spill_time_watch; + /* if (for_memory_spill) { // // escape memory track from current thread status; add untracked memory limit for create thread object, avoid trigger memory spill again - auto n = evict_cnt.fetch_add(1); - if (n) - return 0; - - spill_to_celeborn(); - n = 0; // IgnoreMemoryTracker ignore(2 * 1024 * 1024); // ThreadFromGlobalPool thread(spill_to_celeborn); // thread.join(); @@ -304,6 +326,13 @@ size_t CelebornPartitionWriter::evictPartitions(bool for_memory_spill) IgnoreMemoryTracker ignore(2 * 1024 * 1024); spill_to_celeborn(); } + */ + // if (evicting_or_writing) + // return 0; + + // evicting_or_writing = true; + spill_to_celeborn(); + // evicting_or_writing = false; shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.total_bytes_spilled += total_partition_buffer_size; @@ -311,7 +340,7 @@ size_t CelebornPartitionWriter::evictPartitions(bool for_memory_spill) return res; } -void CelebornPartitionWriter::stop() +void CelebornPartitionWriter::unsafeStop() { /// Push the remaining data to Celeborn for (size_t partition_id = 0; partition_id < partition_block_buffer.size(); ++partition_id) @@ -324,7 +353,8 @@ void CelebornPartitionWriter::stop() partition_buffer[partition_id]->addBlock(std::move(block)); } } - evictPartitions(false); + + unsafeEvictPartitions(false); for (const auto & length : shuffle_writer->split_result.partition_length) { diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index 0ae4936b6b73..98b064753900 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -61,15 +61,16 @@ class PartitionWriter : boost::noncopyable explicit PartitionWriter(CachedShuffleWriter* shuffle_writer_); virtual ~PartitionWriter() = default; - virtual void write(const PartitionInfo& info, DB::Block & data); + void write(const PartitionInfo& info, DB::Block & data); + size_t evictPartitions(bool for_memory_spill = false); + void stop(); - virtual size_t evictPartitions(bool for_memory_spill = false) = 0; - - virtual void stop() = 0; - - virtual size_t totalCacheSize() const { return total_partition_buffer_size; } + size_t totalCacheSize() const { return total_partition_buffer_size; } protected: + virtual size_t unsafeEvictPartitions(bool for_memory_spill = false) = 0; + virtual void unsafeStop() = 0; + CachedShuffleWriter * shuffle_writer; SplitOptions * options; @@ -83,6 +84,8 @@ class PartitionWriter : boost::noncopyable std::vector partition_buffer; size_t total_partition_buffer_size = 0; + + std::atomic evicting_or_writing{false}; }; class LocalPartitionWriter : public PartitionWriter @@ -91,13 +94,12 @@ class LocalPartitionWriter : public PartitionWriter explicit LocalPartitionWriter(CachedShuffleWriter * shuffle_writer); ~LocalPartitionWriter() override = default; - size_t evictPartitions(bool for_memory_spill) override; - - void stop() override; - std::vector mergeSpills(DB::WriteBuffer& data_file); - protected: + size_t unsafeEvictPartitions(bool for_memory_spill) override; + void unsafeStop() override; + String getNextSpillFile(); + std::vector mergeSpills(DB::WriteBuffer& data_file); std::vector spill_infos; }; @@ -108,12 +110,11 @@ class CelebornPartitionWriter : public PartitionWriter CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client); ~CelebornPartitionWriter() override = default; - size_t evictPartitions(bool for_memory_spill) override; - - void stop() override; - protected: - std::atomic evict_cnt = 0; + size_t unsafeEvictPartitions(bool for_memory_spill) override; + void unsafeStop() override; + + std::atomic evicting_or_writing = 0; std::unique_ptr celeborn_client; }; } From 8fa8d7e7c4d10201704dbc7db4c0766b341204bb Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Mon, 27 Nov 2023 16:54:55 +0800 Subject: [PATCH 07/11] remove locks --- .../local-engine/Shuffle/PartitionWriter.cpp | 38 ++++++------------- cpp-ch/local-engine/Shuffle/PartitionWriter.h | 6 --- 2 files changed, 11 insertions(+), 33 deletions(-) diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 777f5a7f9d36..20f5587a191e 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -51,8 +51,6 @@ void local_engine::PartitionWriter::write(const PartitionInfo & partition_info, size_t from = partition_info.partition_start_points[partition_i]; size_t length = partition_info.partition_start_points[partition_i + 1] - from; - std::unique_lock lock(mtxs[partition_i]); - /// Make sure buffer size is no greater than split_size auto & buffer = partition_block_buffer[partition_i]; if (buffer->size() && buffer->size() + length >= shuffle_writer->options.split_size) @@ -91,13 +89,8 @@ size_t LocalPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) PartitionSpillInfo partition_spill_info; partition_spill_info.start = output.count(); - size_t raw_size = 0; - { - std::unique_lock lock(mtxs[partition_id]); - - auto & partition = partition_buffer[partition_id]; - raw_size = partition->spill(writer); - } + auto & partition = partition_buffer[partition_id]; + size_t raw_size = partition->spill(writer); res += raw_size; compressed_output.sync(); @@ -168,18 +161,12 @@ std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) merge_io_time += io_time_watch.elapsedNanoseconds(); serialization_time_watch.restart(); - size_t raw_size = 0; + if (!partition_block_buffer[partition_id]->empty()) { - std::unique_lock lock(mtxs[partition_id]); - - if (!partition_block_buffer[partition_id]->empty()) - { - Block block = partition_block_buffer[partition_id]->releaseColumns(); - partition_buffer[partition_id]->addBlock(std::move(block)); - } - - raw_size = partition_buffer[partition_id]->spill(writer); + Block block = partition_block_buffer[partition_id]->releaseColumns(); + partition_buffer[partition_id]->addBlock(std::move(block)); } + size_t raw_size = partition_buffer[partition_id]->spill(writer); compressed_output.sync(); partition_length[partition_id] = data_file.count() - size_before; @@ -230,7 +217,6 @@ void LocalPartitionWriter::unsafeStop() PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_) : shuffle_writer(shuffle_writer_) , options(&shuffle_writer->options) - , mtxs(options->partition_num) , partition_block_buffer(options->partition_num) , partition_buffer(options->partition_num) { @@ -280,12 +266,9 @@ size_t CelebornPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); - size_t raw_size = 0; // if (!for_memory_spill) // { - std::unique_lock lock(mtxs[partition_id]); - auto & partition = partition_buffer[partition_id]; - raw_size = partition->spill(writer); + // std::unique_lock lock(mtxs[partition_id]); // } // else // { @@ -296,8 +279,11 @@ size_t CelebornPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) // raw_size = partition->spill(writer); // } // } - compressed_output.sync(); + + auto & partition = partition_buffer[partition_id]; + size_t raw_size = partition->spill(writer); res += raw_size; + compressed_output.sync(); Stopwatch push_time_watch; celeborn_client->pushPartitionData(partition_id, output.str().data(), output.str().size()); @@ -345,8 +331,6 @@ void CelebornPartitionWriter::unsafeStop() /// Push the remaining data to Celeborn for (size_t partition_id = 0; partition_id < partition_block_buffer.size(); ++partition_id) { - std::unique_lock lock(mtxs[partition_id]); - if (!partition_block_buffer[partition_id]->empty()) { Block block = partition_block_buffer[partition_id]->releaseColumns(); diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index 98b064753900..25a3514c67e2 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -74,12 +74,6 @@ class PartitionWriter : boost::noncopyable CachedShuffleWriter * shuffle_writer; SplitOptions * options; - /// The mutex is used to protect partition_block_buffer and partition_buffer - /// It may be accessed by multiple threads in which "CachedShuffleWriter::split" and "CachedShuffleWriter::spill" are invoked simutanously. - /// - /// Notice: the mutex must support being recursively acquired by the same thread, which happens during memory spilling. - /// For more details, pls refer to https://github.com/oap-project/gluten/issues/3722 - mutable std::vector mtxs; std::vector partition_block_buffer; std::vector partition_buffer; From 42fa73062c6c74bb6232e220e1b84eb6a11c970d Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Mon, 27 Nov 2023 18:33:36 +0800 Subject: [PATCH 08/11] respect throw_memory_if_exceed --- .../CHShuffleSplitterJniWrapper.java | 18 ++++++---- .../shuffle/CHColumnarShuffleWriter.scala | 4 ++- cpp-ch/local-engine/Common/CHUtil.cpp | 1 - .../Parser/SerializedPlanParser.cpp | 4 +-- .../local-engine/Shuffle/PartitionWriter.cpp | 36 ++++--------------- cpp-ch/local-engine/Shuffle/PartitionWriter.h | 3 +- cpp-ch/local-engine/Shuffle/ShuffleSplitter.h | 1 + cpp-ch/local-engine/local_engine_jni.cpp | 12 ++++--- ...lebornHashBasedColumnarShuffleWriter.scala | 3 +- 9 files changed, 37 insertions(+), 45 deletions(-) diff --git a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java index ec773343a09c..295159dceefc 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java @@ -32,7 +32,8 @@ public long make( int subDirsPerLocalDir, boolean preferSpill, long spillThreshold, - String hashAlgorithm) { + String hashAlgorithm, + boolean throwIfMemoryExceed) { return nativeMake( part.getShortName(), part.getNumPartitions(), @@ -47,7 +48,8 @@ public long make( subDirsPerLocalDir, preferSpill, spillThreshold, - hashAlgorithm); + hashAlgorithm, + throwIfMemoryExceed); } public long makeForRSS( @@ -58,7 +60,8 @@ public long makeForRSS( String codec, long spillThreshold, String hashAlgorithm, - Object pusher) { + Object pusher, + boolean throwIfMemoryExceed) { return nativeMakeForRSS( part.getShortName(), part.getNumPartitions(), @@ -70,7 +73,8 @@ public long makeForRSS( codec, spillThreshold, hashAlgorithm, - pusher); + pusher, + throwIfMemoryExceed); } public native long nativeMake( @@ -87,7 +91,8 @@ public native long nativeMake( int subDirsPerLocalDir, boolean preferSpill, long spillThreshold, - String hashAlgorithm); + String hashAlgorithm, + boolean throwIfMemoryExceed); public native long nativeMakeForRSS( String shortName, @@ -100,7 +105,8 @@ public native long nativeMakeForRSS( String codec, long spillThreshold, String hashAlgorithm, - Object pusher); + Object pusher, + boolean throwIfMemoryExceed); public native void split(long splitterId, long block); diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index 8af1a46875ec..9507d81c6ab0 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -54,6 +54,7 @@ class CHColumnarShuffleWriter[K, V]( private val customizedCompressCodec = GlutenShuffleUtils.getCompressionCodec(conf).toUpperCase(Locale.ROOT) private val preferSpill = GlutenConfig.getConf.chColumnarShufflePreferSpill + private val throwIfMemoryExceed = GlutenConfig.getConf.chColumnarThrowIfMemoryExceed private val spillThreshold = GlutenConfig.getConf.chColumnarShuffleSpillThreshold private val jniWrapper = new CHShuffleSplitterJniWrapper // Are we in the process of stopping? Because map tasks can call stop() with success = true @@ -102,7 +103,8 @@ class CHColumnarShuffleWriter[K, V]( subDirsPerLocalDir, preferSpill, spillThreshold, - CHBackendSettings.shuffleHashAlgorithm + CHBackendSettings.shuffleHashAlgorithm, + throwIfMemoryExceed ) CHNativeMemoryAllocators.createSpillable( "ShuffleWriter", diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 6ac9b20e2a67..1bf4fe718d55 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -424,7 +424,6 @@ std::map BackendInitializerUtil::getBackendConfMap(std return ch_backend_conf; } - /// Parse backend configs from plan extensions do { diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index fd4388e8f169..aea3c4f1202e 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -440,8 +440,8 @@ QueryPlanPtr SerializedPlanParser::parse(std::unique_ptr plan) pb_util::JsonOptions options; std::string json; auto s = pb_util::MessageToJsonString(*plan, &json, options); - if (!s.ok()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not convert Substrait Plan to Json"); + // if (!s.ok()) + // throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not convert Substrait Plan to Json"); LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "substrait plan:\n{}", json); } parseExtensions(plan->extensions()); diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 20f5587a191e..e3ed2715644d 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -107,7 +107,7 @@ size_t LocalPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) }; Stopwatch spill_time_watch; - if (for_memory_spill) + if (for_memory_spill && options->throw_if_memory_exceed) { // escape memory track from current thread status; add untracked memory limit for create thread object, avoid trigger memory spill again IgnoreMemoryTracker ignore(2 * 1024 * 1024); @@ -220,6 +220,7 @@ PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_) , partition_block_buffer(options->partition_num) , partition_buffer(options->partition_num) { + // std::cout << "throw_if_memory_exceed: " << options->throw_if_memory_exceed << std::endl; for (size_t partition_i = 0; partition_i < options->partition_num; ++partition_i) { partition_block_buffer[partition_i] = std::make_shared(options->split_size); @@ -266,20 +267,6 @@ size_t CelebornPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); - // if (!for_memory_spill) - // { - // std::unique_lock lock(mtxs[partition_id]); - // } - // else - // { - // std::unique_lock lock(mtxs[partition_id], std::try_to_lock); - // if (lock.owns_lock()) - // { - // auto & partition = partition_buffer[partition_id]; - // raw_size = partition->spill(writer); - // } - // } - auto & partition = partition_buffer[partition_id]; size_t raw_size = partition->spill(writer); res += raw_size; @@ -299,26 +286,17 @@ size_t CelebornPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) }; Stopwatch spill_time_watch; - /* - if (for_memory_spill) + if (for_memory_spill && options->throw_if_memory_exceed) { - // // escape memory track from current thread status; add untracked memory limit for create thread object, avoid trigger memory spill again - // IgnoreMemoryTracker ignore(2 * 1024 * 1024); - // ThreadFromGlobalPool thread(spill_to_celeborn); - // thread.join(); + // escape memory track from current thread status; add untracked memory limit for create thread object, avoid trigger memory spill again + IgnoreMemoryTracker ignore(2 * 1024 * 1024); + ThreadFromGlobalPool thread(spill_to_celeborn); + thread.join(); } else { - IgnoreMemoryTracker ignore(2 * 1024 * 1024); spill_to_celeborn(); } - */ - // if (evicting_or_writing) - // return 0; - - // evicting_or_writing = true; - spill_to_celeborn(); - // evicting_or_writing = false; shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.total_bytes_spilled += total_partition_buffer_size; diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index 25a3514c67e2..6ae0492a66e0 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -32,7 +32,8 @@ struct PartitionSpillInfo { size_t length; // in Bytes }; -struct SpillInfo { +struct SpillInfo +{ std::string spilled_file; std::vector partition_spill_infos; }; diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h index 70f4f7ef7e1c..600b87d8c99b 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h @@ -47,6 +47,7 @@ struct SplitOptions int compress_level; size_t spill_threshold = 300 * 1024 * 1024; std::string hash_algorithm; + bool throw_if_memory_exceed = true; }; class ColumnsBuffer diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 4571635c79af..6659fe3c6b8b 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -654,7 +654,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat jint num_sub_dirs, jboolean prefer_spill, jlong spill_threshold, - jstring hash_algorithm) + jstring hash_algorithm, + jboolean throw_if_memory_exceed) { LOCAL_ENGINE_JNI_METHOD_START std::string hash_exprs; @@ -696,7 +697,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), .spill_threshold = static_cast(spill_threshold), - .hash_algorithm = jstring2string(env, hash_algorithm)}; + .hash_algorithm = jstring2string(env, hash_algorithm), + .throw_if_memory_exceed = static_cast(throw_if_memory_exceed)}; auto name = jstring2string(env, short_name); local_engine::SplitterHolder * splitter; if (prefer_spill) @@ -724,7 +726,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat jstring codec, jlong spill_threshold, jstring hash_algorithm, - jobject pusher) + jobject pusher, + jboolean throw_if_memory_exceed) { LOCAL_ENGINE_JNI_METHOD_START std::string hash_exprs; @@ -757,7 +760,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), .spill_threshold = static_cast(spill_threshold), - .hash_algorithm = jstring2string(env, hash_algorithm)}; + .hash_algorithm = jstring2string(env, hash_algorithm), + .throw_if_memory_exceed = static_cast(throw_if_memory_exceed)}; auto name = jstring2string(env, short_name); local_engine::SplitterHolder * splitter; splitter = new local_engine::SplitterHolder{.splitter = std::make_unique(name, options, pusher)}; diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala index 61938e263c5d..c607808ab76a 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala @@ -70,7 +70,8 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V]( customizedCompressCodec, GlutenConfig.getConf.chColumnarShuffleSpillThreshold, CHBackendSettings.shuffleHashAlgorithm, - celebornPartitionPusher + celebornPartitionPusher, + GlutenConfig.getConf.chColumnarThrowIfMemoryExceed ) CHNativeMemoryAllocators.createSpillable( "CelebornShuffleWriter", From 9caaebb0c0e053989a00f6e1e757d432f9693f58 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Mon, 27 Nov 2023 18:43:19 +0800 Subject: [PATCH 09/11] revert file --- .../local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp index 8fe6bf92884a..c6ea8e80eb62 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -419,7 +419,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder /* restricted_seek */ true); }; - DB::StoredObjects stored_objects{DB::StoredObject{key, object_size}}; + DB::StoredObjects stored_objects{DB::StoredObject{key, "", object_size}}; auto s3_impl = std::make_unique( std::move(read_buffer_creator), stored_objects, new_settings, /* cache_log */ nullptr, /* use_external_buffer */ true); From 362677ceeb56ca54632bb15cbc6db48304169028 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Wed, 29 Nov 2023 19:42:07 +0800 Subject: [PATCH 10/11] remove useless codes --- cpp-ch/local-engine/Parser/SerializedPlanParser.cpp | 4 ++-- cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h | 1 - cpp-ch/local-engine/Shuffle/PartitionWriter.cpp | 1 - cpp-ch/local-engine/Shuffle/PartitionWriter.h | 3 +-- .../Storages/SubstraitSource/ReadBufferBuilder.cpp | 2 +- 5 files changed, 4 insertions(+), 7 deletions(-) diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index aea3c4f1202e..fd4388e8f169 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -440,8 +440,8 @@ QueryPlanPtr SerializedPlanParser::parse(std::unique_ptr plan) pb_util::JsonOptions options; std::string json; auto s = pb_util::MessageToJsonString(*plan, &json, options); - // if (!s.ok()) - // throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not convert Substrait Plan to Json"); + if (!s.ok()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not convert Substrait Plan to Json"); LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "substrait plan:\n{}", json); } parseExtensions(plan->extensions()); diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h index 5b69c2bf66c7..75caeddd29b7 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h @@ -47,7 +47,6 @@ class CachedShuffleWriter : public ShuffleWriterBase void initOutputIfNeeded(DB::Block & block); bool stopped = false; - // PartitionInfo partition_info; DB::Block output_header; SplitOptions options; SplitResult split_result; diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index e3ed2715644d..0195ecf3870b 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -220,7 +220,6 @@ PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_) , partition_block_buffer(options->partition_num) , partition_buffer(options->partition_num) { - // std::cout << "throw_if_memory_exceed: " << options->throw_if_memory_exceed << std::endl; for (size_t partition_i = 0; partition_i < options->partition_num; ++partition_i) { partition_block_buffer[partition_i] = std::make_shared(options->split_size); diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index 6ae0492a66e0..12ea581be6bd 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -80,7 +80,7 @@ class PartitionWriter : boost::noncopyable size_t total_partition_buffer_size = 0; - std::atomic evicting_or_writing{false}; + bool evicting_or_writing{false}; }; class LocalPartitionWriter : public PartitionWriter @@ -109,7 +109,6 @@ class CelebornPartitionWriter : public PartitionWriter size_t unsafeEvictPartitions(bool for_memory_spill) override; void unsafeStop() override; - std::atomic evicting_or_writing = 0; std::unique_ptr celeborn_client; }; } diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp index c6ea8e80eb62..8fe6bf92884a 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -419,7 +419,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder /* restricted_seek */ true); }; - DB::StoredObjects stored_objects{DB::StoredObject{key, "", object_size}}; + DB::StoredObjects stored_objects{DB::StoredObject{key, object_size}}; auto s3_impl = std::make_unique( std::move(read_buffer_creator), stored_objects, new_settings, /* cache_log */ nullptr, /* use_external_buffer */ true); From f2b90226cc88a7e2d7250f0fcefcb3db650409c4 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Wed, 29 Nov 2023 19:52:15 +0800 Subject: [PATCH 11/11] revert files --- .../local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp index 8fe6bf92884a..c6ea8e80eb62 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -419,7 +419,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder /* restricted_seek */ true); }; - DB::StoredObjects stored_objects{DB::StoredObject{key, object_size}}; + DB::StoredObjects stored_objects{DB::StoredObject{key, "", object_size}}; auto s3_impl = std::make_unique( std::move(read_buffer_creator), stored_objects, new_settings, /* cache_log */ nullptr, /* use_external_buffer */ true);