diff --git a/velox/dwio/common/FileSink.cpp b/velox/dwio/common/FileSink.cpp index 26873abbfc04..1d0648ce1b2d 100644 --- a/velox/dwio/common/FileSink.cpp +++ b/velox/dwio/common/FileSink.cpp @@ -66,13 +66,15 @@ void FileSink::writeImpl( std::vector>& buffers, const std::function&)>& callback) { DWIO_ENSURE(!isClosed(), "Cannot write to closed sink."); - uint64_t size = 0; + const uint64_t oldSize = size_; for (auto& buf : buffers) { - size += callback(buf); + // NOTE: we need to update 'size_' after each 'callback' invocation as some + // file sink implementation like MemorySink depends on the updated 'size_' + // for new write. + size_ += callback(buf); } - size_ += size; if (stats_ != nullptr) { - stats_->incRawBytesWritten(size); + stats_->incRawBytesWritten(size_ - oldSize); } // Writing buffer is treated as transferring ownership. So clearing the // buffers after all buffers are written. diff --git a/velox/dwio/common/tests/CMakeLists.txt b/velox/dwio/common/tests/CMakeLists.txt index 4add4168ddf1..c250547272cd 100644 --- a/velox/dwio/common/tests/CMakeLists.txt +++ b/velox/dwio/common/tests/CMakeLists.txt @@ -25,6 +25,7 @@ add_executable( ExecutorBarrierTest.cpp OnDemandUnitLoaderTests.cpp LocalFileSinkTest.cpp + MemorySinkTest.cpp LoggedExceptionTest.cpp ParallelForTest.cpp RangeTests.cpp diff --git a/velox/dwio/common/tests/MemorySinkTest.cpp b/velox/dwio/common/tests/MemorySinkTest.cpp new file mode 100644 index 000000000000..4f348811d938 --- /dev/null +++ b/velox/dwio/common/tests/MemorySinkTest.cpp @@ -0,0 +1,57 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/FileSink.h" + +#include + +namespace facebook::velox::dwio::common { + +class MemorySinkTest : public testing::Test { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + std::shared_ptr pool_{ + memory::memoryManager()->addLeafPool()}; +}; + +TEST_F(MemorySinkTest, create) { + std::string chars("abcdefghijklmnopqrst"); + std::vector> buffers; + + // Add 'abcdefghij' to first buffer + buffers.emplace_back(*pool_); + buffers.back().append(0, chars.data(), 10); + + // Add 'klmnopqrst' to second buffer + buffers.emplace_back(*pool_); + buffers.back().append(0, chars.data() + 10, 10); + + ASSERT_EQ(buffers.size(), 2); + + auto memorySink = std::make_unique( + 1024, dwio::common::FileSink::Options{.pool = pool_.get()}); + + ASSERT_TRUE(memorySink->isBuffered()); + // Write data to MemorySink. + memorySink->write(buffers); + ASSERT_EQ(memorySink->size(), chars.length()); + ASSERT_EQ(memorySink->data(), chars); + memorySink->close(); +} +} // namespace facebook::velox::dwio::common