Skip to content

Commit

Permalink
Merge pull request redpanda-data#23765 from andrwng/iceberg-uris
Browse files Browse the repository at this point in the history
iceberg: serialize path style URIs in metadata
  • Loading branch information
dotnwat authored Oct 15, 2024
2 parents a7647a2 + 97f357d commit 073aadc
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 69 deletions.
28 changes: 28 additions & 0 deletions src/v/iceberg/manifest_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@
using namespace std::chrono_literals;

namespace iceberg {
ss::future<checked<manifest, metadata_io::errc>>
manifest_io::download_manifest_uri(
const ss::sstring& uri, const partition_key_type& pk_type) {
auto path = manifest_path(from_uri(uri));
co_return co_await download_manifest(path, pk_type);
}

ss::future<checked<manifest_list, metadata_io::errc>>
manifest_io::download_manifest_list_uri(const ss::sstring& uri) {
auto path = manifest_list_path(from_uri(uri));
co_return co_await download_manifest_list(path);
}

ss::future<checked<manifest, metadata_io::errc>> manifest_io::download_manifest(
const manifest_path& path, const partition_key_type& pk_type) {
Expand Down Expand Up @@ -55,4 +67,20 @@ manifest_io::upload_manifest_list(
});
}

ss::sstring manifest_io::to_uri(const std::filesystem::path& p) const {
return fmt::format("{}{}", uri_base(), p.native());
}

std::filesystem::path manifest_io::from_uri(const ss::sstring& s) const {
const auto base = uri_base();
if (!s.starts_with(base)) {
return std::filesystem::path{s};
}
return std::filesystem::path{s.substr(base.size())};
}

ss::sstring manifest_io::uri_base() const {
return fmt::format("s3://{}/", bucket_);
}

} // namespace iceberg
16 changes: 16 additions & 0 deletions src/v/iceberg/manifest_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,29 @@ class manifest_io : public metadata_io {

ss::future<checked<manifest, metadata_io::errc>> download_manifest(
const manifest_path& path, const partition_key_type& pk_type);
ss::future<checked<manifest, metadata_io::errc>> download_manifest_uri(
const ss::sstring& uri, const partition_key_type& pk_type);

ss::future<checked<manifest_list, metadata_io::errc>>
download_manifest_list(const manifest_list_path& path);
ss::future<checked<manifest_list, metadata_io::errc>>
download_manifest_list_uri(const ss::sstring& uri);

ss::future<checked<size_t, metadata_io::errc>>
upload_manifest(const manifest_path& path, const manifest&);
ss::future<checked<size_t, metadata_io::errc>>
upload_manifest_list(const manifest_list_path& path, const manifest_list&);

ss::sstring to_uri(const std::filesystem::path& p) const;

private:
// TODO: make URIs less fragile with an explicit type?
// E.g. s3://bucket/
ss::sstring uri_base() const;

// E.g. s3://bucket/path/to/file => path/to/file
// Leaves the path as is if it doesn't match the expected URI base.
std::filesystem::path from_uri(const ss::sstring& s) const;
};

} // namespace iceberg
14 changes: 7 additions & 7 deletions src/v/iceberg/merge_append_action.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ ss::future<action::action_outcome> merge_append_action::build_updates() && {
table_cur_snap_id);
co_return action::errc::unexpected_state;
}
auto mlist_res = co_await io_.download_manifest_list(
manifest_list_path{snap_it->manifest_list_path});
auto mlist_res = co_await io_.download_manifest_list_uri(
snap_it->manifest_list_path);
if (mlist_res.has_error()) {
co_return to_action_errc(mlist_res.error());
}
Expand Down Expand Up @@ -294,7 +294,7 @@ ss::future<action::action_outcome> merge_append_action::build_updates() && {
.operation = snapshot_operation::append,
.other = {},
},
.manifest_list_path = new_mlist_path().native(),
.manifest_list_path = io_.to_uri(new_mlist_path()),
.schema_id = schema.schema_id,
};
updates_and_reqs ret;
Expand Down Expand Up @@ -382,7 +382,7 @@ merge_append_action::maybe_merge_mfiles_and_new_data(
// Since this bin was too small to merge, we won't do anything else to
// its manifests, just add them back to the returned container.
ret.emplace_back(manifest_file{
.manifest_path = new_manifest_path().native(),
.manifest_path = io_.to_uri(new_manifest_path()),
.manifest_length = mfile_up_res.value(),
.partition_spec_id = ctx.pspec.spec_id,
.content = manifest_file_content::data,
Expand Down Expand Up @@ -437,8 +437,8 @@ merge_append_action::merge_mfiles(
for (const auto& mfile : to_merge) {
// Download the manifest file and collect the entries into the merged
// container.
auto mfile_res = co_await io_.download_manifest(
manifest_path{mfile.manifest_path}, ctx.pk_type);
auto mfile_res = co_await io_.download_manifest_uri(
mfile.manifest_path, ctx.pk_type);
if (mfile_res.has_error()) {
co_return mfile_res.error();
}
Expand Down Expand Up @@ -473,7 +473,7 @@ merge_append_action::merge_mfiles(
co_return mfile_up_res.error();
}
manifest_file merged_file{
.manifest_path = merged_manifest_path().native(),
.manifest_path = io_.to_uri(merged_manifest_path()),
.manifest_length = mfile_up_res.value(),
.partition_spec_id = ctx.pspec.spec_id,
.content = manifest_file_content::data,
Expand Down
2 changes: 1 addition & 1 deletion src/v/iceberg/metadata_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class metadata_io {
}
}

private:
protected:
cloud_io::remote& io_;
const cloud_storage_clients::bucket_name bucket_;
};
Expand Down
9 changes: 4 additions & 5 deletions src/v/iceberg/metadata_query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ do_execute_query(
collector.collect(s);
continue;
}
auto m_list_result = co_await io.download_manifest_list(
manifest_list_path{s.manifest_list_path});
auto m_list_result = co_await io.download_manifest_list_uri(
s.manifest_list_path);

if (m_list_result.has_error()) {
vlog(
Expand Down Expand Up @@ -217,9 +217,8 @@ do_execute_query(
if (pk_result.has_error()) {
co_return pk_result.error();
}
auto m_result = co_await io.download_manifest(
manifest_path(manifest_file.manifest_path),
std::move(pk_result.value()));
auto m_result = co_await io.download_manifest_uri(
manifest_file.manifest_path, std::move(pk_result.value()));

if (m_result.has_error()) {
vlog(
Expand Down
125 changes: 87 additions & 38 deletions src/v/iceberg/tests/manifest_io_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,13 @@ class ManifestIOTest
set_expectations_and_listen({});
}
auto& remote() { return sr->remote.local(); }

std::unique_ptr<cloud_io::scoped_remote> sr;
};

TEST_F(ManifestIOTest, TestManifestRoundtrip) {
schema s{
.schema_struct = std::get<struct_type>(test_nested_schema_type()),
.schema_id = schema::id_t{12},
.identifier_field_ids = {nested_field::id_t{1}},
};
partition_spec p{
manifest make_manifest() const {
schema s{
.schema_struct = std::get<struct_type>(test_nested_schema_type()),
.schema_id = schema::id_t{12},
.identifier_field_ids = {nested_field::id_t{1}},
};
partition_spec p{
.spec_id = partition_spec::id_t{8},
.fields = {
partition_field{
Expand All @@ -53,17 +49,44 @@ TEST_F(ManifestIOTest, TestManifestRoundtrip) {
},
},
};
manifest_metadata meta{
.schema = std::move(s),
.partition_spec = std::move(p),
.format_version = format_version::v1,
.manifest_content_type = manifest_content_type::data,
};
manifest m{
.metadata = std::move(meta),
.entries = {},
};
manifest_metadata meta{
.schema = std::move(s),
.partition_spec = std::move(p),
.format_version = format_version::v1,
.manifest_content_type = manifest_content_type::data,
};
return manifest{
.metadata = std::move(meta),
.entries = {},
};
}

manifest_list make_manifest_list() const {
manifest_list m;
for (int i = 0; i < 1024; i++) {
manifest_file file;
file.manifest_path = "path/to/file";
file.partition_spec_id = partition_spec::id_t{1};
file.content = manifest_file_content::data;
file.seq_number = sequence_number{3};
file.min_seq_number = sequence_number{4};
file.added_snapshot_id = snapshot_id{5};
file.added_files_count = 6;
file.existing_files_count = 7;
file.deleted_files_count = 8;
file.added_rows_count = 9;
file.existing_rows_count = 10;
file.deleted_rows_count = 11;
m.files.emplace_back(std::move(file));
}
return m;
}

std::unique_ptr<cloud_io::scoped_remote> sr;
};

TEST_F(ManifestIOTest, TestManifestRoundtrip) {
auto m = make_manifest();
// Missing manifest.
auto io = manifest_io(remote(), bucket_name);
auto test_path = manifest_path{"foo/bar/baz"};
Expand All @@ -83,23 +106,7 @@ TEST_F(ManifestIOTest, TestManifestRoundtrip) {
}

TEST_F(ManifestIOTest, TestManifestListRoundtrip) {
manifest_list m;
for (int i = 0; i < 1024; i++) {
manifest_file file;
file.manifest_path = "path/to/file";
file.partition_spec_id = partition_spec::id_t{1};
file.content = manifest_file_content::data;
file.seq_number = sequence_number{3};
file.min_seq_number = sequence_number{4};
file.added_snapshot_id = snapshot_id{5};
file.added_files_count = 6;
file.existing_files_count = 7;
file.deleted_files_count = 8;
file.added_rows_count = 9;
file.existing_rows_count = 10;
file.deleted_rows_count = 11;
m.files.emplace_back(std::move(file));
}
manifest_list m = make_manifest_list();

// Missing manifest list.
auto io = manifest_io(remote(), bucket_name);
Expand All @@ -118,6 +125,48 @@ TEST_F(ManifestIOTest, TestManifestListRoundtrip) {
ASSERT_EQ(m, m_roundtrip);
}

TEST_F(ManifestIOTest, TestManifestRoundtripURIs) {
auto m = make_manifest();
auto io = manifest_io(remote(), bucket_name);
auto path = manifest_path{"foo/bar/baz"};
auto test_uri = io.to_uri(path);
ASSERT_TRUE(test_uri.starts_with("s3://"));

auto up_res = io.upload_manifest(path, m).get();
ASSERT_FALSE(up_res.has_error());

// Use the URI string.
auto dl_res = io.download_manifest_uri(test_uri, empty_pk_type()).get();
ASSERT_FALSE(dl_res.has_error());
ASSERT_EQ(m, dl_res.value());

// As a safety measure, we'll still parse the raw path if given.
dl_res = io.download_manifest_uri(path().native(), empty_pk_type()).get();
ASSERT_FALSE(dl_res.has_error());
ASSERT_EQ(m, dl_res.value());
}

TEST_F(ManifestIOTest, TestManifestListRoundtripURIs) {
auto m = make_manifest_list();
auto io = manifest_io(remote(), bucket_name);
auto path = manifest_list_path{"foo/bar/baz"};
auto test_uri = io.to_uri(path);
ASSERT_TRUE(test_uri.starts_with("s3://"));

auto up_res = io.upload_manifest_list(path, m).get();
ASSERT_FALSE(up_res.has_error());

// Use the URI string.
auto dl_res = io.download_manifest_list_uri(test_uri).get();
ASSERT_FALSE(dl_res.has_error());
ASSERT_EQ(m, dl_res.value());

// As a safety measure, we'll still parse the raw path if given.
dl_res = io.download_manifest_list_uri(path().native()).get();
ASSERT_FALSE(dl_res.has_error());
ASSERT_EQ(m, dl_res.value());
}

TEST_F(ManifestIOTest, TestShutdown) {
auto test_path = manifest_path{"foo/bar/baz"};
sr->request_stop();
Expand Down
23 changes: 9 additions & 14 deletions src/v/iceberg/tests/merge_append_action_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,8 @@ TEST_F(MergeAppendActionTest, TestMergeByCount) {

auto latest_mlist_path
= table.snapshots.value().back().manifest_list_path;
auto latest_mlist = io.download_manifest_list(
manifest_list_path{latest_mlist_path})
.get();
auto latest_mlist
= io.download_manifest_list_uri(latest_mlist_path).get();
ASSERT_TRUE(latest_mlist.has_value());
ASSERT_EQ(latest_mlist.value().files.size(), expected_manifests);
}
Expand All @@ -144,8 +143,7 @@ TEST_F(MergeAppendActionTest, TestMergeByCount) {

// Validate that the latest snapshot indeed contains a single manifest.
auto latest_mlist_path = table.snapshots.value().back().manifest_list_path;
auto latest_mlist
= io.download_manifest_list(manifest_list_path{latest_mlist_path}).get();
auto latest_mlist = io.download_manifest_list_uri(latest_mlist_path).get();
ASSERT_TRUE(latest_mlist.has_value());
ASSERT_EQ(latest_mlist.value().files.size(), 1);
const auto& merged_mfile = latest_mlist.value().files[0];
Expand Down Expand Up @@ -186,9 +184,8 @@ TEST_F(MergeAppendActionTest, TestMergeByBytes) {

auto latest_mlist_path
= table.snapshots.value().back().manifest_list_path;
auto latest_mlist = io.download_manifest_list(
manifest_list_path{latest_mlist_path})
.get();
auto latest_mlist
= io.download_manifest_list_uri(latest_mlist_path).get();
ASSERT_TRUE(latest_mlist.has_value());
ASSERT_EQ(latest_mlist.value().files.size(), expected_manifests);
}
Expand All @@ -207,8 +204,7 @@ TEST_F(MergeAppendActionTest, TestMergeByBytes) {
// - the one containing 8 merged 1MB paths
// - the two we added that weren't merged
auto latest_mlist_path = table.snapshots.value().back().manifest_list_path;
auto latest_mlist
= io.download_manifest_list(manifest_list_path{latest_mlist_path}).get();
auto latest_mlist = io.download_manifest_list_uri(latest_mlist_path).get();
ASSERT_TRUE(latest_mlist.has_value());
ASSERT_EQ(latest_mlist.value().files.size(), 3);

Expand Down Expand Up @@ -296,9 +292,8 @@ TEST_F(MergeAppendActionTest, TestPartitionSummaries) {
// manifest (no merge yet).
auto latest_mlist_path
= table.snapshots.value().back().manifest_list_path;
auto latest_mlist_res = io.download_manifest_list(
manifest_list_path{latest_mlist_path})
.get();
auto latest_mlist_res
= io.download_manifest_list_uri(latest_mlist_path).get();
ASSERT_TRUE(latest_mlist_res.has_value());
ASSERT_EQ(expected_manifests, latest_mlist_res.value().files.size());

Expand All @@ -325,7 +320,7 @@ TEST_F(MergeAppendActionTest, TestPartitionSummaries) {
ASSERT_FALSE(res.has_error()) << res.error();
auto latest_mlist_path = table.snapshots.value().back().manifest_list_path;
auto latest_mlist_res
= io.download_manifest_list(manifest_list_path{latest_mlist_path}).get();
= io.download_manifest_list_uri(latest_mlist_path).get();
ASSERT_TRUE(latest_mlist_res.has_value());
ASSERT_EQ(1, latest_mlist_res.value().files.size());

Expand Down
8 changes: 4 additions & 4 deletions src/v/iceberg/tests/metadata_query_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ class MetadataQueryTest
chunked_vector<iceberg::manifest_file> files;

for (auto& s : *table.snapshots) {
auto m_list = co_await io.download_manifest_list(
manifest_list_path(s.manifest_list_path));
auto m_list = co_await io.download_manifest_list_uri(
s.manifest_list_path);
for (auto& f : m_list.assume_value().files) {
if (!paths.contains(f.manifest_path)) {
paths.emplace(f.manifest_path);
Expand All @@ -136,8 +136,8 @@ class MetadataQueryTest
chunked_vector<iceberg::manifest> manifests;
auto files = co_await collect_all_manifest_files(table);
for (auto& f : files) {
auto m = co_await io.download_manifest(
manifest_path(f.manifest_path), make_partition_key_type(table));
auto m = co_await io.download_manifest_uri(
f.manifest_path, make_partition_key_type(table));
manifests.push_back(std::move(m.assume_value()));
}
co_return manifests;
Expand Down

0 comments on commit 073aadc

Please sign in to comment.