Skip to content

Commit

Permalink
Merge pull request #15271 from travisdowns/td-segment-appender-flush-…
Browse files Browse the repository at this point in the history
…order

Fix CI failure in test_concurrent_append_flush
  • Loading branch information
travisdowns authored Dec 18, 2023
2 parents 9f3aeea + cc82d0d commit 3e44a5d
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 91 deletions.
7 changes: 5 additions & 2 deletions src/v/storage/segment_appender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -701,9 +701,12 @@ std::ostream& operator<<(std::ostream& o, const segment_appender& a) {
return o << "{no_of_chunks:" << a._opts.number_of_chunks
<< ", closed:" << a._closed
<< ", fallocation_offset:" << a._fallocation_offset
<< ", stable_offset:" << a._stable_offset
<< ", flushed_offset:" << a._flushed_offset
<< ", committed_offset:" << a._committed_offset
<< ", inflight_writes:" << a._inflight.size()
<< ", dispatched_writes:" << a._inflight_dispatched
<< ", inflight:" << a._inflight.size()
<< ", dispatched:" << a._inflight_dispatched
<< ", merged:" << a._merged_writes
<< ", bytes_flush_pending:" << a._bytes_flush_pending << "}";
}

Expand Down
305 changes: 216 additions & 89 deletions src/v/storage/tests/log_segment_appender_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@

#include "bytes/iobuf.h"
#include "bytes/iostream.h"
#include "config/configuration.h"
#include "random/generators.h"
#include "seastarx.h"
#include "storage/chunk_cache.h"
#include "storage/segment_appender.h"
#include "storage/storage_resources.h"

#include <seastar/core/future.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/testing/thread_test_case.hh>

Expand All @@ -24,16 +27,38 @@
#include <seastar/util/defer.hh>
#include <seastar/util/later.hh>

#include <boost/test/results_collector.hpp>
#include <boost/test/tools/interface.hpp>
#include <boost/test/tools/old/interface.hpp>
#include <fmt/format.h>

#include <algorithm>
#include <chrono>
#include <random>
#include <ranges>
#include <string_view>
#include <vector>

using namespace storage; // NOLINT
using namespace std::chrono;

// miscellaneous captured info about a segment appender such as
// its offsets and other counters
struct segment_appender_info {
size_t committed_offset, stable_offset, flushed_offset, bytes_flush_pending,
inflight_dispatched;

ss::sstring to_string() const {
return fmt::format(
"co {} : so {} : fo {} : fbp {} : bfp {}",
committed_offset,
stable_offset,
flushed_offset,
bytes_flush_pending,
inflight_dispatched);
}
};

struct storage::segment_appender_test_accessor {
segment_appender& sa; // NOLINT

Expand All @@ -54,6 +79,14 @@ struct storage::segment_appender_test_accessor {
auto inflight_dispatched() { return sa._inflight_dispatched; }
auto total_dispatched() { return sa._dispatched_writes; }
auto total_merged() { return sa._merged_writes; }
auto info() {
return segment_appender_info{
.committed_offset = sa._committed_offset,
.stable_offset = sa._stable_offset,
.flushed_offset = sa._flushed_offset,
.bytes_flush_pending = sa._bytes_flush_pending,
.inflight_dispatched = sa._inflight_dispatched};
}
};

namespace {
Expand Down Expand Up @@ -284,121 +317,215 @@ SEASTAR_THREAD_TEST_CASE(
run_test_can_append_10MB_sequential_write_sequential_read(32_MiB);
}

/**
* @brief Returns true iff the current test is currently passing.
*
* From https://stackoverflow.com/a/22102899
* https://creativecommons.org/licenses/by-sa/3.0/
*/
bool current_test_passing() {
using namespace boost::unit_test;
test_case::id_t id = framework::current_test_case().p_id;
test_results rez = results_collector.results(id);
return rez.passed();
}

static void run_concurrent_append_flush(
size_t fallocate_size,
const size_t max_buf_size,
const size_t buf_count = 10000) {
size_t fallocate_size, const size_t max_buf_size, const size_t buf_count) {
auto filename = fmt::format(
"run_concurrent_append_flush_{}_{}.log", fallocate_size, max_buf_size);
auto f = open_file(filename);
storage::storage_resources resources(
config::mock_binding<size_t>(std::move(fallocate_size)));
auto appender = make_segment_appender(f, resources);
auto seg_file = open_file(filename);
storage::storage_resources resources(config::mock_binding(+fallocate_size));
auto appender = make_segment_appender(seg_file, resources);
auto close = ss::defer([&appender] { appender.close().get(); });

std::vector<iobuf> bufs(buf_count);
unsigned char v = 1;
for (auto& buf : bufs) {
buf = make_iobuf_with_char(random_generators::get_int(max_buf_size), v);
if (++v == 0) {
v = 1;
auto seed = random_generators::get_int<size_t>();
std::default_random_engine rng(seed);

BOOST_TEST_CONTEXT(
"run_concurrent_append_flush, seed: "
<< seed << ", fallocate_size: " << fallocate_size
<< ", max_buf_size: " << max_buf_size << ", buf_count :" << buf_count) {
// the basic idea is we create a bunch of random buffers, then randomly
// perform actions on the segment appedner, like appending one of the
// random buffers, flushing the appender, yeilding, etc.

std::vector<iobuf> bufs(buf_count);
unsigned char v = 1;
std::uniform_int_distribution<size_t> bufdist(0, max_buf_size);
for (auto& buf : bufs) {
buf = make_iobuf_with_char(bufdist(rng), v);
if (++v == 0) {
v = 1;
}
}
}

// we do one of the following actions with equal probability,
// respecting the rule that any previous append must have resolved before a
// new one is invoked
enum action { APPEND, FLUSH, WAIT_APPEND, YIELD, LAST };

std::optional<ss::future<>> last_append;
std::vector<ss::future<>> futs;

size_t max_inflight = 0, max_dispatched = 0;

for (size_t buf_index = 0; buf_index < bufs.size();) {
auto next_action = (action)random_generators::get_int(LAST - 1);

max_inflight = std::max(
max_inflight, access(appender).inflight().size());
max_dispatched = std::max(
max_dispatched, access(appender).inflight_dispatched());

switch (next_action) {
case APPEND:
if (!last_append) { // only if the previous append has finished
// At each iteration we chose an action to perform with equal
// probability, respecting the rules of the appender, e.g., that any
// previous append must have resolved before a new one is invoked.
struct action {
enum kind_enum { APPEND, FLUSH, WAIT_APPEND, SLEEP, LAST = SLEEP };

action(int kind)
: kind{(kind_enum)kind} {}

kind_enum kind;
segment_appender_info info{};
ss::sstring extra;
ss::future<> flush_future
= ss::make_ready_future<>(); // if kind == FLUSH

ss::sstring to_string() const {
ss::sstring astr = [this]() {
switch (kind) {
case APPEND:
return "APPEND";
case FLUSH:
return "FLUSH";
case WAIT_APPEND:
return "WAIT_APPEND";
case SLEEP:
return "SLEEP";
}
vassert(false, "bad kind");
}();

return fmt::format("{:12}: {}", astr + extra, info.to_string());
};
};

std::optional<ss::future<>> last_append;
std::vector<ss::future<>> futs;

size_t max_inflight = 0, max_dispatched = 0;

std::uniform_int_distribution<int> dist(0, action::LAST);

std::vector<action> all_actions;

for (size_t buf_index = 0; buf_index < bufs.size();) {
auto& current_action = all_actions.emplace_back(dist(rng));

max_inflight = std::max(
max_inflight, access(appender).inflight().size());
max_dispatched = std::max(
max_dispatched, access(appender).inflight_dispatched());

switch (current_action.kind) {
case action::APPEND:
if (last_append) {
// skip, as we already have an unawaited append in progress
all_actions.pop_back(); // delete action
continue;
}
last_append = appender.append(bufs[buf_index++]);
}
break;
case FLUSH:
futs.push_back(appender.flush());
break;
case WAIT_APPEND:
if (last_append) {
break;
case action::FLUSH:
futs.push_back(appender.flush());
break;
case action::WAIT_APPEND:
if (!last_append) {
// no append to wait for, skip
all_actions.pop_back();
continue;
}
last_append->get();
last_append.reset();
break;
case action::SLEEP: {
// yield 99% of the time, sleep for 0-10 us the other 1%
auto sleep_us = std::uniform_int_distribution<int>(0, 1000)(rng)
* 1us;
(sleep_us > 10us ? ss::yield() : ss::sleep(sleep_us)).get();
current_action.extra += fmt::format(" ({} ms)", sleep_us);
break;
}
default:
BOOST_TEST_FAIL("bad action");
}
break;
case YIELD:
ss::yield().get();
break;
default:
BOOST_TEST_FAIL("bad action");
current_action.info = access(appender).info();
}
}

// check that we got some visible inflight and dispatched IOs
BOOST_CHECK_GT(max_inflight, 0);
BOOST_CHECK_GT(max_dispatched, 0);
// check that we got some visible inflight and dispatched IOs
BOOST_CHECK_GT(max_inflight, 0);
BOOST_CHECK_GT(max_dispatched, 0);

// now we need to wait for the last append, if any
if (last_append) {
last_append->get();
}
// now we need to wait for the last append, if any
if (last_append) {
last_append->get();
}

// do a final flush and wait for it
appender.flush().get();
// append a final flush, so we are in a known flushed state for the
// following checks
futs.emplace_back(appender.flush());

// now there should be nothing in-flight
BOOST_CHECK_EQUAL(access(appender).inflight_dispatched(), 0);

// NOTE this is not 0 always, see issue/13035
BOOST_TEST_INFO(fmt::format(
"appender inflight operations (should be empty): {}",
access(appender).inflight_str()));
BOOST_CHECK_EQUAL(access(appender).inflight().size(), 0);

// now we expect all the prior flush futures to be available
// we don't guarantee this is in the API currently but it is how it
// works currently and we might as well assert it
for (auto& f : futs) {
BOOST_REQUIRE(f.available());
f.get(); // propagate any exception
}
for (auto& f : futs) {
// get all the flush futures
// whp these are all available except possibly the last one
// (appended above) but this is not actually guaranteed, see
// redpanda#13035
f.get();
}
auto sa_state = fmt::format("{}", appender);

// check that we got some writes and merges (we don't know how many)
BOOST_CHECK_GT(access(appender).total_dispatched(), 0);
BOOST_CHECK_GT(access(appender).total_merged(), 0);

// verify the output
auto in = make_file_input_stream(f);
auto closefile = ss::defer([&] { in.close().get(); });
for (auto& buf : bufs) {
size_t sz = buf.size_bytes();
iobuf result = read_iobuf_exactly(in, sz).get();
BOOST_REQUIRE_EQUAL(buf, result);
// now there should be nothing in-flight
BOOST_CHECK_EQUAL(access(appender).inflight_dispatched(), 0);

BOOST_TEST_INFO(fmt::format(
"appender inflight operations (should be empty): {}",
access(appender).inflight_str()));
BOOST_CHECK_EQUAL(access(appender).inflight().size(), 0);

// check that we got some writes and merges (we don't know how many)
BOOST_CHECK_GT(access(appender).total_dispatched(), 0);
BOOST_CHECK_GT(access(appender).total_merged(), 0);

// now we expect all the prior flush futures to be available
// we don't guarantee this is in the API currently but it is how it
// works currently and we might as well assert it

// verify the output
auto in = make_file_input_stream(seg_file);
auto closefile = ss::defer([&] { in.close().get(); });
for (auto& buf : bufs) {
size_t sz = buf.size_bytes();
iobuf result = read_iobuf_exactly(in, sz).get();
BOOST_CHECK_EQUAL(buf, result);
}

if (!current_test_passing()) {
// test is about to fail, print details
// we jump through these hoops because I can't find a better way
// to defer generating the entire diagnosis string (which may be
// very large) until a test actually fails
std::string astr;
for (size_t aid = std::max(0, (int)all_actions.size() - 50);
aid < all_actions.size();
aid++) {
auto& ar = all_actions.at(aid);
astr += fmt::format("action[{}]: {}\n", aid, ar.to_string());
}
BOOST_TEST_INFO("actions: \n" << astr);

BOOST_TEST_INFO("last_append: " << last_append.has_value());
BOOST_TEST_INFO("fsize: " << futs.size());
BOOST_TEST_INFO("segment_appender: " << sa_state);
BOOST_TEST_FAIL("failed see above");
}
}
}

SEASTAR_THREAD_TEST_CASE(test_concurrent_append_flush) {
// we use smaller buffer counts for the large buffer size tests
// to keep the runtime manageable (less than ~2 seconds for this test)
run_concurrent_append_flush(16_KiB, 1);
run_concurrent_append_flush(16_KiB, 1000);

run_concurrent_append_flush(16_KiB, 1, 1000);
run_concurrent_append_flush(16_KiB, 1000, 100);
run_concurrent_append_flush(16_KiB, 20000, 100);

run_concurrent_append_flush(64_KiB, 1000);
run_concurrent_append_flush(64_KiB, 1000, 100);

run_concurrent_append_flush(32_MiB, 1000, 1000);
run_concurrent_append_flush(32_MiB, 1000, 100);
}

static void run_test_can_append_little_data(size_t fallocate_size) {
Expand Down

0 comments on commit 3e44a5d

Please sign in to comment.