Skip to content

Commit

Permalink
Merge pull request redpanda-data#23848 from dotnwat/kafka-server-app
Browse files Browse the repository at this point in the history
redpanda: hide kafka server implementation from application.cc
  • Loading branch information
dotnwat authored Oct 21, 2024
2 parents 0d717e8 + 8007461 commit d3c8f39
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 27 deletions.
1 change: 1 addition & 0 deletions src/v/kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ v_cc_library(
server/group_metadata.cc
server/group_tx_tracker_stm.cc
server/logger.cc
server/app.cc
DEPS
Seastar::seastar
v::bytes
Expand Down
29 changes: 29 additions & 0 deletions src/v/kafka/server/BUILD
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
load("//bazel:build.bzl", "redpanda_cc_library")

redpanda_cc_library(
name = "qdc_monitor_config",
hdrs = [
"queue_depth_monitor_config.h",
],
include_prefix = "kafka/server",
visibility = ["//visibility:public"],
)

redpanda_cc_library(
name = "server",
srcs = [
Expand Down Expand Up @@ -148,6 +157,7 @@ redpanda_cc_library(
include_prefix = "kafka/server",
visibility = ["//visibility:public"],
deps = [
":qdc_monitor_config",
"//src/v/base",
"//src/v/bytes",
"//src/v/bytes:iobuf",
Expand Down Expand Up @@ -319,3 +329,22 @@ redpanda_cc_library(
"//src/v/kafka/protocol",
],
)

redpanda_cc_library(
name = "app",
srcs = [
"app.cc",
],
hdrs = [
"app.h",
],
implementation_deps = [
":server",
],
include_prefix = "kafka/server",
visibility = ["//src/v/redpanda:__pkg__"],
deps = [
":qdc_monitor_config",
"@seastar",
],
)
91 changes: 91 additions & 0 deletions src/v/kafka/server/app.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#include "kafka/server/app.h"

#include "kafka/server/server.h"

#include <memory>

namespace kafka {

seastar::future<> server_app::init(
seastar::sharded<net::server_configuration>* conf,
seastar::smp_service_group smp,
seastar::scheduling_group sched,
seastar::sharded<cluster::metadata_cache>& mdc,
seastar::sharded<cluster::topics_frontend>& tf,
seastar::sharded<cluster::config_frontend>& cf,
seastar::sharded<features::feature_table>& ft,
seastar::sharded<cluster::client_quota::frontend>& cqf,
seastar::sharded<cluster::client_quota::store>& cqs,
seastar::sharded<quota_manager>& qm,
seastar::sharded<snc_quota_manager>& snc_mgr,
seastar::sharded<kafka::group_router>& gr,
seastar::sharded<kafka::usage_manager>& um,
seastar::sharded<cluster::shard_table>& st,
seastar::sharded<cluster::partition_manager>& pm,
seastar::sharded<cluster::id_allocator_frontend>& idaf,
seastar::sharded<security::credential_store>& cs,
seastar::sharded<security::authorizer>& auth,
seastar::sharded<security::audit::audit_log_manager>& audit,
seastar::sharded<security::oidc::service>& oidc,
seastar::sharded<cluster::security_frontend>& sec,
seastar::sharded<cluster::controller_api>& ctrl,
seastar::sharded<cluster::tx_gateway_frontend>& tx,
std::optional<qdc_monitor_config> qdc,
ssx::singleton_thread_worker& worker,
const std::unique_ptr<pandaproxy::schema_registry::api>& pp) {
return _server.start(
conf,
smp,
sched,
std::ref(mdc),
std::ref(tf),
std::ref(cf),
std::ref(ft),
std::ref(cqf),
std::ref(cqs),
std::ref(qm),
std::ref(snc_mgr),
std::ref(gr),
std::ref(um),
std::ref(st),
std::ref(pm),
std::ref(idaf),
std::ref(cs),
std::ref(auth),
std::ref(audit),
std::ref(oidc),
std::ref(sec),
std::ref(ctrl),
std::ref(tx),
qdc,
std::ref(worker),
std::ref(pp));
}

server_app::~server_app() = default;

seastar::future<> server_app::start() {
return _server.invoke_on_all(&net::server::start);
}

seastar::future<> server_app::shutdown_input() {
return _server.invoke_on_all(&net::server::shutdown_input);
}

seastar::future<> server_app::wait_for_shutdown() {
return _server.invoke_on_all(&net::server::wait_for_shutdown);
}

seastar::future<> server_app::stop() { return _server.stop(); }

} // namespace kafka
120 changes: 120 additions & 0 deletions src/v/kafka/server/app.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once

#include "kafka/server/queue_depth_monitor_config.h"

#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>

#include <memory>

namespace cluster {
class metadata_cache;
class topics_frontend;
class config_frontend;
class shard_table;
class partition_manager;
class id_allocator_frontend;
class security_frontend;
class controller_api;
class tx_gateway_frontend;
namespace client_quota {
class frontend;
class store;
} // namespace client_quota
} // namespace cluster

namespace security {
class credential_store;
class authorizer;
namespace audit {
class audit_log_manager;
}
namespace oidc {
class service;
}
} // namespace security

namespace pandaproxy::schema_registry {
class api;
}

namespace ssx {
class singleton_thread_worker;
}

namespace net {
struct server_configuration;
}

namespace features {
class feature_table;
}

namespace kafka {

class server;
class quota_manager;
class snc_quota_manager;
class group_router;
class usage_manager;

class server_app {
public:
server_app() = default;
server_app(const server_app&) = delete;
server_app& operator=(const server_app&) = delete;
server_app(server_app&&) noexcept = delete;
server_app& operator=(server_app&&) noexcept = delete;
~server_app();

seastar::future<> init(
seastar::sharded<net::server_configuration>*,
seastar::smp_service_group,
seastar::scheduling_group,
seastar::sharded<cluster::metadata_cache>&,
seastar::sharded<cluster::topics_frontend>&,
seastar::sharded<cluster::config_frontend>&,
seastar::sharded<features::feature_table>&,
seastar::sharded<cluster::client_quota::frontend>&,
seastar::sharded<cluster::client_quota::store>&,
seastar::sharded<quota_manager>&,
seastar::sharded<snc_quota_manager>&,
seastar::sharded<kafka::group_router>&,
seastar::sharded<kafka::usage_manager>&,
seastar::sharded<cluster::shard_table>&,
seastar::sharded<cluster::partition_manager>&,
seastar::sharded<cluster::id_allocator_frontend>&,
seastar::sharded<security::credential_store>&,
seastar::sharded<security::authorizer>&,
seastar::sharded<security::audit::audit_log_manager>&,
seastar::sharded<security::oidc::service>&,
seastar::sharded<cluster::security_frontend>&,
seastar::sharded<cluster::controller_api>&,
seastar::sharded<cluster::tx_gateway_frontend>&,
std::optional<qdc_monitor_config>,
ssx::singleton_thread_worker&,
const std::unique_ptr<pandaproxy::schema_registry::api>&);

seastar::future<> start();
seastar::future<> shutdown_input();
seastar::future<> wait_for_shutdown();
seastar::future<> stop();

seastar::sharded<server>& ref() { return _server; }
server& local() { return _server.local(); }

private:
seastar::sharded<server> _server;
};

} // namespace kafka
15 changes: 2 additions & 13 deletions src/v/kafka/server/queue_depth_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,12 @@
#pragma once
#include "base/vlog.h"
#include "kafka/server/logger.h"
#include "kafka/server/queue_depth_monitor_config.h"
#include "utils/queue_depth_control.h"

namespace kafka {

struct qdc_monitor {
struct config {
double latency_alpha;
std::chrono::milliseconds max_latency;
size_t window_count;
std::chrono::milliseconds window_size;
double depth_alpha;
size_t idle_depth;
size_t min_depth;
size_t max_depth;
std::chrono::milliseconds depth_update_freq;
};

exponential_moving_average<std::chrono::steady_clock::duration> ema;
queue_depth_control qdc;
ss::timer<ss::lowres_clock> timer;
Expand All @@ -38,7 +27,7 @@ struct qdc_monitor {
* isn't really a perfect value here. its purpose is to bootstrap the
* algorithm and is irrelevant after a full time window has elapsed.
*/
explicit qdc_monitor(const config& cfg)
explicit qdc_monitor(const qdc_monitor_config& cfg)
: ema(cfg.latency_alpha, cfg.max_latency / 2, cfg.window_count)
, qdc(
cfg.max_latency,
Expand Down
29 changes: 29 additions & 0 deletions src/v/kafka/server/queue_depth_monitor_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once

#include <chrono>

namespace kafka {

struct qdc_monitor_config {
double latency_alpha;
std::chrono::milliseconds max_latency;
size_t window_count;
std::chrono::milliseconds window_size;
double depth_alpha;
size_t idle_depth;
size_t min_depth;
size_t max_depth;
std::chrono::milliseconds depth_update_freq;
};

} // namespace kafka
2 changes: 1 addition & 1 deletion src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ server::server(
ss::sharded<cluster::security_frontend>& sec_fe,
ss::sharded<cluster::controller_api>& controller_api,
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
std::optional<qdc_monitor::config> qdc_config,
std::optional<qdc_monitor_config> qdc_config,
ssx::singleton_thread_worker& tw,
const std::unique_ptr<pandaproxy::schema_registry::api>& sr) noexcept
: net::server(cfg, klog)
Expand Down
3 changes: 2 additions & 1 deletion src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "kafka/server/handlers/handler_probe.h"
#include "kafka/server/latency_probe.h"
#include "kafka/server/queue_depth_monitor.h"
#include "kafka/server/queue_depth_monitor_config.h"
#include "kafka/server/read_distribution_probe.h"
#include "kafka/server/sasl_probe.h"
#include "metrics/metrics.h"
Expand Down Expand Up @@ -75,7 +76,7 @@ class server final
ss::sharded<cluster::security_frontend>&,
ss::sharded<cluster::controller_api>&,
ss::sharded<cluster::tx_gateway_frontend>&,
std::optional<qdc_monitor::config>,
std::optional<qdc_monitor_config>,
ssx::singleton_thread_worker&,
const std::unique_ptr<pandaproxy::schema_registry::api>&) noexcept;

Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ redpanda_cc_library(
"//src/v/finjector",
"//src/v/kafka/client",
"//src/v/kafka/server",
"//src/v/kafka/server:app",
"//src/v/kafka/server:qdc_monitor_config",
"//src/v/metrics",
"//src/v/migrations",
"//src/v/model",
Expand Down
Loading

0 comments on commit d3c8f39

Please sign in to comment.