diff --git a/src/v/pandaproxy/probe.cc b/src/v/pandaproxy/probe.cc index 43f146198f931..f3144f3e18dce 100644 --- a/src/v/pandaproxy/probe.cc +++ b/src/v/pandaproxy/probe.cc @@ -131,4 +131,79 @@ void probe::setup_metrics() { } } +server_probe::server_probe( + server::context_t& ctx, const ss::sstring& group_name) + : _ctx(ctx) + , _group_name(group_name) + , _metrics() + , _public_metrics() { + setup_metrics(); +} + +void server_probe::setup_metrics() { + namespace sm = ss::metrics; + + auto setup_common = [this]() { + const auto usage = [](const size_t current, const size_t max) { + constexpr double divide_by_zero = -1.; + constexpr double invalid_values = -2.; + if (max == 0) { + return divide_by_zero; + } + if (current > max) { + return invalid_values; + } + const auto max_d = static_cast(max); + const auto current_d = static_cast(current); + return (max_d - current_d) / max_d; + }; + + std::vector defs; + defs.reserve(3); + defs.emplace_back( + sm::make_gauge( + "inflight_requests_usage_ratio", + [this, usage] { + return usage(_ctx.inflight_sem.current(), _ctx.max_inflight); + }, + sm::description(ssx::sformat( + "Usage ratio of in-flight requests in the {}", _group_name))) + .aggregate({})); + defs.emplace_back( + sm::make_gauge( + "inflight_requests_memory_usage_ratio", + [this, usage] { + return usage(_ctx.mem_sem.current(), _ctx.max_memory); + }, + sm::description(ssx::sformat( + "Memory usage ratio of in-flight requests in the {}", + _group_name))) + .aggregate({})); + defs.emplace_back( + sm::make_gauge( + "queued_requests_memory_blocked", + [this] { return _ctx.mem_sem.waiters(); }, + sm::description(ssx::sformat( + "Number of requests queued in {}, due to memory limitations", + _group_name))) + .aggregate({})); + return defs; + }; + + if (!config::shard_local_cfg().disable_metrics()) { + _metrics.add_group( + _group_name, + setup_common + .template operator()(), + {}, + {}); + } + + if (!config::shard_local_cfg().disable_public_metrics()) { + _public_metrics.add_group( + _group_name, + setup_common.template operator()()); + } +} + } // namespace pandaproxy diff --git a/src/v/pandaproxy/probe.h b/src/v/pandaproxy/probe.h index 9074dbccb4e8f..146f6d7b36daf 100644 --- a/src/v/pandaproxy/probe.h +++ b/src/v/pandaproxy/probe.h @@ -12,6 +12,7 @@ #pragma once #include "metrics/metrics.h" +#include "pandaproxy/server.h" #include "utils/log_hist.h" #include @@ -81,4 +82,22 @@ class probe { metrics::public_metric_groups _public_metrics; }; +class server_probe { +public: + server_probe(server::context_t& ctx, const ss::sstring& group_name); + server_probe(const server_probe&) = delete; + server_probe& operator=(const server_probe&) = delete; + server_probe(server_probe&&) = delete; + server_probe& operator=(server_probe&&) = delete; + ~server_probe() = default; + +private: + void setup_metrics(); + + server::context_t& _ctx; + const ss::sstring& _group_name; + metrics::internal_metric_groups _metrics; + metrics::public_metric_groups _public_metrics; +}; + } // namespace pandaproxy diff --git a/src/v/pandaproxy/reply.h b/src/v/pandaproxy/reply.h index a69703286cf10..d9fe375af42a0 100644 --- a/src/v/pandaproxy/reply.h +++ b/src/v/pandaproxy/reply.h @@ -65,6 +65,10 @@ inline ss::http::reply& set_reply_too_many_requests(ss::http::reply& rep) { .add_header("Retry-After", "0"); } +inline ss::http::reply& set_reply_payload_too_large(ss::http::reply& rep) { + return rep.set_status(ss::http::reply::status_type::payload_too_large); +} + inline std::unique_ptr reply_unavailable() { auto rep = std::make_unique(ss::http::reply{}); set_reply_unavailable(*rep); diff --git a/src/v/pandaproxy/rest/proxy.cc b/src/v/pandaproxy/rest/proxy.cc index 811aee79bd58c..7eb5eca96f088 100644 --- a/src/v/pandaproxy/rest/proxy.cc +++ b/src/v/pandaproxy/rest/proxy.cc @@ -113,7 +113,7 @@ proxy::proxy( , _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind()) , _client(client) , _client_cache(client_cache) - , _ctx{{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this}, + , _ctx{{{{}, max_memory, _mem_sem, _inflight_config_binding(), _inflight_sem, {}, smp_sg}, *this}, {config::always_true(), config::shard_local_cfg().superusers.bind(), controller}, _config.pandaproxy_api.value()} , _server( @@ -126,8 +126,11 @@ proxy::proxy( json::serialization_format::application_json) , _ensure_started{[this]() { return do_start(); }} , _controller(controller) { - _inflight_config_binding.watch( - [this]() { _inflight_sem.set_capacity(_inflight_config_binding()); }); + _inflight_config_binding.watch([this]() { + const size_t capacity = _inflight_config_binding(); + _inflight_sem.set_capacity(capacity); + _ctx.max_inflight = capacity; + }); } ss::future<> proxy::start() { diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index 9e99cf4080747..056f38a73386a 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -614,7 +614,7 @@ service::service( config::shard_local_cfg() .max_in_flight_schema_registry_requests_per_shard.bind()) , _client(client) - , _ctx{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this} + , _ctx{{{}, max_memory, _mem_sem, _inflight_config_binding(), _inflight_sem, {}, smp_sg}, *this} , _server( "schema_registry", // server_name "schema_registry", // public_metric_group_name @@ -632,8 +632,11 @@ service::service( config::always_true(), config::shard_local_cfg().superusers.bind(), controller.get()} { - _inflight_config_binding.watch( - [this]() { _inflight_sem.set_capacity(_inflight_config_binding()); }); + _inflight_config_binding.watch([this]() { + const size_t capacity = _inflight_config_binding(); + _inflight_sem.set_capacity(capacity); + _ctx.max_inflight = capacity; + }); } ss::future<> service::start() { diff --git a/src/v/pandaproxy/server.cc b/src/v/pandaproxy/server.cc index 22507fdc51353..09b3072829bd6 100644 --- a/src/v/pandaproxy/server.cc +++ b/src/v/pandaproxy/server.cc @@ -28,6 +28,7 @@ #include #include +#include namespace pandaproxy { @@ -104,6 +105,12 @@ struct handler_adaptor : ss::httpd::handler_base { co_return std::move(rp.rep); } auto req_size = get_request_size(*rq.req); + if (req_size > _ctx.max_memory) { + set_reply_payload_too_large(*rp.rep); + rp.mime_type = _exceptional_mime_type; + set_and_measure_response(rp); + co_return std::move(rp.rep); + } auto sem_units = co_await ss::get_units(_ctx.mem_sem, req_size); if (_ctx.as.abort_requested()) { set_reply_unavailable(*rp.rep); @@ -150,7 +157,8 @@ server::server( , _api20(std::move(api20)) , _has_routes(false) , _ctx(ctx) - , _exceptional_mime_type(exceptional_mime_type) { + , _exceptional_mime_type(exceptional_mime_type) + , _probe{} { _api20.set_api_doc(_server._routes); _api20.register_api_file(_server._routes, header); _api20.add_definitions_file(_server._routes, definitions); @@ -195,6 +203,9 @@ ss::future<> server::start( const std::vector& advertised) { _server._routes.register_exeption_handler( exception_replier{ss::sstring{name(_exceptional_mime_type)}}); + + _probe = std::make_unique(_ctx, _public_metrics_group_name); + _ctx.advertised_listeners.reserve(endpoints.size()); for (auto& server_endpoint : endpoints) { auto addr = co_await net::resolve_dns(server_endpoint.address); @@ -234,13 +245,18 @@ ss::future<> server::start( } co_await _server.listen(addr, cred); } + co_return; } ss::future<> server::stop() { - return _pending_reqs.close() - .finally([this]() { return _ctx.as.request_abort(); }) - .finally([this]() mutable { return _server.stop(); }); + return _pending_reqs.close().finally([this]() { + _ctx.as.request_abort(); + _probe.reset(nullptr); + return _server.stop(); + }); } +server::~server() noexcept = default; + } // namespace pandaproxy diff --git a/src/v/pandaproxy/server.h b/src/v/pandaproxy/server.h index 813eb0a40f513..e854b0a8e86af 100644 --- a/src/v/pandaproxy/server.h +++ b/src/v/pandaproxy/server.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -40,6 +41,8 @@ namespace pandaproxy { +class server_probe; + inline ss::shard_id user_shard(const ss::sstring& name) { auto hash = xxhash_64(name.data(), name.length()); return jump_consistent_hash(hash, ss::smp::count); @@ -69,7 +72,9 @@ class server { public: struct context_t { std::vector advertised_listeners; + size_t max_memory; ssx::semaphore& mem_sem; + size_t max_inflight; adjustable_semaphore& inflight_sem; ss::abort_source as; ss::smp_service_group smp_sg; @@ -103,9 +108,9 @@ class server { }; server() = delete; - ~server() = default; + ~server() noexcept; server(const server&) = delete; - server(server&&) noexcept = default; + server(server&&) noexcept = delete; server& operator=(const server&) = delete; server& operator=(server&&) = delete; @@ -135,6 +140,7 @@ class server { bool _has_routes; context_t& _ctx; json::serialization_format _exceptional_mime_type; + std::unique_ptr _probe; }; template diff --git a/tests/rptest/tests/pandaproxy_test.py b/tests/rptest/tests/pandaproxy_test.py index dd1895ae1ece2..017a17e43a483 100644 --- a/tests/rptest/tests/pandaproxy_test.py +++ b/tests/rptest/tests/pandaproxy_test.py @@ -35,12 +35,15 @@ from rptest.services import tls from rptest.utils.utf8 import CONTROL_CHARS_MAP from typing import Optional, List, Dict, Union +from rptest.utils.mode_checks import skip_debug_mode def create_topic_names(count): return list(f"pandaproxy-topic-{uuid.uuid4()}" for _ in range(count)) +PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE = 413 + HTTP_GET_BROKERS_HEADERS = { "Accept": "application/vnd.kafka.v2+json", "Content-Type": "application/vnd.kafka.v2+json" @@ -1284,6 +1287,66 @@ def test_invalid_topics_fetch(self): assert sc_res.json( )["message"] == f'Invalid parameter \'topic_name\' got \'{topic_name.translate(CONTROL_CHARS_MAP)}\'' + #Memory tracking is disabled in debug + @skip_debug_mode + @cluster(num_nodes=3) + def test_topic_produce_request_too_big(self): + """ + Create a topic and post a request larger than the total available memory. + """ + + self.redpanda.set_resource_settings( + ResourceSettings(memory_mb=256, num_cpus=1)) + self.redpanda.start() + + name = create_topic_names(1)[0] + + self.logger.info("Generating request larger than the available memory") + value = { + "value": + ("TWVzc2FnZSBTdGFydC4gVXNpbmcgYSBsb25nIHNlbnRlbmNlIHRvIGJlIGFibGUgdG8gcmVhY2ggdGhlIGF2YWlsYWJsZSB" + "tZW1vcnkgbGltaXQgd2l0aG91dCBoYXZpbmcgdG8gdXNlIHRvbyBtYW55IHJlY29yZHMuIEV2ZXJ5IHJlY29yZCBvYmplY3" + "QgaXMgOTYgYnl0ZXMgKyBoZWFwLiBJZiBhIHNtYWxsIHZhbHVlIHN0cmluZyBpcyB1c2VkIHBlciBvYmplY3QsIHdoZW4gd" + "GhpcyBqc29uIGlzIHBhcnNlZCwgdGhlIG1lbW9yeSByZXF1aXJlbWVudHMgYXJlIG11Y2ggbW9yZSB0aGFuIHRoZSByZXF1" + "ZXN0IGl0c2VsZi4gTWVzc2FnZSBFbmQuIE1lc3NhZ2UgU3RhcnQuIFVzaW5nIGEgbG9uZyBzZW50ZW5jZSB0byBiZSBhYmx" + "lIHRvIHJlYWNoIHRoZSBhdmFpbGFibGUgbWVtb3J5IGxpbWl0IHdpdGhvdXQgaGF2aW5nIHRvIHVzZSB0b28gbWFueSByZW" + "NvcmRzLiBFdmVyeSByZWNvcmQgb2JqZWN0IGlzIDk2IGJ5dGVzICsgaGVhcC4gSWYgYSBzbWFsbCB2YWx1ZSBzdHJpbmcga" + "XMgdXNlZCBwZXIgb2JqZWN0LCB3aGVuIHRoaXMganNvbiBpcyBwYXJzZWQsIHRoZSBtZW1vcnkgcmVxdWlyZW1lbnRzIGFy" + "ZSBtdWNoIG1vcmUgdGhhbiB0aGUgcmVxdWVzdCBpdHNlbGYuIE1lc3NhZ2UgRW5kLiBNZXNzYWdlIFN0YXJ0LiBVc2luZyB" + "hIGxvbmcgc2VudGVuY2UgdG8gYmUgYWJsZSB0byByZWFjaCB0aGUgYXZhaWxhYmxlIG1lbW9yeSBsaW1pdCB3aXRob3V0IG" + "hhdmluZyB0byB1c2UgdG9vIG1hbnkgcmVjb3Jkcy4gRXZlcnkgcmVjb3JkIG9iamVjdCBpcyA5NiBieXRlcyArIGhlYXAuI" + "ElmIGEgc21hbGwgdmFsdWUgc3RyaW5nIGlzIHVzZWQgcGVyIG9iamVjdCwgd2hlbiB0aGlzIGpzb24gaXMgcGFyc2VkLCB0" + "aGUgbWVtb3J5IHJlcXVpcmVtZW50cyBhcmUgbXVjaCBtb3JlIHRoYW4gdGhlIHJlcXVlc3QgaXRzZWxmLiBNZXNzYWdlIEV" + "uZC4gTWVzc2FnZSBTdGFydC4gVXNpbmcgYSBsb25nIHNlbnRlbmNlIHRvIGJlIGFibGUgdG8gcmVhY2ggdGhlIGF2YWlsYW" + "JsZSBtZW1vcnkgbGltaXQgd2l0aG91dCBoYXZpbmcgdG8gdXNlIHRvbyBtYW55IHJlY29yZHMuIEV2ZXJ5IHJlY29yZCBvY" + "mplY3QgaXMgOTYgYnl0ZXMgKyBoZWFwLiBJZiBhIHNtYWxsIHZhbHVlIHN0cmluZyBpcyB1c2VkIHBlciBvYmplY3QsIHdo" + "ZW4gdGhpcyBqc29uIGlzIHBhcnNlZCwgdGhlIG1lbW9yeSByZXF1aXJlbWVudHMgYXJlIG11Y2ggbW9yZSB0aGFuIHRoZSB" + "yZXF1ZXN0IGl0c2VsZi4gTWVzc2FnZSBFbmQuIE1lc3NhZ2UgU3RhcnQuIFVzaW5nIGEgbG9uZyBzZW50ZW5jZSB0byBiZS" + "BhYmxlIHRvIHJlYWNoIHRoZSBhdmFpbGFibGUgbWVtb3J5IGxpbWl0IHdpdGhvdXQgaGF2aW5nIHRvIHVzZSB0b28gbWFue" + "SByZWNvcmRzLiBFdmVyeSByZWNvcmQgb2JqZWN0IGlzIDk2IGJ5dGVzICsgaGVhcC4gSWYgYSBzbWFsbCB2YWx1ZSBzdHJp" + "bmcgaXMgdXNlZCBwZXIgb2JqZWN0LCB3aGVuIHRoaXMganNvbiBpcyBwYXJzZWQsIHRoZSBtZW1vcnkgcmVxdWlyZW1lbnR" + "zIGFyZSBtdWNoIG1vcmUgdGhhbiB0aGUgcmVxdWVzdCBpdHNlbGYuIE1lc3NhZ2UgRW5kLg==" + ) + } + values = [value for _ in range(50000)] + data = {"records": values} + data_json = json.dumps(data) + + # With 256Mb available per core, the available memory for the kafka services + # is 90.4Mb at most. We want to ensure that this request is larger than this + memory_limit = 90.4 * 1024 * 1024 + assert len(data_json) > memory_limit, \ + f"Expected request larger than {memory_limit}b. Got {len(data_json)}b, instead" + + self.logger.info(f"Creating test topic: {name}") + self._create_topics([name], partitions=3) + + self.logger.info(f"Producing to topic: {name}") + produce_result_raw = self._produce_topic(name, data_json) + assert produce_result_raw.status_code == PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE, \ + f"Expected '{PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE}' " \ + f"but got '{produce_result_raw.status_code}' instead" + class PandaProxySASLTest(PandaProxyEndpoints): """