Skip to content

Commit

Permalink
k/record_batcher: Optionally inject a ss::logger
Browse files Browse the repository at this point in the history
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
  • Loading branch information
oleiman committed Oct 17, 2024
1 parent 11b8a23 commit 37414a7
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
20 changes: 13 additions & 7 deletions src/v/kafka/client/record_batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ namespace detail {
class batcher_impl {
public:
batcher_impl() = delete;
explicit batcher_impl(size_t batch_max_bytes)
: _batch_max_bytes(batch_max_bytes) {}
explicit batcher_impl(
size_t batch_max_bytes, std::optional<ss::logger*> log)
: _batch_max_bytes(batch_max_bytes)
, _log(log.value_or(&kclog)) {
vassert(_log != nullptr, "Injected logger must not be nullptr");
}
~batcher_impl() = default;
batcher_impl(const batcher_impl&) = delete;
batcher_impl& operator=(const batcher_impl&) = delete;
Expand All @@ -48,7 +52,7 @@ class batcher_impl {
size_t record_size = record.size_bytes();
if (record_size > max_records_bytes()) {
vlog(
kclog.info,
_log->info,
"Dropped record: size exceeds configured batch max "
"size: {} > {}",
human::bytes{static_cast<double>(record_size)},
Expand Down Expand Up @@ -93,14 +97,14 @@ class batcher_impl {
- static_cast<int64_t>(batch.size_bytes());
if (diff < 0) {
vlog(
kclog.debug,
_log->debug,
"Underestimaged batch size {} - {} = {}",
human::bytes{static_cast<double>(batch_size_bytes())},
human::bytes{static_cast<double>(batch.size_bytes())},
diff);
} else {
vlog(
kclog.trace,
_log->trace,
"Building record batch. Actual size: {} (estimated: {}, err:{})",
human::bytes{static_cast<double>(batch.size_bytes())},
human::bytes{static_cast<double>(batch_size_bytes())},
Expand All @@ -120,15 +124,17 @@ class batcher_impl {
}

size_t _batch_max_bytes;
ss::logger* _log;
storage::record_batch_builder _builder{bb_init()};
ss::chunked_fifo<model::record_batch> _record_batches;
size_t _curr_batch_size{0};
};

} // namespace detail

record_batcher::record_batcher(size_t max_batch_size)
: _impl(std::make_unique<detail::batcher_impl>(max_batch_size)) {}
record_batcher::record_batcher(
size_t max_batch_size, std::optional<ss::logger*> log)
: _impl(std::make_unique<detail::batcher_impl>(max_batch_size, log)) {}

record_batcher::~record_batcher() = default;

Expand Down
5 changes: 4 additions & 1 deletion src/v/kafka/client/record_batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "bytes/iobuf.h"
#include "model/record.h"

#include <seastar/util/log.hh>

#include <optional>

namespace kafka::client {
Expand All @@ -34,7 +36,8 @@ class batcher_impl;
class record_batcher {
public:
record_batcher() = delete;
explicit record_batcher(size_t batch_max_bytes);
explicit record_batcher(
size_t batch_max_bytes, std::optional<ss::logger*> log = std::nullopt);
~record_batcher();
record_batcher(const record_batcher&) = delete;
record_batcher& operator=(const record_batcher&) = delete;
Expand Down
2 changes: 1 addition & 1 deletion src/v/transform/logging/rpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ rpc_client::write(model::partition_id pid, io::json_batches batches) {
auto max_batch_size = cfg->properties.batch_max_bytes.value_or(
config::shard_local_cfg().kafka_batch_max_bytes());

kafka::client::record_batcher batcher{max_batch_size};
kafka::client::record_batcher batcher{max_batch_size, &tlg_log};

while (!batches.empty()) {
auto json_batch = std::move(batches.front());
Expand Down

0 comments on commit 37414a7

Please sign in to comment.