Skip to content

Commit

Permalink
Fix the incorrect size calculation logic of FileSink.
Browse files Browse the repository at this point in the history
  • Loading branch information
wypb committed Apr 11, 2024
1 parent cb25213 commit bc656f3
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
12 changes: 6 additions & 6 deletions velox/dwio/common/FileSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ void FileSink::writeImpl(
std::vector<DataBuffer<char>>& buffers,
const std::function<uint64_t(const DataBuffer<char>&)>& callback) {
DWIO_ENSURE(!isClosed(), "Cannot write to closed sink.");
uint64_t size = 0;
const uint64_t oldSize = size_;
for (auto& buf : buffers) {
const auto writtenSize = callback(buf);
size += writtenSize;
size_ += writtenSize;
// NOTE: we need to update 'size_' after each 'callback' invocation as some
// file sink implementation like MemorySink depends on the updated 'size_'
// for new write.
size_ += callback(buf);
}

if (stats_ != nullptr) {
stats_->incRawBytesWritten(size);
stats_->incRawBytesWritten(size_ - oldSize);
}
// Writing buffer is treated as transferring ownership. So clearing the
// buffers after all buffers are written.
Expand Down
7 changes: 4 additions & 3 deletions velox/dwio/common/tests/MemorySinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class MemorySinkTest : public testing::Test {
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
}

std::shared_ptr<velox::memory::MemoryPool> pool_{
memory::memoryManager()->addLeafPool()};
};
Expand All @@ -43,16 +44,16 @@ TEST_F(MemorySinkTest, create) {
buffers.back().append(chars[i]);
}

EXPECT_EQ(buffers.size(), 2);
ASSERT_EQ(buffers.size(), 2);

auto memorySink = std::make_unique<MemorySink>(
1024, dwio::common::FileSink::Options{.pool = pool_.get()});

ASSERT_TRUE(memorySink->isBuffered());
// Write data to MemorySink.
memorySink->write(buffers);
EXPECT_EQ(memorySink->size(), chars.length());
EXPECT_EQ(memorySink->data(), chars);
ASSERT_EQ(memorySink->size(), chars.length());
ASSERT_EQ(memorySink->data(), chars);
memorySink->close();
}
} // namespace facebook::velox::dwio::common

0 comments on commit bc656f3

Please sign in to comment.