Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-8436]: Add support for SASL/PLAIN #24525

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 40 additions & 12 deletions src/v/cluster/feature_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include "cluster/logger.h"
#include "cluster/members_table.h"
#include "config/configuration.h"
#include "config/endpoint_tls_config.h"
#include "config/node_config.h"
#include "config/tls_config.h"
#include "config/types.h"
#include "config/validators.h"
#include "features/enterprise_feature_messages.h"
Expand All @@ -34,9 +36,9 @@

#include <seastar/core/semaphore.hh>

#include <absl/algorithm/container.h>
#include <fmt/format.h>

#include <algorithm>
#include <stdexcept>

namespace cluster {
Expand Down Expand Up @@ -184,7 +186,7 @@ feature_manager::start(std::vector<model::node_id>&& cluster_founder_nodes) {
ssx::background = ssx::spawn_with_gate_then(_gate, [this] {
return ss::do_until(
[this] { return _as.local().abort_requested(); },
[this] { return maybe_log_license_check_info(); });
[this] { return maybe_log_periodic_reminders(); });
});

for (const model::node_id n : cluster_founder_nodes) {
Expand All @@ -208,7 +210,7 @@ feature_manager::report_enterprise_features() const {
const auto& cfg = config::shard_local_cfg();
const auto& node_cfg = config::node();
auto has_gssapi = [&cfg]() {
return absl::c_any_of(
return std::ranges::any_of(
cfg.sasl_mechanisms(), [](const auto& m) { return m == "GSSAPI"; });
};
auto has_oidc = []() {
Expand Down Expand Up @@ -274,32 +276,38 @@ feature_manager::report_enterprise_features() const {
return report;
}

ss::future<> feature_manager::maybe_log_license_check_info() {
auto license_check_retry = std::chrono::seconds(60 * 5);
ss::future<> feature_manager::maybe_log_periodic_reminders() {
auto reminder_period = std::chrono::seconds(60 * 5);
auto interval_override = std::getenv(
"__REDPANDA_LICENSE_CHECK_INTERVAL_SEC");
"__REDPANDA_PERIODIC_REMINDER_INTERVAL_SEC");
if (interval_override != nullptr) {
try {
license_check_retry = std::min(
std::chrono::seconds{license_check_retry},
reminder_period = std::min(
std::chrono::seconds{reminder_period},
std::chrono::seconds{std::stoi(interval_override)});
vlog(
clusterlog.info,
"Overriding default license log annoy interval to: {}s",
license_check_retry.count());
"Overriding default reminder period interval to: {}s",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth extracting out the duplicate _license_nag_is_set functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I missed changing those as well

reminder_period.count());
} catch (...) {
vlog(
clusterlog.error,
"Invalid license check interval override '{}'",
"Invalid reminder period interval override '{}'",
interval_override);
}
}
try {
co_await ss::sleep_abortable(license_check_retry, _as.local());
co_await ss::sleep_abortable(reminder_period, _as.local());
} catch (const ss::sleep_aborted&) {
// Shutting down - next iteration will drop out
co_return;
}

maybe_log_license_nag();
maybe_log_security_nag();
}

void feature_manager::maybe_log_license_nag() {
auto enterprise_features = report_enterprise_features();
if (enterprise_features.any()) {
if (_feature_table.local().should_sanction()) {
Expand All @@ -312,6 +320,26 @@ ss::future<> feature_manager::maybe_log_license_check_info() {
}
}

void feature_manager::maybe_log_security_nag() {
if (std::ranges::any_of(
pgellert marked this conversation as resolved.
Show resolved Hide resolved
config::shard_local_cfg().sasl_mechanisms(),
[](const auto& m) { return m == "PLAIN"; })) {
const bool any_tls_disabled
= std::ranges::any_of(
config::node_config().kafka_api_tls.value(),
[](const config::endpoint_tls_config& cfg) {
return !cfg.config.is_enabled();
})
|| config::node_config().kafka_api_tls.value().empty();

vlogl(
clusterlog,
any_tls_disabled ? ss::log_level::error : ss::log_level::warn,
"SASL/PLAIN is enabled. This is insecure and not recommended for "
"production.");
}
}

bool feature_manager::need_to_verify_enterprise_license() {
return features::is_major_version_upgrade(
_feature_table.local().get_active_version(),
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/feature_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ class feature_manager {
&& _am_controller_leader;
}

ss::future<> maybe_log_license_check_info();
ss::future<> maybe_log_periodic_reminders();
void maybe_log_license_nag();
void maybe_log_security_nag();
bool need_to_verify_enterprise_license();

// Compose a command struct, replicate it via raft and wait for apply.
Expand Down
2 changes: 1 addition & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,7 @@ configuration::configuration()
std::vector<ss::sstring>{"GSSAPI", "OAUTHBEARER"},
"sasl_mechanisms",
"A list of supported SASL mechanisms. Accepted values: `SCRAM`, "
"`GSSAPI`, `OAUTHBEARER`.",
"`GSSAPI`, `OAUTHBEARER`, `PLAIN`.",
meta{
.needs_restart = needs_restart::no,
.visibility = visibility::user,
Expand Down
8 changes: 7 additions & 1 deletion src/v/config/validators.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ std::optional<ss::sstring> validate_client_groups_byte_rate_quota(
std::optional<ss::sstring>
validate_sasl_mechanisms(const std::vector<ss::sstring>& mechanisms) {
constexpr auto supported = std::to_array<std::string_view>(
{"GSSAPI", "SCRAM", "OAUTHBEARER"});
{"GSSAPI", "SCRAM", "OAUTHBEARER", "PLAIN"});

// Validate results
for (const auto& m : mechanisms) {
Expand All @@ -124,6 +124,12 @@ validate_sasl_mechanisms(const std::vector<ss::sstring>& mechanisms) {
return ssx::sformat("'{}' is not a supported SASL mechanism", m);
}
}

if (mechanisms.size() == 1 && mechanisms[0] == "PLAIN") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the condition supposed to be "at least one other" or "at least SCRAM as well"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the condition supposed to be "at least one other" or "at least SCRAM as well"?

From SUCCESS-02: "We will not support a cluster configured with only a PLAIN mechanism", which I understand to mean that at least one other secure mechanism is required, which includes all of the others; SCRAM,OAUTHBEARER, GSSAPI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The preceding sentence of SUCCESS-02 is "Users only need to create a single account to support either SCRAM or PLAIN authentication." which makes me think that it is SCRAM specifically that's also required. I think it's worth asking @deniscoady to see what he meant here. (I'd comment on the PRD, but I don't have access.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Users only need to create a single account to support either SCRAM or PLAIN authentication.",

SCRAM Users are the single source of truth for users which are managed internally. Historically I've pushed for them to be named "SCRAM Uses" over "SASL Users" (c.f., SASL/OAUTHBEARER, SASL/GSSAPI, which are managed externally), but although they are created as SCRAM users, they are supported via HTTP Basic Auth, and now, via SASL/PLAIN.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I've got that. I guess what I'm trying to say is that it's not clear to me is whether the goal is:
(a) For Redpanda to be opinionated and nudge customers to enable at least one secure sasl mechanism, or
(b) For Redpanda to signal that SCRAM users and SASL users are the same so you probably want to enable SCRAM as well if you enable PLAIN.

If (a), then this implementation makes sense. But I think (b) would also be reasonable, given that I would expect most customers to use PLAIN while migrating over to Redpanda and away from PLAIN over to SCRAM. In that case, I think they would start by enabling PLAIN+SCRAM, then migrate, and finally disable PLAIN.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed with @deniscoady and I think we may modify this requirement slightly to make it so if you enable PLAIN you must also enable SCRAM. This impacts our internal clients (PP/SR/Auditing) so I think this is reasonable.

return "When PLAIN is enabled, at least one other mechanism must be "
"enabled";
}

return std::nullopt;
}

Expand Down
11 changes: 11 additions & 0 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
#include "security/gssapi_authenticator.h"
#include "security/mtls.h"
#include "security/oidc_authenticator.h"
#include "security/plain_authenticator.h"
#include "security/scram_algorithm.h"
#include "security/scram_authenticator.h"
#include "ssx/future-util.h"
Expand Down Expand Up @@ -720,6 +721,16 @@ ss::future<response_ptr> sasl_handshake_handler::handle(
}
}

if (supports("PLAIN")) {
supported_sasl_mechanisms.emplace_back(
security::plain_authenticator::name);
if (request.data.mechanism == security::plain_authenticator::name) {
ctx.sasl()->set_mechanism(
std::make_unique<security::plain_authenticator>(
ctx.credentials()));
}
}
Comment on lines +724 to +732
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussion: Getting clarification from REQ-01. Not sure if the wording was wrong or if PLAIN implies also enabling SCRAM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it "{PLAIN} implies {PLAIN, SCRAM}" or "{PLAIN} without SCRAM is invalid" or "{PLAIN} implies only {PLAIN} on the kafka procotol, but you can still manage SCRAM users through the admin API"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PLAIN means enable PLAIN. One other secure mechanism must be explicitly enabled.


if (supports("GSSAPI")) {
supported_sasl_mechanisms.emplace_back(
security::gssapi_authenticator::name);
Expand Down
2 changes: 2 additions & 0 deletions src/v/security/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ redpanda_cc_library(
"oidc_authenticator.cc",
"oidc_principal_mapping_applicator.cc",
"oidc_service.cc",
"plain_authenticator.cc",
"role.cc",
"scram_algorithm.cc",
"scram_authenticator.cc",
Expand Down Expand Up @@ -118,6 +119,7 @@ redpanda_cc_library(
"oidc_error.h",
"oidc_principal_mapping_applicator.h",
"oidc_service.h",
"plain_authenticator.h",
"role.h",
"role_store.h",
"sasl_authentication.h",
Expand Down
1 change: 1 addition & 0 deletions src/v/security/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ v_cc_library(
oidc_authenticator.cc
oidc_principal_mapping_applicator.cc
oidc_service.cc
plain_authenticator.cc
request_auth.cc
role.cc
scram_algorithm.cc
Expand Down
108 changes: 108 additions & 0 deletions src/v/security/plain_authenticator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#include "security/plain_authenticator.h"

#include "base/vlog.h"
#include "security/acl.h"
#include "security/credential_store.h"
#include "security/errc.h"
#include "security/logger.h"
#include "security/scram_authenticator.h"
#include "security/types.h"
#include "strings/utf8.h"

#include <seastar/util/defer.hh>

namespace security {

ss::future<result<bytes>> plain_authenticator::authenticate(bytes auth_bytes) {
constexpr size_t max_length{255};
constexpr std::string_view sep{"\0", 1};

auto make_failed = ss::defer([this] { _state = state::failed; });

if (_state != state::init) {
vlog(
seclog.warn,
"invalid plain state: {}",
_state == state::failed ? "failed" : "complete");
co_return errc::invalid_credentials;
}

auto auth_str = std::string_view(
reinterpret_cast<char*>(auth_bytes.data()), auth_bytes.size());

if (!is_valid_utf8(auth_str)) {
vlog(seclog.warn, "invalid utf8");
co_return errc::invalid_credentials;
}

// [authorization identity] not supported
if (!auth_str.starts_with(sep)) {
vlog(seclog.warn, "[authorization identity] not supported");
co_return errc::invalid_credentials;
}
auth_str = auth_str.substr(sep.length());
auto it = auth_str.find(sep);
if (std::string_view::npos == it) {
vlog(seclog.warn, "seperator not found");
co_return errc::invalid_credentials;
}

credential_user username{auth_str.substr(0, it)};
credential_password password{auth_str.substr(it + sep.length())};

if (username().empty()) {
vlog(seclog.warn, "username not found");
co_return errc::invalid_credentials;
}

if (username().length() > max_length) {
vlog(seclog.warn, "username too long");
co_return errc::invalid_credentials;
}

if (password().empty()) {
vlog(seclog.warn, "password not found");
co_return errc::invalid_credentials;
}

if (password().length() > max_length) {
vlog(seclog.warn, "password too long");
co_return errc::invalid_credentials;
}

_audit_user.name = username;
auto cred = _credentials.get<scram_credential>(username);
if (!cred.has_value()) {
vlog(seclog.warn, "credential not found");
co_return errc::invalid_credentials;
}

if (!validate_scram_credential(*cred, password).has_value()) {
vlog(seclog.warn, "scram authentication failed");
co_return errc::invalid_credentials;
}

vlog(seclog.trace, "Authenticated user {}", username);

make_failed.cancel();

_principal = cred->principal().value_or(
acl_principal{principal_type::user, username()});
_audit_user.name = _principal.name();
_audit_user.type_id = audit::user::type::user;

_state = state::complete;
co_return bytes{};
}

} // namespace security
62 changes: 62 additions & 0 deletions src/v/security/plain_authenticator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once
#include "security/acl.h"
#include "security/fwd.h"
#include "security/sasl_authentication.h"

namespace security {

/**
* @class plain_authenticator
* @brief A class that implements SASL/PLAIN authentication mechanism.
*
* This class is responsible for handling the SASL/PLAIN authentication process.
* It authenticates the username and password provided by the client against
* SCRAM users in the credential store.
*/
class plain_authenticator final : public sasl_mechanism {
public:
static constexpr const char* name = "PLAIN";

explicit plain_authenticator(credential_store& credentials)
: _credentials(credentials) {}

ss::future<result<bytes>> authenticate(bytes auth_bytes) override;

bool complete() const override { return _state == state::complete; }
bool failed() const override { return _state == state::failed; }

const acl_principal& principal() const override {
vassert(
_state == state::complete,
"Authentication id is not valid until auth process complete");
return _principal;
}

const audit::user& audit_user() const override { return _audit_user; }

const char* mechanism_name() const override { return "SASL-PLAIN"; }

private:
enum class state {
init,
complete,
failed,
};

state _state{state::init};
credential_store& _credentials;
acl_principal _principal;
security::audit::user _audit_user;
};

} // namespace security
Loading
Loading