Skip to content

Commit

Permalink
Merge pull request #24572 from bharathv/assert_ttor
Browse files Browse the repository at this point in the history
datalake/translator: ensure cleanup of log reader in all cases
  • Loading branch information
piyushredpanda authored Dec 14, 2024
2 parents be7d112 + 454ca47 commit 1628258
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
60 changes: 34 additions & 26 deletions src/v/datalake/translation/partition_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ partition_translator::max_offset_for_translation() const {
ss::future<std::optional<coordinator::translated_offset_range>>
partition_translator::do_translation_for_range(
retry_chain_node& parent,
model::record_batch_reader rdr,
kafka::offset begin_offset) {
kafka::offset read_begin_offset,
kafka::offset read_end_offset) {
// This configuration only writes a single row group per file but we limit
// the bytes via the reader max_bytes.
auto writer_factory = std::make_unique<local_parquet_file_writer_factory>(
local_path{_writer_scratch_space}, // storage temp files are written to
fmt::format("{}", begin_offset), // file prefix
local_path{_writer_scratch_space}, // storage temp files are written to
fmt::format("{}", read_begin_offset), // file prefix
ss::make_shared<serde_parquet_writer_factory>());

auto task = translation_task{
Expand All @@ -232,11 +232,36 @@ partition_translator::do_translation_for_range(
return can_continue() ? std::nullopt
: std::make_optional("translator stopping");
}};

vlog(
_logger.debug,
"translating data in kafka range: [{}, {}]",
read_begin_offset,
read_end_offset);

auto log_reader = co_await _partition_proxy->make_reader(
{kafka::offset_cast(read_begin_offset),
kafka::offset_cast(read_end_offset),
0,
_max_bytes_per_reader,
datalake_priority(),
std::nullopt,
std::nullopt,
_as});
auto tracker = kafka::aborted_transaction_tracker::create_default(
_partition_proxy.get(), std::move(log_reader.ot_state));
auto kafka_reader
= model::make_record_batch_reader<kafka::read_committed_reader>(
std::move(tracker), std::move(log_reader.reader));
// Be wary of introducing abortable code here that can skip cleanup
// of kafka_reader. The reader is cleaned up along with consumption,
// so we need to ensure that the reader is dispatched to translation
// in all cases.
auto result = co_await task.translate(
ntp,
_partition->get_topic_revision_id(),
std::move(writer_factory),
std::move(rdr),
std::move(kafka_reader),
remote_path_prefix,
parent,
las);
Expand Down Expand Up @@ -289,29 +314,12 @@ partition_translator::do_translate_once(retry_chain_node& parent_rcn) {
// accumulate. The resulting parquet files are only performant
// if there is a big chunk of data in them. Smaller parquet files
// are an overhead for iceberg metadata.
auto log_reader = co_await _partition_proxy->make_reader(
{kafka::offset_cast(read_begin_offset),
kafka::offset_cast(read_end_offset),
0,
_max_bytes_per_reader,
datalake_priority(),
std::nullopt,
std::nullopt,
_as});

auto units = co_await ss::get_units(**_parallel_translations, 1, _as);
vlog(
_logger.debug,
"translating data in kafka range: [{}, {}]",
read_begin_offset,
read_end_offset);
auto tracker = kafka::aborted_transaction_tracker::create_default(
_partition_proxy.get(), std::move(log_reader.ot_state));
auto kafka_reader
= model::make_record_batch_reader<kafka::read_committed_reader>(
std::move(tracker), std::move(log_reader.reader));

auto translation_result = co_await do_translation_for_range(
parent_rcn, std::move(kafka_reader), read_begin_offset);
parent_rcn, read_begin_offset, read_end_offset);

// release units and checkpoint outside of the lock.
units.return_all();
vlog(_logger.debug, "translation result: {}", translation_result);
auto result = translation_success::no;
Expand Down
4 changes: 2 additions & 2 deletions src/v/datalake/translation/partition_translator.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class partition_translator {
ss::future<std::optional<coordinator::translated_offset_range>>
do_translation_for_range(
retry_chain_node& parent,
model::record_batch_reader,
kafka::offset begin_offset);
kafka::offset read_begin,
kafka::offset read_end);

using checkpoint_result = ss::bool_class<struct checkpoint_result>;
ss::future<checkpoint_result> checkpoint_translated_data(
Expand Down

0 comments on commit 1628258

Please sign in to comment.