Skip to content

Commit

Permalink
Merge pull request redpanda-data#24053 from BenPope/core-7999-tiered-…
Browse files Browse the repository at this point in the history
…storage-sanctioning

[CORE-7999] Tiered storage sanctioning for topic creation
  • Loading branch information
BenPope authored Nov 7, 2024
2 parents fc3f05a + c6e9d02 commit b9059b2
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 38 deletions.
33 changes: 33 additions & 0 deletions src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,27 @@
#include <sstream>
#include <system_error>

namespace {

std::vector<std::string_view> get_enterprise_features(
const cluster::custom_assignable_topic_configuration& cfg) {
std::vector<std::string_view> features;
const auto si_disabled = model::shadow_indexing_mode::disabled;
if (
cfg.cfg.properties.shadow_indexing.value_or(si_disabled) != si_disabled) {
features.emplace_back("tiered storage");
}
if (cfg.is_recovery_enabled()) {
features.emplace_back("topic recovery");
}
if (cfg.is_read_replica()) {
features.emplace_back("remote read replicas");
}
return features;
}

} // namespace

namespace cluster {

topics_frontend::topics_frontend(
Expand Down Expand Up @@ -437,6 +458,18 @@ errc topics_frontend::validate_topic_configuration(
}
}

if (
_features.local().should_sanction()
&& is_user_topic(assignable_config.cfg.tp_ns)) {
if (auto f = get_enterprise_features(assignable_config); !f.empty()) {
vlog(
clusterlog.warn,
"An enterprise license is required to enable {}.",
f);
return errc::topic_invalid_config;
}
}

return errc::success;
}

Expand Down
120 changes: 82 additions & 38 deletions src/v/kafka/server/tests/create_topics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,17 @@
#include "container/fragmented_vector.h"
#include "kafka/protocol/create_topics.h"
#include "kafka/protocol/metadata.h"
#include "kafka/server/handlers/configs/config_response_utils.h"
#include "kafka/server/handlers/topics/types.h"
#include "redpanda/tests/fixture.h"
#include "resource_mgmt/io_priority.h"
#include "kafka/server/tests/topic_properties_helpers.h"

#include <seastar/core/smp.hh>
#include <seastar/core/sstring.hh>

#include <algorithm>
#include <limits>

// rougly equivalent to the test harness:
// https://github.com/apache/kafka/blob/8e16158/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
class create_topic_fixture : public redpanda_thread_fixture {
class create_topic_fixture : public topic_properties_test_fixture {
public:
kafka::create_topics_request make_req(
chunked_vector<kafka::creatable_topic> topics,
Expand Down Expand Up @@ -251,8 +248,6 @@ class create_topic_fixture : public redpanda_thread_fixture {
// This is rougly equivalent to
// https://github.com/apache/kafka/blob/8e16158/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala#L27
FIXTURE_TEST(create_topics, create_topic_fixture) {
wait_for_controller_leadership().get();

test_create_topic(make_req({make_topic("topic1")}));

// FIXME: these all crash with undefined behavior
Expand Down Expand Up @@ -306,17 +301,15 @@ FIXTURE_TEST(read_replica_and_remote_write, create_topic_fixture) {
client.connect().get();
auto resp = client.dispatch(make_req({topic}), kafka::api_version(2)).get();

BOOST_CHECK(
resp.data.topics[0].error_code == kafka::error_code::invalid_config);
BOOST_CHECK(
resp.data.topics[0].error_message
== "remote read and write are not supported for read replicas");
BOOST_CHECK(resp.data.topics[0].name == "topic1");
BOOST_CHECK_EQUAL(
resp.data.topics[0].error_code, kafka::error_code::invalid_config);
BOOST_CHECK_EQUAL(
resp.data.topics[0].error_message,
"remote read and write are not supported for read replicas");
BOOST_CHECK_EQUAL(resp.data.topics[0].name, "topic1");
}

FIXTURE_TEST(test_v5_validate_configs_resp, create_topic_fixture) {
wait_for_controller_leadership().get();

/// Test conditions in create_topic_fixture::verify_metadata will run
test_create_topic(
make_req({make_topic("topicA"), make_topic("topicB")}, true),
Expand Down Expand Up @@ -356,14 +349,14 @@ FIXTURE_TEST(create_multiple_topics_mixed_invalid, create_topic_fixture) {
.dispatch(make_req({topic_a, topic_b}), kafka::api_version(5))
.get();

BOOST_CHECK(resp.data.topics.size() == 2);
BOOST_REQUIRE_EQUAL(resp.data.topics.size(), 2);

BOOST_CHECK(resp.data.topics[0].error_code == kafka::error_code::none);
BOOST_CHECK(resp.data.topics[0].name == "topic_a");
BOOST_CHECK_EQUAL(resp.data.topics[0].error_code, kafka::error_code::none);
BOOST_CHECK_EQUAL(resp.data.topics[0].name, "topic_a");

BOOST_CHECK(
resp.data.topics[1].error_code == kafka::error_code::invalid_config);
BOOST_CHECK(resp.data.topics[1].name == "topic_b");
BOOST_CHECK_EQUAL(
resp.data.topics[1].error_code, kafka::error_code::invalid_config);
BOOST_CHECK_EQUAL(resp.data.topics[1].name, "topic_b");
}

FIXTURE_TEST(create_multiple_topics_all_invalid, create_topic_fixture) {
Expand Down Expand Up @@ -395,19 +388,19 @@ FIXTURE_TEST(create_multiple_topics_all_invalid, create_topic_fixture) {
kafka::api_version(5))
.get();

BOOST_CHECK(resp.data.topics.size() == 3);
BOOST_REQUIRE_EQUAL(resp.data.topics.size(), 3);

BOOST_CHECK(resp.data.topics[0].name == "topic_a");
BOOST_CHECK(
resp.data.topics[0].error_code == kafka::error_code::invalid_config);
BOOST_CHECK_EQUAL(resp.data.topics[0].name, "topic_a");
BOOST_CHECK_EQUAL(
resp.data.topics[0].error_code, kafka::error_code::invalid_config);

BOOST_CHECK(resp.data.topics[1].name == "topic_b");
BOOST_CHECK(
resp.data.topics[1].error_code == kafka::error_code::invalid_config);
BOOST_CHECK_EQUAL(resp.data.topics[1].name, "topic_b");
BOOST_CHECK_EQUAL(
resp.data.topics[1].error_code, kafka::error_code::invalid_config);

BOOST_CHECK(resp.data.topics[2].name == "topic_c");
BOOST_CHECK(
resp.data.topics[2].error_code == kafka::error_code::invalid_config);
BOOST_CHECK_EQUAL(resp.data.topics[2].name, "topic_c");
BOOST_CHECK_EQUAL(
resp.data.topics[2].error_code, kafka::error_code::invalid_config);
}

FIXTURE_TEST(invalid_boolean_property, create_topic_fixture) {
Expand All @@ -422,11 +415,12 @@ FIXTURE_TEST(invalid_boolean_property, create_topic_fixture) {
client.connect().get();
auto resp = client.dispatch(make_req({topic}), kafka::api_version(5)).get();

BOOST_CHECK(
resp.data.topics[0].error_code == kafka::error_code::invalid_config);
BOOST_CHECK(
resp.data.topics[0].error_message == "Configuration is invalid");
BOOST_CHECK(resp.data.topics[0].name == "topic1");
BOOST_REQUIRE_EQUAL(resp.data.topics.size(), 1);
BOOST_CHECK_EQUAL(
resp.data.topics[0].error_code, kafka::error_code::invalid_config);
BOOST_CHECK_EQUAL(
resp.data.topics[0].error_message, "Configuration is invalid");
BOOST_CHECK_EQUAL(resp.data.topics[0].name, "topic1");
}

FIXTURE_TEST(case_insensitive_boolean_property, create_topic_fixture) {
Expand All @@ -441,6 +435,56 @@ FIXTURE_TEST(case_insensitive_boolean_property, create_topic_fixture) {
client.connect().get();
auto resp = client.dispatch(make_req({topic}), kafka::api_version(5)).get();

BOOST_CHECK(resp.data.topics[0].error_code == kafka::error_code::none);
BOOST_CHECK(resp.data.topics[0].name == "topic1");
BOOST_CHECK_EQUAL(resp.data.topics[0].error_code, kafka::error_code::none);
BOOST_CHECK_EQUAL(resp.data.topics[0].name, "topic1");
}

FIXTURE_TEST(unlicensed_rejected, create_topic_fixture) {
revoke_license();
auto si_props = {
ss::sstring{kafka::topic_property_remote_read},
ss::sstring{kafka::topic_property_remote_write},
ss::sstring{kafka::topic_property_recovery},
ss::sstring{kafka::topic_property_read_replica},
};

auto client = make_kafka_client().get();
client.connect().get();

for (const auto& prop : si_props) {
auto topic = make_topic(
ssx::sformat("topic_{}", prop),
std::nullopt,
std::nullopt,
std::map<ss::sstring, ss::sstring>{{prop, "true"}});

auto resp
= client.dispatch(make_req({topic}), kafka::api_version(5)).get();

BOOST_CHECK_EQUAL(
resp.data.topics[0].error_code, kafka::error_code::invalid_config);
}
}

FIXTURE_TEST(unlicensed_reject_defaults, create_topic_fixture) {
revoke_license();

const std::initializer_list<std::string_view> si_configs{
lconf().cloud_storage_enable_remote_read.name(),
lconf().cloud_storage_enable_remote_write.name()};

auto client = make_kafka_client().get();
client.connect().get();

for (const auto& config : si_configs) {
update_cluster_config(config, "true");
auto topic = make_topic(ssx::sformat("topic_{}", config));

auto resp
= client.dispatch(make_req({topic}), kafka::api_version(5)).get();

BOOST_CHECK_EQUAL(
resp.data.topics[0].error_code, kafka::error_code::invalid_config);
update_cluster_config(config, "false");
}
}
33 changes: 33 additions & 0 deletions src/v/kafka/server/tests/topic_properties_helpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "config_frontend.h"
#include "redpanda/tests/fixture.h"

class topic_properties_test_fixture : public redpanda_thread_fixture {
public:
topic_properties_test_fixture() { wait_for_controller_leadership().get(); }

void revoke_license() {
app.controller->get_feature_table()
.invoke_on_all([](auto& ft) { return ft.revoke_license(); })
.get();
}

void update_cluster_config(std::string_view k, std::string_view v) {
app.controller->get_config_frontend()
.local()
.patch(
cluster::config_update_request{
.upsert{{ss::sstring{k}, ss::sstring{v}}},
},
model::timeout_clock::now() + 5s)
.get();
}
};

0 comments on commit b9059b2

Please sign in to comment.