Skip to content

Commit

Permalink
pandaproxy: add max memory check for incoming requests
Browse files Browse the repository at this point in the history
  • Loading branch information
IoannisRP committed Dec 16, 2024
1 parent bedd51b commit efe9624
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/v/pandaproxy/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ inline ss::http::reply& set_reply_too_many_requests(ss::http::reply& rep) {
.add_header("Retry-After", "0");
}

inline ss::http::reply& set_reply_payload_too_large(ss::http::reply& rep) {
return rep.set_status(ss::http::reply::status_type::payload_too_large);
}

inline std::unique_ptr<ss::http::reply> reply_unavailable() {
auto rep = std::make_unique<ss::http::reply>(ss::http::reply{});
set_reply_unavailable(*rep);
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/rest/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ proxy::proxy(
, _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind())
, _client(client)
, _client_cache(client_cache)
, _ctx{{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this},
, _ctx{{{{}, max_memory, _mem_sem, _inflight_sem, {}, smp_sg}, *this},
{config::always_true(), config::shard_local_cfg().superusers.bind(), controller},
_config.pandaproxy_api.value()}
, _server(
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ service::service(
config::shard_local_cfg()
.max_in_flight_schema_registry_requests_per_shard.bind())
, _client(client)
, _ctx{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this}
, _ctx{{{}, max_memory, _mem_sem, _inflight_sem, {}, smp_sg}, *this}
, _server(
"schema_registry", // server_name
"schema_registry", // public_metric_group_name
Expand Down
6 changes: 6 additions & 0 deletions src/v/pandaproxy/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ struct handler_adaptor : ss::httpd::handler_base {
co_return std::move(rp.rep);
}
auto req_size = get_request_size(*rq.req);
if (req_size > _ctx.max_memory) {
set_reply_payload_too_large(*rp.rep);
rp.mime_type = _exceptional_mime_type;
set_and_measure_response(rp);
co_return std::move(rp.rep);
}
auto sem_units = co_await ss::get_units(_ctx.mem_sem, req_size);
if (_ctx.as.abort_requested()) {
set_reply_unavailable(*rp.rep);
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class server {
public:
struct context_t {
std::vector<net::unresolved_address> advertised_listeners;
size_t max_memory;
ssx::semaphore& mem_sem;
adjustable_semaphore& inflight_sem;
ss::abort_source as;
Expand Down
63 changes: 63 additions & 0 deletions tests/rptest/tests/pandaproxy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
from rptest.services import tls
from rptest.utils.utf8 import CONTROL_CHARS_MAP
from typing import Optional, List, Dict, Union
from rptest.utils.mode_checks import skip_debug_mode


def create_topic_names(count):
return list(f"pandaproxy-topic-{uuid.uuid4()}" for _ in range(count))


PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE = 413

HTTP_GET_BROKERS_HEADERS = {
"Accept": "application/vnd.kafka.v2+json",
"Content-Type": "application/vnd.kafka.v2+json"
Expand Down Expand Up @@ -1284,6 +1287,66 @@ def test_invalid_topics_fetch(self):
assert sc_res.json(
)["message"] == f'Invalid parameter \'topic_name\' got \'{topic_name.translate(CONTROL_CHARS_MAP)}\''

#Memory tracking is disabled in debug
@skip_debug_mode
@cluster(num_nodes=3)
def test_topic_produce_request_too_big(self):
"""
Create a topic and post a request larger than the total available memory.
"""

self.redpanda.set_resource_settings(
ResourceSettings(memory_mb=256, num_cpus=1))
self.redpanda.start()

name = create_topic_names(1)[0]

self.logger.info("Generating request larger than the available memory")
value = {
"value":
("TWVzc2FnZSBTdGFydC4gVXNpbmcgYSBsb25nIHNlbnRlbmNlIHRvIGJlIGFibGUgdG8gcmVhY2ggdGhlIGF2YWlsYWJsZSB"
"tZW1vcnkgbGltaXQgd2l0aG91dCBoYXZpbmcgdG8gdXNlIHRvbyBtYW55IHJlY29yZHMuIEV2ZXJ5IHJlY29yZCBvYmplY3"
"QgaXMgOTYgYnl0ZXMgKyBoZWFwLiBJZiBhIHNtYWxsIHZhbHVlIHN0cmluZyBpcyB1c2VkIHBlciBvYmplY3QsIHdoZW4gd"
"GhpcyBqc29uIGlzIHBhcnNlZCwgdGhlIG1lbW9yeSByZXF1aXJlbWVudHMgYXJlIG11Y2ggbW9yZSB0aGFuIHRoZSByZXF1"
"ZXN0IGl0c2VsZi4gTWVzc2FnZSBFbmQuIE1lc3NhZ2UgU3RhcnQuIFVzaW5nIGEgbG9uZyBzZW50ZW5jZSB0byBiZSBhYmx"
"lIHRvIHJlYWNoIHRoZSBhdmFpbGFibGUgbWVtb3J5IGxpbWl0IHdpdGhvdXQgaGF2aW5nIHRvIHVzZSB0b28gbWFueSByZW"
"NvcmRzLiBFdmVyeSByZWNvcmQgb2JqZWN0IGlzIDk2IGJ5dGVzICsgaGVhcC4gSWYgYSBzbWFsbCB2YWx1ZSBzdHJpbmcga"
"XMgdXNlZCBwZXIgb2JqZWN0LCB3aGVuIHRoaXMganNvbiBpcyBwYXJzZWQsIHRoZSBtZW1vcnkgcmVxdWlyZW1lbnRzIGFy"
"ZSBtdWNoIG1vcmUgdGhhbiB0aGUgcmVxdWVzdCBpdHNlbGYuIE1lc3NhZ2UgRW5kLiBNZXNzYWdlIFN0YXJ0LiBVc2luZyB"
"hIGxvbmcgc2VudGVuY2UgdG8gYmUgYWJsZSB0byByZWFjaCB0aGUgYXZhaWxhYmxlIG1lbW9yeSBsaW1pdCB3aXRob3V0IG"
"hhdmluZyB0byB1c2UgdG9vIG1hbnkgcmVjb3Jkcy4gRXZlcnkgcmVjb3JkIG9iamVjdCBpcyA5NiBieXRlcyArIGhlYXAuI"
"ElmIGEgc21hbGwgdmFsdWUgc3RyaW5nIGlzIHVzZWQgcGVyIG9iamVjdCwgd2hlbiB0aGlzIGpzb24gaXMgcGFyc2VkLCB0"
"aGUgbWVtb3J5IHJlcXVpcmVtZW50cyBhcmUgbXVjaCBtb3JlIHRoYW4gdGhlIHJlcXVlc3QgaXRzZWxmLiBNZXNzYWdlIEV"
"uZC4gTWVzc2FnZSBTdGFydC4gVXNpbmcgYSBsb25nIHNlbnRlbmNlIHRvIGJlIGFibGUgdG8gcmVhY2ggdGhlIGF2YWlsYW"
"JsZSBtZW1vcnkgbGltaXQgd2l0aG91dCBoYXZpbmcgdG8gdXNlIHRvbyBtYW55IHJlY29yZHMuIEV2ZXJ5IHJlY29yZCBvY"
"mplY3QgaXMgOTYgYnl0ZXMgKyBoZWFwLiBJZiBhIHNtYWxsIHZhbHVlIHN0cmluZyBpcyB1c2VkIHBlciBvYmplY3QsIHdo"
"ZW4gdGhpcyBqc29uIGlzIHBhcnNlZCwgdGhlIG1lbW9yeSByZXF1aXJlbWVudHMgYXJlIG11Y2ggbW9yZSB0aGFuIHRoZSB"
"yZXF1ZXN0IGl0c2VsZi4gTWVzc2FnZSBFbmQuIE1lc3NhZ2UgU3RhcnQuIFVzaW5nIGEgbG9uZyBzZW50ZW5jZSB0byBiZS"
"BhYmxlIHRvIHJlYWNoIHRoZSBhdmFpbGFibGUgbWVtb3J5IGxpbWl0IHdpdGhvdXQgaGF2aW5nIHRvIHVzZSB0b28gbWFue"
"SByZWNvcmRzLiBFdmVyeSByZWNvcmQgb2JqZWN0IGlzIDk2IGJ5dGVzICsgaGVhcC4gSWYgYSBzbWFsbCB2YWx1ZSBzdHJp"
"bmcgaXMgdXNlZCBwZXIgb2JqZWN0LCB3aGVuIHRoaXMganNvbiBpcyBwYXJzZWQsIHRoZSBtZW1vcnkgcmVxdWlyZW1lbnR"
"zIGFyZSBtdWNoIG1vcmUgdGhhbiB0aGUgcmVxdWVzdCBpdHNlbGYuIE1lc3NhZ2UgRW5kLg=="
)
}
values = [value for _ in range(50000)]
data = {"records": values}
data_json = json.dumps(data)

# With 256Mb available per core, the available memory for the kafka services
# is 90.4Mb at most. We want to ensure that this request is larger than this
memory_limit = 90.4 * 1024 * 1024
assert len(data_json) > memory_limit, \
f"Expected request larger than {memory_limit}b. Got {len(data_json)}b, instead"

self.logger.info(f"Creating test topic: {name}")
self._create_topics([name], partitions=3)

self.logger.info(f"Producing to topic: {name}")
produce_result_raw = self._produce_topic(name, data_json)
assert produce_result_raw.status_code == PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE, \
f"Expected '{PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE}' " \
f"but got '{produce_result_raw.status_code}' instead"


class PandaProxySASLTest(PandaProxyEndpoints):
"""
Expand Down

0 comments on commit efe9624

Please sign in to comment.