Skip to content

Commit

Permalink
Merge pull request redpanda-data#18280 from bashtanov/base-id_allocat…
Browse files Browse the repository at this point in the history
…or_stm_test-on-raft_fixture

tests: base id_allocator_stm_test on raft_fixture
  • Loading branch information
mmaslankaprv authored May 13, 2024
2 parents a1ab255 + 3821433 commit cf0430c
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 91 deletions.
4 changes: 4 additions & 0 deletions src/v/base/include/base/outcome.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <system_error> // bring in std::error_code et al

// use the standard ones instead
#include <boost/outcome/basic_result.hpp>
#include <boost/outcome/std_outcome.hpp>
#include <boost/outcome/std_result.hpp>

Expand All @@ -39,3 +40,6 @@ using unchecked = outcome::std_result<R, S, outcome::policy::all_narrow>;
template<class R, class S = std::error_code>
using checked
= outcome::result<R, S, outcome::policy::throw_bad_result_access<S, void>>;

template<class T>
constexpr bool is_result_v = outcome::is_basic_result_v<T>;
16 changes: 8 additions & 8 deletions src/v/cluster/id_allocator_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ reset_id_handler::process(ss::shard_id shard, reset_id_allocator_request req) {
}
return stm->reset_next_id(id, timeout)
.then([](id_allocator_stm::stm_allocation_result r) {
if (r.raft_status != raft::errc::success) {
if (!r) {
vlog(
clusterlog.warn,
"allocate id stm call failed with {}",
raft::make_error_code(r.raft_status).message());
r.assume_error().message());
return reset_id_allocator_reply{errc::replication_error};
}

Expand All @@ -146,7 +146,7 @@ allocate_id_handler::process(ss::shard_id shard, allocate_id_request req) {
"can't get partition by {} ntp",
model::id_allocator_ntp);
return ss::make_ready_future<allocate_id_reply>(
allocate_id_reply{0, errc::topic_not_exists});
0, errc::topic_not_exists);
}
auto stm = partition->id_allocator_stm();
if (!stm) {
Expand All @@ -155,19 +155,19 @@ allocate_id_handler::process(ss::shard_id shard, allocate_id_request req) {
"can't get id allocator stm of the {}' partition",
model::id_allocator_ntp);
return ss::make_ready_future<allocate_id_reply>(
allocate_id_reply{0, errc::topic_not_exists});
0, errc::topic_not_exists);
}
return stm->allocate_id(timeout).then(
[](id_allocator_stm::stm_allocation_result r) {
if (r.raft_status != raft::errc::success) {
if (!r) {
vlog(
clusterlog.warn,
"allocate id stm call failed with {}",
raft::make_error_code(r.raft_status).message());
return allocate_id_reply{r.id, errc::replication_error};
r.assume_error().message());
return allocate_id_reply{-1, errc::replication_error};
}

return allocate_id_reply{r.id, errc::success};
return allocate_id_reply{r.assume_value(), errc::success};
});
});
}
Expand Down
30 changes: 16 additions & 14 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,28 @@ id_allocator_stm::reset_next_id(
return _lock
.with(
timeout, [this, id, timeout]() { return advance_state(id, timeout); })
.handle_exception_type([](const ss::semaphore_timed_out&) {
return stm_allocation_result{-1, raft::errc::timeout};
});
.handle_exception_type(
[](const ss::semaphore_timed_out&) -> stm_allocation_result {
return raft::make_error_code(raft::errc::timeout);
});
}

ss::future<id_allocator_stm::stm_allocation_result>
id_allocator_stm::advance_state(
int64_t value, model::timeout_clock::duration timeout) {
if (!co_await sync(timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}
if (value < _curr_id) {
co_return stm_allocation_result{_curr_id, raft::errc::success};
co_return _curr_id;
}
_curr_id = value;
auto success = co_await set_state(_curr_id + _batch_size, timeout);
if (!success) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}
_curr_batch = _batch_size;
co_return stm_allocation_result{_curr_id, raft::errc::success};
co_return stm_allocation_result(_curr_id);
}

ss::future<bool> id_allocator_stm::set_state(
Expand All @@ -123,31 +124,32 @@ ss::future<id_allocator_stm::stm_allocation_result>
id_allocator_stm::allocate_id(model::timeout_clock::duration timeout) {
return _lock
.with(timeout, [this, timeout]() { return do_allocate_id(timeout); })
.handle_exception_type([](const ss::semaphore_timed_out&) {
return stm_allocation_result{-1, raft::errc::timeout};
});
.handle_exception_type(
[](const ss::semaphore_timed_out&) -> stm_allocation_result {
return raft::make_error_code(raft::errc::timeout);
});
}

ss::future<id_allocator_stm::stm_allocation_result>
id_allocator_stm::do_allocate_id(model::timeout_clock::duration timeout) {
if (!co_await sync(timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}

if (_curr_batch == 0) {
_curr_id = _state;
if (!co_await set_state(_curr_id + _batch_size, timeout)) {
co_return stm_allocation_result{-1, raft::errc::timeout};
co_return raft::make_error_code(raft::errc::timeout);
}
_curr_batch = _batch_size;
}

auto id = _curr_id;
int64_t id = _curr_id;

_curr_id += 1;
_curr_batch -= 1;

co_return stm_allocation_result{id, raft::errc::success};
co_return stm_allocation_result{id};
}

ss::future<> id_allocator_stm::apply(const model::record_batch& b) {
Expand Down
5 changes: 1 addition & 4 deletions src/v/cluster/id_allocator_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ class id_allocator_stm final : public raft::persisted_stm<> {
public:
static constexpr std::string_view name = "id_allocator_stm";

struct stm_allocation_result {
int64_t id;
raft::errc raft_status{raft::errc::success};
};
using stm_allocation_result = result<int64_t>;

explicit id_allocator_stm(ss::logger&, raft::consensus*);

Expand Down
23 changes: 22 additions & 1 deletion src/v/cluster/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ set(srcs
distributed_kv_stm_tests.cc
rm_stm_tests.cc
rm_stm_compatibility_test.cc
id_allocator_stm_test.cc
local_monitor_test.cc
tx_compaction_tests.cc
producer_state_tests.cc
Expand All @@ -51,6 +50,28 @@ rp_test(
)
endforeach()

set(srcs
id_allocator_stm_test.cc
)

foreach(cluster_test_src ${srcs})
get_filename_component(test_name ${cluster_test_src} NAME_WE)
rp_test(
UNIT_TEST
GTEST
BINARY_NAME ${test_name}
SOURCES ${cluster_test_src}
LIBRARIES
v::gtest_main
v::storage_test_utils
v::cluster
v::http_test_utils
v::raft
v::raft_fixture
LABELS cluster
)
endforeach()

set(srcs
manual_log_deletion_test.cc
cluster_tests.cc
Expand Down
Loading

0 comments on commit cf0430c

Please sign in to comment.