From 37414a7d58a18e5f7f168444957238996216cf71 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 15 Oct 2024 14:28:03 -0700 Subject: [PATCH] k/record_batcher: Optionally inject a ss::logger Signed-off-by: Oren Leiman --- src/v/kafka/client/record_batcher.cc | 20 +++++++++++++------- src/v/kafka/client/record_batcher.h | 5 ++++- src/v/transform/logging/rpc_client.cc | 2 +- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/v/kafka/client/record_batcher.cc b/src/v/kafka/client/record_batcher.cc index a80361a235a4..370036bf5149 100644 --- a/src/v/kafka/client/record_batcher.cc +++ b/src/v/kafka/client/record_batcher.cc @@ -23,8 +23,12 @@ namespace detail { class batcher_impl { public: batcher_impl() = delete; - explicit batcher_impl(size_t batch_max_bytes) - : _batch_max_bytes(batch_max_bytes) {} + explicit batcher_impl( + size_t batch_max_bytes, std::optional log) + : _batch_max_bytes(batch_max_bytes) + , _log(log.value_or(&kclog)) { + vassert(_log != nullptr, "Injected logger must not be nullptr"); + } ~batcher_impl() = default; batcher_impl(const batcher_impl&) = delete; batcher_impl& operator=(const batcher_impl&) = delete; @@ -48,7 +52,7 @@ class batcher_impl { size_t record_size = record.size_bytes(); if (record_size > max_records_bytes()) { vlog( - kclog.info, + _log->info, "Dropped record: size exceeds configured batch max " "size: {} > {}", human::bytes{static_cast(record_size)}, @@ -93,14 +97,14 @@ class batcher_impl { - static_cast(batch.size_bytes()); if (diff < 0) { vlog( - kclog.debug, + _log->debug, "Underestimaged batch size {} - {} = {}", human::bytes{static_cast(batch_size_bytes())}, human::bytes{static_cast(batch.size_bytes())}, diff); } else { vlog( - kclog.trace, + _log->trace, "Building record batch. Actual size: {} (estimated: {}, err:{})", human::bytes{static_cast(batch.size_bytes())}, human::bytes{static_cast(batch_size_bytes())}, @@ -120,6 +124,7 @@ class batcher_impl { } size_t _batch_max_bytes; + ss::logger* _log; storage::record_batch_builder _builder{bb_init()}; ss::chunked_fifo _record_batches; size_t _curr_batch_size{0}; @@ -127,8 +132,9 @@ class batcher_impl { } // namespace detail -record_batcher::record_batcher(size_t max_batch_size) - : _impl(std::make_unique(max_batch_size)) {} +record_batcher::record_batcher( + size_t max_batch_size, std::optional log) + : _impl(std::make_unique(max_batch_size, log)) {} record_batcher::~record_batcher() = default; diff --git a/src/v/kafka/client/record_batcher.h b/src/v/kafka/client/record_batcher.h index 7a02e4f592f2..df6071ec2a8c 100644 --- a/src/v/kafka/client/record_batcher.h +++ b/src/v/kafka/client/record_batcher.h @@ -14,6 +14,8 @@ #include "bytes/iobuf.h" #include "model/record.h" +#include + #include namespace kafka::client { @@ -34,7 +36,8 @@ class batcher_impl; class record_batcher { public: record_batcher() = delete; - explicit record_batcher(size_t batch_max_bytes); + explicit record_batcher( + size_t batch_max_bytes, std::optional log = std::nullopt); ~record_batcher(); record_batcher(const record_batcher&) = delete; record_batcher& operator=(const record_batcher&) = delete; diff --git a/src/v/transform/logging/rpc_client.cc b/src/v/transform/logging/rpc_client.cc index d506d8daada8..6286327c734f 100644 --- a/src/v/transform/logging/rpc_client.cc +++ b/src/v/transform/logging/rpc_client.cc @@ -41,7 +41,7 @@ rpc_client::write(model::partition_id pid, io::json_batches batches) { auto max_batch_size = cfg->properties.batch_max_bytes.value_or( config::shard_local_cfg().kafka_batch_max_bytes()); - kafka::client::record_batcher batcher{max_batch_size}; + kafka::client::record_batcher batcher{max_batch_size, &tlg_log}; while (!batches.empty()) { auto json_batch = std::move(batches.front());