Skip to content

Commit

Permalink
Add force path to MemoryManager::shrinkPools (#9020)
Browse files Browse the repository at this point in the history
Summary:
Add fast path to shrink pools so that when it is turned on, it will reclaim memory by aborting the queries with biggest memory usage.

Pull Request resolved: facebookincubator/velox#9020

Reviewed By: xiaoxmeng

Differential Revision: D54724098

Pulled By: tanjialiang

fbshipit-source-id: 090060ffa500500e9da74c9128df7155a19a6f4b
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Mar 12, 2024
1 parent 829132a commit 18deebe
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 100 deletions.
8 changes: 6 additions & 2 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,12 @@ bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
return arbitrator_->growCapacity(pool, getAlivePools(), incrementBytes);
}

uint64_t MemoryManager::shrinkPools(uint64_t targetBytes) {
return arbitrator_->shrinkCapacity(getAlivePools(), targetBytes);
uint64_t MemoryManager::shrinkPools(
uint64_t targetBytes,
bool allowSpill,
bool allowAbort) {
return arbitrator_->shrinkCapacity(
getAlivePools(), targetBytes, allowSpill, allowAbort);
}

void MemoryManager::dropPool(MemoryPool* pool) {
Expand Down
14 changes: 9 additions & 5 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,15 @@ class MemoryManager {

/// Invoked to shrink alive pools to free 'targetBytes' capacity. The function
/// returns the actual freed memory capacity in bytes. If 'targetBytes' is
/// zero, then try to reclaim all the memory from the alive pools.
///
/// TODO: add option to enable spilling or not. If spilling is disabled, then
/// the arbitrator might reclaim memory by killing queries.
uint64_t shrinkPools(uint64_t targetBytes = 0);
/// zero, then try to reclaim all the memory from the alive pools. If
/// 'allowSpill' is true, it reclaims the used memory by spilling. If
/// 'allowAbort' is true, it reclaims the used memory by aborting the queries
/// with the most memory usage. If both are true, it first reclaims the used
/// memory by spilling and then abort queries to reach the reclaim target.
uint64_t shrinkPools(
uint64_t targetBytes = 0,
bool allowSpill = true,
bool allowAbort = false);

/// Default unmanaged leaf pool with no threadsafe stats support. Libraries
/// using this method can get a pool that is shared with other threads. The
Expand Down
4 changes: 3 additions & 1 deletion velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ class NoopArbitrator : public MemoryArbitrator {
// memory pool capacity shrink.
uint64_t shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& /* unused */,
uint64_t /* unused */) override {
uint64_t /* unused */,
bool /* unused */,
bool /* unused */) override {
return 0;
}

Expand Down
10 changes: 8 additions & 2 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,16 @@ class MemoryArbitrator {
/// of memory pools by reclaiming free and used memory. The freed memory
/// capacity is given back to the arbitrator. If 'targetBytes' is zero, then
/// try to reclaim all the memory from 'pools'. The function returns the
/// actual freed memory capacity in bytes.
/// actual freed memory capacity in bytes. If 'allowSpill' is true, it
/// reclaims the used memory by spilling. If 'allowAbort' is true, it reclaims
/// the used memory by aborting the queries with the most memory usage. If
/// both are true, it first reclaims the used memory by spilling and then
/// abort queries to reach the reclaim target.
virtual uint64_t shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes) = 0;
uint64_t targetBytes,
bool allowSpill = true,
bool allowAbort = false) = 0;

/// The internal execution stats of the memory arbitrator.
struct Stats {
Expand Down
152 changes: 107 additions & 45 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,31 @@ std::string memoryPoolAbortMessage(
<< victim->treeMemoryUsage();
return out.str();
}
} // namespace

SharedArbitrator::SharedArbitrator(const MemoryArbitrator::Config& config)
: MemoryArbitrator(config), freeCapacity_(capacity_) {
RECORD_METRIC_VALUE(kMetricArbitratorFreeCapacityBytes, freeCapacity_);
VELOX_CHECK_EQ(kind_, config.kind);
}

std::string SharedArbitrator::Candidate::toString() const {
return fmt::format(
"CANDIDATE[{} RECLAIMABLE[{}] RECLAIMABLE_BYTES[{}] FREE_BYTES[{}]]",
pool->root()->name(),
reclaimable,
succinctBytes(reclaimableBytes),
succinctBytes(freeBytes));
std::vector<SharedArbitrator::Candidate> getCandidateStats(
const std::vector<std::shared_ptr<MemoryPool>>& pools) {
std::vector<SharedArbitrator::Candidate> candidates;
candidates.reserve(pools.size());
for (const auto& pool : pools) {
auto reclaimableBytesOpt = pool->reclaimableBytes();
const uint64_t reclaimableBytes = reclaimableBytesOpt.value_or(0);
candidates.push_back(
{reclaimableBytesOpt.has_value(),
reclaimableBytes,
pool->freeBytes(),
pool->currentBytes(),
pool.get()});
}
return candidates;
}

void SharedArbitrator::sortCandidatesByFreeCapacity(
std::vector<Candidate>& candidates) const {
void sortCandidatesByFreeCapacity(
std::vector<SharedArbitrator::Candidate>& candidates) {
std::sort(
candidates.begin(),
candidates.end(),
[&](const Candidate& lhs, const Candidate& rhs) {
[&](const SharedArbitrator::Candidate& lhs,
const SharedArbitrator::Candidate& rhs) {
return lhs.freeBytes > rhs.freeBytes;
});

Expand All @@ -94,12 +96,13 @@ void SharedArbitrator::sortCandidatesByFreeCapacity(
&candidates);
}

void SharedArbitrator::sortCandidatesByReclaimableMemory(
std::vector<Candidate>& candidates) const {
void sortCandidatesByReclaimableMemory(
std::vector<SharedArbitrator::Candidate>& candidates) {
std::sort(
candidates.begin(),
candidates.end(),
[](const Candidate& lhs, const Candidate& rhs) {
[](const SharedArbitrator::Candidate& lhs,
const SharedArbitrator::Candidate& rhs) {
if (!lhs.reclaimable) {
return false;
}
Expand All @@ -114,11 +117,24 @@ void SharedArbitrator::sortCandidatesByReclaimableMemory(
&candidates);
}

const SharedArbitrator::Candidate&
SharedArbitrator::findCandidateWithLargestCapacity(
void sortCandidatesByUsage(
std::vector<SharedArbitrator::Candidate>& candidates) {
std::sort(
candidates.begin(),
candidates.end(),
[](const SharedArbitrator::Candidate& lhs,
const SharedArbitrator::Candidate& rhs) {
return lhs.currentBytes > rhs.currentBytes;
});
}

// Finds the candidate with the largest capacity. For 'requestor', the
// capacity for comparison including its current capacity and the capacity to
// grow.
const SharedArbitrator::Candidate& findCandidateWithLargestCapacity(
MemoryPool* requestor,
uint64_t targetBytes,
const std::vector<Candidate>& candidates) const {
const std::vector<SharedArbitrator::Candidate>& candidates) {
VELOX_CHECK(!candidates.empty());
int32_t candidateIdx{-1};
int64_t maxCapacity{-1};
Expand Down Expand Up @@ -150,6 +166,22 @@ SharedArbitrator::findCandidateWithLargestCapacity(
VELOX_CHECK_NE(candidateIdx, -1);
return candidates[candidateIdx];
}
} // namespace

SharedArbitrator::SharedArbitrator(const MemoryArbitrator::Config& config)
: MemoryArbitrator(config), freeCapacity_(capacity_) {
RECORD_METRIC_VALUE(kMetricArbitratorFreeCapacityBytes, freeCapacity_);
VELOX_CHECK_EQ(kind_, config.kind);
}

std::string SharedArbitrator::Candidate::toString() const {
return fmt::format(
"CANDIDATE[{} RECLAIMABLE[{}] RECLAIMABLE_BYTES[{}] FREE_BYTES[{}]]",
pool->root()->name(),
reclaimable,
succinctBytes(reclaimableBytes),
succinctBytes(freeBytes));
}

SharedArbitrator::~SharedArbitrator() {
if (freeCapacity_ != capacity_) {
Expand Down Expand Up @@ -209,7 +241,9 @@ uint64_t SharedArbitrator::shrinkCapacity(

uint64_t SharedArbitrator::shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes) {
uint64_t targetBytes,
bool allowSpill,
bool allowAbort) {
ScopedArbitration scopedArbitration(this);
if (targetBytes == 0) {
targetBytes = capacity_;
Expand All @@ -218,12 +252,30 @@ uint64_t SharedArbitrator::shrinkCapacity(
}
std::vector<Candidate> candidates = getCandidateStats(pools);
auto freedBytes = reclaimFreeMemoryFromCandidates(candidates, targetBytes);
auto freeGuard = folly::makeGuard([&]() {
// Returns the freed memory capacity back to the arbitrator.
if (freedBytes > 0) {
incrementFreeCapacity(freedBytes);
}
});
if (freedBytes >= targetBytes) {
return freedBytes;
}
freedBytes += reclaimUsedMemoryFromCandidates(
nullptr, candidates, targetBytes - freedBytes);
incrementFreeCapacity(freedBytes);
if (allowSpill) {
freedBytes += reclaimUsedMemoryFromCandidatesBySpill(
nullptr, candidates, targetBytes - freedBytes);
if (freedBytes >= targetBytes) {
return freedBytes;
}
}
if (allowAbort) {
if (allowSpill) {
// Candidate stats may change after spilling.
candidates = getCandidateStats(pools);
}
freedBytes += reclaimUsedMemoryFromCandidatesByAbort(
candidates, targetBytes - freedBytes);
}
return freedBytes;
}

Expand All @@ -232,22 +284,6 @@ void SharedArbitrator::testingFreeCapacity(uint64_t capacity) {
incrementFreeCapacityLocked(capacity);
}

std::vector<SharedArbitrator::Candidate> SharedArbitrator::getCandidateStats(
const std::vector<std::shared_ptr<MemoryPool>>& pools) {
std::vector<SharedArbitrator::Candidate> candidates;
candidates.reserve(pools.size());
for (const auto& pool : pools) {
auto reclaimableBytesOpt = pool->reclaimableBytes();
const uint64_t reclaimableBytes = reclaimableBytesOpt.value_or(0);
candidates.push_back(
{reclaimableBytesOpt.has_value(),
reclaimableBytes,
pool->freeBytes(),
pool.get()});
}
return candidates;
}

bool SharedArbitrator::growCapacity(
MemoryPool* pool,
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
Expand Down Expand Up @@ -396,7 +432,7 @@ bool SharedArbitrator::arbitrateMemory(
}

VELOX_CHECK_LT(freedBytes, growTarget);
freedBytes += reclaimUsedMemoryFromCandidates(
freedBytes += reclaimUsedMemoryFromCandidatesBySpill(
requestor, candidates, growTarget - freedBytes);
if (requestor->aborted()) {
RECORD_METRIC_VALUE(kMetricArbitratorFailuresCount);
Expand Down Expand Up @@ -447,7 +483,7 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
return freedBytes;
}

uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidates(
uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesBySpill(
MemoryPool* requestor,
std::vector<Candidate>& candidates,
uint64_t targetBytes) {
Expand All @@ -472,6 +508,32 @@ uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidates(
return freedBytes;
}

uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort(
std::vector<Candidate>& candidates,
uint64_t targetBytes) {
sortCandidatesByUsage(candidates);

int64_t freedBytes{0};
for (const auto& candidate : candidates) {
VELOX_CHECK_LT(freedBytes, targetBytes);
if (candidate.currentBytes == 0) {
break;
}
try {
VELOX_MEM_POOL_ABORTED(fmt::format(
"Memory pool aborted to reclaim used memory, current usage {}",
succinctBytes(candidate.currentBytes)));
} catch (VeloxRuntimeError& ex) {
abort(candidate.pool, std::current_exception());
}
freedBytes += candidate.pool->shrink();
if (freedBytes >= targetBytes) {
break;
}
}
return freedBytes;
}

uint64_t SharedArbitrator::reclaim(
MemoryPool* pool,
uint64_t targetBytes) noexcept {
Expand Down
32 changes: 12 additions & 20 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ class SharedArbitrator : public memory::MemoryArbitrator {

uint64_t shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes) override final;
uint64_t targetBytes,
bool allowSpill = true,
bool force = false) override final;

Stats stats() const final;

Expand All @@ -65,6 +67,7 @@ class SharedArbitrator : public memory::MemoryArbitrator {
bool reclaimable{false};
uint64_t reclaimableBytes{0};
uint64_t freeBytes{0};
int64_t currentBytes{0};
MemoryPool* pool;

std::string toString() const;
Expand Down Expand Up @@ -110,23 +113,6 @@ class SharedArbitrator : public memory::MemoryArbitrator {
// requestor capacity accordingly.
bool ensureCapacity(MemoryPool* requestor, uint64_t targetBytes);

// Invoked to capture the candidate memory pools stats for arbitration.
static std::vector<Candidate> getCandidateStats(
const std::vector<std::shared_ptr<MemoryPool>>& pools);

void sortCandidatesByReclaimableMemory(
std::vector<Candidate>& candidates) const;

void sortCandidatesByFreeCapacity(std::vector<Candidate>& candidates) const;

// Finds the candidate with the largest capacity. For 'requestor', the
// capacity for comparison including its current capacity and the capacity to
// grow.
const Candidate& findCandidateWithLargestCapacity(
MemoryPool* requestor,
uint64_t targetBytes,
const std::vector<Candidate>& candidates) const;

bool arbitrateMemory(
MemoryPool* requestor,
std::vector<Candidate>& candidates,
Expand All @@ -150,15 +136,21 @@ class SharedArbitrator : public memory::MemoryArbitrator {
std::vector<Candidate>& candidates,
uint64_t targetBytes);

// Invoked to reclaim used memory capacity from 'candidates'.
// Invoked to reclaim used memory capacity from 'candidates' by spilling.
//
// NOTE: the function might sort 'candidates' based on each candidate's
// reclaimable memory internally.
uint64_t reclaimUsedMemoryFromCandidates(
uint64_t reclaimUsedMemoryFromCandidatesBySpill(
MemoryPool* requestor,
std::vector<Candidate>& candidates,
uint64_t targetBytes);

// Invoded to reclaim used memroy capacity from 'candidates' by aborting the
// top memory users' queries.
uint64_t reclaimUsedMemoryFromCandidatesByAbort(
std::vector<Candidate>& candidates,
uint64_t targetBytes);

// Invoked to reclaim used memory from 'pool' with specified 'targetBytes'.
// The function returns the actually freed capacity.
uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes) noexcept;
Expand Down
4 changes: 3 additions & 1 deletion velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ class FakeTestArbitrator : public MemoryArbitrator {

uint64_t shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& /*unused*/,
uint64_t /*unused*/) override {
uint64_t /*unused*/,
bool /*unused*/,
bool /*unused*/) override {
VELOX_NYI();
}

Expand Down
4 changes: 3 additions & 1 deletion velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ class FakeTestArbitrator : public MemoryArbitrator {

uint64_t shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& /*unused*/,
uint64_t /*unused*/) override {
uint64_t /*unused*/,
bool /*unused*/,
bool /*unused*/) override {
VELOX_NYI();
}

Expand Down
Loading

0 comments on commit 18deebe

Please sign in to comment.