Skip to content

Commit

Permalink
Merge pull request redpanda-data#23872 from andrwng/datalake-partitio…
Browse files Browse the repository at this point in the history
…ning-writer

datalake: add partitioning_writer
  • Loading branch information
andrwng authored Oct 22, 2024
2 parents 0c1606c + d1b5b6e commit a66c47c
Show file tree
Hide file tree
Showing 17 changed files with 370 additions and 20 deletions.
31 changes: 31 additions & 0 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,31 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "partitioning_writer",
srcs = [
"partitioning_writer.cc",
],
hdrs = [
"partitioning_writer.h",
],
implementation_deps = [
":logger",
":table_definition",
"//src/v/base",
"//src/v/iceberg:struct_accessor",
],
include_prefix = "datalake",
visibility = [":__subpackages__"],
deps = [
":writer",
"//src/v/container:chunked_hash_map",
"//src/v/iceberg:datatypes",
"//src/v/iceberg:partition_key",
"//src/v/iceberg:values",
],
)

redpanda_cc_library(
name = "types",
hdrs = [
Expand All @@ -78,10 +103,16 @@ redpanda_cc_library(

redpanda_cc_library(
name = "writer",
srcs = [
"data_writer_interface.cc",
],
hdrs = [
"data_writer_interface.h",
"schemaless_translator.h",
],
implementation_deps = [
"@fmt",
],
include_prefix = "datalake",
visibility = [":__subpackages__"],
deps = [
Expand Down
1 change: 1 addition & 0 deletions src/v/datalake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ v_cc_library(
schema_registry.cc
schemaless_translator.cc
schema_protobuf.cc
partitioning_writer.cc
protobuf_utils.cc
values_protobuf.cc
base_types.cc
Expand Down
4 changes: 2 additions & 2 deletions src/v/datalake/arrow_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -715,8 +715,8 @@ void map_converter::add_data(const iceberg::value& value) {
}
}

arrow_translator::arrow_translator(iceberg::struct_type schema)
: _struct_converter(std::make_unique<struct_converter>(std::move(schema))) {}
arrow_translator::arrow_translator(const iceberg::struct_type& schema)
: _struct_converter(std::make_unique<struct_converter>(schema)) {}

arrow_translator::~arrow_translator() = default;

Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/arrow_translator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class struct_converter;

class arrow_translator {
public:
explicit arrow_translator(iceberg::struct_type schema);
explicit arrow_translator(const iceberg::struct_type& schema);

~arrow_translator();

Expand Down
7 changes: 4 additions & 3 deletions src/v/datalake/batching_parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
namespace datalake {

batching_parquet_writer::batching_parquet_writer(
iceberg::struct_type schema,
const iceberg::struct_type& schema,
size_t row_count_threshold,
size_t byte_count_threshold,
local_path output_file_path)
: _iceberg_to_arrow(std::move(schema))
: _iceberg_to_arrow(schema)
, _arrow_to_iobuf(_iceberg_to_arrow.build_arrow_schema())
, _row_count_threshold{row_count_threshold}
, _byte_count_threshold{byte_count_threshold}
Expand Down Expand Up @@ -217,7 +217,8 @@ local_path batching_parquet_writer_factory::create_filename() const {
/ fmt::format("{}-{}.parquet", _file_name_prefix, uuid_t::create())};
}
ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
batching_parquet_writer_factory::create_writer(iceberg::struct_type schema) {
batching_parquet_writer_factory::create_writer(
const iceberg::struct_type& schema) {
auto ret = std::make_unique<batching_parquet_writer>(
std::move(schema),
_row_count_threshold,
Expand Down
4 changes: 2 additions & 2 deletions src/v/datalake/batching_parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace datalake {
class batching_parquet_writer : public data_writer {
public:
batching_parquet_writer(
iceberg::struct_type schema,
const iceberg::struct_type& schema,
size_t row_count_threshold,
size_t byte_count_threshold,
local_path output_file_path);
Expand Down Expand Up @@ -87,7 +87,7 @@ class batching_parquet_writer_factory : public data_writer_factory {
size_t byte_count_threshold);

ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
create_writer(iceberg::struct_type schema) override;
create_writer(const iceberg::struct_type& schema) override;

private:
local_path create_filename() const;
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/data_writer_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class data_writer_factory {
virtual ~data_writer_factory() = default;

virtual ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
create_writer(iceberg::struct_type /* schema */) = 0;
create_writer(const iceberg::struct_type& /* schema */) = 0;
};

} // namespace datalake
Expand Down
98 changes: 98 additions & 0 deletions src/v/datalake/partitioning_writer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "datalake/partitioning_writer.h"

#include "base/vlog.h"
#include "datalake/data_writer_interface.h"
#include "datalake/logger.h"
#include "datalake/table_definition.h"
#include "iceberg/struct_accessor.h"

#include <exception>

namespace datalake {

namespace {
const auto hourly_spec = hour_partition_spec();
const auto default_schema = schemaless_struct_type();
const auto default_accessors = iceberg::struct_accessor::from_struct_type(
default_schema);
} // namespace

ss::future<data_writer_error>
partitioning_writer::add_data(iceberg::struct_value val, int64_t approx_size) {
iceberg::partition_key pk;
try {
pk = iceberg::partition_key::create(
val, default_accessors, hourly_spec);
} catch (...) {
vlog(
datalake_log.error,
"Error {} while partitioning value: {}",
std::current_exception(),
val);
co_return data_writer_error::parquet_conversion_error;
}
auto writer_iter = writers_.find(pk);
if (writer_iter == writers_.end()) {
auto writer_res = co_await writer_factory_.create_writer(type_);
if (writer_res.has_error()) {
vlog(
datalake_log.error,
"Failed to create new writer: {}",
writer_res.error());
co_return writer_res.error();
}
auto new_iter = writers_.emplace(
pk.copy(), std::move(writer_res.value()));
writer_iter = new_iter.first;
}
auto& writer = writer_iter->second;
auto write_res = co_await writer->add_data_struct(
std::move(val), approx_size);
if (write_res != data_writer_error::ok) {
vlog(datalake_log.error, "Failed to add data: {}", write_res);
co_return write_res;
}
co_return write_res;
}

ss::future<result<chunked_vector<local_file_metadata>, data_writer_error>>
partitioning_writer::finish() && {
chunked_vector<local_file_metadata> files;
auto first_error = data_writer_error::ok;
// TODO: parallelize me!
for (auto& [pk, writer] : writers_) {
int hour = std::get<iceberg::int_value>(
std::get<iceberg::primitive_value>(*pk.val->fields[0]))
.val;
auto file_res = co_await writer->finish();
if (file_res.has_error()) {
vlog(
datalake_log.error,
"Failed to finish writer: {}",
file_res.error());
if (first_error == data_writer_error::ok) {
first_error = file_res.error();
}
// Even on error, move on so that we can close all the writers.
continue;
}
auto& file = file_res.value();
file.hour = hour;
files.emplace_back(std::move(file));
}
if (first_error != data_writer_error::ok) {
co_return first_error;
}
co_return files;
}

} // namespace datalake
62 changes: 62 additions & 0 deletions src/v/datalake/partitioning_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include "container/chunked_hash_map.h"
#include "datalake/data_writer_interface.h"
#include "iceberg/datatypes.h"
#include "iceberg/partition_key.h"
#include "iceberg/values.h"

namespace iceberg {
struct struct_type;
} // namespace iceberg

namespace datalake {

// A wrapper around multiple data writers that all share the same schema and
// are partitioned by an Iceberg partition key.
//
// Uses the default partition spec to partition. As such, this class expects
// that schemas and values given as inputs are constructed with the default
// ("schemaless") schema and fields at the front.
class partitioning_writer {
public:
explicit partitioning_writer(
data_writer_factory& factory, iceberg::struct_type type)
: writer_factory_(factory)
, type_(std::move(type)) {}

// Adds the given value to the writer corresponding to the value's
// partition key.
//
// Expects that the input value abides by the schema denoted by `type_`.
ss::future<data_writer_error>
add_data(iceberg::struct_value, int64_t approx_size);

// Finishes and returns the list of local files written by the underlying
// writers, with the appropriate partitioning metadata filled in.
ss::future<result<chunked_vector<local_file_metadata>, data_writer_error>>
finish() &&;

private:
// Factory for data writers.
data_writer_factory& writer_factory_;

// The Iceberg message type for the underlying writer. Expected to include
// Redpanda-specific fields, e.g. a timestamp field for partitioning.
const iceberg::struct_type type_;

// Map of partition keys to their corresponding data file writers.
chunked_hash_map<iceberg::partition_key, std::unique_ptr<data_writer>>
writers_;
};

} // namespace datalake
3 changes: 1 addition & 2 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ record_multiplexer::get_writer() {
if (!_writer) {
auto& translator = get_translator();
auto schema = translator.get_schema();
auto writer_result = co_await _writer_factory->create_writer(
std::move(schema));
auto writer_result = co_await _writer_factory->create_writer(schema);
if (!writer_result.has_value()) {
co_return writer_result.error();
}
Expand Down
32 changes: 32 additions & 0 deletions src/v/datalake/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ redpanda_test_cc_library(
],
)

redpanda_test_cc_library(
name = "test_data_writer",
hdrs = [
"test_data_writer.h",
],
include_prefix = "datalake/tests",
deps = [
"//src/v/datalake:writer",
"//src/v/iceberg:datatypes",
"//src/v/iceberg:values",
"@seastar",
],
)

redpanda_cc_gtest(
name = "datalake_protobuf_test",
timeout = "short",
Expand All @@ -92,3 +106,21 @@ redpanda_cc_gtest(
"@seastar",
],
)

redpanda_cc_gtest(
name = "partitioning_writer_test",
timeout = "short",
srcs = [
"partitioning_writer_test.cc",
],
deps = [
":test_data_writer",
"//src/v/datalake:partitioning_writer",
"//src/v/datalake:table_definition",
"//src/v/iceberg/tests:test_schemas",
"//src/v/iceberg/tests:value_generator",
"//src/v/model",
"//src/v/test_utils:gtest",
"@googletest//:gtest",
],
)
8 changes: 4 additions & 4 deletions src/v/datalake/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ rp_test(
rp_test(
UNIT_TEST
GTEST
BINARY_NAME parquet_writer_test
SOURCES parquet_writer_test.cc
BINARY_NAME datalake
SOURCES
parquet_writer_test.cc
partitioning_writer_test.cc
LIBRARIES
v::gtest_main
v::datalake
v::iceberg_test_utils
LABELS datalake
ARGS "-- -c 1"
)

rp_test(
Expand Down Expand Up @@ -123,4 +124,3 @@ rp_test(
LABELS storage
ARGS "-- -c 1"
)

Loading

0 comments on commit a66c47c

Please sign in to comment.