From d4f8d8556bbd3bf39117b03964e8e84b6750d32e Mon Sep 17 00:00:00 2001 From: wypb Date: Thu, 11 Apr 2024 00:50:19 -0700 Subject: [PATCH] Fix the incorrect size calculation logic of FileSink. (#9429) Summary: I was writing unit tests for the Textfile writer internally and found that data written to `MemorySink` may be overwritten. I investigated the reason and found that `FileSink::writeImpl` can accept multiple buffers. https://github.com/facebookincubator/velox/blob/main/velox/dwio/common/FileSink.cpp#L65-L80 Every time the data of a buffer is written, we should update `FileSink#size_` instead of updating after writing all buffers. Because `MemorySink::write` relies on `FileSink#size_` to write data to `MemorySink#data_`. https://github.com/facebookincubator/velox/blob/main/velox/dwio/common/FileSink.cpp#L184-L189 CC: mbasmanova xiaoxmeng Pull Request resolved: https://github.com/facebookincubator/velox/pull/9429 Reviewed By: amitkdutta, kewang1024 Differential Revision: D56004384 Pulled By: xiaoxmeng fbshipit-source-id: c824c5e481866abef12f99d570d41a852bb8b2e0 --- velox/dwio/common/FileSink.cpp | 10 ++-- velox/dwio/common/tests/CMakeLists.txt | 1 + velox/dwio/common/tests/MemorySinkTest.cpp | 57 ++++++++++++++++++++++ 3 files changed, 64 insertions(+), 4 deletions(-) create mode 100644 velox/dwio/common/tests/MemorySinkTest.cpp diff --git a/velox/dwio/common/FileSink.cpp b/velox/dwio/common/FileSink.cpp index 26873abbfc04..1d0648ce1b2d 100644 --- a/velox/dwio/common/FileSink.cpp +++ b/velox/dwio/common/FileSink.cpp @@ -66,13 +66,15 @@ void FileSink::writeImpl( std::vector>& buffers, const std::function&)>& callback) { DWIO_ENSURE(!isClosed(), "Cannot write to closed sink."); - uint64_t size = 0; + const uint64_t oldSize = size_; for (auto& buf : buffers) { - size += callback(buf); + // NOTE: we need to update 'size_' after each 'callback' invocation as some + // file sink implementation like MemorySink depends on the updated 'size_' + // for new write. + size_ += callback(buf); } - size_ += size; if (stats_ != nullptr) { - stats_->incRawBytesWritten(size); + stats_->incRawBytesWritten(size_ - oldSize); } // Writing buffer is treated as transferring ownership. So clearing the // buffers after all buffers are written. diff --git a/velox/dwio/common/tests/CMakeLists.txt b/velox/dwio/common/tests/CMakeLists.txt index 4add4168ddf1..c250547272cd 100644 --- a/velox/dwio/common/tests/CMakeLists.txt +++ b/velox/dwio/common/tests/CMakeLists.txt @@ -25,6 +25,7 @@ add_executable( ExecutorBarrierTest.cpp OnDemandUnitLoaderTests.cpp LocalFileSinkTest.cpp + MemorySinkTest.cpp LoggedExceptionTest.cpp ParallelForTest.cpp RangeTests.cpp diff --git a/velox/dwio/common/tests/MemorySinkTest.cpp b/velox/dwio/common/tests/MemorySinkTest.cpp new file mode 100644 index 000000000000..4f348811d938 --- /dev/null +++ b/velox/dwio/common/tests/MemorySinkTest.cpp @@ -0,0 +1,57 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/FileSink.h" + +#include + +namespace facebook::velox::dwio::common { + +class MemorySinkTest : public testing::Test { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + std::shared_ptr pool_{ + memory::memoryManager()->addLeafPool()}; +}; + +TEST_F(MemorySinkTest, create) { + std::string chars("abcdefghijklmnopqrst"); + std::vector> buffers; + + // Add 'abcdefghij' to first buffer + buffers.emplace_back(*pool_); + buffers.back().append(0, chars.data(), 10); + + // Add 'klmnopqrst' to second buffer + buffers.emplace_back(*pool_); + buffers.back().append(0, chars.data() + 10, 10); + + ASSERT_EQ(buffers.size(), 2); + + auto memorySink = std::make_unique( + 1024, dwio::common::FileSink::Options{.pool = pool_.get()}); + + ASSERT_TRUE(memorySink->isBuffered()); + // Write data to MemorySink. + memorySink->write(buffers); + ASSERT_EQ(memorySink->size(), chars.length()); + ASSERT_EQ(memorySink->data(), chars); + memorySink->close(); +} +} // namespace facebook::velox::dwio::common