Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CI failure in test_concurrent_append_flush #15271

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/v/storage/segment_appender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -697,9 +697,12 @@ std::ostream& operator<<(std::ostream& o, const segment_appender& a) {
return o << "{no_of_chunks:" << a._opts.number_of_chunks
<< ", closed:" << a._closed
<< ", fallocation_offset:" << a._fallocation_offset
<< ", stable_offset:" << a._stable_offset
<< ", flushed_offset:" << a._flushed_offset
<< ", committed_offset:" << a._committed_offset
<< ", inflight_writes:" << a._inflight.size()
<< ", dispatched_writes:" << a._inflight_dispatched
<< ", inflight:" << a._inflight.size()
<< ", dispatched:" << a._inflight_dispatched
<< ", merged:" << a._merged_writes
<< ", bytes_flush_pending:" << a._bytes_flush_pending << "}";
}

Expand Down
305 changes: 216 additions & 89 deletions src/v/storage/tests/log_segment_appender_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@

#include "bytes/iobuf.h"
#include "bytes/iostream.h"
#include "config/configuration.h"
#include "random/generators.h"
#include "seastarx.h"
#include "storage/chunk_cache.h"
#include "storage/segment_appender.h"
#include "storage/storage_resources.h"

#include <seastar/core/future.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/testing/thread_test_case.hh>

Expand All @@ -24,16 +27,38 @@
#include <seastar/util/defer.hh>
#include <seastar/util/later.hh>

#include <boost/test/results_collector.hpp>
#include <boost/test/tools/interface.hpp>
#include <boost/test/tools/old/interface.hpp>
#include <fmt/format.h>

#include <algorithm>
#include <chrono>
#include <random>
#include <ranges>
#include <string_view>
#include <vector>

using namespace storage; // NOLINT
using namespace std::chrono;

// miscellaneous captured info about a segment appender such as
// its offsets and other counters
struct segment_appender_info {
size_t committed_offset, stable_offset, flushed_offset, bytes_flush_pending,
inflight_dispatched;

ss::sstring to_string() const {
return fmt::format(
"co {} : so {} : fo {} : fbp {} : bfp {}",
committed_offset,
stable_offset,
flushed_offset,
bytes_flush_pending,
inflight_dispatched);
}
};

struct storage::segment_appender_test_accessor {
segment_appender& sa; // NOLINT

Expand All @@ -54,6 +79,14 @@ struct storage::segment_appender_test_accessor {
auto inflight_dispatched() { return sa._inflight_dispatched; }
auto total_dispatched() { return sa._dispatched_writes; }
auto total_merged() { return sa._merged_writes; }
auto info() {
return segment_appender_info{
.committed_offset = sa._committed_offset,
.stable_offset = sa._stable_offset,
.flushed_offset = sa._flushed_offset,
.bytes_flush_pending = sa._bytes_flush_pending,
.inflight_dispatched = sa._inflight_dispatched};
}
};

namespace {
Expand Down Expand Up @@ -284,121 +317,215 @@ SEASTAR_THREAD_TEST_CASE(
run_test_can_append_10MB_sequential_write_sequential_read(32_MiB);
}

/**
* @brief Returns true iff the current test is currently passing.
*
* From https://stackoverflow.com/a/22102899
* https://creativecommons.org/licenses/by-sa/3.0/
*/
bool current_test_passing() {
using namespace boost::unit_test;
test_case::id_t id = framework::current_test_case().p_id;
test_results rez = results_collector.results(id);
return rez.passed();
}

static void run_concurrent_append_flush(
size_t fallocate_size,
const size_t max_buf_size,
const size_t buf_count = 10000) {
size_t fallocate_size, const size_t max_buf_size, const size_t buf_count) {
auto filename = fmt::format(
"run_concurrent_append_flush_{}_{}.log", fallocate_size, max_buf_size);
auto f = open_file(filename);
storage::storage_resources resources(
config::mock_binding<size_t>(std::move(fallocate_size)));
auto appender = make_segment_appender(f, resources);
auto seg_file = open_file(filename);
storage::storage_resources resources(config::mock_binding(+fallocate_size));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: stray + in +fallocate_size

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andijcr - actually it's not stray, it's needed to make it an rvalue, because config::mock_binding is declared in a way that requires an rvalue argument if you want to rely on template parameter deduction, unfortunately. I do plan to fix this, but this is one workaround for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤯

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do plan to fix this

To clarify I mean mock_binding could be fixed to avoid this problem.

auto appender = make_segment_appender(seg_file, resources);
auto close = ss::defer([&appender] { appender.close().get(); });

std::vector<iobuf> bufs(buf_count);
unsigned char v = 1;
for (auto& buf : bufs) {
buf = make_iobuf_with_char(random_generators::get_int(max_buf_size), v);
if (++v == 0) {
v = 1;
auto seed = random_generators::get_int<size_t>();
std::default_random_engine rng(seed);

BOOST_TEST_CONTEXT(
"run_concurrent_append_flush, seed: "
<< seed << ", fallocate_size: " << fallocate_size
<< ", max_buf_size: " << max_buf_size << ", buf_count :" << buf_count) {
// the basic idea is we create a bunch of random buffers, then randomly
// perform actions on the segment appedner, like appending one of the
// random buffers, flushing the appender, yeilding, etc.

std::vector<iobuf> bufs(buf_count);
unsigned char v = 1;
std::uniform_int_distribution<size_t> bufdist(0, max_buf_size);
for (auto& buf : bufs) {
buf = make_iobuf_with_char(bufdist(rng), v);
if (++v == 0) {
v = 1;
}
}
}

// we do one of the following actions with equal probability,
// respecting the rule that any previous append must have resolved before a
// new one is invoked
enum action { APPEND, FLUSH, WAIT_APPEND, YIELD, LAST };

std::optional<ss::future<>> last_append;
std::vector<ss::future<>> futs;

size_t max_inflight = 0, max_dispatched = 0;

for (size_t buf_index = 0; buf_index < bufs.size();) {
auto next_action = (action)random_generators::get_int(LAST - 1);

max_inflight = std::max(
max_inflight, access(appender).inflight().size());
max_dispatched = std::max(
max_dispatched, access(appender).inflight_dispatched());

switch (next_action) {
case APPEND:
if (!last_append) { // only if the previous append has finished
// At each iteration we chose an action to perform with equal
// probability, respecting the rules of the appender, e.g., that any
// previous append must have resolved before a new one is invoked.
struct action {
enum kind_enum { APPEND, FLUSH, WAIT_APPEND, SLEEP, LAST = SLEEP };

action(int kind)
: kind{(kind_enum)kind} {}

kind_enum kind;
segment_appender_info info{};
ss::sstring extra;
ss::future<> flush_future
= ss::make_ready_future<>(); // if kind == FLUSH

ss::sstring to_string() const {
ss::sstring astr = [this]() {
switch (kind) {
case APPEND:
return "APPEND";
case FLUSH:
return "FLUSH";
case WAIT_APPEND:
return "WAIT_APPEND";
case SLEEP:
return "SLEEP";
}
vassert(false, "bad kind");
}();

return fmt::format("{:12}: {}", astr + extra, info.to_string());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related but one day we should bring magic_enum into the codebase or reimplement part of the functionality

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andijcr absolutely! I've never used magic_enum specifically but I definitely feel the pain of enum boilerplate every time I create a new enum in C++.

};
};

std::optional<ss::future<>> last_append;
std::vector<ss::future<>> futs;

size_t max_inflight = 0, max_dispatched = 0;

std::uniform_int_distribution<int> dist(0, action::LAST);

std::vector<action> all_actions;

for (size_t buf_index = 0; buf_index < bufs.size();) {
auto& current_action = all_actions.emplace_back(dist(rng));

max_inflight = std::max(
max_inflight, access(appender).inflight().size());
max_dispatched = std::max(
max_dispatched, access(appender).inflight_dispatched());

switch (current_action.kind) {
case action::APPEND:
if (last_append) {
// skip, as we already have an unawaited append in progress
all_actions.pop_back(); // delete action
continue;
}
last_append = appender.append(bufs[buf_index++]);
}
break;
case FLUSH:
futs.push_back(appender.flush());
break;
case WAIT_APPEND:
if (last_append) {
break;
case action::FLUSH:
futs.push_back(appender.flush());
break;
case action::WAIT_APPEND:
if (!last_append) {
// no append to wait for, skip
all_actions.pop_back();
andijcr marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
last_append->get();
last_append.reset();
break;
case action::SLEEP: {
// yield 99% of the time, sleep for 0-10 us the other 1%
auto sleep_us = std::uniform_int_distribution<int>(0, 1000)(rng)
* 1us;
(sleep_us > 10us ? ss::yield() : ss::sleep(sleep_us)).get();
current_action.extra += fmt::format(" ({} ms)", sleep_us);
break;
}
default:
BOOST_TEST_FAIL("bad action");
}
break;
case YIELD:
ss::yield().get();
break;
default:
BOOST_TEST_FAIL("bad action");
current_action.info = access(appender).info();
}
}

// check that we got some visible inflight and dispatched IOs
BOOST_CHECK_GT(max_inflight, 0);
BOOST_CHECK_GT(max_dispatched, 0);
// check that we got some visible inflight and dispatched IOs
BOOST_CHECK_GT(max_inflight, 0);
BOOST_CHECK_GT(max_dispatched, 0);

// now we need to wait for the last append, if any
if (last_append) {
last_append->get();
}
// now we need to wait for the last append, if any
if (last_append) {
last_append->get();
}

// do a final flush and wait for it
appender.flush().get();
// append a final flush, so we are in a known flushed state for the
// following checks
futs.emplace_back(appender.flush());

// now there should be nothing in-flight
BOOST_CHECK_EQUAL(access(appender).inflight_dispatched(), 0);

// NOTE this is not 0 always, see issue/13035
BOOST_TEST_INFO(fmt::format(
"appender inflight operations (should be empty): {}",
access(appender).inflight_str()));
BOOST_CHECK_EQUAL(access(appender).inflight().size(), 0);

// now we expect all the prior flush futures to be available
// we don't guarantee this is in the API currently but it is how it
// works currently and we might as well assert it
for (auto& f : futs) {
BOOST_REQUIRE(f.available());
f.get(); // propagate any exception
}
for (auto& f : futs) {
// get all the flush futures
// whp these are all available except possibly the last one
// (appended above) but this is not actually guaranteed, see
// redpanda#13035
f.get();
}
auto sa_state = fmt::format("{}", appender);

// check that we got some writes and merges (we don't know how many)
BOOST_CHECK_GT(access(appender).total_dispatched(), 0);
BOOST_CHECK_GT(access(appender).total_merged(), 0);

// verify the output
auto in = make_file_input_stream(f);
auto closefile = ss::defer([&] { in.close().get(); });
for (auto& buf : bufs) {
size_t sz = buf.size_bytes();
iobuf result = read_iobuf_exactly(in, sz).get();
BOOST_REQUIRE_EQUAL(buf, result);
// now there should be nothing in-flight
BOOST_CHECK_EQUAL(access(appender).inflight_dispatched(), 0);

BOOST_TEST_INFO(fmt::format(
"appender inflight operations (should be empty): {}",
access(appender).inflight_str()));
BOOST_CHECK_EQUAL(access(appender).inflight().size(), 0);

// check that we got some writes and merges (we don't know how many)
BOOST_CHECK_GT(access(appender).total_dispatched(), 0);
BOOST_CHECK_GT(access(appender).total_merged(), 0);

// now we expect all the prior flush futures to be available
// we don't guarantee this is in the API currently but it is how it
// works currently and we might as well assert it

// verify the output
auto in = make_file_input_stream(seg_file);
auto closefile = ss::defer([&] { in.close().get(); });
for (auto& buf : bufs) {
size_t sz = buf.size_bytes();
iobuf result = read_iobuf_exactly(in, sz).get();
BOOST_CHECK_EQUAL(buf, result);
}

if (!current_test_passing()) {
// test is about to fail, print details
// we jump through these hoops because I can't find a better way
// to defer generating the entire diagnosis string (which may be
// very large) until a test actually fails
std::string astr;
for (size_t aid = std::max(0, (int)all_actions.size() - 50);
aid < all_actions.size();
aid++) {
auto& ar = all_actions.at(aid);
astr += fmt::format("action[{}]: {}\n", aid, ar.to_string());
}
BOOST_TEST_INFO("actions: \n" << astr);

BOOST_TEST_INFO("last_append: " << last_append.has_value());
BOOST_TEST_INFO("fsize: " << futs.size());
BOOST_TEST_INFO("segment_appender: " << sa_state);
BOOST_TEST_FAIL("failed see above");
}
}
}

SEASTAR_THREAD_TEST_CASE(test_concurrent_append_flush) {
// we use smaller buffer counts for the large buffer size tests
// to keep the runtime manageable (less than ~2 seconds for this test)
run_concurrent_append_flush(16_KiB, 1);
run_concurrent_append_flush(16_KiB, 1000);

run_concurrent_append_flush(16_KiB, 1, 1000);
run_concurrent_append_flush(16_KiB, 1000, 100);
run_concurrent_append_flush(16_KiB, 20000, 100);

run_concurrent_append_flush(64_KiB, 1000);
run_concurrent_append_flush(64_KiB, 1000, 100);

run_concurrent_append_flush(32_MiB, 1000, 1000);
run_concurrent_append_flush(32_MiB, 1000, 100);
}

static void run_test_can_append_little_data(size_t fallocate_size) {
Expand Down