Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg: add simple drop_table support for filesystem_catalog #24299

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/v/iceberg/filesystem_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,24 @@ filesystem_catalog::load_table(const table_identifier& table_ident) {
}

ss::future<checked<void, catalog::errc>>
filesystem_catalog::drop_table(const table_identifier&, bool) {
// TODO: implement
filesystem_catalog::drop_table(const table_identifier& table_id, bool) {
// Check that the table exists.
auto current_tmeta = co_await read_table_meta(table_id);
if (current_tmeta.has_error()) {
co_return current_tmeta.error();
}

// delete metadata
auto delete_res = co_await table_io_.delete_all_metadata(
metadata_location_path{
fmt::format("{}/metadata/", table_location(table_id))});
if (delete_res.has_error()) {
vlog(log.warn, "dropping table {} failed", table_id);
co_return to_catalog_errc(delete_res.error());
}

// TODO: support purging data

co_return outcome::success();
}

Expand Down
73 changes: 73 additions & 0 deletions src/v/iceberg/table_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,77 @@ table_io::version_hint_exists(const version_hint_path& path) {
return object_exists(path, "iceberg::version_hint");
}

ss::future<checked<std::nullopt_t, metadata_io::errc>>
table_io::delete_all_metadata(const metadata_location_path& path) {
retry_chain_node root_rcn(
io_.as(),
ss::lowres_clock::duration{30s},
100ms,
retry_strategy::polling);
retry_chain_logger ctxlog(log, root_rcn);

auto log_exception = [&](
const std::exception_ptr& ex, std::string_view ctx) {
auto level = ssx::is_shutdown_exception(ex) ? ss::log_level::debug
: ss::log_level::warn;
vlogl(ctxlog, level, "exception while {} in {}: {}", ctx, path, ex);
};

// deleting may require several iterations if list_objects doesn't return
// everything at once.
while (true) {
retry_chain_node list_rcn(10ms, retry_strategy::backoff, &root_rcn);
auto list_fut = co_await ss::coroutine::as_future(io_.list_objects(
bucket_, list_rcn, cloud_storage_clients::object_key{path}));
if (list_fut.failed()) {
log_exception(list_fut.get_exception(), "listing objects");
co_return errc::failed;
}

auto list_res = std::move(list_fut.get());
if (list_res.has_error()) {
co_return errc::failed;
}

chunked_vector<cloud_storage_clients::object_key> to_delete;
to_delete.reserve(list_res.value().contents.size());
for (auto& obj : list_res.value().contents) {
vlog(ctxlog.debug, "deleting metadata object {}", obj.key);
to_delete.emplace_back(std::move(obj.key));
}

retry_chain_node delete_rcn(10ms, retry_strategy::backoff, &root_rcn);
auto delete_fut = co_await ss::coroutine::as_future(io_.delete_objects(
bucket_, std::move(to_delete), delete_rcn, [](size_t) {}));
if (delete_fut.failed()) {
log_exception(delete_fut.get_exception(), "deleting objects");
co_return errc::failed;
}

switch (delete_fut.get()) {
case cloud_io::upload_result::success:
break;
case cloud_io::upload_result::timedout:
co_return errc::timedout;
case cloud_io::upload_result::cancelled:
co_return errc::shutting_down;
case cloud_io::upload_result::failed:
co_return errc::failed;
}

if (!list_res.value().is_truncated) {
// deleted everything
break;
}

auto retry = root_rcn.retry();
if (!retry.is_allowed) {
co_return errc::timedout;
}
co_await ss::sleep_abortable(retry.delay, *retry.abort_source);
}

co_return std::nullopt;
}

} // namespace iceberg
8 changes: 8 additions & 0 deletions src/v/iceberg/table_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

namespace iceberg {

// /<table-location>/metadata/
using metadata_location_path
= named_type<std::filesystem::path, struct metadata_location_path_tag>;
// /<table-location>/metadata/v0.metadata.json
using table_metadata_path
= named_type<std::filesystem::path, struct table_metadata_path_tag>;
// /<table-location>/metadata/version-hint.text
using version_hint_path
= named_type<std::filesystem::path, struct table_metadata_path_tag>;

Expand All @@ -40,6 +45,9 @@ class table_io : public metadata_io {

ss::future<checked<bool, metadata_io::errc>>
version_hint_exists(const version_hint_path& path);

ss::future<checked<std::nullopt_t, metadata_io::errc>>
delete_all_metadata(const metadata_location_path& path);
};

} // namespace iceberg
74 changes: 74 additions & 0 deletions src/v/iceberg/tests/filesystem_catalog_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,77 @@ TEST_F(FileSystemCatalogTest, TestCommit) {
ASSERT_FALSE(hint_exists_res.has_error());
ASSERT_FALSE(hint_exists_res.value());
}

TEST_F(FileSystemCatalogTest, TestDrop) {
const table_identifier id{.ns = {"ns"}, .table = "table"};

// Try dropping a non-existent table
{
auto res = catalog.drop_table(id, /*purge=*/true).get();
ASSERT_TRUE(res.has_error());
ASSERT_EQ(res.error(), catalog::errc::not_found);
}

// Create a table and do a metadata update
{
auto create_res
= catalog.create_table(id, schema{}, partition_spec{}).get();
ASSERT_FALSE(create_res.has_error());

transaction txn(std::move(create_res.value()));
auto set_schema_res = txn
.set_schema(schema{
.schema_struct = std::get<struct_type>(
test_nested_schema_type()),
.schema_id = schema::id_t{1},
.identifier_field_ids = {},
})
.get();
ASSERT_FALSE(set_schema_res.has_error());
auto tx_res = catalog.commit_txn(id, std::move(txn)).get();
ASSERT_FALSE(tx_res.has_error());
}

table_io io(remote(), bucket_name);
const auto v1_meta_path = table_metadata_path{
"test/ns/table/metadata/v1.metadata.json"};
const auto vhint_path = version_hint_path{
"test/ns/table/metadata/version-hint.text"};

// check the metadata files are there
{
auto meta_res = io.download_table_meta(v1_meta_path).get();
ASSERT_FALSE(meta_res.has_error());
auto vhint_res = io.download_version_hint(vhint_path).get();
ASSERT_FALSE(vhint_res.has_error());
}

// drop the table
{
auto res = catalog.drop_table(id, /*purge=*/true).get();
ASSERT_FALSE(res.has_error());
}

// check that the table is indeed deleted
{
auto load_res = catalog.load_table(id).get();
ASSERT_TRUE(load_res.has_error());
ASSERT_EQ(load_res.error(), catalog::errc::not_found);

auto meta_res = io.download_table_meta(v1_meta_path).get();
ASSERT_TRUE(meta_res.has_error());
EXPECT_EQ(meta_res.error(), metadata_io::errc::failed);

auto vhint_res = io.download_version_hint(vhint_path).get();
ASSERT_TRUE(vhint_res.has_error());
EXPECT_EQ(vhint_res.error(), metadata_io::errc::failed);
}

// check that we can create another instance of the same table
{
auto create_res
= catalog.create_table(id, schema{}, partition_spec{}).get();
ASSERT_FALSE(create_res.has_error());
ASSERT_EQ(create_res.value().last_sequence_number, sequence_number{0});
}
}
2 changes: 1 addition & 1 deletion tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def test_avro_schema(self, cloud_storage_type, query_engine):

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
filesystem_catalog_mode=[False])
filesystem_catalog_mode=[True, False])
def test_topic_lifecycle(self, cloud_storage_type,
filesystem_catalog_mode):
count = 100
Expand Down