Skip to content

Commit

Permalink
Add non-blocking support for DwrfReader (#8884)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator/velox#8884

Create a non-io-blocking interface for the reader. This way the main thread won't be blocked on IO if that thread isn't meant to be an IO thread.

Reviewed By: Yuhta

Differential Revision: D54282154

fbshipit-source-id: 8081578d5bc9c7a4d86ed34b24a188dc062eeff7
  • Loading branch information
Daniel Munoz authored and facebook-github-bot committed Mar 12, 2024
1 parent 21242cb commit 49d1224
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 13 deletions.
14 changes: 14 additions & 0 deletions velox/dwio/common/Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "velox/dwio/common/InputStream.h"
#include "velox/dwio/common/Mutation.h"
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/ReaderState.h"
#include "velox/dwio/common/Statistics.h"
#include "velox/dwio/common/TypeWithId.h"
#include "velox/type/Type.h"
Expand Down Expand Up @@ -59,6 +60,14 @@ class RowReader {
velox::VectorPtr& result,
const Mutation* mutation = nullptr) = 0;

virtual StateAndResultOrIoActions<uint64_t> tryNext(
uint64_t size,
velox::VectorPtr& result,
const Mutation* mutation = nullptr) {
// Default to blocking
return {ReaderStepState::kSuccess, next(size, result, mutation)};
}

/**
* Return the next row number that will be scanned in the next next() call,
* kAtEnd when at end of file. This row number is relative to beginning of
Expand All @@ -71,6 +80,11 @@ class RowReader {
*/
virtual int64_t nextRowNumber() = 0;

virtual StateAndResultOrIoActions<int64_t> tryNextRowNumber() {
// Default to blocking
return {ReaderStepState::kSuccess, nextRowNumber()};
}

/**
* Given the max number of rows to read, return the actual number of rows that
* will be scanned, including any rows to be deleted or filtered. Return
Expand Down
40 changes: 40 additions & 0 deletions velox/dwio/common/ReaderState.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/common/ResultOrActions.h"

namespace facebook::velox::dwio::common {

enum class ReaderStepState {
// Success, will contain the result
kSuccess,

// IO is needed to read more data. There will be one or more actions
// (callbacks) to do the IO.
kNeedsIO,
};

template <typename ResultType>
struct StateAndResultOrIoActions {
ReaderStepState state;
ResultOrActions<ResultType> resultOrIoActions;
};

using StateAndIoActions = StateAndResultOrIoActions<folly::Unit>;

} // namespace facebook::velox::dwio::common
114 changes: 101 additions & 13 deletions velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,29 @@ using dwio::common::ColumnSelector;
using dwio::common::FileFormat;
using dwio::common::InputStream;
using dwio::common::ReaderOptions;
using dwio::common::ReaderStepState;
using dwio::common::ResultOrActions;
using dwio::common::RowReaderOptions;
using dwio::common::StateAndIoActions;
using dwio::common::StateAndResultOrIoActions;

namespace {

template <typename F>
auto performIO(F func) {
constexpr bool isVoid = std::is_same_v<StateAndIoActions, decltype(func())>;

auto result = func();
while (result.state == ReaderStepState::kNeedsIO) {
result.resultOrIoActions.runAllActions();
result = func();
}
if constexpr (!isVoid) {
return result.resultOrIoActions.result();
}
}

} // namespace

DwrfRowReader::DwrfRowReader(
const std::shared_ptr<ReaderBase>& reader,
Expand Down Expand Up @@ -374,11 +396,11 @@ void DwrfRowReader::readWithRowNumber(
std::move(children));
}

int64_t DwrfRowReader::nextRowNumber() {
StateAndResultOrIoActions<int64_t> DwrfRowReader::tryNextRowNumber() {
auto strideSize = getReader().getFooter().rowIndexStride();
while (currentStripe_ < stripeCeiling_) {
if (currentRowInStripe_ == 0) {
if (getReader().randomSkip()) {
if (getReader().randomSkip() && !retryNextRowNumberInProgress_) {
auto numStripeRows =
getReader().getFooter().stripes(currentStripe_).numberOfRows();
auto skip = getReader().randomSkip()->nextSkip();
Expand All @@ -389,19 +411,35 @@ int64_t DwrfRowReader::nextRowNumber() {
goto advanceToNextStripe;
}
}
startNextStripe();
auto result = tryStartNextStripe();
if (result.state == ReaderStepState::kNeedsIO) {
retryNextRowNumberInProgress_ = true;
ResultOrActions<int64_t> resultOrActions;
resultOrActions.moveActionsBack(
std::move(result.resultOrIoActions.actions()));
return {ReaderStepState::kNeedsIO, std::move(resultOrActions)};
}
}
checkSkipStrides(strideSize);
if (currentRowInStripe_ < rowsInCurrentStripe_) {
return firstRowOfStripe_[currentStripe_] + currentRowInStripe_;
retryNextRowNumberInProgress_ = false;
return {
ReaderStepState::kSuccess,
static_cast<int64_t>(
firstRowOfStripe_[currentStripe_] + currentRowInStripe_)};
}
advanceToNextStripe:
++currentStripe_;
currentRowInStripe_ = 0;
newStripeReadyForRead_ = false;
}
atEnd_ = true;
return kAtEnd;
retryNextRowNumberInProgress_ = false;
return {ReaderStepState::kSuccess, kAtEnd};
}

int64_t DwrfRowReader::nextRowNumber() {
return performIO([&]() { return tryNextRowNumber(); });
}

int64_t DwrfRowReader::nextReadSize(uint64_t size) {
Expand All @@ -420,29 +458,45 @@ int64_t DwrfRowReader::nextReadSize(uint64_t size) {
return rowsToRead;
}

uint64_t DwrfRowReader::next(
StateAndResultOrIoActions<uint64_t> DwrfRowReader::tryNext(
uint64_t size,
velox::VectorPtr& result,
const dwio::common::Mutation* mutation) {
auto nextRow = nextRowNumber();
if (nextRow == kAtEnd) {
auto nextRowResult = tryNextRowNumber();
if (nextRowResult.state == ReaderStepState::kNeedsIO) {
ResultOrActions<uint64_t> ret;
ret.moveActionsBack(std::move(nextRowResult.resultOrIoActions.actions()));
return {ReaderStepState::kNeedsIO, std::move(ret)};
}
if (nextRowResult.resultOrIoActions.result() == kAtEnd) {
if (!isEmptyFile()) {
previousRow_ = firstRowOfStripe_[stripeCeiling_ - 1] +
getReader().getFooter().stripes(stripeCeiling_ - 1).numberOfRows();
} else {
previousRow_ = 0;
}
return 0;
return {ReaderStepState::kSuccess, 0};
}
VELOX_CHECK(nextRowResult.resultOrIoActions.hasResult());
auto nextRow = nextRowResult.resultOrIoActions.result();

auto rowsToRead = nextReadSize(size);

previousRow_ = nextRow;
// Record strideIndex for use by the columnReader_ which may delay actual
// reading of the data.
auto strideSize = getReader().getFooter().rowIndexStride();
strideIndex_ = strideSize > 0 ? currentRowInStripe_ / strideSize : 0;
readNext(rowsToRead, mutation, result);
currentRowInStripe_ += rowsToRead;
return rowsToRead;
return {ReaderStepState::kSuccess, static_cast<uint64_t>(rowsToRead)};
}

uint64_t DwrfRowReader::next(
uint64_t size,
velox::VectorPtr& result,
const dwio::common::Mutation* mutation) {
return performIO([&]() { return tryNext(size, result, mutation); });
}

void DwrfRowReader::resetFilterCaches() {
Expand Down Expand Up @@ -472,6 +526,16 @@ DwrfRowReader::prefetchUnits() {
return res;
}

DwrfRowReader::FetchStatus DwrfRowReader::fetchStatus(
uint32_t stripeIndex) const {
return stripeLoadStatuses_.withRLock([&](auto& stripeLoadStatus) {
if (stripeIndex < 0 || stripeIndex >= stripeLoadStatus.size()) {
return FetchStatus::ERROR;
}
return stripeLoadStatus[stripeIndex];
});
}

DwrfRowReader::FetchResult DwrfRowReader::fetch(uint32_t stripeIndex) {
FetchStatus prevStatus;
stripeLoadStatuses_.withWLock([&](auto& stripeLoadStatus) {
Expand Down Expand Up @@ -599,6 +663,22 @@ DwrfRowReader::FetchResult DwrfRowReader::prefetch(uint32_t stripeToFetch) {
return fetch(stripeToFetch);
}

StateAndIoActions DwrfRowReader::trySafeFetchNextStripe() {
switch (fetchStatus(currentStripe_)) {
case FetchStatus::NOT_STARTED:
case FetchStatus::ERROR:
case FetchStatus::IN_PROGRESS:
return {
ReaderStepState::kNeedsIO,
std::function<void()>([this, currentStripe = currentStripe_]() {
fetch(currentStripe);
})};
default:
safeFetchNextStripe();
return {ReaderStepState::kSuccess, {}};
}
}

// Guarantee stripe we are currently on is available and loaded
void DwrfRowReader::safeFetchNextStripe() {
auto startTime = std::chrono::high_resolution_clock::now();
Expand Down Expand Up @@ -632,13 +712,16 @@ void DwrfRowReader::safeFetchNextStripe() {
DWIO_ENSURE(prefetchedStripeStates_.rlock()->contains(currentStripe_));
}

void DwrfRowReader::startNextStripe() {
StateAndIoActions DwrfRowReader::tryStartNextStripe() {
if (newStripeReadyForRead_ || currentStripe_ >= stripeCeiling_) {
return;
return {ReaderStepState::kSuccess, {}};
}
auto fetchResult = trySafeFetchNextStripe();
if (fetchResult.state != ReaderStepState::kSuccess) {
return fetchResult;
}
columnReader_.reset();
selectiveColumnReader_.reset();
safeFetchNextStripe();
prefetchedStripeStates_.withWLock([&](auto& prefetchedStripeStates) {
DWIO_ENSURE(prefetchedStripeStates.contains(currentStripe_));

Expand Down Expand Up @@ -668,6 +751,11 @@ void DwrfRowReader::startNextStripe() {
<< std::chrono::duration_cast<std::chrono::microseconds>(
endTime - startTime)
.count();
return {ReaderStepState::kSuccess, {}};
}

void DwrfRowReader::startNextStripe() {
performIO([this]() { return tryStartNextStripe(); });
}

size_t DwrfRowReader::estimatedReaderMemory() const {
Expand Down
14 changes: 14 additions & 0 deletions velox/dwio/dwrf/reader/DwrfReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ class DwrfRowReader : public StrideIndexProvider,
VectorPtr& result,
const dwio::common::Mutation* = nullptr) override;

dwio::common::StateAndResultOrIoActions<uint64_t> tryNext(
uint64_t size,
velox::VectorPtr& result,
const dwio::common::Mutation* mutation = nullptr) override;

void updateRuntimeStats(
dwio::common::RuntimeStatistics& stats) const override {
stats.skippedStrides += skippedStrides_;
Expand Down Expand Up @@ -122,12 +127,19 @@ class DwrfRowReader : public StrideIndexProvider,

int64_t nextRowNumber() override;

dwio::common::StateAndResultOrIoActions<int64_t> tryNextRowNumber() override;

int64_t nextReadSize(uint64_t size) override;

private:
dwio::common::StateAndIoActions tryStartNextStripe();

dwio::common::StateAndIoActions trySafeFetchNextStripe();

// Represents the status of a stripe being fetched.
enum class FetchStatus { NOT_STARTED, IN_PROGRESS, FINISHED, ERROR };

FetchStatus fetchStatus(uint32_t stripeIndex) const;
FetchResult fetch(uint32_t stripeIndex);
FetchResult prefetch(uint32_t stripeToFetch);

Expand Down Expand Up @@ -200,6 +212,8 @@ class DwrfRowReader : public StrideIndexProvider,

bool atEnd_{false};

bool retryNextRowNumberInProgress_{false};

// internal methods

std::optional<size_t> estimatedRowSizeHelper(
Expand Down

0 comments on commit 49d1224

Please sign in to comment.