From bc656f362102116736ec6eadaa63bd3a00e32450 Mon Sep 17 00:00:00 2001 From: wypb Date: Thu, 11 Apr 2024 11:38:17 +0800 Subject: [PATCH] Fix the incorrect size calculation logic of FileSink. --- velox/dwio/common/FileSink.cpp | 12 ++++++------ velox/dwio/common/tests/MemorySinkTest.cpp | 7 ++++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/velox/dwio/common/FileSink.cpp b/velox/dwio/common/FileSink.cpp index 25c28f2f7239..1d0648ce1b2d 100644 --- a/velox/dwio/common/FileSink.cpp +++ b/velox/dwio/common/FileSink.cpp @@ -66,15 +66,15 @@ void FileSink::writeImpl( std::vector>& buffers, const std::function&)>& callback) { DWIO_ENSURE(!isClosed(), "Cannot write to closed sink."); - uint64_t size = 0; + const uint64_t oldSize = size_; for (auto& buf : buffers) { - const auto writtenSize = callback(buf); - size += writtenSize; - size_ += writtenSize; + // NOTE: we need to update 'size_' after each 'callback' invocation as some + // file sink implementation like MemorySink depends on the updated 'size_' + // for new write. + size_ += callback(buf); } - if (stats_ != nullptr) { - stats_->incRawBytesWritten(size); + stats_->incRawBytesWritten(size_ - oldSize); } // Writing buffer is treated as transferring ownership. So clearing the // buffers after all buffers are written. diff --git a/velox/dwio/common/tests/MemorySinkTest.cpp b/velox/dwio/common/tests/MemorySinkTest.cpp index fe4bfe5fb459..466d0fc18114 100644 --- a/velox/dwio/common/tests/MemorySinkTest.cpp +++ b/velox/dwio/common/tests/MemorySinkTest.cpp @@ -25,6 +25,7 @@ class MemorySinkTest : public testing::Test { static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); } + std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -43,7 +44,7 @@ TEST_F(MemorySinkTest, create) { buffers.back().append(chars[i]); } - EXPECT_EQ(buffers.size(), 2); + ASSERT_EQ(buffers.size(), 2); auto memorySink = std::make_unique( 1024, dwio::common::FileSink::Options{.pool = pool_.get()}); @@ -51,8 +52,8 @@ TEST_F(MemorySinkTest, create) { ASSERT_TRUE(memorySink->isBuffered()); // Write data to MemorySink. memorySink->write(buffers); - EXPECT_EQ(memorySink->size(), chars.length()); - EXPECT_EQ(memorySink->data(), chars); + ASSERT_EQ(memorySink->size(), chars.length()); + ASSERT_EQ(memorySink->data(), chars); memorySink->close(); } } // namespace facebook::velox::dwio::common