Skip to content

Commit

Permalink
Merge pull request #16963 from mmaslankaprv/vbotbuildovich/backport-1…
Browse files Browse the repository at this point in the history
…6846-v23.3.x-53

[v23.3.x] Made `storage::readers_cache` size limited
  • Loading branch information
mmaslankaprv authored Mar 8, 2024
2 parents fe1c9ac + 412d589 commit 3fd6014
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 50 deletions.
9 changes: 9 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ configuration::configuration()
"Duration after which inactive readers will be evicted from cache",
{.visibility = visibility::tunable},
30s)
, readers_cache_target_max_size(
*this,
"readers_cache_target_max_size",
"Maximum desired number of readers cached per ntp. This a soft limit, a "
"number of readers in cache may temporary increase as cleanup is done in "
"background",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
200,
{.min = 0, .max = 10000})
, log_segment_ms(
*this,
"log_segment_ms",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ struct configuration final : public config_store {
bounded_property<uint16_t> log_segment_size_jitter_percent;
bounded_property<uint64_t> compacted_log_segment_size;
property<std::chrono::milliseconds> readers_cache_eviction_timeout_ms;
bounded_property<size_t> readers_cache_target_max_size;

bounded_property<std::optional<std::chrono::milliseconds>> log_segment_ms;
property<std::chrono::milliseconds> log_segment_ms_min;
property<std::chrono::milliseconds> log_segment_ms_max;
Expand Down
4 changes: 3 additions & 1 deletion src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ disk_log_impl::disk_log_impl(
, _probe(std::make_unique<storage::probe>())
, _max_segment_size(compute_max_segment_size())
, _readers_cache(std::make_unique<readers_cache>(
config().ntp(), _manager.config().readers_cache_eviction_timeout)) {
config().ntp(),
_manager.config().readers_cache_eviction_timeout,
config::shard_local_cfg().readers_cache_target_max_size.bind())) {
const bool is_compacted = config().is_compacted();
for (auto& s : _segs) {
_probe->add_initial_segment(*s);
Expand Down
50 changes: 41 additions & 9 deletions src/v/storage/readers_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "model/fundamental.h"
#include "ssx/future-util.h"
#include "storage/types.h"
#include "utils/intrusive_list_helpers.h"
#include "utils/mutex.h"
#include "vlog.h"

Expand All @@ -26,22 +27,27 @@
namespace storage {

readers_cache::readers_cache(
model::ntp ntp, std::chrono::milliseconds eviction_timeout)
model::ntp ntp,
std::chrono::milliseconds eviction_timeout,
config::binding<size_t> target_max_size)
: _ntp(std::move(ntp))
, _eviction_timeout(eviction_timeout) {
, _eviction_timeout(eviction_timeout)
, _target_max_size(std::move(target_max_size))
, _eviction_jitter(eviction_timeout) {
_probe.setup_metrics(_ntp);
// setup eviction timer
_eviction_timer.set_callback([this] {
ssx::spawn_with_gate(_gate, [this] {
return maybe_evict().finally([this] {
if (!_gate.is_closed()) {
_eviction_timer.arm(_eviction_timeout);
_eviction_timer.arm(
_eviction_jitter.next_jitter_duration());
}
});
});
});

_eviction_timer.arm(_eviction_timeout);
_eviction_timer.arm(_eviction_jitter.next_jitter_duration());
}

model::record_batch_reader
Expand Down Expand Up @@ -76,6 +82,7 @@ readers_cache::put(std::unique_ptr<log_reader> reader) {
auto ptr = new entry{.reader = std::move(reader)}; // NOLINT
_in_use.push_back(*ptr);
_probe.reader_added();
maybe_evict_size();
return ptr->make_cached_reader(this);
}

Expand All @@ -98,7 +105,7 @@ readers_cache::get_reader(const log_reader_config& cfg) {
return std::nullopt;
}
vlog(stlog.trace, "{} - trying to get reader for: {}", _ntp, cfg);
intrusive_list<entry, &entry::_hook> to_evict;
uncounted_intrusive_list<entry, &entry::_hook> to_evict;
/**
* We use linear search since _readers intrusive list is small.
*/
Expand Down Expand Up @@ -136,7 +143,7 @@ readers_cache::get_reader(const log_reader_config& cfg) {

// we use cached_reader wrapper to track reader usage, when cached_reader is
// destroyed we unlock reader and trigger eviction
e._hook.unlink();
_readers.erase(_readers.iterator_to(e));
_in_use.push_back(e);
return e.make_cached_reader(this);
}
Expand Down Expand Up @@ -276,8 +283,8 @@ readers_cache::~readers_cache() {
"readers cache have to be closed before destorying");
}

ss::future<>
readers_cache::dispose_entries(intrusive_list<entry, &entry::_hook> entries) {
ss::future<> readers_cache::dispose_entries(
uncounted_intrusive_list<entry, &entry::_hook> entries) {
for (auto& e : entries) {
co_await e.reader->finally();
}
Expand All @@ -296,7 +303,7 @@ readers_cache::dispose_entries(intrusive_list<entry, &entry::_hook> entries) {
}

void readers_cache::dispose_in_background(
intrusive_list<entry, &entry::_hook> entries) {
uncounted_intrusive_list<entry, &entry::_hook> entries) {
ssx::spawn_with_gate(_gate, [this, entries = std::move(entries)]() mutable {
return dispose_entries(std::move(entries));
});
Expand Down Expand Up @@ -341,4 +348,29 @@ ss::future<> readers_cache::maybe_evict() {
});
}

inline bool readers_cache::over_size_limit() const {
return !_readers.empty()
&& _readers.size() + _in_use.size() > _target_max_size();
}

void readers_cache::maybe_evict_size() {
/**
* exit early if there is nothing to clean
*/
if (!over_size_limit()) [[likely]] {
return;
}

uncounted_intrusive_list<entry, &entry::_hook> to_evict;
_readers.pop_front_and_dispose(
[&to_evict](entry* e) { to_evict.push_back(*e); });

dispose_in_background(std::move(to_evict));
}

readers_cache::stats readers_cache::get_stats() const {
return readers_cache::stats{
.in_use_readers = _in_use.size(), .cached_readers = _readers.size()};
}

} // namespace storage
34 changes: 24 additions & 10 deletions src/v/storage/readers_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
* by the Apache License, Version 2.0
*/
#pragma once
#include "config/property.h"
#include "model/fundamental.h"
#include "model/record.h"
#include "model/record_batch_reader.h"
#include "random/generators.h"
#include "random/simple_time_jitter.h"
#include "storage/log_reader.h"
#include "storage/readers_cache_probe.h"
#include "storage/types.h"
Expand All @@ -35,6 +37,10 @@ namespace storage {
*/
class readers_cache {
public:
struct stats {
size_t in_use_readers;
size_t cached_readers;
};
using offset_range = std::pair<model::offset, model::offset>;
class range_lock_holder {
public:
Expand Down Expand Up @@ -68,12 +74,15 @@ class readers_cache {
std::optional<offset_range> _range;
readers_cache* _cache;
};
explicit readers_cache(model::ntp, std::chrono::milliseconds);
explicit readers_cache(
model::ntp, std::chrono::milliseconds, config::binding<size_t>);
std::optional<model::record_batch_reader>
get_reader(const log_reader_config&);

model::record_batch_reader put(std::unique_ptr<log_reader> reader);

stats get_stats() const;

/**
* Evict readers. No new readers holding log to given offset can be added to
* the cache under range_lock_holder is destroyed
Expand All @@ -85,7 +94,6 @@ class readers_cache {
ss::future<range_lock_holder> evict_range(model::offset, model::offset);

ss::future<> stop();
readers_cache() noexcept = default;
readers_cache(readers_cache&&) = delete;
readers_cache(const readers_cache&) = delete;
readers_cache& operator=(readers_cache&&) = delete;
Expand All @@ -98,7 +106,7 @@ class readers_cache {
struct entry;
void touch(entry* e) {
e->last_used = ss::lowres_clock::now();
e->_hook.unlink();
_readers.erase(_readers.iterator_to(*e));
_readers.push_back(*e);
};

Expand All @@ -110,7 +118,7 @@ class readers_cache {
std::unique_ptr<log_reader> reader;
ss::lowres_clock::time_point last_used = ss::lowres_clock::now();
bool valid = true;
intrusive_list_hook _hook;
safe_intrusive_list_hook _hook;
};
/**
* RAII based entry lock guard, it touches entry in a cache and handles
Expand All @@ -128,7 +136,7 @@ class readers_cache {
, _cache(c) {}

~entry_guard() noexcept {
_e->_hook.unlink();
_cache->_in_use.erase(_cache->_in_use.iterator_to(*_e));
/**
* we only return reader to cache if it is reusable and wasn't
* requested to be evicted
Expand All @@ -147,13 +155,14 @@ class readers_cache {
};

ss::future<> maybe_evict();
ss::future<> dispose_entries(intrusive_list<entry, &entry::_hook>);
void dispose_in_background(intrusive_list<entry, &entry::_hook>);
ss::future<>
dispose_entries(uncounted_intrusive_list<entry, &entry::_hook>);
void dispose_in_background(uncounted_intrusive_list<entry, &entry::_hook>);
void dispose_in_background(entry* e);
ss::future<> wait_for_no_inuse_readers();
template<typename Predicate>
ss::future<> evict_if(Predicate predicate) {
intrusive_list<entry, &entry::_hook> to_evict;
uncounted_intrusive_list<entry, &entry::_hook> to_evict;
// lock reders to make sure no new readers will be added
for (auto it = _readers.begin(); it != _readers.end();) {
auto should_evict = predicate(*it);
Expand All @@ -174,6 +183,8 @@ class readers_cache {
}
co_await dispose_entries(std::move(to_evict));
}
inline bool over_size_limit() const;
void maybe_evict_size();

bool intersects_with_locked_range(model::offset, model::offset) const;

Expand All @@ -186,13 +197,16 @@ class readers_cache {
* when reader is in use we push it to _in_use intrusive list, otherwise it
* is stored in _readers.
*/
intrusive_list<entry, &entry::_hook> _readers;
intrusive_list<entry, &entry::_hook> _in_use;
counted_intrusive_list<entry, &entry::_hook> _readers;
counted_intrusive_list<entry, &entry::_hook> _in_use;
config::binding<size_t> _target_max_size;
/**
* When offset range is locked any new readers for given offset will not be
* added to cache.
*/
std::vector<offset_range> _locked_offset_ranges;
ss::condition_variable _in_use_reader_destroyed;

simple_time_jitter<ss::lowres_clock> _eviction_jitter;
};
} // namespace storage
2 changes: 1 addition & 1 deletion src/v/storage/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ rp_test(
timequery_test.cc
kvstore_test.cc
backlog_controller_test.cc
readers_cache_test.cc
concat_segment_reader_test.cc
offset_to_filepos_test.cc
offset_translator_state_test.cc
Expand Down Expand Up @@ -142,6 +141,7 @@ rp_test(
SOURCES
scoped_file_tracker_test.cc
segment_deduplication_test.cc
readers_cache_test.cc
LIBRARIES v::storage v::storage_test_utils v::gtest_main
LABELS storage
ARGS "-- -c 1"
Expand Down
Loading

0 comments on commit 3fd6014

Please sign in to comment.