Skip to content

Commit

Permalink
Merge pull request #24525 from michael-redpanda/sasl-plain/core-8407
Browse files Browse the repository at this point in the history
[CORE-8436]: Add support for SASL/PLAIN
  • Loading branch information
michael-redpanda authored Dec 19, 2024
2 parents c32a996 + b2d8fa9 commit 5809e0d
Show file tree
Hide file tree
Showing 27 changed files with 859 additions and 104 deletions.
52 changes: 40 additions & 12 deletions src/v/cluster/feature_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include "cluster/logger.h"
#include "cluster/members_table.h"
#include "config/configuration.h"
#include "config/endpoint_tls_config.h"
#include "config/node_config.h"
#include "config/tls_config.h"
#include "config/types.h"
#include "config/validators.h"
#include "features/enterprise_feature_messages.h"
Expand All @@ -34,9 +36,9 @@

#include <seastar/core/semaphore.hh>

#include <absl/algorithm/container.h>
#include <fmt/format.h>

#include <algorithm>
#include <stdexcept>

namespace cluster {
Expand Down Expand Up @@ -184,7 +186,7 @@ feature_manager::start(std::vector<model::node_id>&& cluster_founder_nodes) {
ssx::background = ssx::spawn_with_gate_then(_gate, [this] {
return ss::do_until(
[this] { return _as.local().abort_requested(); },
[this] { return maybe_log_license_check_info(); });
[this] { return maybe_log_periodic_reminders(); });
});

for (const model::node_id n : cluster_founder_nodes) {
Expand All @@ -208,7 +210,7 @@ feature_manager::report_enterprise_features() const {
const auto& cfg = config::shard_local_cfg();
const auto& node_cfg = config::node();
auto has_gssapi = [&cfg]() {
return absl::c_any_of(
return std::ranges::any_of(
cfg.sasl_mechanisms(), [](const auto& m) { return m == "GSSAPI"; });
};
auto has_oidc = []() {
Expand Down Expand Up @@ -274,32 +276,38 @@ feature_manager::report_enterprise_features() const {
return report;
}

ss::future<> feature_manager::maybe_log_license_check_info() {
auto license_check_retry = std::chrono::seconds(60 * 5);
ss::future<> feature_manager::maybe_log_periodic_reminders() {
auto reminder_period = std::chrono::seconds(60 * 5);
auto interval_override = std::getenv(
"__REDPANDA_LICENSE_CHECK_INTERVAL_SEC");
"__REDPANDA_PERIODIC_REMINDER_INTERVAL_SEC");
if (interval_override != nullptr) {
try {
license_check_retry = std::min(
std::chrono::seconds{license_check_retry},
reminder_period = std::min(
std::chrono::seconds{reminder_period},
std::chrono::seconds{std::stoi(interval_override)});
vlog(
clusterlog.info,
"Overriding default license log annoy interval to: {}s",
license_check_retry.count());
"Overriding default reminder period interval to: {}s",
reminder_period.count());
} catch (...) {
vlog(
clusterlog.error,
"Invalid license check interval override '{}'",
"Invalid reminder period interval override '{}'",
interval_override);
}
}
try {
co_await ss::sleep_abortable(license_check_retry, _as.local());
co_await ss::sleep_abortable(reminder_period, _as.local());
} catch (const ss::sleep_aborted&) {
// Shutting down - next iteration will drop out
co_return;
}

maybe_log_license_nag();
maybe_log_security_nag();
}

void feature_manager::maybe_log_license_nag() {
auto enterprise_features = report_enterprise_features();
if (enterprise_features.any()) {
if (_feature_table.local().should_sanction()) {
Expand All @@ -312,6 +320,26 @@ ss::future<> feature_manager::maybe_log_license_check_info() {
}
}

void feature_manager::maybe_log_security_nag() {
if (std::ranges::any_of(
config::shard_local_cfg().sasl_mechanisms(),
[](const auto& m) { return m == "PLAIN"; })) {
const bool any_tls_disabled
= std::ranges::any_of(
config::node_config().kafka_api_tls.value(),
[](const config::endpoint_tls_config& cfg) {
return !cfg.config.is_enabled();
})
|| config::node_config().kafka_api_tls.value().empty();

vlogl(
clusterlog,
any_tls_disabled ? ss::log_level::error : ss::log_level::warn,
"SASL/PLAIN is enabled. This is insecure and not recommended for "
"production.");
}
}

bool feature_manager::need_to_verify_enterprise_license() {
return features::is_major_version_upgrade(
_feature_table.local().get_active_version(),
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/feature_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ class feature_manager {
&& _am_controller_leader;
}

ss::future<> maybe_log_license_check_info();
ss::future<> maybe_log_periodic_reminders();
void maybe_log_license_nag();
void maybe_log_security_nag();
bool need_to_verify_enterprise_license();

// Compose a command struct, replicate it via raft and wait for apply.
Expand Down
2 changes: 1 addition & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,7 @@ configuration::configuration()
std::vector<ss::sstring>{"GSSAPI", "OAUTHBEARER"},
"sasl_mechanisms",
"A list of supported SASL mechanisms. Accepted values: `SCRAM`, "
"`GSSAPI`, `OAUTHBEARER`.",
"`GSSAPI`, `OAUTHBEARER`, `PLAIN`.",
meta{
.needs_restart = needs_restart::no,
.visibility = visibility::user,
Expand Down
8 changes: 7 additions & 1 deletion src/v/config/validators.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ std::optional<ss::sstring> validate_client_groups_byte_rate_quota(
std::optional<ss::sstring>
validate_sasl_mechanisms(const std::vector<ss::sstring>& mechanisms) {
constexpr auto supported = std::to_array<std::string_view>(
{"GSSAPI", "SCRAM", "OAUTHBEARER"});
{"GSSAPI", "SCRAM", "OAUTHBEARER", "PLAIN"});

// Validate results
for (const auto& m : mechanisms) {
Expand All @@ -124,6 +124,12 @@ validate_sasl_mechanisms(const std::vector<ss::sstring>& mechanisms) {
return ssx::sformat("'{}' is not a supported SASL mechanism", m);
}
}

if (mechanisms.size() == 1 && mechanisms[0] == "PLAIN") {
return "When PLAIN is enabled, at least one other mechanism must be "
"enabled";
}

return std::nullopt;
}

Expand Down
11 changes: 11 additions & 0 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
#include "security/gssapi_authenticator.h"
#include "security/mtls.h"
#include "security/oidc_authenticator.h"
#include "security/plain_authenticator.h"
#include "security/scram_algorithm.h"
#include "security/scram_authenticator.h"
#include "ssx/future-util.h"
Expand Down Expand Up @@ -720,6 +721,16 @@ ss::future<response_ptr> sasl_handshake_handler::handle(
}
}

if (supports("PLAIN")) {
supported_sasl_mechanisms.emplace_back(
security::plain_authenticator::name);
if (request.data.mechanism == security::plain_authenticator::name) {
ctx.sasl()->set_mechanism(
std::make_unique<security::plain_authenticator>(
ctx.credentials()));
}
}

if (supports("GSSAPI")) {
supported_sasl_mechanisms.emplace_back(
security::gssapi_authenticator::name);
Expand Down
2 changes: 2 additions & 0 deletions src/v/security/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ redpanda_cc_library(
"oidc_authenticator.cc",
"oidc_principal_mapping_applicator.cc",
"oidc_service.cc",
"plain_authenticator.cc",
"role.cc",
"scram_algorithm.cc",
"scram_authenticator.cc",
Expand Down Expand Up @@ -118,6 +119,7 @@ redpanda_cc_library(
"oidc_error.h",
"oidc_principal_mapping_applicator.h",
"oidc_service.h",
"plain_authenticator.h",
"role.h",
"role_store.h",
"sasl_authentication.h",
Expand Down
1 change: 1 addition & 0 deletions src/v/security/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ v_cc_library(
oidc_authenticator.cc
oidc_principal_mapping_applicator.cc
oidc_service.cc
plain_authenticator.cc
request_auth.cc
role.cc
scram_algorithm.cc
Expand Down
108 changes: 108 additions & 0 deletions src/v/security/plain_authenticator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#include "security/plain_authenticator.h"

#include "base/vlog.h"
#include "security/acl.h"
#include "security/credential_store.h"
#include "security/errc.h"
#include "security/logger.h"
#include "security/scram_authenticator.h"
#include "security/types.h"
#include "strings/utf8.h"

#include <seastar/util/defer.hh>

namespace security {

ss::future<result<bytes>> plain_authenticator::authenticate(bytes auth_bytes) {
constexpr size_t max_length{255};
constexpr std::string_view sep{"\0", 1};

auto make_failed = ss::defer([this] { _state = state::failed; });

if (_state != state::init) {
vlog(
seclog.warn,
"invalid plain state: {}",
_state == state::failed ? "failed" : "complete");
co_return errc::invalid_credentials;
}

auto auth_str = std::string_view(
reinterpret_cast<char*>(auth_bytes.data()), auth_bytes.size());

if (!is_valid_utf8(auth_str)) {
vlog(seclog.warn, "invalid utf8");
co_return errc::invalid_credentials;
}

// [authorization identity] not supported
if (!auth_str.starts_with(sep)) {
vlog(seclog.warn, "[authorization identity] not supported");
co_return errc::invalid_credentials;
}
auth_str = auth_str.substr(sep.length());
auto it = auth_str.find(sep);
if (std::string_view::npos == it) {
vlog(seclog.warn, "seperator not found");
co_return errc::invalid_credentials;
}

credential_user username{auth_str.substr(0, it)};
credential_password password{auth_str.substr(it + sep.length())};

if (username().empty()) {
vlog(seclog.warn, "username not found");
co_return errc::invalid_credentials;
}

if (username().length() > max_length) {
vlog(seclog.warn, "username too long");
co_return errc::invalid_credentials;
}

if (password().empty()) {
vlog(seclog.warn, "password not found");
co_return errc::invalid_credentials;
}

if (password().length() > max_length) {
vlog(seclog.warn, "password too long");
co_return errc::invalid_credentials;
}

_audit_user.name = username;
auto cred = _credentials.get<scram_credential>(username);
if (!cred.has_value()) {
vlog(seclog.warn, "credential not found");
co_return errc::invalid_credentials;
}

if (!validate_scram_credential(*cred, password).has_value()) {
vlog(seclog.warn, "scram authentication failed");
co_return errc::invalid_credentials;
}

vlog(seclog.trace, "Authenticated user {}", username);

make_failed.cancel();

_principal = cred->principal().value_or(
acl_principal{principal_type::user, username()});
_audit_user.name = _principal.name();
_audit_user.type_id = audit::user::type::user;

_state = state::complete;
co_return bytes{};
}

} // namespace security
62 changes: 62 additions & 0 deletions src/v/security/plain_authenticator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once
#include "security/acl.h"
#include "security/fwd.h"
#include "security/sasl_authentication.h"

namespace security {

/**
* @class plain_authenticator
* @brief A class that implements SASL/PLAIN authentication mechanism.
*
* This class is responsible for handling the SASL/PLAIN authentication process.
* It authenticates the username and password provided by the client against
* SCRAM users in the credential store.
*/
class plain_authenticator final : public sasl_mechanism {
public:
static constexpr const char* name = "PLAIN";

explicit plain_authenticator(credential_store& credentials)
: _credentials(credentials) {}

ss::future<result<bytes>> authenticate(bytes auth_bytes) override;

bool complete() const override { return _state == state::complete; }
bool failed() const override { return _state == state::failed; }

const acl_principal& principal() const override {
vassert(
_state == state::complete,
"Authentication id is not valid until auth process complete");
return _principal;
}

const audit::user& audit_user() const override { return _audit_user; }

const char* mechanism_name() const override { return "SASL-PLAIN"; }

private:
enum class state {
init,
complete,
failed,
};

state _state{state::init};
credential_store& _credentials;
acl_principal _principal;
security::audit::user _audit_user;
};

} // namespace security
Loading

0 comments on commit 5809e0d

Please sign in to comment.