Skip to content

Commit

Permalink
Merge pull request redpanda-data#23600 from mmaslankaprv/avro-gaps
Browse files Browse the repository at this point in the history
Added missing Avro values and schema conversion
  • Loading branch information
mmaslankaprv authored Oct 4, 2024
2 parents 617a3c7 + a20da80 commit f408244
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 25 deletions.
35 changes: 28 additions & 7 deletions src/v/iceberg/schema_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ struct avro_primitive_type_visitor {
avro::Schema operator()(const decimal_type& type) {
auto bytes_for_p = static_cast<int>(
std::ceil((type.precision * std::log2(10) + 1) / 8));
auto ret = avro::FixedSchema(bytes_for_p, "");
ret.root()->setLogicalType(
avro::LogicalType(avro::LogicalType::DECIMAL));
auto ret = avro::FixedSchema(bytes_for_p, "decimal");
avro::LogicalType l_type(avro::LogicalType::DECIMAL);
l_type.setPrecision(type.precision);
l_type.setScale(type.scale);
ret.root()->setLogicalType(l_type);

return ret;
}
avro::Schema operator()(const date_type&) {
Expand Down Expand Up @@ -99,13 +102,15 @@ struct avro_primitive_type_visitor {
}
avro::Schema operator()(const string_type&) { return avro::StringSchema(); }
avro::Schema operator()(const uuid_type&) {
static constexpr auto uuid_size_bytes = 16;
auto ret = avro::FixedSchema(uuid_size_bytes, "");
// Out of the two string and fixed, we choose to encode UUIDs as string
// values as this is the only way it is supported in Avro C++ library
auto ret = avro::StringSchema();
ret.root()->setLogicalType(avro::LogicalType(avro::LogicalType::UUID));
return ret;
}
avro::Schema operator()(const fixed_type& type) {
auto ret = avro::FixedSchema(static_cast<int>(type.length), "");
auto ret = avro::FixedSchema(
static_cast<int>(type.length), "fixed_bytes");
return ret;
}
avro::Schema operator()(const binary_type&) { return avro::BytesSchema(); }
Expand Down Expand Up @@ -235,6 +240,9 @@ field_type type_from_avro(const avro::NodePtr& n) {
const auto& type = n->type();
switch (type) {
case avro::AVRO_STRING:
if (n->logicalType().type() == avro::LogicalType::UUID) {
return uuid_type{};
}
return string_type{};
case avro::AVRO_BYTES:
return binary_type{};
Expand Down Expand Up @@ -313,8 +321,21 @@ field_type type_from_avro(const avro::NodePtr& n) {
case avro::AVRO_UNION:
// NOTE: should be handled by maybe_optional_from_avro().
throw std::invalid_argument("Avro union type not supported");
case avro::AVRO_FIXED:
case avro::AVRO_FIXED: {
auto logical_type = n->logicalType();
if (logical_type.type() == avro::LogicalType::DECIMAL) {
// casting to uint32_t should be safe, the only reason the values
// are signed is that unsigned counterparts are missing in java.
return decimal_type{
.precision = static_cast<uint32_t>(logical_type.precision()),
.scale = static_cast<uint32_t>(logical_type.scale()),
};
} else if (logical_type.type() == avro::LogicalType::UUID) {
// UUID can be either a string or a fixed type
return uuid_type{};
}
return fixed_type{n->fixedSize()};
}
case avro::AVRO_SYMBOLIC:
throw std::invalid_argument("Avro symbolic type not supported");
case avro::AVRO_UNKNOWN:
Expand Down
18 changes: 18 additions & 0 deletions src/v/iceberg/tests/test_schemas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,28 @@ field_type test_nested_schema_type() {
nested_field::create(17, "age", field_required::yes, int_type{}));
nested_struct.fields.emplace_back(nested_field::create(
15, "person", field_required::no, std::move(person_struct)));

field_type nested_type = std::move(nested_struct);
return nested_type;
}

field_type test_nested_schema_type_avro() {
auto t = test_nested_schema_type();
auto s_type = std::get<struct_type>(std::move(t));
s_type.fields.emplace_back(nested_field::create(
18,
"some_decimal",
field_required::no,
decimal_type{.precision = 10, .scale = 2}));

s_type.fields.emplace_back(nested_field::create(
19, "the_fixed_64_bytes", field_required::no, fixed_type{.length = 64}));

s_type.fields.emplace_back(
nested_field::create(20, "an_uuid", field_required::no, uuid_type{}));
return s_type;
}

// Schema taken from
// https://github.com/apache/iceberg-go/blob/704a6e78c13ea63f1ff4bb387f7d4b365b5f0f82/schema_test.go#L644
const char* test_nested_schema_json_str = R"JSON({
Expand Down
4 changes: 4 additions & 0 deletions src/v/iceberg/tests/test_schemas.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ namespace iceberg {
// Expected type from test_nested_schema_json_str.
field_type test_nested_schema_type();

// Contains additional fields that are currently supported by avro:
// decimals, fixed and UUID
field_type test_nested_schema_type_avro();

// Nested schema taken from
// https://github.com/apache/iceberg-go/blob/704a6e78c13ea63f1ff4bb387f7d4b365b5f0f82/schema_test.go#L644
extern const char* test_nested_schema_json_str;
Expand Down
8 changes: 6 additions & 2 deletions src/v/iceberg/tests/value_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ struct generating_primitive_value_visitor {
spec_.forced_num_val.value_or(generate_numeric_val()))};
}
value operator()(const decimal_type&) {
if (spec_.forced_num_val) {
return decimal_value{absl::MakeInt128(0, generate_numeric_val())};
}
return decimal_value{
spec_.forced_num_val.value_or(generate_numeric_val())};
absl::MakeInt128(generate_numeric_val(), generate_numeric_val())};
}
value operator()(const date_type&) {
return date_value{static_cast<int>(
Expand Down Expand Up @@ -97,7 +100,8 @@ struct generating_primitive_value_visitor {
}
switch (spec_.pattern) {
case value_pattern::zeros:
return fixed_value{iobuf{}};
return fixed_value{
bytes_to_iobuf(bytes::from_string(std::string(t.length, 0)))};
case value_pattern::random:
return fixed_value{random_generators::make_iobuf(t.length)};
}
Expand Down
41 changes: 39 additions & 2 deletions src/v/iceberg/tests/values_avro_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
using namespace iceberg;

TEST(ValuesAvroTest, TestZeroVals) {
auto schema_field_type = test_nested_schema_type();
auto schema_field_type = test_nested_schema_type_avro();
auto schema = struct_type_to_avro(
std::get<struct_type>(schema_field_type), "nested");
auto zero_val = tests::make_value({}, schema_field_type);
Expand All @@ -34,7 +34,7 @@ TEST(ValuesAvroTest, TestZeroVals) {

TEST(ValuesAvroTest, TestRandomVals) {
constexpr int num_iterations = 10;
auto schema_field_type = test_nested_schema_type();
auto schema_field_type = test_nested_schema_type_avro();
auto schema = struct_type_to_avro(
std::get<struct_type>(schema_field_type), "nested");

Expand All @@ -51,3 +51,40 @@ TEST(ValuesAvroTest, TestRandomVals) {
ASSERT_EQ(roundtrip_val.value(), rand_val);
}
}

TEST(ValuesAvroTest, TestDecimal) {
struct_type st;
st.fields.push_back(nested_field::create(
0,
"decimal_val",
field_required::yes,
decimal_type{.precision = 10, .scale = 2}));

field_type schema_field{std::move(st)};

auto schema = struct_type_to_avro(
std::get<struct_type>(schema_field), "st_with_decimal");

auto make_struct = [](absl::int128 value) {
struct_value ret;
ret.fields.push_back(decimal_value{.val = value});
return ret;
};

for (auto& v : {
make_struct(std::numeric_limits<absl::int128>::max()),
make_struct(std::numeric_limits<absl::int128>::max()),
make_struct(0),
make_struct(-1),
make_struct(1),
}) {
auto datum = struct_to_avro(v, schema.root());
auto roundtrip_val = val_from_avro(
datum, schema_field, field_required::yes);

ASSERT_TRUE(roundtrip_val.has_value());
auto roundtrip_struct = std::get<std::unique_ptr<struct_value>>(
std::move(*roundtrip_val));
ASSERT_EQ(*roundtrip_struct, v);
}
}
104 changes: 90 additions & 14 deletions src/v/iceberg/values_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ void maybe_throw_wrong_type(
}
}

void maybe_throw_wrong_logical_type(
const avro::LogicalType& expected, const avro::LogicalType& actual) {
if (
expected.type() != actual.type() || expected.scale() != actual.scale()
|| expected.precision() != actual.precision()) {
throw std::invalid_argument(fmt::format(
"Expected (type: {}, precision: {}, scale: {}) logical_type but got: "
"(type: {}, precision: {}, scale: {})",
expected.type(),
expected.precision(),
expected.scale(),
actual.type(),
actual.precision(),
actual.scale()));
}
}

// Throws an exception if the given schema isn't valid for the given type.
void maybe_throw_invalid_schema(
const avro::NodePtr& actual_schema, const avro::Type& expected_type) {
Expand All @@ -50,9 +67,7 @@ void maybe_throw_invalid_schema(
const auto& actual_type = actual_schema->type();
maybe_throw_wrong_type(expected_type, actual_type);
}
// XXX: need to figure out how to correctly instantiate avro::GenericFixed to
// support the unimplemented fields. Or move to an impl that uses iobufs. Throw
// for now.

struct primitive_value_avro_visitor {
explicit primitive_value_avro_visitor(const avro::NodePtr& avro_schema)
: avro_schema_(avro_schema) {}
Expand Down Expand Up @@ -113,14 +128,27 @@ struct primitive_value_avro_visitor {
}
return {data};
}
avro::GenericDatum operator()(const decimal_value&) {
throw std::invalid_argument("XXX decimal not implemented");
avro::GenericDatum operator()(const decimal_value& v) {
std::vector<uint8_t> bytes(16);
auto low = absl::Int128Low64(v.val);
auto high = absl::Int128High64(v.val);
// Store in big-endian order (most significant byte at index 0)
for (size_t i = 0; i < 8; ++i) {
bytes[i + 8] = static_cast<uint8_t>(low >> ((7 - i) * 8));
bytes[i] = static_cast<uint8_t>(high >> ((7 - i) * 8));
}

return {avro_schema_, avro::GenericFixed(avro_schema_, bytes)};
}
avro::GenericDatum operator()(const fixed_value&) {
throw std::invalid_argument("XXX fixed not implemented");
avro::GenericDatum operator()(const fixed_value& fixed) {
std::vector<uint8_t> bytes(fixed.val.size_bytes());
iobuf::iterator_consumer it(fixed.val.cbegin(), fixed.val.cend());
it.consume_to(fixed.val.size_bytes(), bytes.data());
return {avro_schema_, avro::GenericFixed(avro_schema_, bytes)};
}
avro::GenericDatum operator()(const uuid_value&) {
throw std::invalid_argument("XXX uuid not implemented");
avro::GenericDatum operator()(const uuid_value& uuid) {
maybe_throw_invalid_schema(avro_schema_, avro::AVRO_STRING);
return {avro_schema_, fmt::to_string(uuid.val)};
}
};

Expand Down Expand Up @@ -289,11 +317,38 @@ struct primitive_value_parsing_visitor {
maybe_throw_wrong_type(data_.type(), avro::AVRO_STRING);
return string_value{iobuf::from(data_.value<std::string>())};
}
value operator()(const fixed_type&) {
throw std::invalid_argument("XXX fixed not implemented");
value operator()(const fixed_type& schema_type) {
maybe_throw_wrong_type(data_.type(), avro::AVRO_FIXED);
auto lt = avro::LogicalType(avro::LogicalType::NONE);
maybe_throw_wrong_logical_type(data_.logicalType(), lt);

const auto& v = data_.value<avro::GenericFixed>().value();
if (v.size() != schema_type.length) {
throw std::invalid_argument(fmt::format(
"Fixed type length mismatch, schema length: {}, current value "
"length: {}",
schema_type.length,
v.size()));
}
iobuf b;
b.append(v.data(), v.size());
return fixed_value{.val = std::move(b)};
}
value operator()(const uuid_type&) {
throw std::invalid_argument("XXX uuid not implemented");
// in Avro UUID can be either fixed or string type
if (data_.type() == avro::AVRO_FIXED) {
maybe_throw_wrong_logical_type(
data_.logicalType(), avro::LogicalType(avro::LogicalType::UUID));
const auto& v = data_.value<avro::GenericFixed>().value();
return uuid_value{.val = uuid_t(v)};
}
maybe_throw_wrong_type(data_.type(), avro::AVRO_STRING);
maybe_throw_wrong_logical_type(
data_.logicalType(), avro::LogicalType(avro::LogicalType::UUID));
const auto& v = data_.value<std::string>();

auto uuid = uuid_t::from_string(v);
return uuid_value{uuid};
}
value operator()(const binary_type&) {
maybe_throw_wrong_type(data_.type(), avro::AVRO_BYTES);
Expand All @@ -302,8 +357,29 @@ struct primitive_value_parsing_visitor {
b.append(v.data(), v.size());
return binary_value{std::move(b)};
}
value operator()(const decimal_type&) {
throw std::invalid_argument("XXX decimal not implemented");
value operator()(const decimal_type& dt) {
maybe_throw_wrong_type(data_.type(), avro::AVRO_FIXED);
auto lt = avro::LogicalType(avro::LogicalType::DECIMAL);
lt.setPrecision(dt.precision);
lt.setScale(dt.scale);
maybe_throw_wrong_logical_type(data_.logicalType(), lt);
const auto& v = data_.value<avro::GenericFixed>().value();
if (v.size() > 16) {
throw std::invalid_argument(fmt::format(
"decimals with more than 16 bytes are not supported, current "
"value size: {}",
v.size()));
}
int64_t high_half{0};
uint64_t low_half{0};
for (size_t i = 0; i < std::min<size_t>(v.size(), 8); ++i) {
high_half |= int64_t(v[i]) << (7 - i) * 8;
};
for (size_t i = 8; i < std::min<size_t>(v.size(), 16); ++i) {
low_half |= uint64_t(v[i]) << (15 - i) * 8;
};

return decimal_value{.val = absl::MakeInt128(high_half, low_half)};
}
};

Expand Down

0 comments on commit f408244

Please sign in to comment.