Skip to content

Commit

Permalink
Merge pull request redpanda-data#24299 from ztlpn/iceberg-fs-catalog-…
Browse files Browse the repository at this point in the history
…delete

iceberg: add simple drop_table support for filesystem_catalog
  • Loading branch information
rockwotj authored Nov 26, 2024
2 parents fc82e5a + 049dd7d commit 65b4186
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 3 deletions.
20 changes: 18 additions & 2 deletions src/v/iceberg/filesystem_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,24 @@ filesystem_catalog::load_table(const table_identifier& table_ident) {
}

ss::future<checked<void, catalog::errc>>
filesystem_catalog::drop_table(const table_identifier&, bool) {
// TODO: implement
filesystem_catalog::drop_table(const table_identifier& table_id, bool) {
// Check that the table exists.
auto current_tmeta = co_await read_table_meta(table_id);
if (current_tmeta.has_error()) {
co_return current_tmeta.error();
}

// delete metadata
auto delete_res = co_await table_io_.delete_all_metadata(
metadata_location_path{
fmt::format("{}/metadata/", table_location(table_id))});
if (delete_res.has_error()) {
vlog(log.warn, "dropping table {} failed", table_id);
co_return to_catalog_errc(delete_res.error());
}

// TODO: support purging data

co_return outcome::success();
}

Expand Down
73 changes: 73 additions & 0 deletions src/v/iceberg/table_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,77 @@ table_io::version_hint_exists(const version_hint_path& path) {
return object_exists(path, "iceberg::version_hint");
}

ss::future<checked<std::nullopt_t, metadata_io::errc>>
table_io::delete_all_metadata(const metadata_location_path& path) {
retry_chain_node root_rcn(
io_.as(),
ss::lowres_clock::duration{30s},
100ms,
retry_strategy::polling);
retry_chain_logger ctxlog(log, root_rcn);

auto log_exception = [&](
const std::exception_ptr& ex, std::string_view ctx) {
auto level = ssx::is_shutdown_exception(ex) ? ss::log_level::debug
: ss::log_level::warn;
vlogl(ctxlog, level, "exception while {} in {}: {}", ctx, path, ex);
};

// deleting may require several iterations if list_objects doesn't return
// everything at once.
while (true) {
retry_chain_node list_rcn(10ms, retry_strategy::backoff, &root_rcn);
auto list_fut = co_await ss::coroutine::as_future(io_.list_objects(
bucket_, list_rcn, cloud_storage_clients::object_key{path}));
if (list_fut.failed()) {
log_exception(list_fut.get_exception(), "listing objects");
co_return errc::failed;
}

auto list_res = std::move(list_fut.get());
if (list_res.has_error()) {
co_return errc::failed;
}

chunked_vector<cloud_storage_clients::object_key> to_delete;
to_delete.reserve(list_res.value().contents.size());
for (auto& obj : list_res.value().contents) {
vlog(ctxlog.debug, "deleting metadata object {}", obj.key);
to_delete.emplace_back(std::move(obj.key));
}

retry_chain_node delete_rcn(10ms, retry_strategy::backoff, &root_rcn);
auto delete_fut = co_await ss::coroutine::as_future(io_.delete_objects(
bucket_, std::move(to_delete), delete_rcn, [](size_t) {}));
if (delete_fut.failed()) {
log_exception(delete_fut.get_exception(), "deleting objects");
co_return errc::failed;
}

switch (delete_fut.get()) {
case cloud_io::upload_result::success:
break;
case cloud_io::upload_result::timedout:
co_return errc::timedout;
case cloud_io::upload_result::cancelled:
co_return errc::shutting_down;
case cloud_io::upload_result::failed:
co_return errc::failed;
}

if (!list_res.value().is_truncated) {
// deleted everything
break;
}

auto retry = root_rcn.retry();
if (!retry.is_allowed) {
co_return errc::timedout;
}
co_await ss::sleep_abortable(retry.delay, *retry.abort_source);
}

co_return std::nullopt;
}

} // namespace iceberg
8 changes: 8 additions & 0 deletions src/v/iceberg/table_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

namespace iceberg {

// /<table-location>/metadata/
using metadata_location_path
= named_type<std::filesystem::path, struct metadata_location_path_tag>;
// /<table-location>/metadata/v0.metadata.json
using table_metadata_path
= named_type<std::filesystem::path, struct table_metadata_path_tag>;
// /<table-location>/metadata/version-hint.text
using version_hint_path
= named_type<std::filesystem::path, struct table_metadata_path_tag>;

Expand All @@ -40,6 +45,9 @@ class table_io : public metadata_io {

ss::future<checked<bool, metadata_io::errc>>
version_hint_exists(const version_hint_path& path);

ss::future<checked<std::nullopt_t, metadata_io::errc>>
delete_all_metadata(const metadata_location_path& path);
};

} // namespace iceberg
74 changes: 74 additions & 0 deletions src/v/iceberg/tests/filesystem_catalog_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,77 @@ TEST_F(FileSystemCatalogTest, TestCommit) {
ASSERT_FALSE(hint_exists_res.has_error());
ASSERT_FALSE(hint_exists_res.value());
}

TEST_F(FileSystemCatalogTest, TestDrop) {
const table_identifier id{.ns = {"ns"}, .table = "table"};

// Try dropping a non-existent table
{
auto res = catalog.drop_table(id, /*purge=*/true).get();
ASSERT_TRUE(res.has_error());
ASSERT_EQ(res.error(), catalog::errc::not_found);
}

// Create a table and do a metadata update
{
auto create_res
= catalog.create_table(id, schema{}, partition_spec{}).get();
ASSERT_FALSE(create_res.has_error());

transaction txn(std::move(create_res.value()));
auto set_schema_res = txn
.set_schema(schema{
.schema_struct = std::get<struct_type>(
test_nested_schema_type()),
.schema_id = schema::id_t{1},
.identifier_field_ids = {},
})
.get();
ASSERT_FALSE(set_schema_res.has_error());
auto tx_res = catalog.commit_txn(id, std::move(txn)).get();
ASSERT_FALSE(tx_res.has_error());
}

table_io io(remote(), bucket_name);
const auto v1_meta_path = table_metadata_path{
"test/ns/table/metadata/v1.metadata.json"};
const auto vhint_path = version_hint_path{
"test/ns/table/metadata/version-hint.text"};

// check the metadata files are there
{
auto meta_res = io.download_table_meta(v1_meta_path).get();
ASSERT_FALSE(meta_res.has_error());
auto vhint_res = io.download_version_hint(vhint_path).get();
ASSERT_FALSE(vhint_res.has_error());
}

// drop the table
{
auto res = catalog.drop_table(id, /*purge=*/true).get();
ASSERT_FALSE(res.has_error());
}

// check that the table is indeed deleted
{
auto load_res = catalog.load_table(id).get();
ASSERT_TRUE(load_res.has_error());
ASSERT_EQ(load_res.error(), catalog::errc::not_found);

auto meta_res = io.download_table_meta(v1_meta_path).get();
ASSERT_TRUE(meta_res.has_error());
EXPECT_EQ(meta_res.error(), metadata_io::errc::failed);

auto vhint_res = io.download_version_hint(vhint_path).get();
ASSERT_TRUE(vhint_res.has_error());
EXPECT_EQ(vhint_res.error(), metadata_io::errc::failed);
}

// check that we can create another instance of the same table
{
auto create_res
= catalog.create_table(id, schema{}, partition_spec{}).get();
ASSERT_FALSE(create_res.has_error());
ASSERT_EQ(create_res.value().last_sequence_number, sequence_number{0});
}
}
2 changes: 1 addition & 1 deletion tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def test_avro_schema(self, cloud_storage_type, query_engine):

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
filesystem_catalog_mode=[False])
filesystem_catalog_mode=[True, False])
def test_topic_lifecycle(self, cloud_storage_type,
filesystem_catalog_mode):
count = 100
Expand Down

0 comments on commit 65b4186

Please sign in to comment.