From ae698ab24d7cd786e10d3ee4a29abd42f8276d90 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Tue, 15 Oct 2024 15:50:48 +0530 Subject: [PATCH 01/14] http: Add url-build-error type A named type is added to denote a url build error instead of ss::sstring. This avoids issues due to implicit type conversions when the error is part of a variant. --- src/v/http/BUILD | 1 + src/v/http/request_builder.cc | 2 +- src/v/http/request_builder.h | 8 +++++--- src/v/iceberg/rest_client/BUILD | 1 + src/v/iceberg/rest_client/types.h | 8 ++++++-- 5 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/v/http/BUILD b/src/v/http/BUILD index 0f8ab2bb2e1c9..53fcab2a2ef99 100644 --- a/src/v/http/BUILD +++ b/src/v/http/BUILD @@ -67,6 +67,7 @@ redpanda_cc_library( "//src/v/base", "//src/v/http:utils", "//src/v/thirdparty/ada", + "//src/v/utils:named_type", "@abseil-cpp//absl/container:flat_hash_map", "@abseil-cpp//absl/container:flat_hash_set", "@boost//:algorithm", diff --git a/src/v/http/request_builder.cc b/src/v/http/request_builder.cc index 415ef6c25f4e5..38bfd0d425579 100644 --- a/src/v/http/request_builder.cc +++ b/src/v/http/request_builder.cc @@ -34,7 +34,7 @@ namespace http { request_builder& request_builder::host(std::string_view host) { _url = ada::parse(host); if (!_url) { - _error = fmt::format("failed to parse host: {}", host); + _error = url_build_error{fmt::format("failed to parse host: {}", host)}; } else { // Mark url state as good, the default state is that the host is not set _error = std::nullopt; diff --git a/src/v/http/request_builder.h b/src/v/http/request_builder.h index f5260160d5a2c..716b7b606f720 100644 --- a/src/v/http/request_builder.h +++ b/src/v/http/request_builder.h @@ -13,6 +13,7 @@ #include "base/seastarx.h" #include "thirdparty/ada/ada.h" +#include "utils/named_type.h" #include @@ -22,15 +23,16 @@ namespace http { +using url_build_error = named_type; + // Builds a request using the builder pattern. Allows setting the host, target // (path), method, headers and query params. class request_builder { public: - using error_type = ss::sstring; static constexpr auto default_state{"host not set"}; using expected - = tl::expected, error_type>; + = tl::expected, url_build_error>; // The host supplied here is parsed and stored as a result. When the request // is finally built this is added as a host header. If the parse failed then @@ -81,7 +83,7 @@ class request_builder { boost::beast::http::request_header<> _request; absl::flat_hash_map _query_params_kv; absl::flat_hash_set _query_params; - std::optional _error{default_state}; + std::optional _error{default_state}; }; } // namespace http diff --git a/src/v/iceberg/rest_client/BUILD b/src/v/iceberg/rest_client/BUILD index 5dd326a6ab036..0939355c7b84e 100644 --- a/src/v/iceberg/rest_client/BUILD +++ b/src/v/iceberg/rest_client/BUILD @@ -10,6 +10,7 @@ redpanda_cc_library( include_prefix = "iceberg/rest_client", deps = [ "//src/v/base", + "//src/v/http:request_builder", "//src/v/thirdparty/ada", "//src/v/utils:named_type", "@boost//:beast", diff --git a/src/v/iceberg/rest_client/types.h b/src/v/iceberg/rest_client/types.h index 66cfd4a3c516b..451d6de619bfc 100644 --- a/src/v/iceberg/rest_client/types.h +++ b/src/v/iceberg/rest_client/types.h @@ -11,6 +11,7 @@ #pragma once #include "base/seastarx.h" +#include "http/request_builder.h" #include "utils/named_type.h" #include @@ -41,8 +42,11 @@ struct retries_exhausted { // Represents the sum of all error types which can be encountered during // rest-client operations. -using domain_error - = std::variant; +using domain_error = std::variant< + http::url_build_error, + json_parse_error, + http_call_error, + retries_exhausted>; // The core result type used by all operations in the iceberg/rest-client which // can fail. Allows chaining of operations together and short-circuiting when an From cbb011bcd2f5b2168470492541ffeadedbe4f1b7 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Mon, 14 Oct 2024 12:55:25 +0530 Subject: [PATCH 02/14] iceberg/rest-client: Add table rest entity A table entity exposes common operations on iceberg tables --- src/v/iceberg/CMakeLists.txt | 1 + src/v/iceberg/rest_client/BUILD | 16 +++++++++++++++ src/v/iceberg/rest_client/entities.cc | 26 ++++++++++++++++++++++++ src/v/iceberg/rest_client/entities.h | 29 +++++++++++++++++++++++++++ 4 files changed, 72 insertions(+) create mode 100644 src/v/iceberg/rest_client/entities.cc create mode 100644 src/v/iceberg/rest_client/entities.h diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 5a4683ae00e76..6a0d89f0a0f34 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -68,6 +68,7 @@ v_cc_library( rest_client/parsers.cc rest_client/retry_policy.cc rest_client/catalog_client.cc + rest_client/entities.cc DEPS Avro::avro v::bytes diff --git a/src/v/iceberg/rest_client/BUILD b/src/v/iceberg/rest_client/BUILD index 0939355c7b84e..dcba55cd39b8a 100644 --- a/src/v/iceberg/rest_client/BUILD +++ b/src/v/iceberg/rest_client/BUILD @@ -50,6 +50,22 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "entities", + srcs = [ + "entities.cc", + ], + hdrs = [ + "entities.h", + ], + include_prefix = "iceberg/rest_client", + deps = [ + "//src/v/container:fragmented_vector", + "//src/v/http:rest_entity", + "//src/v/ssx:sformat", + ], +) + redpanda_cc_library( name = "rest_catalog_client", srcs = [ diff --git a/src/v/iceberg/rest_client/entities.cc b/src/v/iceberg/rest_client/entities.cc new file mode 100644 index 0000000000000..6d19a68a050b2 --- /dev/null +++ b/src/v/iceberg/rest_client/entities.cc @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "iceberg/rest_client/entities.h" + +#include "ssx/sformat.h" + +namespace iceberg::rest_client { + +table::table( + std::string_view root_url, const chunked_vector& namespace_parts) + : rest_entity(root_url) + , _namespace{ssx::sformat("{}", fmt::join(namespace_parts, "\x1f"))} {} + +ss::sstring table::resource_name() const { + return fmt::format("namespaces/{}/tables", _namespace); +} + +} // namespace iceberg::rest_client diff --git a/src/v/iceberg/rest_client/entities.h b/src/v/iceberg/rest_client/entities.h new file mode 100644 index 0000000000000..f7ef7f88c7583 --- /dev/null +++ b/src/v/iceberg/rest_client/entities.h @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "container/fragmented_vector.h" +#include "http/rest_client/rest_entity.h" + +namespace iceberg::rest_client { + +struct table : public http::rest_client::rest_entity { + table( + std::string_view root_url, + const chunked_vector& namespace_parts); + + ss::sstring resource_name() const final; + +private: + ss::sstring _namespace; +}; + +} // namespace iceberg::rest_client From 73567a0acf13d08f47cbd66cfdc63dc3d2220d36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Mon, 21 Oct 2024 15:30:40 +0200 Subject: [PATCH 03/14] iceberg/client: re-organized http client life cycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously an `http::client` underlying the Iceberg catalog client must have been recreated with every request. Change the life cycle of the underlying HTTP client so that the `iceberg::catalog_client` owns the underlying HTTP client instance. This way a caller can still control the client internals while the life cycle is being managed by the catalog_client. Signed-off-by: Michał Maślanka --- src/v/iceberg/rest_client/catalog_client.cc | 16 +-- src/v/iceberg/rest_client/catalog_client.h | 14 +- .../rest_client/tests/catalog_client_tests.cc | 135 +++++++++--------- 3 files changed, 70 insertions(+), 95 deletions(-) diff --git a/src/v/iceberg/rest_client/catalog_client.cc b/src/v/iceberg/rest_client/catalog_client.cc index 036188af4b38a..ee4e3b28d3469 100644 --- a/src/v/iceberg/rest_client/catalog_client.cc +++ b/src/v/iceberg/rest_client/catalog_client.cc @@ -52,7 +52,7 @@ expected parse_json(iobuf&& raw_response) { } catalog_client::catalog_client( - client_source& client_source, + std::unique_ptr http_client, ss::sstring endpoint, credentials credentials, std::optional base_path, @@ -60,7 +60,7 @@ catalog_client::catalog_client( std::optional api_version, std::optional token, std::unique_ptr retry_policy) - : _client_source{client_source} + : _http_client(std::move(http_client)) , _endpoint{std::move(endpoint)} , _credentials{std::move(credentials)} , _path_components{std::move(base_path), std::move(prefix), std::move(api_version)} @@ -123,11 +123,9 @@ ss::future> catalog_client::perform_request( std::vector retriable_errors{}; - auto client_ptr = _client_source.get().acquire(); while (true) { const auto permit = rtc.retry(); if (!permit.is_allowed) { - co_await client_ptr->shutdown_and_stop(); co_return tl::unexpected( retries_exhausted{.errors = std::move(retriable_errors)}); } @@ -137,30 +135,22 @@ ss::future> catalog_client::perform_request( request_payload.emplace(payload->copy()); } auto response_f = co_await ss::coroutine::as_future( - client_ptr->request_and_collect_response( + _http_client->request_and_collect_response( std::move(request.value()), std::move(request_payload))); auto call_res = _retry_policy->should_retry(std::move(response_f)); if (call_res.has_value()) { - co_await client_ptr->shutdown_and_stop(); co_return std::move(call_res->body); } auto& error = call_res.error(); if (!error.can_be_retried) { - co_await client_ptr->shutdown_and_stop(); co_return tl::unexpected(std::move(error.err)); } - if (error.is_transport_error()) { - co_await client_ptr->shutdown_and_stop(); - client_ptr = _client_source.get().acquire(); - } - retriable_errors.emplace_back(std::move(error.err)); co_await ss::sleep_abortable(permit.delay, rtc.root_abort_source()); } - co_await client_ptr->shutdown_and_stop(); } path_components::path_components( diff --git a/src/v/iceberg/rest_client/catalog_client.h b/src/v/iceberg/rest_client/catalog_client.h index 5c233dfaedd5a..d3995ad6375d3 100644 --- a/src/v/iceberg/rest_client/catalog_client.h +++ b/src/v/iceberg/rest_client/catalog_client.h @@ -28,16 +28,6 @@ using base_path = named_type; using prefix_path = named_type; using api_version = named_type; -// A client source generates low level http clients for the catalog client to -// make API calls with. Once a generic http client pool is implemented, it can -// use the same interface and hand out client leases instead of unique ptrs. -struct client_source { - // A client returned by this method call is owned by the caller. It should - // be shut down after use by the caller. - virtual std::unique_ptr acquire() = 0; - virtual ~client_source() = default; -}; - // Holds parts of a root path used by catalog client struct path_components { path_components( @@ -87,7 +77,7 @@ class catalog_client { /// if valid. If expired, a new one will be acquired \param retry_policy a /// retry policy used to determine how failing calls will be retried catalog_client( - client_source& client_source, + std::unique_ptr client, ss::sstring endpoint, credentials credentials, std::optional base_path = std::nullopt, @@ -118,7 +108,7 @@ class catalog_client { http::request_builder request_builder, std::optional payload = std::nullopt); - std::reference_wrapper _client_source; + std::unique_ptr _http_client; ss::sstring _endpoint; credentials _credentials; path_components _path_components; diff --git a/src/v/iceberg/rest_client/tests/catalog_client_tests.cc b/src/v/iceberg/rest_client/tests/catalog_client_tests.cc index d36293bde8106..9d0452f9bf546 100644 --- a/src/v/iceberg/rest_client/tests/catalog_client_tests.cc +++ b/src/v/iceberg/rest_client/tests/catalog_client_tests.cc @@ -46,27 +46,12 @@ class mock_client : public http::abstract_client { MOCK_METHOD(ss::future<>, shutdown_and_stop, (), (override)); }; -// A client source for testing. Allows setting expectation on a client before -// returning to the catalog client. -struct client_source : public r::client_source { - client_source( - std::function)> expectation) - : _expectation{std::move(expectation)} {} - - std::unique_ptr acquire() final { - auto ptr = std::make_unique(); - _expectation(*(ptr.get())); - - // Client passed to caller should always be shut down, and exactly once - EXPECT_CALL(*(ptr.get()), shutdown_and_stop()) - .Times(1) - .WillOnce(t::Return(ss::make_ready_future())); - return ptr; - } - -private: - std::function)> _expectation; -}; +std::unique_ptr +make_http_client(std::function set_expectations) { + auto client = std::make_unique(); + set_expectations(*client); + return client; +} namespace iceberg::rest_client { @@ -109,9 +94,8 @@ TEST(path_components, path_with_prefix) { } TEST(client, root_url_computed) { - client_source cs{[](mock_client&) {}}; r::catalog_client cc{ - cs, + make_http_client([](mock_client&) {}), endpoint, credentials, r::base_path{"api/catalog/"}, @@ -146,11 +130,13 @@ ss::future validate_token_request( } TEST(token_tests, acquire_token) { - client_source cs{[](mock_client& m) { - EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) - .WillOnce(validate_token_request); - }}; - r::catalog_client cc{cs, endpoint, credentials}; + r::catalog_client cc{ + make_http_client([](mock_client& m) { + EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) + .WillOnce(validate_token_request); + }), + endpoint, + credentials}; r::catalog_client_tester t{cc}; auto token = t.get_current_token().get(); ASSERT_TRUE(token.has_value()); @@ -158,13 +144,13 @@ TEST(token_tests, acquire_token) { } TEST(token_tests, supplied_token_used) { - client_source cs{[](mock_client& m) { - EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)).Times(0); - }}; const r::oauth_token supplied_token{ .token = "t", .expires_at = ss::lowres_clock::now() + 1h}; r::catalog_client cc{ - cs, + make_http_client([](mock_client& m) { + EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) + .Times(0); + }), endpoint, credentials, std::nullopt, @@ -179,14 +165,13 @@ TEST(token_tests, supplied_token_used) { } TEST(token_tests, supplied_token_expired) { - client_source cs{[](mock_client& m) { - EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) - .WillOnce(validate_token_request); - }}; const r::oauth_token expired_token{ .token = "t", .expires_at = ss::lowres_clock::now()}; r::catalog_client cc{ - cs, + make_http_client([](mock_client& m) { + EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) + .WillOnce(validate_token_request); + }), endpoint, credentials, std::nullopt, @@ -202,13 +187,16 @@ TEST(token_tests, supplied_token_expired) { } TEST(token_tests, handle_bad_json) { - client_source cs{[](mock_client& m) { - EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) - .WillOnce(t::Return(ss::make_ready_future( - http::downloaded_response{ - .status = bh::status::ok, .body = iobuf::from(R"J({)J")}))); - }}; - r::catalog_client cc{cs, endpoint, credentials}; + r::catalog_client cc{ + make_http_client([](mock_client& m) { + EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) + .WillOnce( + t::Return(ss::make_ready_future( + http::downloaded_response{ + .status = bh::status::ok, .body = iobuf::from(R"J({)J")}))); + }), + endpoint, + credentials}; r::catalog_client_tester t{cc}; auto token = t.get_current_token().get(); ASSERT_FALSE(token.has_value()); @@ -219,14 +207,16 @@ TEST(token_tests, handle_bad_json) { } TEST(token_tests, handle_non_retriable_http_status) { - client_source cs{[](mock_client& m) { - EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) - .WillOnce(t::Return(ss::make_ready_future( - http::downloaded_response{ - .status = bh::status::bad_request, .body = iobuf()}))); - }}; - - r::catalog_client cc{cs, endpoint, credentials}; + r::catalog_client cc{ + make_http_client([](mock_client& m) { + EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) + .WillOnce( + t::Return(ss::make_ready_future( + http::downloaded_response{ + .status = bh::status::bad_request, .body = iobuf()}))); + }), + endpoint, + credentials}; r::catalog_client_tester t{cc}; auto token = t.get_current_token().get(); @@ -238,18 +228,22 @@ TEST(token_tests, handle_non_retriable_http_status) { } TEST(token_tests, handle_retriable_http_status) { - client_source cs{[](mock_client& m) { - EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) - .WillOnce(t::Return(ss::make_ready_future( - http::downloaded_response{ - .status = bh::status::gateway_timeout, .body = iobuf()}))) - .WillOnce(t::Return(ss::make_ready_future( - http::downloaded_response{ - .status = bh::status::ok, - .body = iobuf::from( - R"J({"access_token": "token", "expires_in": 1})J")}))); - }}; - r::catalog_client cc{cs, endpoint, credentials}; + r::catalog_client cc{ + make_http_client([](mock_client& m) { + EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) + .WillOnce( + t::Return(ss::make_ready_future( + http::downloaded_response{ + .status = bh::status::gateway_timeout, .body = iobuf()}))) + .WillOnce( + t::Return(ss::make_ready_future( + http::downloaded_response{ + .status = bh::status::ok, + .body = iobuf::from( + R"J({"access_token": "token", "expires_in": 1})J")}))); + }), + endpoint, + credentials}; r::catalog_client_tester t{cc}; auto token = t.get_current_token().get(); @@ -266,12 +260,13 @@ TEST(token_tests, handle_retries_exhausted) { http::downloaded_response{.status = bh::status::gateway_timeout}); }; - client_source cs{[&ret](mock_client& m) { - EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) - .WillRepeatedly(ret); - }}; - - r::catalog_client cc{cs, endpoint, credentials}; + r::catalog_client cc{ + make_http_client([&ret](mock_client& m) { + EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) + .WillRepeatedly(ret); + }), + endpoint, + credentials}; r::catalog_client_tester t{cc}; auto token = t.get_current_token().get(); From 7e57b2ec59546b9cbd4e06761ef4030df2c320d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Mon, 21 Oct 2024 15:47:47 +0200 Subject: [PATCH 04/14] iceberg/client: reorganized includes in types.h MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/iceberg/rest_client/types.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/iceberg/rest_client/types.h b/src/v/iceberg/rest_client/types.h index 451d6de619bfc..97503e9f46ba3 100644 --- a/src/v/iceberg/rest_client/types.h +++ b/src/v/iceberg/rest_client/types.h @@ -12,12 +12,12 @@ #include "base/seastarx.h" #include "http/request_builder.h" +#include "thirdparty/ada/ada.h" #include "utils/named_type.h" #include #include -#include #include namespace iceberg::rest_client { From a633aa38a30cf5ecec7866f5ea51640f627dfe65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Mon, 21 Oct 2024 15:48:35 +0200 Subject: [PATCH 05/14] iceberg/client: made error types printable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/iceberg/CMakeLists.txt | 1 + src/v/iceberg/rest_client/BUILD | 3 ++ src/v/iceberg/rest_client/types.cc | 55 ++++++++++++++++++++++++++++++ src/v/iceberg/rest_client/types.h | 15 ++++++++ 4 files changed, 74 insertions(+) create mode 100644 src/v/iceberg/rest_client/types.cc diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 6a0d89f0a0f34..46477192c3b71 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -69,6 +69,7 @@ v_cc_library( rest_client/retry_policy.cc rest_client/catalog_client.cc rest_client/entities.cc + rest_client/types.cc DEPS Avro::avro v::bytes diff --git a/src/v/iceberg/rest_client/BUILD b/src/v/iceberg/rest_client/BUILD index dcba55cd39b8a..124c512beb53d 100644 --- a/src/v/iceberg/rest_client/BUILD +++ b/src/v/iceberg/rest_client/BUILD @@ -4,6 +4,9 @@ package(default_visibility = ["//src/v/iceberg:__subpackages__"]) redpanda_cc_library( name = "rest_client_types", + srcs = [ + "types.cc", + ], hdrs = [ "types.h", ], diff --git a/src/v/iceberg/rest_client/types.cc b/src/v/iceberg/rest_client/types.cc new file mode 100644 index 0000000000000..d58db1b27c2fc --- /dev/null +++ b/src/v/iceberg/rest_client/types.cc @@ -0,0 +1,55 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "iceberg/rest_client/types.h" + +namespace { +struct domain_error_printing_visitor { + fmt::format_context* ctx; + + auto operator()(const http::url_build_error& err) const { + return fmt::format_to(ctx->out(), "url_build_error: {}", err); + } + + auto operator()(const iceberg::rest_client::json_parse_error& err) const { + return fmt::format_to( + ctx->out(), + "json_parse_error: context: {}, error: {}", + err.context, + err.error); + } + + auto operator()(const iceberg::rest_client::http_call_error& err) const { + return fmt::format_to(ctx->out(), "{}", err); + } + + auto operator()(const iceberg::rest_client::retries_exhausted& err) const { + return fmt::format_to( + ctx->out(), "retries_exhausted:[{}]", fmt::join(err.errors, ", ")); + } +}; +} // namespace + +auto fmt::formatter::format( + const iceberg::rest_client::http_call_error& err, + fmt::format_context& ctx) const -> decltype(ctx.out()) { + return std::visit( + [&ctx](const auto& http_call_error) { + return fmt::format_to( + ctx.out(), "http_call_error: {}", http_call_error); + }, + err); +} + +auto fmt::formatter::format( + const iceberg::rest_client::domain_error& err, + fmt::format_context& ctx) const -> decltype(ctx.out()) { + return std::visit(domain_error_printing_visitor{.ctx = &ctx}, err); +} diff --git a/src/v/iceberg/rest_client/types.h b/src/v/iceberg/rest_client/types.h index 97503e9f46ba3..778159b5efa80 100644 --- a/src/v/iceberg/rest_client/types.h +++ b/src/v/iceberg/rest_client/types.h @@ -68,3 +68,18 @@ struct credentials { }; } // namespace iceberg::rest_client +template<> +struct fmt::formatter + : fmt::formatter { + auto format( + const iceberg::rest_client::domain_error&, + fmt::format_context& ctx) const -> decltype(ctx.out()); +}; + +template<> +struct fmt::formatter + : fmt::formatter { + auto format( + const iceberg::rest_client::http_call_error&, + fmt::format_context& ctx) const -> decltype(ctx.out()); +}; From 1b8ec09ef3362af3e119fcbbbe84464d64589b2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 22 Oct 2024 14:43:11 +0200 Subject: [PATCH 06/14] iceberg: added parse_optional_str to json utils MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/iceberg/json_utils.cc | 13 +++++++++++++ src/v/iceberg/json_utils.h | 3 +++ 2 files changed, 16 insertions(+) diff --git a/src/v/iceberg/json_utils.cc b/src/v/iceberg/json_utils.cc index 66529c5049e28..383f516d19da9 100644 --- a/src/v/iceberg/json_utils.cc +++ b/src/v/iceberg/json_utils.cc @@ -148,6 +148,19 @@ parse_optional_i64(const json::Value& v, std::string_view member_name) { return json->get().GetInt64(); } +std::optional +parse_optional_str(const json::Value& v, std::string_view member_name) { + const auto json = parse_optional(v, member_name); + if (!json.has_value()) { + return std::nullopt; + } + if (!json->get().IsString()) { + throw std::invalid_argument( + fmt::format("Expected string for field '{}'", member_name)); + } + return json->get().GetString(); +} + bool parse_required_bool(const json::Value& v, std::string_view member_name) { const auto& bool_json = parse_required(v, member_name); if (!bool_json.IsBool()) { diff --git a/src/v/iceberg/json_utils.h b/src/v/iceberg/json_utils.h index 2562f326f4610..5670d073a6eaf 100644 --- a/src/v/iceberg/json_utils.h +++ b/src/v/iceberg/json_utils.h @@ -39,6 +39,9 @@ parse_optional_i32(const json::Value& v, std::string_view member_name); std::optional parse_optional_i64(const json::Value& v, std::string_view member_name); +std::optional +parse_optional_str(const json::Value& v, std::string_view member_name); + bool parse_required_bool(const json::Value& v, std::string_view member_name); std::string_view From 0422391c559948bfdc564e104fab64aa34585f34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 22 Oct 2024 14:22:46 +0200 Subject: [PATCH 07/14] iceberg/rc: moved credentials to separate file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/iceberg/rest_client/BUILD | 13 ++++++++++++ src/v/iceberg/rest_client/catalog_client.h | 1 + src/v/iceberg/rest_client/credentials.h | 24 ++++++++++++++++++++++ src/v/iceberg/rest_client/types.h | 7 ------- 4 files changed, 38 insertions(+), 7 deletions(-) create mode 100644 src/v/iceberg/rest_client/credentials.h diff --git a/src/v/iceberg/rest_client/BUILD b/src/v/iceberg/rest_client/BUILD index 124c512beb53d..03d202778d9c5 100644 --- a/src/v/iceberg/rest_client/BUILD +++ b/src/v/iceberg/rest_client/BUILD @@ -53,6 +53,18 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "credentials", + hdrs = [ + "credentials.h", + ], + include_prefix = "iceberg/rest_client", + deps = [ + "//src/v/base", + "@seastar", + ], +) + redpanda_cc_library( name = "entities", srcs = [ @@ -79,6 +91,7 @@ redpanda_cc_library( ], include_prefix = "iceberg/rest_client", deps = [ + ":credentials", ":rest_client_parsers", ":rest_client_types", ":retry_policy", diff --git a/src/v/iceberg/rest_client/catalog_client.h b/src/v/iceberg/rest_client/catalog_client.h index d3995ad6375d3..36ec24aa2d8ff 100644 --- a/src/v/iceberg/rest_client/catalog_client.h +++ b/src/v/iceberg/rest_client/catalog_client.h @@ -13,6 +13,7 @@ #include "bytes/iobuf.h" #include "http/client.h" #include "http/request_builder.h" +#include "iceberg/rest_client/credentials.h" #include "iceberg/rest_client/retry_policy.h" #include "iceberg/rest_client/types.h" #include "json/document.h" diff --git a/src/v/iceberg/rest_client/credentials.h b/src/v/iceberg/rest_client/credentials.h new file mode 100644 index 0000000000000..3d57b6d64c184 --- /dev/null +++ b/src/v/iceberg/rest_client/credentials.h @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "base/seastarx.h" + +#include + +#pragma once + +namespace iceberg::rest_client { +// Static credentials expected to be supplied by redpanda when requesting an +// oauth token +struct credentials { + ss::sstring client_id; + ss::sstring client_secret; +}; + +}; // namespace iceberg::rest_client diff --git a/src/v/iceberg/rest_client/types.h b/src/v/iceberg/rest_client/types.h index 778159b5efa80..ccaff251b08f7 100644 --- a/src/v/iceberg/rest_client/types.h +++ b/src/v/iceberg/rest_client/types.h @@ -60,13 +60,6 @@ struct oauth_token { ss::lowres_clock::time_point expires_at; }; -// Static credentials expected to be supplied by redpanda when requesting an -// oauth token -struct credentials { - ss::sstring client_id; - ss::sstring client_secret; -}; - } // namespace iceberg::rest_client template<> struct fmt::formatter From e890bc71a83d2906143ed5d819fedc9f2a069182 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 22 Oct 2024 14:27:17 +0200 Subject: [PATCH 08/14] iceberg/rc: moved oauth_token to separate file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/iceberg/rest_client/BUILD | 14 ++++++++++++ src/v/iceberg/rest_client/catalog_client.h | 1 + src/v/iceberg/rest_client/oauth_token.h | 25 ++++++++++++++++++++++ src/v/iceberg/rest_client/parsers.h | 1 + src/v/iceberg/rest_client/types.h | 6 ------ 5 files changed, 41 insertions(+), 6 deletions(-) create mode 100644 src/v/iceberg/rest_client/oauth_token.h diff --git a/src/v/iceberg/rest_client/BUILD b/src/v/iceberg/rest_client/BUILD index 03d202778d9c5..f6d487aae802a 100644 --- a/src/v/iceberg/rest_client/BUILD +++ b/src/v/iceberg/rest_client/BUILD @@ -31,6 +31,7 @@ redpanda_cc_library( ], include_prefix = "iceberg/rest_client", deps = [ + ":oauth_token", ":rest_client_types", "//src/v/json", "//src/v/utils:named_type", @@ -65,6 +66,18 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "oauth_token", + hdrs = [ + "oauth_token.h", + ], + include_prefix = "iceberg/rest_client", + deps = [ + "//src/v/base", + "@seastar", + ], +) + redpanda_cc_library( name = "entities", srcs = [ @@ -92,6 +105,7 @@ redpanda_cc_library( include_prefix = "iceberg/rest_client", deps = [ ":credentials", + ":oauth_token", ":rest_client_parsers", ":rest_client_types", ":retry_policy", diff --git a/src/v/iceberg/rest_client/catalog_client.h b/src/v/iceberg/rest_client/catalog_client.h index 36ec24aa2d8ff..dfb6fbb0853bd 100644 --- a/src/v/iceberg/rest_client/catalog_client.h +++ b/src/v/iceberg/rest_client/catalog_client.h @@ -14,6 +14,7 @@ #include "http/client.h" #include "http/request_builder.h" #include "iceberg/rest_client/credentials.h" +#include "iceberg/rest_client/oauth_token.h" #include "iceberg/rest_client/retry_policy.h" #include "iceberg/rest_client/types.h" #include "json/document.h" diff --git a/src/v/iceberg/rest_client/oauth_token.h b/src/v/iceberg/rest_client/oauth_token.h new file mode 100644 index 0000000000000..94137a6972789 --- /dev/null +++ b/src/v/iceberg/rest_client/oauth_token.h @@ -0,0 +1,25 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "base/seastarx.h" + +#include +#include + +namespace iceberg::rest_client { +// Oauth token returned by the catalog server, in exchange for credentials +struct oauth_token { + ss::sstring token; + ss::lowres_clock::time_point expires_at; +}; + +} // namespace iceberg::rest_client diff --git a/src/v/iceberg/rest_client/parsers.h b/src/v/iceberg/rest_client/parsers.h index a5c94fc7a1a92..dc783511f009c 100644 --- a/src/v/iceberg/rest_client/parsers.h +++ b/src/v/iceberg/rest_client/parsers.h @@ -10,6 +10,7 @@ #pragma once +#include "iceberg/rest_client/oauth_token.h" #include "iceberg/rest_client/types.h" #include "json/document.h" #include "json/schema.h" diff --git a/src/v/iceberg/rest_client/types.h b/src/v/iceberg/rest_client/types.h index ccaff251b08f7..ded563dbf7691 100644 --- a/src/v/iceberg/rest_client/types.h +++ b/src/v/iceberg/rest_client/types.h @@ -54,12 +54,6 @@ using domain_error = std::variant< template using expected = tl::expected; -// Oauth token returned by the catalog server, in exchange for credentials -struct oauth_token { - ss::sstring token; - ss::lowres_clock::time_point expires_at; -}; - } // namespace iceberg::rest_client template<> struct fmt::formatter From 32709064e0ce82a2048d02ec2cf4af0b72908a04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 22 Oct 2024 14:44:51 +0200 Subject: [PATCH 09/14] iceberg/rc: added missing fields to oauth_token MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/iceberg/rest_client/oauth_token.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/v/iceberg/rest_client/oauth_token.h b/src/v/iceberg/rest_client/oauth_token.h index 94137a6972789..d815652b47630 100644 --- a/src/v/iceberg/rest_client/oauth_token.h +++ b/src/v/iceberg/rest_client/oauth_token.h @@ -19,7 +19,10 @@ namespace iceberg::rest_client { // Oauth token returned by the catalog server, in exchange for credentials struct oauth_token { ss::sstring token; + ss::sstring token_type; ss::lowres_clock::time_point expires_at; + std::optional refresh_token; + std::optional scope; }; } // namespace iceberg::rest_client From 4484073eec5bfe00e30d00861f02eca1bc31369d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 22 Oct 2024 15:20:46 +0200 Subject: [PATCH 10/14] iceberg/rc: reorganized rest client package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reorganized Icebert REST client package to make more clear boundaries between the dependencies and make it easier to extend the client in future. Signed-off-by: Michał Maślanka --- src/v/iceberg/CMakeLists.txt | 4 +- src/v/iceberg/rest_client/BUILD | 23 +++--- src/v/iceberg/rest_client/catalog_client.cc | 23 +++++- src/v/iceberg/rest_client/catalog_client.h | 2 +- .../rest_client/{types.cc => error.cc} | 2 +- .../iceberg/rest_client/{types.h => error.h} | 0 src/v/iceberg/rest_client/json.cc | 32 +++++++++ .../iceberg/rest_client/{parsers.h => json.h} | 8 +-- src/v/iceberg/rest_client/parsers.cc | 72 ------------------- src/v/iceberg/rest_client/retry_policy.h | 2 +- src/v/iceberg/rest_client/tests/BUILD | 16 +---- .../iceberg/rest_client/tests/CMakeLists.txt | 12 ---- .../rest_client/tests/catalog_client_tests.cc | 15 ++-- .../iceberg/rest_client/tests/parser_tests.cc | 56 --------------- 14 files changed, 80 insertions(+), 187 deletions(-) rename src/v/iceberg/rest_client/{types.cc => error.cc} (97%) rename src/v/iceberg/rest_client/{types.h => error.h} (100%) create mode 100644 src/v/iceberg/rest_client/json.cc rename src/v/iceberg/rest_client/{parsers.h => json.h} (65%) delete mode 100644 src/v/iceberg/rest_client/parsers.cc delete mode 100644 src/v/iceberg/rest_client/tests/parser_tests.cc diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 46477192c3b71..49c9f5c304ce6 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -65,11 +65,11 @@ v_cc_library( values.cc values_avro.cc values_bytes.cc - rest_client/parsers.cc + rest_client/json.cc rest_client/retry_policy.cc rest_client/catalog_client.cc rest_client/entities.cc - rest_client/types.cc + rest_client/error.cc DEPS Avro::avro v::bytes diff --git a/src/v/iceberg/rest_client/BUILD b/src/v/iceberg/rest_client/BUILD index f6d487aae802a..dc6025e58a129 100644 --- a/src/v/iceberg/rest_client/BUILD +++ b/src/v/iceberg/rest_client/BUILD @@ -3,12 +3,12 @@ load("//bazel:build.bzl", "redpanda_cc_library") package(default_visibility = ["//src/v/iceberg:__subpackages__"]) redpanda_cc_library( - name = "rest_client_types", + name = "error", srcs = [ - "types.cc", + "error.cc", ], hdrs = [ - "types.h", + "error.h", ], include_prefix = "iceberg/rest_client", deps = [ @@ -22,17 +22,17 @@ redpanda_cc_library( ) redpanda_cc_library( - name = "rest_client_parsers", + name = "json", srcs = [ - "parsers.cc", + "json.cc", ], hdrs = [ - "parsers.h", + "json.h", ], include_prefix = "iceberg/rest_client", deps = [ ":oauth_token", - ":rest_client_types", + "//src/v/iceberg:json_utils", "//src/v/json", "//src/v/utils:named_type", ], @@ -48,7 +48,7 @@ redpanda_cc_library( ], include_prefix = "iceberg/rest_client", deps = [ - ":rest_client_types", + ":error", "//src/v/http", "//src/v/net", ], @@ -95,7 +95,7 @@ redpanda_cc_library( ) redpanda_cc_library( - name = "rest_catalog_client", + name = "client", srcs = [ "catalog_client.cc", ], @@ -105,9 +105,9 @@ redpanda_cc_library( include_prefix = "iceberg/rest_client", deps = [ ":credentials", + ":error", + ":json", ":oauth_token", - ":rest_client_parsers", - ":rest_client_types", ":retry_policy", "//src/v/bytes:iobuf", "//src/v/bytes:iobuf_parser", @@ -118,6 +118,7 @@ redpanda_cc_library( "//src/v/utils:named_type", "//src/v/utils:retry_chain_node", "@abseil-cpp//absl/strings", + "@rapidjson", "@seastar", ], ) diff --git a/src/v/iceberg/rest_client/catalog_client.cc b/src/v/iceberg/rest_client/catalog_client.cc index ee4e3b28d3469..6caef6511b6b8 100644 --- a/src/v/iceberg/rest_client/catalog_client.cc +++ b/src/v/iceberg/rest_client/catalog_client.cc @@ -13,13 +13,14 @@ #include "bytes/iobuf_parser.h" #include "http/request_builder.h" #include "http/utils.h" -#include "iceberg/rest_client/parsers.h" +#include "iceberg/rest_client/json.h" #include #include #include #include +#include namespace { @@ -34,6 +35,24 @@ T trim_slashes(std::optional input, typename T::type default_value) { } // namespace namespace iceberg::rest_client { +namespace { +template +auto parse_as_expected(std::string_view ctx, Func&& parse_func) { + using ret_t = std::invoke_result_t; + return [f = std::forward(parse_func), + ctx](const json::Document& document) -> expected { + try { + return f(document); + } catch (...) { + return tl::unexpected(json_parse_error{ + .context = ss::sstring(ctx), + .error = parse_error_msg{fmt::format( + "error parsing JSON - {}", std::current_exception())}, + }); + } + }; +} +} // namespace expected parse_json(iobuf&& raw_response) { iobuf_parser p{std::move(raw_response)}; @@ -86,7 +105,7 @@ catalog_client::acquire_token(retry_chain_node& rtc) { }); co_return (co_await perform_request(rtc, token_request, std::move(payload))) .and_then(parse_json) - .and_then(parse_oauth_token); + .and_then(parse_as_expected("oauth_token", parse_oauth_token)); } ss::sstring catalog_client::root_path() const { diff --git a/src/v/iceberg/rest_client/catalog_client.h b/src/v/iceberg/rest_client/catalog_client.h index dfb6fbb0853bd..6c23ad2d150b5 100644 --- a/src/v/iceberg/rest_client/catalog_client.h +++ b/src/v/iceberg/rest_client/catalog_client.h @@ -14,9 +14,9 @@ #include "http/client.h" #include "http/request_builder.h" #include "iceberg/rest_client/credentials.h" +#include "iceberg/rest_client/error.h" #include "iceberg/rest_client/oauth_token.h" #include "iceberg/rest_client/retry_policy.h" -#include "iceberg/rest_client/types.h" #include "json/document.h" #include "utils/named_type.h" #include "utils/retry_chain_node.h" diff --git a/src/v/iceberg/rest_client/types.cc b/src/v/iceberg/rest_client/error.cc similarity index 97% rename from src/v/iceberg/rest_client/types.cc rename to src/v/iceberg/rest_client/error.cc index d58db1b27c2fc..2fa221be176a4 100644 --- a/src/v/iceberg/rest_client/types.cc +++ b/src/v/iceberg/rest_client/error.cc @@ -8,7 +8,7 @@ * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ -#include "iceberg/rest_client/types.h" +#include "iceberg/rest_client/error.h" namespace { struct domain_error_printing_visitor { diff --git a/src/v/iceberg/rest_client/types.h b/src/v/iceberg/rest_client/error.h similarity index 100% rename from src/v/iceberg/rest_client/types.h rename to src/v/iceberg/rest_client/error.h diff --git a/src/v/iceberg/rest_client/json.cc b/src/v/iceberg/rest_client/json.cc new file mode 100644 index 0000000000000..9a276f8a81f81 --- /dev/null +++ b/src/v/iceberg/rest_client/json.cc @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "iceberg/rest_client/json.h" + +#include "iceberg/json_utils.h" + +namespace iceberg::rest_client { +oauth_token parse_oauth_token(const json::Document& doc) { + oauth_token ret; + ret.token = parse_required_str(doc, "access_token"); + auto expires_in = parse_optional_i32(doc, "expires_in"); + if (expires_in) { + ret.expires_at = ss::lowres_clock::now() + + std::chrono::seconds(expires_in.value()); + } else { + ret.expires_at = ss::lowres_clock::time_point::max(); + } + ret.token_type = parse_required_str(doc, "token_type"); + ret.scope = parse_optional_str(doc, "scope"); + ret.refresh_token = parse_optional_str(doc, "refresh_token"); + return ret; +} + +} // namespace iceberg::rest_client diff --git a/src/v/iceberg/rest_client/parsers.h b/src/v/iceberg/rest_client/json.h similarity index 65% rename from src/v/iceberg/rest_client/parsers.h rename to src/v/iceberg/rest_client/json.h index dc783511f009c..93a0c84d5852d 100644 --- a/src/v/iceberg/rest_client/parsers.h +++ b/src/v/iceberg/rest_client/json.h @@ -11,17 +11,11 @@ #pragma once #include "iceberg/rest_client/oauth_token.h" -#include "iceberg/rest_client/types.h" #include "json/document.h" -#include "json/schema.h" namespace iceberg::rest_client { -// Extracts a readable/loggable error from a json schema validator which has -// failed. -parse_error_msg get_schema_validation_error(const json::SchemaValidator& v); - // Parses oauth token from a JSON response sent from catalog server -expected parse_oauth_token(json::Document&& doc); +oauth_token parse_oauth_token(const json::Document& doc); } // namespace iceberg::rest_client diff --git a/src/v/iceberg/rest_client/parsers.cc b/src/v/iceberg/rest_client/parsers.cc deleted file mode 100644 index d17cfb76106b6..0000000000000 --- a/src/v/iceberg/rest_client/parsers.cc +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2024 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ - -#include "iceberg/rest_client/parsers.h" - -#include "json/stringbuffer.h" -#include "json/writer.h" - -namespace { - -constexpr auto oauth_token_schema = R"JSON({ - "type": "object", - "properties": { - "access_token": { - "type": "string" - }, - "scope": { - "type": "string" - }, - "token_type": { - "type": "string" - }, - "expires_in": { - "type": "integer" - } - }, - "required": [ - "access_token", - "expires_in" - ] -})JSON"; -} - -namespace iceberg::rest_client { - -parse_error_msg get_schema_validation_error(const json::SchemaValidator& v) { - json::StringBuffer sb; - json::Writer w{sb}; - v.GetError().Accept(w); - return parse_error_msg{sb.GetString()}; -} - -expected parse_oauth_token(json::Document&& doc) { - json::Document schema; - if (schema.Parse(oauth_token_schema).HasParseError()) { - return tl::unexpected(json_parse_error{ - .context = "parse_oauth_token::invalid_schema_validator_input", - .error = parse_error_msg{GetParseError_En(schema.GetParseError())}}); - } - - json::SchemaDocument schema_doc{schema}; - - if (json::SchemaValidator v{schema_doc}; !doc.Accept(v)) { - return tl::unexpected(json_parse_error{ - .context = "parse_oauth_token::response_does_not_match_schema", - .error = get_schema_validation_error(v)}); - } - - auto expires_in_sec = std::chrono::seconds{doc["expires_in"].GetInt()}; - return oauth_token{ - .token = doc["access_token"].GetString(), - .expires_at = ss::lowres_clock::now() + expires_in_sec}; -} - -} // namespace iceberg::rest_client diff --git a/src/v/iceberg/rest_client/retry_policy.h b/src/v/iceberg/rest_client/retry_policy.h index bde9ac007cd35..e57efb42f8a86 100644 --- a/src/v/iceberg/rest_client/retry_policy.h +++ b/src/v/iceberg/rest_client/retry_policy.h @@ -11,7 +11,7 @@ #pragma once #include "http/client.h" -#include "iceberg/rest_client/types.h" +#include "iceberg/rest_client/error.h" namespace iceberg::rest_client { diff --git a/src/v/iceberg/rest_client/tests/BUILD b/src/v/iceberg/rest_client/tests/BUILD index 27e528e3c9075..6b91292ecf2f4 100644 --- a/src/v/iceberg/rest_client/tests/BUILD +++ b/src/v/iceberg/rest_client/tests/BUILD @@ -1,19 +1,5 @@ load("//bazel:test.bzl", "redpanda_cc_gtest") -redpanda_cc_gtest( - name = "rest_client_parser_tests", - timeout = "short", - srcs = [ - "parser_tests.cc", - ], - cpu = 1, - deps = [ - "//src/v/iceberg/rest_client:rest_client_parsers", - "//src/v/test_utils:gtest", - "@googletest//:gtest", - ], -) - redpanda_cc_gtest( name = "retry_policy_tests", timeout = "short", @@ -37,7 +23,7 @@ redpanda_cc_gtest( cpu = 1, deps = [ "//src/v/bytes:iobuf_parser", - "//src/v/iceberg/rest_client:rest_catalog_client", + "//src/v/iceberg/rest_client:client", "//src/v/test_utils:gtest", "@googletest//:gtest", "@seastar", diff --git a/src/v/iceberg/rest_client/tests/CMakeLists.txt b/src/v/iceberg/rest_client/tests/CMakeLists.txt index 2c62141fff9a0..b21a2a5327685 100644 --- a/src/v/iceberg/rest_client/tests/CMakeLists.txt +++ b/src/v/iceberg/rest_client/tests/CMakeLists.txt @@ -1,15 +1,3 @@ -rp_test( - UNIT_TEST - GTEST - BINARY_NAME rc_parsers - SOURCES - parser_tests.cc - LIBRARIES - v::gtest_main - v::iceberg - ARGS "-- -c1" -) - rp_test( UNIT_TEST GTEST diff --git a/src/v/iceberg/rest_client/tests/catalog_client_tests.cc b/src/v/iceberg/rest_client/tests/catalog_client_tests.cc index 9d0452f9bf546..725519f09e4f4 100644 --- a/src/v/iceberg/rest_client/tests/catalog_client_tests.cc +++ b/src/v/iceberg/rest_client/tests/catalog_client_tests.cc @@ -126,7 +126,8 @@ ss::future validate_token_request( co_return http::downloaded_response{ .status = bh::status::ok, - .body = iobuf::from(R"J({"access_token": "token", "expires_in": 1})J")}; + .body = iobuf::from( + R"J({"access_token": "token","token_type":"bearer", "expires_in": 1})J")}; } TEST(token_tests, acquire_token) { @@ -235,12 +236,12 @@ TEST(token_tests, handle_retriable_http_status) { t::Return(ss::make_ready_future( http::downloaded_response{ .status = bh::status::gateway_timeout, .body = iobuf()}))) - .WillOnce( - t::Return(ss::make_ready_future( - http::downloaded_response{ - .status = bh::status::ok, - .body = iobuf::from( - R"J({"access_token": "token", "expires_in": 1})J")}))); + .WillOnce(t::Return(ss::make_ready_future< + http:: + downloaded_response>(http::downloaded_response{ + .status = bh::status::ok, + .body = iobuf::from( + R"J({"access_token": "token","token_type": "bearer", "expires_in": 1})J")}))); }), endpoint, credentials}; diff --git a/src/v/iceberg/rest_client/tests/parser_tests.cc b/src/v/iceberg/rest_client/tests/parser_tests.cc deleted file mode 100644 index d6917b61309fd..0000000000000 --- a/src/v/iceberg/rest_client/tests/parser_tests.cc +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2024 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ - -#include "iceberg/rest_client/parsers.h" - -#include -#include - -namespace r = iceberg::rest_client; -using namespace testing; - -json::Document make_doc(ss::sstring s) { - json::Document d; - d.Parse(s); - return d; -} - -TEST(oauth_token, invalid_doc_fails_schema) { - auto result = r::parse_oauth_token(make_doc("{}")); - ASSERT_FALSE(result.has_value()); - ASSERT_THAT( - result.error(), - VariantWith(Field( - &r::json_parse_error::context, - "parse_oauth_token::response_does_not_match_schema"))); -} - -TEST(oauth_token, type_mismatch_fails_schema) { - auto result = r::parse_oauth_token( - make_doc(R"J({"access_token": "", "expires_in": "42"})J")); - ASSERT_FALSE(result.has_value()); - ASSERT_THAT( - result.error(), - VariantWith(Field( - &r::json_parse_error::context, - "parse_oauth_token::response_does_not_match_schema"))); -} - -TEST(oauth_token, valid_doc_parse) { - using namespace std::chrono_literals; - auto result = r::parse_oauth_token( - make_doc(R"J({"access_token": "", "expires_in": 42})J")); - ASSERT_TRUE(result.has_value()); - const auto& token = result.value(); - ASSERT_EQ(token.token, ""); - // Assume that it doesn't take more than 2 seconds to get to this assertion - ASSERT_THAT( - token.expires_at - ss::lowres_clock::now(), AllOf(Le(42s), Ge(40s))); -} From b7b1404cb239bd23a5070f14b4cba077f7f700f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 22 Oct 2024 16:46:30 +0200 Subject: [PATCH 11/14] iceberg/rc: renamed oauth_token field to match open_api specification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/iceberg/rest_client/catalog_client.cc | 4 ++-- src/v/iceberg/rest_client/json.cc | 2 +- src/v/iceberg/rest_client/oauth_token.h | 2 +- src/v/iceberg/rest_client/tests/catalog_client_tests.cc | 6 +++--- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/v/iceberg/rest_client/catalog_client.cc b/src/v/iceberg/rest_client/catalog_client.cc index 6caef6511b6b8..42e73a78961a1 100644 --- a/src/v/iceberg/rest_client/catalog_client.cc +++ b/src/v/iceberg/rest_client/catalog_client.cc @@ -121,10 +121,10 @@ catalog_client::ensure_token(retry_chain_node& rtc) { co_return (co_await acquire_token(rtc)) .and_then([this](auto t) -> expected { _oauth_token.emplace(t); - return t.token; + return t.access_token; }); } - co_return _oauth_token->token; + co_return _oauth_token->access_token; } ss::future> catalog_client::perform_request( diff --git a/src/v/iceberg/rest_client/json.cc b/src/v/iceberg/rest_client/json.cc index 9a276f8a81f81..4c541f3027808 100644 --- a/src/v/iceberg/rest_client/json.cc +++ b/src/v/iceberg/rest_client/json.cc @@ -15,7 +15,7 @@ namespace iceberg::rest_client { oauth_token parse_oauth_token(const json::Document& doc) { oauth_token ret; - ret.token = parse_required_str(doc, "access_token"); + ret.access_token = parse_required_str(doc, "access_token"); auto expires_in = parse_optional_i32(doc, "expires_in"); if (expires_in) { ret.expires_at = ss::lowres_clock::now() diff --git a/src/v/iceberg/rest_client/oauth_token.h b/src/v/iceberg/rest_client/oauth_token.h index d815652b47630..7c36cda5dd3ed 100644 --- a/src/v/iceberg/rest_client/oauth_token.h +++ b/src/v/iceberg/rest_client/oauth_token.h @@ -18,7 +18,7 @@ namespace iceberg::rest_client { // Oauth token returned by the catalog server, in exchange for credentials struct oauth_token { - ss::sstring token; + ss::sstring access_token; ss::sstring token_type; ss::lowres_clock::time_point expires_at; std::optional refresh_token; diff --git a/src/v/iceberg/rest_client/tests/catalog_client_tests.cc b/src/v/iceberg/rest_client/tests/catalog_client_tests.cc index 725519f09e4f4..1e8f1fff2e915 100644 --- a/src/v/iceberg/rest_client/tests/catalog_client_tests.cc +++ b/src/v/iceberg/rest_client/tests/catalog_client_tests.cc @@ -146,7 +146,7 @@ TEST(token_tests, acquire_token) { TEST(token_tests, supplied_token_used) { const r::oauth_token supplied_token{ - .token = "t", .expires_at = ss::lowres_clock::now() + 1h}; + .access_token = "t", .expires_at = ss::lowres_clock::now() + 1h}; r::catalog_client cc{ make_http_client([](mock_client& m) { EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) @@ -162,12 +162,12 @@ TEST(token_tests, supplied_token_used) { r::catalog_client_tester t{cc}; auto token = t.get_current_token().get(); ASSERT_TRUE(token.has_value()); - ASSERT_EQ(token, supplied_token.token); + ASSERT_EQ(token, supplied_token.access_token); } TEST(token_tests, supplied_token_expired) { const r::oauth_token expired_token{ - .token = "t", .expires_at = ss::lowres_clock::now()}; + .access_token = "t", .expires_at = ss::lowres_clock::now()}; r::catalog_client cc{ make_http_client([](mock_client& m) { EXPECT_CALL(m, request_and_collect_response(t::_, t::_, t::_)) From 88c194ac938a2ecf2d70aa51c616b61fc5216f11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 22 Oct 2024 16:51:04 +0200 Subject: [PATCH 12/14] iceberg/rc: added strict validation of oauth_token.token_type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/iceberg/rest_client/json.cc | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/v/iceberg/rest_client/json.cc b/src/v/iceberg/rest_client/json.cc index 4c541f3027808..b294359070f81 100644 --- a/src/v/iceberg/rest_client/json.cc +++ b/src/v/iceberg/rest_client/json.cc @@ -23,7 +23,17 @@ oauth_token parse_oauth_token(const json::Document& doc) { } else { ret.expires_at = ss::lowres_clock::time_point::max(); } - ret.token_type = parse_required_str(doc, "token_type"); + // token type is defined as enum with the following possible values: + // "bearer", "mac", "N_A" + auto token_type = parse_required_str(doc, "token_type"); + if (!(token_type == "bearer" || token_type == "mac" + || token_type == "N_A")) { + throw std::invalid_argument(fmt::format( + "Unexpected oauth_token.token_type value {}. It must be one of the " + "bearer, mac or N_A", + token_type)); + } + ret.token_type = std::move(token_type); ret.scope = parse_optional_str(doc, "scope"); ret.refresh_token = parse_optional_str(doc, "refresh_token"); return ret; From 7467c5609ddd1b865cc97813001e6101fe1ba237 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 22 Oct 2024 16:31:51 +0200 Subject: [PATCH 13/14] iceberg/rc: added missing shutdown method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/iceberg/rest_client/catalog_client.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/v/iceberg/rest_client/catalog_client.h b/src/v/iceberg/rest_client/catalog_client.h index 6c23ad2d150b5..aac761854c94b 100644 --- a/src/v/iceberg/rest_client/catalog_client.h +++ b/src/v/iceberg/rest_client/catalog_client.h @@ -88,6 +88,9 @@ class catalog_client { std::optional token = std::nullopt, std::unique_ptr retry_policy = nullptr); + // Must be called before destroying the client to prevent resource leak + ss::future<> shutdown() { return _http_client->shutdown_and_stop(); } + private: // The root url calculated from base url, prefix and api version. Given a // base url of "/b", an api version "v2" and a prefix of "x/y", the root url From 61ea3e98efaca66210fc84735fae057121990770 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Wed, 23 Oct 2024 09:02:57 +0200 Subject: [PATCH 14/14] iceberg/rc: added tests for oauth_token json parsing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/iceberg/rest_client/tests/BUILD | 17 ++++++ .../iceberg/rest_client/tests/CMakeLists.txt | 12 +++++ .../tests/json_serialization_test.cc | 52 +++++++++++++++++++ 3 files changed, 81 insertions(+) create mode 100644 src/v/iceberg/rest_client/tests/json_serialization_test.cc diff --git a/src/v/iceberg/rest_client/tests/BUILD b/src/v/iceberg/rest_client/tests/BUILD index 6b91292ecf2f4..1624c262bca9b 100644 --- a/src/v/iceberg/rest_client/tests/BUILD +++ b/src/v/iceberg/rest_client/tests/BUILD @@ -29,3 +29,20 @@ redpanda_cc_gtest( "@seastar", ], ) + +redpanda_cc_gtest( + name = "json_serialization_test", + timeout = "short", + srcs = [ + "json_serialization_test.cc", + ], + cpu = 1, + deps = [ + "//src/v/bytes:iobuf_parser", + "//src/v/iceberg/rest_client:json", + "//src/v/iceberg/rest_client:oauth_token", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/iceberg/rest_client/tests/CMakeLists.txt b/src/v/iceberg/rest_client/tests/CMakeLists.txt index b21a2a5327685..197443c00fc9b 100644 --- a/src/v/iceberg/rest_client/tests/CMakeLists.txt +++ b/src/v/iceberg/rest_client/tests/CMakeLists.txt @@ -21,3 +21,15 @@ rp_test( v::iceberg ARGS "-- -c1" ) + +rp_test( + UNIT_TEST + GTEST + BINARY_NAME rc_json_serialization + SOURCES + json_serialization_test.cc + LIBRARIES + v::gtest_main + v::iceberg + ARGS "-- -c1" +) diff --git a/src/v/iceberg/rest_client/tests/json_serialization_test.cc b/src/v/iceberg/rest_client/tests/json_serialization_test.cc new file mode 100644 index 0000000000000..ceacd2897d85f --- /dev/null +++ b/src/v/iceberg/rest_client/tests/json_serialization_test.cc @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "iceberg/rest_client/json.h" + +#include +#include + +using namespace testing; + +json::Document make_doc(ss::sstring s) { + json::Document d; + d.Parse(s); + return d; +} + +TEST(oauth_token, invalid_doc_fails_schema) { + ASSERT_THROW( + iceberg::rest_client::parse_oauth_token(make_doc("{}")), + std::invalid_argument); +} + +TEST(oauth_token, type_mismatch_fails_schema) { + ASSERT_THROW( + iceberg::rest_client::parse_oauth_token( + make_doc(R"J({"access_token": "", "expires_in": "42"})J")), + std::invalid_argument); +} + +TEST(oauth_token, invalid_token_type) { + ASSERT_THROW( + iceberg::rest_client::parse_oauth_token(make_doc( + R"J({"access_token": "", "token_type" : "haha", "expires_in": 42})J")), + std::invalid_argument); +} + +TEST(oauth_token, valid_doc_parse) { + using namespace std::chrono_literals; + auto token = iceberg::rest_client::parse_oauth_token(make_doc( + R"J({"access_token": "", "token_type" : "bearer", "expires_in": 42})J")); + + // Assume that it doesn't take more than 2 seconds to get to this assertion + ASSERT_THAT( + token.expires_at - ss::lowres_clock::now(), AllOf(Le(42s), Ge(40s))); +}