Skip to content

Commit

Permalink
Merge pull request #24514 from ballard26/kv-cache
Browse files Browse the repository at this point in the history
Add a general key-value cache to utils
  • Loading branch information
ballard26 authored Dec 18, 2024
2 parents 2bc1658 + f98cb9c commit 925707c
Show file tree
Hide file tree
Showing 15 changed files with 408 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/v/io/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ redpanda_cc_library(
"scheduler.cc",
],
hdrs = [
"cache.h",
"interval_map.h",
"io_queue.h",
"page.h",
Expand All @@ -33,6 +32,7 @@ redpanda_cc_library(
"//src/v/container:intrusive",
"//src/v/ssx:future_util",
"//src/v/ssx:semaphore",
"//src/v/utils:s3_fifo",
"@abseil-cpp//absl/container:btree",
"@abseil-cpp//absl/container:flat_hash_map",
"@boost//:intrusive",
Expand Down
2 changes: 1 addition & 1 deletion src/v/io/page.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ page::page(uint64_t offset, seastar::temporary_buffer<char> data)
page::page(
uint64_t offset,
seastar::temporary_buffer<char> data,
const class cache_hook& hook)
const class utils::s3_fifo::cache_hook& hook)
: cache_hook(hook)
, offset_(offset)
, size_(data.size())
Expand Down
6 changes: 3 additions & 3 deletions src/v/io/page.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#pragma once

#include "container/intrusive_list_helpers.h"
#include "io/cache.h"
#include "utils/s3_fifo.h"

#include <seastar/core/future.hh>
#include <seastar/core/shared_ptr.hh>
Expand Down Expand Up @@ -40,7 +40,7 @@ class page : public seastar::enable_lw_shared_from_this<page> {
page(
uint64_t offset,
seastar::temporary_buffer<char> data,
const cache_hook& hook);
const utils::s3_fifo::cache_hook& hook);

page(const page&) = delete;
page& operator=(const page&) = delete;
Expand Down Expand Up @@ -123,7 +123,7 @@ class page : public seastar::enable_lw_shared_from_this<page> {
* Page cache entry intrusive list hook.
*/
// NOLINTNEXTLINE(*-non-private-member-variables-in-classes)
cache_hook cache_hook;
utils::s3_fifo::cache_hook cache_hook;

struct waiter {
intrusive_list_hook waiter;
Expand Down
5 changes: 3 additions & 2 deletions src/v/io/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
* by the Apache License, Version 2.0
*/
#pragma once
#include "io/cache.h"
#include "io/page.h"
#include "utils/s3_fifo.h"

namespace experimental::io {

Expand All @@ -26,7 +26,8 @@ class page_cache {
size_t operator()(const page&) noexcept;
};

using cache_type = cache<page, &page::cache_hook, evict, cost>;
using cache_type
= utils::s3_fifo::cache<page, &page::cache_hook, evict, cost>;

public:
using config = cache_type::config;
Expand Down
4 changes: 2 additions & 2 deletions src/v/io/pager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ seastar::future<> pager::close() noexcept {
}
}

seastar::lw_shared_ptr<page>
pager::alloc_page(uint64_t offset, std::optional<cache_hook> hook) noexcept {
seastar::lw_shared_ptr<page> pager::alloc_page(
uint64_t offset, std::optional<utils::s3_fifo::cache_hook> hook) noexcept {
auto buf = seastar::temporary_buffer<char>::aligned(page_size, page_size);
if (hook.has_value()) {
return seastar::make_lw_shared<page>(
Expand Down
5 changes: 3 additions & 2 deletions src/v/io/pager.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
*/
#pragma once

#include "io/cache.h"
#include "io/page_set.h"
#include "io/scheduler.h"
#include "utils/s3_fifo.h"

#include <seastar/core/future.hh>
#include <seastar/core/temporary_buffer.hh>
Expand Down Expand Up @@ -92,7 +92,8 @@ class pager {

private:
static seastar::lw_shared_ptr<page> alloc_page(
uint64_t offset, std::optional<cache_hook> hook = std::nullopt) noexcept;
uint64_t offset,
std::optional<utils::s3_fifo::cache_hook> hook = std::nullopt) noexcept;

/*
* Read a page from the underlying file.
Expand Down
10 changes: 0 additions & 10 deletions src/v/io/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,6 @@ redpanda_cc_gtest(
],
)

redpanda_cc_gtest(
name = "cache_test",
timeout = "short",
srcs = ["cache_test.cc"],
deps = [
"//src/v/io",
"//src/v/test_utils:gtest",
],
)

redpanda_cc_gtest(
name = "interval_map_test",
timeout = "short",
Expand Down
1 change: 0 additions & 1 deletion src/v/io/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ rp_test(
SOURCES
common.cc
common_test.cc
cache_test.cc
interval_map_test.cc
persistence_test.cc
page_test.cc
Expand Down
25 changes: 25 additions & 0 deletions src/v/utils/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -652,3 +652,28 @@ redpanda_cc_library(
"@seastar",
],
)

redpanda_cc_library(
name = "s3_fifo",
hdrs = [
"s3_fifo.h",
],
include_prefix = "utils",
deps = [
"@boost//:intrusive",
"@fmt",
],
)

redpanda_cc_library(
name = "chunked_kv_cache",
hdrs = [
"chunked_kv_cache.h",
],
include_prefix = "utils",
deps = [
"//src/v/container:chunked_hash_map",
"//src/v/utils:s3_fifo",
"@boost//:intrusive",
],
)
195 changes: 195 additions & 0 deletions src/v/utils/chunked_kv_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "container/chunked_hash_map.h"
#include "utils/s3_fifo.h"

#include <seastar/core/shared_ptr.hh>
#include <seastar/util/optimized_optional.hh>

#include <boost/intrusive/list.hpp>
#include <boost/intrusive/options.hpp>
#include <boost/intrusive/parent_from_member.hpp>

namespace utils {

/**
* A basic key-value cache implementation built on top of the s3_fifo::cache.
*/
template<
typename Key,
typename Value,
typename Hash = std::conditional_t<
detail::has_absl_hash<Key>,
detail::avalanching_absl_hash<Key>,
ankerl::unordered_dense::hash<Key>>,
typename EqualTo = std::equal_to<Key>>
class chunked_kv_cache {
struct cached_value;
struct evict;
using cache_t = s3_fifo::cache<
cached_value,
&cached_value::hook,
evict,
s3_fifo::default_cache_cost>;

public:
using config = cache_t::config;

explicit chunked_kv_cache(config config)
: _cache{config, evict{*this}} {}

~chunked_kv_cache() noexcept = default;

// These contructors need to be deleted to ensure a stable `this` pointer.
chunked_kv_cache(chunked_kv_cache&&) = delete;
chunked_kv_cache& operator=(chunked_kv_cache&&) noexcept = delete;
chunked_kv_cache(const chunked_kv_cache&) = delete;
chunked_kv_cache& operator=(const chunked_kv_cache&) noexcept = delete;

/**
* Inserts a value for a given key into the cache.
*
* Returns true if the value was inserted and false if there was already a
* value for the given key in the cache.
*/
bool try_insert(const Key& key, ss::shared_ptr<Value> val);

/**
* Gets the key's corresponding value from the cache.
*
* Returns std::nullopt if the key doesn't have a value in the cache.
*/
ss::optimized_optional<ss::shared_ptr<Value>> get_value(const Key& key);

using cache_stat = struct cache_t::stat;
/**
* Cache statistics.
*/
struct stat : public cache_stat {
/// Current size of the cache index.
size_t index_size;
};

/**
* Returns the current cache statistics.
*/
[[nodiscard]] stat stat() const noexcept;

private:
using ghost_hook_t = boost::intrusive::list_member_hook<
boost::intrusive::link_mode<boost::intrusive::safe_link>>;

struct cached_value {
Key key;
ss::shared_ptr<Value> value;
s3_fifo::cache_hook hook;
ghost_hook_t ghost_hook;
};

using entry_t = std::unique_ptr<cached_value>;
using ghost_fifo_t = boost::intrusive::list<
cached_value,
boost::intrusive::
member_hook<cached_value, ghost_hook_t, &cached_value::ghost_hook>>;

chunked_hash_map<Key, entry_t, Hash, EqualTo> _map;
cache_t _cache;
ghost_fifo_t _ghost_fifo;

void gc_ghost_fifo();
};

template<typename Key, typename Value, typename Hash, typename EqualTo>
struct chunked_kv_cache<Key, Value, Hash, EqualTo>::evict {
chunked_kv_cache& kv_c;

bool operator()(cached_value& e) noexcept {
e.value = nullptr;
kv_c._ghost_fifo.push_back(e);
return true;
}
};

template<typename Key, typename Value, typename Hash, typename EqualTo>
bool chunked_kv_cache<Key, Value, Hash, EqualTo>::try_insert(
const Key& key, ss::shared_ptr<Value> val) {
gc_ghost_fifo();

auto e_it = _map.find(key);
if (e_it == _map.end()) {
auto [e_it, succ] = _map.try_emplace(
key, std::make_unique<cached_value>(key, std::move(val)));
if (!succ) {
return false;
}

_cache.insert(*e_it->second);
return true;
}

auto& entry = *e_it->second;
if (entry.hook.evicted()) {
entry.value = std::move(val);
_ghost_fifo.erase(_ghost_fifo.iterator_to(entry));
_cache.insert(entry);
return true;
}

return false;
}

template<typename Key, typename Value, typename Hash, typename EqualTo>
ss::optimized_optional<ss::shared_ptr<Value>>
chunked_kv_cache<Key, Value, Hash, EqualTo>::get_value(const Key& key) {
gc_ghost_fifo();

auto e_it = _map.find(key);
if (e_it == _map.end()) {
return std::nullopt;
}

auto& entry = *e_it->second;
if (entry.hook.evicted()) {
return std::nullopt;
}

entry.hook.touch();
return entry.value;
}

template<typename Key, typename Value, typename Hash, typename EqualTo>
void chunked_kv_cache<Key, Value, Hash, EqualTo>::gc_ghost_fifo() {
for (auto it = _ghost_fifo.begin(); it != _ghost_fifo.end();) {
auto& entry = *it;
if (_cache.ghost_queue_contains(entry)) {
// The ghost queue is in fifo-order so any entry that comes after an
// entry that hasn't been evicted will also not be evicted.
return;
}

it = _ghost_fifo.erase(it);
_map.erase(entry.key);
}
}
template<typename Key, typename Value, typename Hash, typename EqualTo>
struct chunked_kv_cache<Key, Value, Hash, EqualTo>::stat
chunked_kv_cache<Key, Value, Hash, EqualTo>::stat() const noexcept {
struct stat s {
_cache.stat()
};
s.index_size = _map.size();
return s;
}

} // namespace utils
14 changes: 7 additions & 7 deletions src/v/io/cache.h → src/v/utils/s3_fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
* @{
*/

namespace experimental::io {
namespace utils::s3_fifo {

namespace testing_details {
class cache_hook_accessor;
Expand Down Expand Up @@ -674,18 +674,18 @@ bool cache<T, Hook, Evictor, Cost>::evict_main() noexcept {
* @}
*/

} // namespace experimental::io
} // namespace utils::s3_fifo

template<
typename T,
experimental::io::cache_hook T::*Hook,
experimental::io::cache_evictor<T> Evictor,
experimental::io::cache_cost<T> Cost>
struct fmt::formatter<experimental::io::cache<T, Hook, Evictor, Cost>>
utils::s3_fifo::cache_hook T::*Hook,
utils::s3_fifo::cache_evictor<T> Evictor,
utils::s3_fifo::cache_cost<T> Cost>
struct fmt::formatter<utils::s3_fifo::cache<T, Hook, Evictor, Cost>>
: fmt::formatter<std::string_view> {
template<typename FormatContext>
auto format(
const experimental::io::cache<T, Hook, Evictor, Cost>& cache,
const utils::s3_fifo::cache<T, Hook, Evictor, Cost>& cache,
FormatContext& ctx) const {
const auto stat = cache.stat();
return fmt::format_to(
Expand Down
Loading

0 comments on commit 925707c

Please sign in to comment.