Skip to content

Commit

Permalink
audit: Perform record batching and partition assignment in manager
Browse files Browse the repository at this point in the history
Previous implementation used a very high value for retries on the
internal kafka client, which prevents the client from recovering
certain types of errors.

Instead, we batch up drained records on the manager side, allowing
us to hold a copy of each batch in memory and retry failed produce
calls from "scratch".

This also allows us to be _much_ more aggressive about batching.
The internal kafka client will calculate a destination partition
for each record, round robin style over the number of partitions.
In the new scheme, we shoot for a maximally sized batch first, then
select a destination, still round-robin style, but biasing heavily
toward locally led partitions. In this way, given the default audit
per-shard queue limit and default max batch size (both 1MiB), the
most common drain operation should result in exactly one produce
request.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
  • Loading branch information
oleiman committed Oct 17, 2024
1 parent 5c7ed51 commit e5fd326
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 65 deletions.
4 changes: 4 additions & 0 deletions src/v/security/audit/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ redpanda_cc_library(
"schemas/utils.h",
"types.h",
],
implementation_deps = [
"//src/v/kafka/client:record_batcher",
"@abseil-cpp//absl/algorithm:container",
],
include_prefix = "security/audit",
visibility = ["//visibility:public"],
deps = [
Expand Down
1 change: 1 addition & 0 deletions src/v/security/audit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ v_cc_library(
v::utils
v::config
v::cluster
v::kc_record_batcher
)

add_subdirectory(schemas)
Expand Down
236 changes: 171 additions & 65 deletions src/v/security/audit/audit_log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@

#include "security/audit/audit_log_manager.h"

#include "base/outcome.h"
#include "cluster/controller.h"
#include "cluster/ephemeral_credential_frontend.h"
#include "cluster/metadata_cache.h"
#include "cluster/security_frontend.h"
#include "config/configuration.h"
#include "kafka/client/client.h"
#include "kafka/client/config_utils.h"
#include "kafka/client/record_batcher.h"
#include "kafka/protocol/produce.h"
#include "kafka/protocol/schemata/produce_response.h"
#include "kafka/protocol/topic_properties.h"
Expand All @@ -32,15 +34,26 @@
#include "storage/parser_utils.h"
#include "utils/retry.h"

#include <seastar/core/loop.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/smp.hh>
#include <seastar/coroutine/maybe_yield.hh>

#include <absl/algorithm/container.h>

#include <memory>
#include <optional>

namespace security::audit {

namespace {
struct partition_batch {
model::partition_id pid;
model::record_batch batch;
std::optional<ssx::semaphore_units> send_units{};
};
} // namespace

static constexpr std::string_view subsystem_name = "Audit System";

std::ostream& operator<<(std::ostream& os, event_type t) {
Expand Down Expand Up @@ -88,10 +101,9 @@ class audit_client {
/// kafka::config::produce_shutdown_delay_ms to complete
ss::future<> shutdown();

/// Produces to the audit topic, internal partitioner assigns partitions
/// to the batches provided. Blocks if semaphore is exhausted.
ss::future<>
produce(std::vector<kafka::client::record_essence>, audit_probe&);
/// Produces to the audit topic partition specified with each batch.
/// Blocks if semaphore is exhausted.
ss::future<> produce(chunked_vector<partition_batch>, audit_probe&);

/// Returns true if the configuration phase has completed which includes:
/// - Connecting to the broker(s) w/ ephemeral creds
Expand All @@ -100,8 +112,8 @@ class audit_client {
bool is_initialized() const { return _is_initialized; }

private:
ss::future<> do_produce(
std::vector<kafka::client::record_essence> records, audit_probe& probe);
ss::future<>
do_produce(model::record_batch, model::partition_id, audit_probe&);
ss::future<> update_status(kafka::error_code);
ss::future<> update_status(kafka::produce_response);
ss::future<> configure();
Expand Down Expand Up @@ -147,7 +159,7 @@ class audit_sink {
/// Produce to the audit topic within the context of the internal locks,
/// ensuring toggling of the audit master switch happens in lock step with
/// calls to produce()
ss::future<> produce(std::vector<kafka::client::record_essence> records);
ss::future<> produce(chunked_vector<partition_batch> records);

/// Allocates and connects, or deallocates and shuts down the audit client
void toggle(bool enabled);
Expand Down Expand Up @@ -247,9 +259,13 @@ ss::future<> audit_client::configure() {
co_await create_internal_topic();
co_await _client.connect();

/// Retries should be "infinite", to avoid dropping data, there is a
/// known issue within the client setting this value to size_t::max
_client.config().retries.set_value(10000);
/// To avoid dropping data, retries should be functionally infinite,
/// but we handle this logic at the produce call site. Individual
/// requests should fail after a few attempts, allowing the kafka
/// client to refresh its metadata on a subsequent request.
/// Explicitly set `client::config::retries` to its default value.
/// We might want to make this tunable at some point.
_client.config().retries.reset();
vlog(adtlog.info, "Audit log client initialized");
} catch (...) {
vlog(
Expand Down Expand Up @@ -476,35 +492,60 @@ ss::future<> audit_client::shutdown() {
}

ss::future<> audit_client::produce(
std::vector<kafka::client::record_essence> records, audit_probe& probe) {
/// TODO: Produce with acks=1, atm -1 is hardcoded into client
const auto records_size = [](const auto& records) {
std::size_t size = 0;
for (const auto& r : records) {
if (r.value) {
/// auditing does not fill in any of the fields of the
/// record_essence other then the value member
size += r.value->size_bytes();
}
chunked_vector<partition_batch> records, audit_probe& probe) {
auto total_size = absl::c_accumulate(
records, size_t{0}, [](size_t acc, const partition_batch& b) {
return acc + b.batch.size_bytes();
});

vlog(
adtlog.trace,
"Producing {} batches, totaling {}B, wait for semaphore units...",
records.size(),
total_size);

auto reserved = co_await ss::get_units(_send_sem, total_size);

absl::c_for_each(records, [&reserved](partition_batch& pb) {
try {
pb.send_units.emplace(reserved.split(pb.batch.size_bytes()));
} catch (const std::invalid_argument& e) {
// NOTE(oren): we should never reach here because reserved should
// always begin with precisely the number of units needed for all
// input batches.
vassert(false, "Error getting units for batch: {}", e.what());
}
return size;
};
});

// limit concurrency to the number of max-sized batches that the
// audit_client could handle. In the common case, the number of batches
// here should usually be 1-2, since the default per-shard queue limit
// is 1MiB, which is also the default for kafka_batch_max_bytes.
// TODO(oren): a configurabale ratio might be better
[[maybe_unused]] auto max_concurrency
= _max_buffer_size / config::shard_local_cfg().kafka_batch_max_bytes();

try {
const auto size_bytes = records_size(records);
vlog(
adtlog.trace,
"Obtaining {} units from auditing semaphore",
size_bytes);
auto units = co_await ss::get_units(_send_sem, size_bytes);
ssx::spawn_with_gate(
_gate,
[this,
&probe,
units = std::move(units),
max_concurrency,
records = std::move(records)]() mutable {
return do_produce(std::move(records), probe)
.finally([units = std::move(units)] {});
return ss::do_with(
std::move(records),
[this, &probe, max_concurrency](auto& records) mutable {
return ss::max_concurrent_for_each(
std::make_move_iterator(records.begin()),
std::make_move_iterator(records.end()),
max_concurrency,
[this,
&probe](partition_batch rec) mutable -> ss::future<> {
return do_produce(
std::move(rec.batch), rec.pid, probe)
.finally([units = std::move(rec.send_units)] {});
});
});
});
} catch (const ss::broken_semaphore&) {
vlog(
Expand All @@ -515,35 +556,34 @@ ss::future<> audit_client::produce(
}

ss::future<> audit_client::do_produce(
std::vector<kafka::client::record_essence> records, audit_probe& probe) {
const auto n_records = records.size();
kafka::produce_response r = co_await _client.produce_records(
model::kafka_audit_logging_topic, std::move(records));
bool errored = std::any_of(
r.data.responses.cbegin(),
r.data.responses.cend(),
[](const kafka::topic_produce_response& tp) {
return std::any_of(
tp.partitions.cbegin(),
tp.partitions.cend(),
[](const kafka::partition_produce_response& p) {
return p.error_code != kafka::error_code::none;
});
});
if (errored) {
if (_as.abort_requested()) {
vlog(
adtlog.warn,
"{} audit records dropped, shutting down",
n_records);
} else {
vlog(adtlog.error, "{} audit records dropped", n_records);
model::record_batch batch, model::partition_id pid, audit_probe& probe) {
// Effectively retry forever, but start a fresh request from batch data held
// in memory when each produce_record_batch's retries are exhausted. This
// way the kafka client should periodically refresh its internal metadata.
std::optional<kafka::error_code> ec;
while (!_as.abort_requested()) {
auto r = co_await _client.produce_record_batch(
model::topic_partition{model::kafka_audit_logging_topic, pid},
batch.copy());
ec.emplace(r.error_code);
co_await update_status(ec.value());
if (ec.value() == kafka::error_code::none) {
break;
}
}

// report unknown server error if we aborted before making any attempt
if (auto errc = ec.value_or(kafka::error_code::unknown_server_error);
errc != kafka::error_code::none) {
vlog(
adtlog.warn,
"{} audit records dropped, shutting down. Last error: {}",
batch.record_count(),
errc);
probe.audit_error();
} else {
probe.audit_event();
}
co_return co_await update_status(std::move(r));
}

/// audit_sink
Expand Down Expand Up @@ -575,8 +615,7 @@ audit_sink::update_auth_status(auth_misconfigured_t auth_misconfigured) {
});
}

ss::future<>
audit_sink::produce(std::vector<kafka::client::record_essence> records) {
ss::future<> audit_sink::produce(chunked_vector<partition_batch> records) {
/// No locks/gates since the calls to this method are done in controlled
/// context of other synchronization primitives
vassert(_client, "produce() called on a null client");
Expand All @@ -593,8 +632,12 @@ ss::future<> audit_sink::publish_app_lifecycle_event(
auto as_json = lifecycle_event->to_json();
iobuf b;
b.append(as_json.c_str(), as_json.size());
std::vector<kafka::client::record_essence> rs;
rs.push_back(kafka::client::record_essence{.value = std::move(b)});
auto batch
= kafka::client::
record_batcher{config::shard_local_cfg().kafka_batch_max_bytes(), &adtlog}
.make_batch_of_one(std::nullopt, std::move(b));
chunked_vector<partition_batch> rs;
rs.emplace_back(_audit_mgr->compute_partition_id(), std::move(batch));
co_await produce(std::move(rs));
}

Expand Down Expand Up @@ -847,6 +890,49 @@ bool audit_log_manager::report_redpanda_app_event(is_started app_started) {
: application_lifecycle::activity_id::stop);
}

model::partition_id audit_log_manager::compute_partition_id() {
static thread_local model::partition_id _next_pid{0};

model::topic_namespace_view ns_tp{model::kafka_audit_logging_nt};
auto cfg = _metadata_cache->local().get_topic_cfg(ns_tp);
if (!cfg.has_value()) {
vlog(
adtlog.debug,
"{} missing from metadata cache, fall back to round-robin candidate "
"{}",
ns_tp,
_next_pid);
return _next_pid;
}
auto n_partitions = cfg.value().partition_count;
vassert(n_partitions >= 0, "Invalid partition count {}", n_partitions);

auto inc_pid = [n_partitions](model::partition_id pid, int32_t inc = 1) {
return model::partition_id{(pid + inc) % n_partitions};
};

const auto& partition_leaders
= _controller->get_partition_leaders().local();

std::optional<model::partition_id> pid;
for (auto i : boost::irange(n_partitions)) {
auto try_pid = inc_pid(_next_pid, i);
auto leader = partition_leaders.get_leader(ns_tp, try_pid);
if (!leader.has_value()) {
continue;
} else if (leader.value() == _self) {
vlog(adtlog.debug, "Node {} leads partition {}", _self, try_pid);
pid.emplace(try_pid);
break;
}
}

// NOTE(oren): sort of arbitrary. if we didn't find a locally led partition,
// then at least advance the round robin to the next PID in natural order
_next_pid = inc_pid(pid.value_or(_next_pid));
return pid.value_or(_next_pid);
}

ss::future<> audit_log_manager::drain() {
if (_queue.empty()) {
co_return;
Expand All @@ -857,8 +943,10 @@ ss::future<> audit_log_manager::drain() {
"Attempting to drain {} audit events from sharded queue",
_queue.size());

/// Combine all batched audit msgs into record_essences
std::vector<kafka::client::record_essence> essences;
/// Combine all queued audit msgs into record_batches
kafka::client::record_batcher batcher{
config::shard_local_cfg().kafka_batch_max_bytes(), &adtlog};

auto records = std::exchange(_queue, underlying_t{});
auto& records_seq = records.get<underlying_list>();
while (!records_seq.empty()) {
Expand All @@ -867,19 +955,37 @@ ss::future<> audit_log_manager::drain() {
auto as_json = audit_msg->to_json();
iobuf b;
b.append(as_json.c_str(), as_json.size());
essences.push_back(
kafka::client::record_essence{.value = std::move(b)});
co_await ss::maybe_yield();
batcher.append(std::nullopt, std::move(b));

co_await ss::coroutine::maybe_yield();
}

auto batches = std::move(batcher).finish();

chunked_vector<partition_batch> p_batches;
p_batches.reserve(batches.size());

// attach a partition ID to each batch and call into the audit_sink

std::transform(
std::make_move_iterator(batches.begin()),
std::make_move_iterator(batches.end()),
std::back_inserter(p_batches),
[this](model::record_batch recs) {
return partition_batch{
.pid = compute_partition_id(),
.batch = std::move(recs),
};
});

/// This call may block if the audit_clients semaphore is exhausted,
/// this represents the amount of memory used within its kafka::client
/// produce batch queue. If the semaphore blocks it will apply
/// backpressure here, and the \ref _queue will begin to fill closer to
/// capacity. When it hits capacity, enqueue_audit_event() will block.
co_await container().invoke_on(
client_shard_id,
[recs = std::move(essences)](audit_log_manager& mgr) mutable {
[recs = std::move(p_batches)](audit_log_manager& mgr) mutable {
return mgr._sink->produce(std::move(recs));
});
}
Expand Down
6 changes: 6 additions & 0 deletions src/v/security/audit/audit_log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ class audit_log_manager
const security::acl_principal&,
const model::topic&) const;

/**
* Compute an output partition for some audit record batch by per-shard
* round-robin, biased toward partitions with a locally hosted leader.
*/
model::partition_id compute_partition_id();

ss::future<> drain();
ss::future<> pause();
ss::future<> resume();
Expand Down

0 comments on commit e5fd326

Please sign in to comment.