Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] pandaproxy: add max memory check for incoming requests #24605

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions src/v/pandaproxy/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,79 @@ void probe::setup_public_metrics() {
.aggregate(aggregate_labels)});
}

server_probe::server_probe(
server::context_t& ctx, const ss::sstring& group_name)
: _ctx(ctx)
, _group_name(group_name)
, _metrics()
, _public_metrics() {
setup_metrics();
}

void server_probe::setup_metrics() {
namespace sm = ss::metrics;

auto setup_common = [this]<typename MetricDef>() {
const auto usage = [](const size_t current, const size_t max) {
constexpr double divide_by_zero = -1.;
constexpr double invalid_values = -2.;
if (max == 0) {
return divide_by_zero;
}
if (current > max) {
return invalid_values;
}
const auto max_d = static_cast<double>(max);
const auto current_d = static_cast<double>(current);
return (max_d - current_d) / max_d;
};

std::vector<MetricDef> defs;
defs.reserve(3);
defs.emplace_back(
sm::make_gauge(
"inflight_requests_usage_ratio",
[this, usage] {
return usage(_ctx.inflight_sem.current(), _ctx.max_inflight);
},
sm::description(ssx::sformat(
"Usage ratio of in-flight requests in the {}", _group_name)))
.aggregate({}));
defs.emplace_back(
sm::make_gauge(
"inflight_requests_memory_usage_ratio",
[this, usage] {
return usage(_ctx.mem_sem.current(), _ctx.max_memory);
},
sm::description(ssx::sformat(
"Memory usage ratio of in-flight requests in the {}",
_group_name)))
.aggregate({}));
defs.emplace_back(
sm::make_gauge(
"queued_requests_memory_blocked",
[this] { return _ctx.mem_sem.waiters(); },
sm::description(ssx::sformat(
"Number of requests queued in {}, due to memory limitations",
_group_name)))
.aggregate({}));
return defs;
};

if (!config::shard_local_cfg().disable_metrics()) {
_metrics.add_group(
_group_name,
setup_common
.template operator()<ss::metrics::impl::metric_definition_impl>(),
{},
{});
}

if (!config::shard_local_cfg().disable_public_metrics()) {
_public_metrics.add_group(
_group_name,
setup_common.template operator()<ss::metrics::metric_definition>());
}
}

} // namespace pandaproxy
19 changes: 19 additions & 0 deletions src/v/pandaproxy/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "metrics/metrics.h"
#include "pandaproxy/server.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics_registration.hh>
Expand Down Expand Up @@ -82,4 +83,22 @@ class probe {
metrics::public_metric_groups _public_metrics;
};

class server_probe {
public:
server_probe(server::context_t& ctx, const ss::sstring& group_name);
server_probe(const server_probe&) = delete;
server_probe& operator=(const server_probe&) = delete;
server_probe(server_probe&&) = delete;
server_probe& operator=(server_probe&&) = delete;
~server_probe() = default;

private:
void setup_metrics();

server::context_t& _ctx;
const ss::sstring& _group_name;
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;
};

} // namespace pandaproxy
4 changes: 4 additions & 0 deletions src/v/pandaproxy/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ inline ss::http::reply& set_reply_too_many_requests(ss::http::reply& rep) {
.add_header("Retry-After", "0");
}

inline ss::http::reply& set_reply_payload_too_large(ss::http::reply& rep) {
return rep.set_status(ss::http::reply::status_type::payload_too_large);
}

inline std::unique_ptr<ss::http::reply> reply_unavailable() {
auto rep = std::make_unique<ss::http::reply>(ss::http::reply{});
set_reply_unavailable(*rep);
Expand Down
9 changes: 6 additions & 3 deletions src/v/pandaproxy/rest/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ proxy::proxy(
, _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind())
, _client(client)
, _client_cache(client_cache)
, _ctx{{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this},
, _ctx{{{{}, max_memory, _mem_sem, _inflight_config_binding(), _inflight_sem, {}, smp_sg}, *this},
{config::always_true(), config::shard_local_cfg().superusers.bind(), controller},
_config.pandaproxy_api.value()}
, _server(
Expand All @@ -126,8 +126,11 @@ proxy::proxy(
json::serialization_format::application_json)
, _ensure_started{[this]() { return do_start(); }}
, _controller(controller) {
_inflight_config_binding.watch(
[this]() { _inflight_sem.set_capacity(_inflight_config_binding()); });
_inflight_config_binding.watch([this]() {
const size_t capacity = _inflight_config_binding();
_inflight_sem.set_capacity(capacity);
_ctx.max_inflight = capacity;
});
}

ss::future<> proxy::start() {
Expand Down
9 changes: 6 additions & 3 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ service::service(
config::shard_local_cfg()
.max_in_flight_schema_registry_requests_per_shard.bind())
, _client(client)
, _ctx{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this}
, _ctx{{{}, max_memory, _mem_sem, _inflight_config_binding(), _inflight_sem, {}, smp_sg}, *this}
, _server(
"schema_registry", // server_name
"schema_registry", // public_metric_group_name
Expand All @@ -556,8 +556,11 @@ service::service(
config::always_true(),
config::shard_local_cfg().superusers.bind(),
controller.get()} {
_inflight_config_binding.watch(
[this]() { _inflight_sem.set_capacity(_inflight_config_binding()); });
_inflight_config_binding.watch([this]() {
const size_t capacity = _inflight_config_binding();
_inflight_sem.set_capacity(capacity);
_ctx.max_inflight = capacity;
});
}

ss::future<> service::start() {
Expand Down
24 changes: 20 additions & 4 deletions src/v/pandaproxy/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <charconv>
#include <exception>
#include <memory>

namespace pandaproxy {

Expand Down Expand Up @@ -104,6 +105,12 @@ struct handler_adaptor : ss::httpd::handler_base {
co_return std::move(rp.rep);
}
auto req_size = get_request_size(*rq.req);
if (req_size > _ctx.max_memory) {
set_reply_payload_too_large(*rp.rep);
rp.mime_type = _exceptional_mime_type;
set_and_measure_response(rp);
co_return std::move(rp.rep);
}
auto sem_units = co_await ss::get_units(_ctx.mem_sem, req_size);
if (_ctx.as.abort_requested()) {
set_reply_unavailable(*rp.rep);
Expand Down Expand Up @@ -150,7 +157,8 @@ server::server(
, _api20(std::move(api20))
, _has_routes(false)
, _ctx(ctx)
, _exceptional_mime_type(exceptional_mime_type) {
, _exceptional_mime_type(exceptional_mime_type)
, _probe{} {
_api20.set_api_doc(_server._routes);
_api20.register_api_file(_server._routes, header);
_api20.add_definitions_file(_server._routes, definitions);
Expand Down Expand Up @@ -195,6 +203,9 @@ ss::future<> server::start(
const std::vector<model::broker_endpoint>& advertised) {
_server._routes.register_exeption_handler(
exception_replier{ss::sstring{name(_exceptional_mime_type)}});

_probe = std::make_unique<server_probe>(_ctx, _public_metrics_group_name);

_ctx.advertised_listeners.reserve(endpoints.size());
for (auto& server_endpoint : endpoints) {
auto addr = co_await net::resolve_dns(server_endpoint.address);
Expand Down Expand Up @@ -234,13 +245,18 @@ ss::future<> server::start(
}
co_await _server.listen(addr, cred);
}

co_return;
}

ss::future<> server::stop() {
return _pending_reqs.close()
.finally([this]() { return _ctx.as.request_abort(); })
.finally([this]() mutable { return _server.stop(); });
return _pending_reqs.close().finally([this]() {
_ctx.as.request_abort();
_probe.reset(nullptr);
return _server.stop();
});
}

server::~server() noexcept = default;

} // namespace pandaproxy
10 changes: 8 additions & 2 deletions src/v/pandaproxy/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sstring.hh>
#include <seastar/http/api_docs.hh>
#include <seastar/http/handlers.hh>
#include <seastar/http/httpd.hh>
Expand All @@ -40,6 +41,8 @@

namespace pandaproxy {

class server_probe;

inline ss::shard_id user_shard(const ss::sstring& name) {
auto hash = xxhash_64(name.data(), name.length());
return jump_consistent_hash(hash, ss::smp::count);
Expand Down Expand Up @@ -69,7 +72,9 @@ class server {
public:
struct context_t {
std::vector<net::unresolved_address> advertised_listeners;
size_t max_memory;
ssx::semaphore& mem_sem;
size_t max_inflight;
adjustable_semaphore& inflight_sem;
ss::abort_source as;
ss::smp_service_group smp_sg;
Expand Down Expand Up @@ -103,9 +108,9 @@ class server {
};

server() = delete;
~server() = default;
~server() noexcept;
server(const server&) = delete;
server(server&&) noexcept = default;
server(server&&) noexcept = delete;
server& operator=(const server&) = delete;
server& operator=(server&&) = delete;

Expand Down Expand Up @@ -135,6 +140,7 @@ class server {
bool _has_routes;
context_t& _ctx;
json::serialization_format _exceptional_mime_type;
std::unique_ptr<server_probe> _probe;
};

template<typename service_t>
Expand Down
63 changes: 63 additions & 0 deletions tests/rptest/tests/pandaproxy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
from rptest.services import tls
from rptest.utils.utf8 import CONTROL_CHARS_MAP
from typing import Optional, List, Dict, Union
from rptest.utils.mode_checks import skip_debug_mode


def create_topic_names(count):
return list(f"pandaproxy-topic-{uuid.uuid4()}" for _ in range(count))


PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE = 413

HTTP_GET_BROKERS_HEADERS = {
"Accept": "application/vnd.kafka.v2+json",
"Content-Type": "application/vnd.kafka.v2+json"
Expand Down Expand Up @@ -1284,6 +1287,66 @@ def test_invalid_topics_fetch(self):
assert sc_res.json(
)["message"] == f'Invalid parameter \'topic_name\' got \'{topic_name.translate(CONTROL_CHARS_MAP)}\''

#Memory tracking is disabled in debug
@skip_debug_mode
@cluster(num_nodes=3)
def test_topic_produce_request_too_big(self):
"""
Create a topic and post a request larger than the total available memory.
"""

self.redpanda.set_resource_settings(
ResourceSettings(memory_mb=256, num_cpus=1))
self.redpanda.start()

name = create_topic_names(1)[0]

self.logger.info("Generating request larger than the available memory")
value = {
"value":
("TWVzc2FnZSBTdGFydC4gVXNpbmcgYSBsb25nIHNlbnRlbmNlIHRvIGJlIGFibGUgdG8gcmVhY2ggdGhlIGF2YWlsYWJsZSB"
"tZW1vcnkgbGltaXQgd2l0aG91dCBoYXZpbmcgdG8gdXNlIHRvbyBtYW55IHJlY29yZHMuIEV2ZXJ5IHJlY29yZCBvYmplY3"
"QgaXMgOTYgYnl0ZXMgKyBoZWFwLiBJZiBhIHNtYWxsIHZhbHVlIHN0cmluZyBpcyB1c2VkIHBlciBvYmplY3QsIHdoZW4gd"
"GhpcyBqc29uIGlzIHBhcnNlZCwgdGhlIG1lbW9yeSByZXF1aXJlbWVudHMgYXJlIG11Y2ggbW9yZSB0aGFuIHRoZSByZXF1"
"ZXN0IGl0c2VsZi4gTWVzc2FnZSBFbmQuIE1lc3NhZ2UgU3RhcnQuIFVzaW5nIGEgbG9uZyBzZW50ZW5jZSB0byBiZSBhYmx"
"lIHRvIHJlYWNoIHRoZSBhdmFpbGFibGUgbWVtb3J5IGxpbWl0IHdpdGhvdXQgaGF2aW5nIHRvIHVzZSB0b28gbWFueSByZW"
"NvcmRzLiBFdmVyeSByZWNvcmQgb2JqZWN0IGlzIDk2IGJ5dGVzICsgaGVhcC4gSWYgYSBzbWFsbCB2YWx1ZSBzdHJpbmcga"
"XMgdXNlZCBwZXIgb2JqZWN0LCB3aGVuIHRoaXMganNvbiBpcyBwYXJzZWQsIHRoZSBtZW1vcnkgcmVxdWlyZW1lbnRzIGFy"
"ZSBtdWNoIG1vcmUgdGhhbiB0aGUgcmVxdWVzdCBpdHNlbGYuIE1lc3NhZ2UgRW5kLiBNZXNzYWdlIFN0YXJ0LiBVc2luZyB"
"hIGxvbmcgc2VudGVuY2UgdG8gYmUgYWJsZSB0byByZWFjaCB0aGUgYXZhaWxhYmxlIG1lbW9yeSBsaW1pdCB3aXRob3V0IG"
"hhdmluZyB0byB1c2UgdG9vIG1hbnkgcmVjb3Jkcy4gRXZlcnkgcmVjb3JkIG9iamVjdCBpcyA5NiBieXRlcyArIGhlYXAuI"
"ElmIGEgc21hbGwgdmFsdWUgc3RyaW5nIGlzIHVzZWQgcGVyIG9iamVjdCwgd2hlbiB0aGlzIGpzb24gaXMgcGFyc2VkLCB0"
"aGUgbWVtb3J5IHJlcXVpcmVtZW50cyBhcmUgbXVjaCBtb3JlIHRoYW4gdGhlIHJlcXVlc3QgaXRzZWxmLiBNZXNzYWdlIEV"
"uZC4gTWVzc2FnZSBTdGFydC4gVXNpbmcgYSBsb25nIHNlbnRlbmNlIHRvIGJlIGFibGUgdG8gcmVhY2ggdGhlIGF2YWlsYW"
"JsZSBtZW1vcnkgbGltaXQgd2l0aG91dCBoYXZpbmcgdG8gdXNlIHRvbyBtYW55IHJlY29yZHMuIEV2ZXJ5IHJlY29yZCBvY"
"mplY3QgaXMgOTYgYnl0ZXMgKyBoZWFwLiBJZiBhIHNtYWxsIHZhbHVlIHN0cmluZyBpcyB1c2VkIHBlciBvYmplY3QsIHdo"
"ZW4gdGhpcyBqc29uIGlzIHBhcnNlZCwgdGhlIG1lbW9yeSByZXF1aXJlbWVudHMgYXJlIG11Y2ggbW9yZSB0aGFuIHRoZSB"
"yZXF1ZXN0IGl0c2VsZi4gTWVzc2FnZSBFbmQuIE1lc3NhZ2UgU3RhcnQuIFVzaW5nIGEgbG9uZyBzZW50ZW5jZSB0byBiZS"
"BhYmxlIHRvIHJlYWNoIHRoZSBhdmFpbGFibGUgbWVtb3J5IGxpbWl0IHdpdGhvdXQgaGF2aW5nIHRvIHVzZSB0b28gbWFue"
"SByZWNvcmRzLiBFdmVyeSByZWNvcmQgb2JqZWN0IGlzIDk2IGJ5dGVzICsgaGVhcC4gSWYgYSBzbWFsbCB2YWx1ZSBzdHJp"
"bmcgaXMgdXNlZCBwZXIgb2JqZWN0LCB3aGVuIHRoaXMganNvbiBpcyBwYXJzZWQsIHRoZSBtZW1vcnkgcmVxdWlyZW1lbnR"
"zIGFyZSBtdWNoIG1vcmUgdGhhbiB0aGUgcmVxdWVzdCBpdHNlbGYuIE1lc3NhZ2UgRW5kLg=="
)
}
values = [value for _ in range(50000)]
data = {"records": values}
data_json = json.dumps(data)

# With 256Mb available per core, the available memory for the kafka services
# is 90.4Mb at most. We want to ensure that this request is larger than this
memory_limit = 90.4 * 1024 * 1024
assert len(data_json) > memory_limit, \
f"Expected request larger than {memory_limit}b. Got {len(data_json)}b, instead"

self.logger.info(f"Creating test topic: {name}")
self._create_topics([name], partitions=3)

self.logger.info(f"Producing to topic: {name}")
produce_result_raw = self._produce_topic(name, data_json)
assert produce_result_raw.status_code == PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE, \
f"Expected '{PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE}' " \
f"but got '{produce_result_raw.status_code}' instead"


class PandaProxySASLTest(PandaProxyEndpoints):
"""
Expand Down
Loading