Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: base id_allocator_stm_test on raft_fixture #18280

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/v/base/include/base/outcome.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <system_error> // bring in std::error_code et al

// use the standard ones instead
#include <boost/outcome/basic_result.hpp>
#include <boost/outcome/std_outcome.hpp>
#include <boost/outcome/std_result.hpp>

Expand All @@ -39,3 +40,6 @@ using unchecked = outcome::std_result<R, S, outcome::policy::all_narrow>;
template<class R, class S = std::error_code>
using checked
= outcome::result<R, S, outcome::policy::throw_bad_result_access<S, void>>;

template<class T>
constexpr bool is_result_v = outcome::is_basic_result_v<T>;
16 changes: 8 additions & 8 deletions src/v/cluster/id_allocator_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ reset_id_handler::process(ss::shard_id shard, reset_id_allocator_request req) {
}
return stm->reset_next_id(id, timeout)
.then([](id_allocator_stm::stm_allocation_result r) {
if (r.raft_status != raft::errc::success) {
if (!r) {
vlog(
clusterlog.warn,
"allocate id stm call failed with {}",
raft::make_error_code(r.raft_status).message());
r.assume_error().message());
return reset_id_allocator_reply{errc::replication_error};
}

Expand All @@ -146,7 +146,7 @@ allocate_id_handler::process(ss::shard_id shard, allocate_id_request req) {
"can't get partition by {} ntp",
model::id_allocator_ntp);
return ss::make_ready_future<allocate_id_reply>(
allocate_id_reply{0, errc::topic_not_exists});
0, errc::topic_not_exists);
}
auto stm = partition->id_allocator_stm();
if (!stm) {
Expand All @@ -155,19 +155,19 @@ allocate_id_handler::process(ss::shard_id shard, allocate_id_request req) {
"can't get id allocator stm of the {}' partition",
model::id_allocator_ntp);
return ss::make_ready_future<allocate_id_reply>(
allocate_id_reply{0, errc::topic_not_exists});
0, errc::topic_not_exists);
}
return stm->allocate_id(timeout).then(
[](id_allocator_stm::stm_allocation_result r) {
if (r.raft_status != raft::errc::success) {
if (!r) {
vlog(
clusterlog.warn,
"allocate id stm call failed with {}",
raft::make_error_code(r.raft_status).message());
return allocate_id_reply{r.id, errc::replication_error};
r.assume_error().message());
return allocate_id_reply{-1, errc::replication_error};
}

return allocate_id_reply{r.id, errc::success};
return allocate_id_reply{r.assume_value(), errc::success};
});
});
}
Expand Down
30 changes: 16 additions & 14 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,28 @@ id_allocator_stm::reset_next_id(
return _lock
.with(
timeout, [this, id, timeout]() { return advance_state(id, timeout); })
.handle_exception_type([](const ss::semaphore_timed_out&) {
return stm_allocation_result{-1, raft::errc::timeout};
});
.handle_exception_type(
[](const ss::semaphore_timed_out&) -> stm_allocation_result {
return raft::make_error_code(raft::errc::timeout);
});
}

ss::future<id_allocator_stm::stm_allocation_result>
id_allocator_stm::advance_state(
int64_t value, model::timeout_clock::duration timeout) {
if (!co_await sync(timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}
if (value < _curr_id) {
co_return stm_allocation_result{_curr_id, raft::errc::success};
co_return _curr_id;
}
_curr_id = value;
auto success = co_await set_state(_curr_id + _batch_size, timeout);
if (!success) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}
_curr_batch = _batch_size;
co_return stm_allocation_result{_curr_id, raft::errc::success};
co_return stm_allocation_result(_curr_id);
}

ss::future<bool> id_allocator_stm::set_state(
Expand All @@ -123,31 +124,32 @@ ss::future<id_allocator_stm::stm_allocation_result>
id_allocator_stm::allocate_id(model::timeout_clock::duration timeout) {
return _lock
.with(timeout, [this, timeout]() { return do_allocate_id(timeout); })
.handle_exception_type([](const ss::semaphore_timed_out&) {
return stm_allocation_result{-1, raft::errc::timeout};
});
.handle_exception_type(
[](const ss::semaphore_timed_out&) -> stm_allocation_result {
return raft::make_error_code(raft::errc::timeout);
});
}

ss::future<id_allocator_stm::stm_allocation_result>
id_allocator_stm::do_allocate_id(model::timeout_clock::duration timeout) {
if (!co_await sync(timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}

if (_curr_batch == 0) {
_curr_id = _state;
if (!co_await set_state(_curr_id + _batch_size, timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}
_curr_batch = _batch_size;
}

auto id = _curr_id;
int64_t id = _curr_id;

_curr_id += 1;
_curr_batch -= 1;

co_return stm_allocation_result{id, raft::errc::success};
co_return stm_allocation_result{id};
}

ss::future<> id_allocator_stm::apply(const model::record_batch& b) {
Expand Down
5 changes: 1 addition & 4 deletions src/v/cluster/id_allocator_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ class id_allocator_stm final : public raft::persisted_stm<> {
public:
static constexpr std::string_view name = "id_allocator_stm";

struct stm_allocation_result {
int64_t id;
raft::errc raft_status{raft::errc::success};
};
using stm_allocation_result = result<int64_t>;

explicit id_allocator_stm(ss::logger&, raft::consensus*);

Expand Down
23 changes: 22 additions & 1 deletion src/v/cluster/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ set(srcs
distributed_kv_stm_tests.cc
rm_stm_tests.cc
rm_stm_compatibility_test.cc
id_allocator_stm_test.cc
local_monitor_test.cc
tx_compaction_tests.cc
producer_state_tests.cc
Expand All @@ -51,6 +50,28 @@ rp_test(
)
endforeach()

set(srcs
id_allocator_stm_test.cc
)

foreach(cluster_test_src ${srcs})
get_filename_component(test_name ${cluster_test_src} NAME_WE)
rp_test(
UNIT_TEST
GTEST
BINARY_NAME ${test_name}
SOURCES ${cluster_test_src}
LIBRARIES
v::gtest_main
v::storage_test_utils
v::cluster
v::http_test_utils
v::raft
v::raft_fixture
LABELS cluster
)
endforeach()

set(srcs
manual_log_deletion_test.cc
cluster_tests.cc
Expand Down
Loading
Loading