Skip to content

Commit

Permalink
config rewirte tool
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Oct 3, 2023
1 parent 448cb2e commit e1be49e
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 1 deletion.
15 changes: 14 additions & 1 deletion src/v/raft/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,17 @@ rp_test(
# the args below are just to keep it fast
ARGS "-c 1 --duration=1 --runs=1 --memory=1G"
LABELS raft
)
)

set(srcs
config_rewrite_test.cc
)

rp_test(
UNIT_TEST
BINARY_NAME test_raft_cw
SOURCES ${srcs}
LIBRARIES v::seastar_testing_main v::raft v::storage_test_utils v::model_test_utils v::features
LABELS raft
ARGS "-- -c 8"
)
136 changes: 136 additions & 0 deletions src/v/raft/tests/config_rewrite_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@

// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "bytes/iobuf_parser.h"
#include "features/feature_table.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"
#include "model/record.h"
#include "model/record_batch_reader.h"
#include "model/record_batch_types.h"
#include "model/timeout_clock.h"
#include "net/unresolved_address.h"
#include "raft/consensus_utils.h"
#include "raft/group_configuration.h"
#include "raft/heartbeats.h"
#include "raft/types.h"
#include "random/generators.h"
#include "reflection/adl.h"
#include "storage/api.h"
#include "storage/kvstore.h"
#include "storage/log_manager.h"
#include "storage/ntp_config.h"
#include "storage/types.h"
#include "test_utils/randoms.h"

#include <seastar/core/circular_buffer.hh>
#include <seastar/core/future.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/sstring.hh>
#include <seastar/testing/thread_test_case.hh>

#include <boost/test/tools/old/interface.hpp>
#include <boost/test/unit_test_log.hpp>
#include <fmt/ostream.h>
#include <fmt/ranges.h>

#include <optional>
#include <utility>
#include <vector>

using namespace std::chrono_literals;

static ss::sstring data_dir = "/home/mmaslanka/dev/support/schemas_data";
SEASTAR_THREAD_TEST_CASE(rewrite_raft_configuration) {
ss::sharded<features::feature_table> ft;
storage::api st_api(
[]() {
return storage::kvstore_config(
1_MiB,
config::mock_binding(10ms),
data_dir,
storage::make_sanitized_file_config());
},
[]() {
return storage::log_config(
data_dir,
100_MiB,
ss::default_priority_class(),
storage::make_sanitized_file_config());
},
ft);
st_api.start().get();
model::revision_id target_rev(10);
auto src_log = st_api.log_mgr()
.manage(storage::ntp_config(
model::ntp(
model::kafka_namespace,
model::topic("schemas"),
model::partition_id(0)),
data_dir,
nullptr,
model::revision_id(34)))
.get();
auto target_log = st_api.log_mgr()
.manage(storage::ntp_config(
model::ntp(
model::kafka_namespace,
model::topic("schemas_override"),
model::partition_id(0)),
data_dir,
nullptr,
target_rev))
.get();
auto reader = src_log
->make_reader(storage::log_reader_config(
model::offset(0),
model::offset::max(),
ss::default_priority_class()))
.get();

auto batches = model::consume_reader_to_memory(
std::move(reader), model::no_timeout)
.get();
ss::circular_buffer<model::record_batch> transformed;
auto br = model::broker(
model::node_id(0),
net::unresolved_address("localhost", 9092),
net::unresolved_address("localhost", 33145),
std::nullopt,
model::broker_properties{});
raft::group_configuration new_cfg({br}, target_rev);

for (auto& b : batches) {
if (b.header().type == model::record_batch_type::raft_configuration) {
auto records = b.copy_records();
auto cfg = reflection::from_iobuf<raft::group_configuration>(
records.front().release_value());
fmt::print("config: {}\n", cfg);
auto cfg_batches
= raft::details::serialize_configuration_as_batches(cfg);
cfg_batches.begin()->set_term(b.term());
transformed.push_back(std::move(*cfg_batches.begin()));
} else {
transformed.push_back(std::move(b));
}
}

auto tr_reader = model::make_memory_record_batch_reader(
std::move(transformed));

auto appender = target_log->make_appender(storage::log_append_config{
.should_fsync = storage::log_append_config::fsync::yes,
.io_priority = ss::default_priority_class(),
});
tr_reader.consume(std::move(appender), model::no_timeout).get();
st_api.stop().get();
}

0 comments on commit e1be49e

Please sign in to comment.