Skip to content

Commit

Permalink
Merge pull request #18745 from WillemKauf/read_histogram
Browse files Browse the repository at this point in the history
[CORE-3081] kafka: add `read_distribution` histogram
  • Loading branch information
WillemKauf authored Jun 7, 2024
2 parents 74f078a + 3055a0e commit 18fa688
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 13 deletions.
4 changes: 4 additions & 0 deletions src/v/kafka/protocol/batch_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class kafka_batch_serializer {
uint32_t record_count;
model::offset base_offset;
model::offset last_offset;
model::timestamp first_timestamp;
// First batch with the transactional bit set.
// We only return aborted transactions from this point on.
// This is needed for the correctness of consumption logic for
Expand Down Expand Up @@ -59,6 +60,7 @@ class kafka_batch_serializer {
ss::future<ss::stop_iteration> operator()(model::record_batch&& batch) {
if (unlikely(record_count_ == 0)) {
_base_offset = batch.base_offset();
_first_timestamp = batch.header().first_timestamp;
}
if (unlikely(
!_first_tx_batch_offset
Expand All @@ -78,6 +80,7 @@ class kafka_batch_serializer {
.record_count = record_count_,
.base_offset = _base_offset,
.last_offset = _last_offset,
.first_timestamp = _first_timestamp,
.first_tx_batch_offset = _first_tx_batch_offset,
};
}
Expand All @@ -92,6 +95,7 @@ class kafka_batch_serializer {
protocol::encoder _wr;
model::offset _base_offset;
model::offset _last_offset;
model::timestamp _first_timestamp;
std::optional<model::offset> _first_tx_batch_offset;
uint32_t record_count_ = 0;
};
Expand Down
66 changes: 66 additions & 0 deletions src/v/kafka/read_distribution_probe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "config/configuration.h"
#include "metrics/metrics.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics.hh>

#include <chrono>

namespace kafka {
class read_distribution_probe {
public:
using hist_t = log_hist_read_dist;

read_distribution_probe() = default;
read_distribution_probe(const read_distribution_probe&) = delete;
read_distribution_probe& operator=(const read_distribution_probe&) = delete;
read_distribution_probe(read_distribution_probe&&) = delete;
read_distribution_probe& operator=(read_distribution_probe&&) = delete;
~read_distribution_probe() = default;

template<typename dur_t>
void add_read_event_delta_from_tip(dur_t delta_from_tip) {
_read_distribution.record(delta_from_tip);
};

void setup_metrics() {
namespace sm = ss::metrics;

if (config::shard_local_cfg().disable_metrics()) {
return;
}

_metrics.add_group(
prometheus_sanitize::metrics_name("kafka_fetch"),
{
sm::make_histogram(
"read_distribution",
[this] {
return _read_distribution.read_dist_histogram_logform();
},
sm::description("Read path time distribution histogram"),
{}),
},
{},
{sm::shard_label});
}

private:
hist_t _read_distribution;
metrics::internal_metric_groups _metrics;
};

} // namespace kafka
19 changes: 19 additions & 0 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "kafka/protocol/batch_consumer.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/fetch.h"
#include "kafka/read_distribution_probe.h"
#include "kafka/server/fetch_session.h"
#include "kafka/server/fwd.h"
#include "kafka/server/handlers/details/leader_epoch.h"
Expand Down Expand Up @@ -110,6 +111,7 @@ static ss::future<read_result> read_from_partition(
std::exception_ptr e;
std::unique_ptr<iobuf> data;
std::vector<cluster::tx::tx_range> aborted_transactions;
std::optional<std::chrono::milliseconds> delta_from_tip_ms;
try {
auto result = co_await rdr.reader.consume(
kafka_batch_serializer(), deadline ? *deadline : model::no_timeout);
Expand All @@ -120,6 +122,14 @@ static ss::future<read_result> read_from_partition(
part.probe().add_bytes_fetched_from_follower(data->size_bytes());
}

if (data->size_bytes() > 0) {
auto curr_timestamp = model::timestamp::now();
if (curr_timestamp >= result.first_timestamp) {
delta_from_tip_ms = std::chrono::milliseconds{
curr_timestamp() - result.first_timestamp()};
}
}

if (result.first_tx_batch_offset && result.record_count > 0) {
// Reader should live at least until this point to hold on to the
// segment locks so that prefix truncation doesn't happen.
Expand Down Expand Up @@ -157,6 +167,7 @@ static ss::future<read_result> read_from_partition(
start_o,
hw,
lso.value(),
delta_from_tip_ms,
std::move(aborted_transactions));
}

Expand All @@ -165,6 +176,7 @@ static ss::future<read_result> read_from_partition(
start_o,
hw,
lso.value(),
delta_from_tip_ms,
std::move(aborted_transactions));
}

Expand Down Expand Up @@ -535,6 +547,7 @@ static ss::future<std::vector<read_result>> fetch_ntps_in_parallel(
cluster::partition_manager& cluster_pm,
const replica_selector& replica_selector,
std::vector<ntp_fetch_config> ntp_fetch_configs,
read_distribution_probe& read_probe,
bool foreign_read,
std::optional<model::timeout_clock::time_point> deadline,
const size_t bytes_left,
Expand Down Expand Up @@ -592,6 +605,10 @@ static ss::future<std::vector<read_result>> fetch_ntps_in_parallel(
size_t total_size = 0;
for (const auto& r : results) {
total_size += r.data_size_bytes();
if (r.delta_from_tip_ms.has_value()) {
read_probe.add_read_event_delta_from_tip(
r.delta_from_tip_ms.value());
}
}
vlog(
klog.trace,
Expand Down Expand Up @@ -647,6 +664,7 @@ handle_shard_fetch(ss::shard_id shard, op_context& octx, shard_fetch fetch) {
mgr,
octx.rctx.server().local().get_replica_selector(),
std::move(configs),
octx.rctx.server().local().read_probe(),
foreign_read,
octx.deadline,
octx.bytes_left,
Expand Down Expand Up @@ -803,6 +821,7 @@ class fetch_worker {
_ctx.mgr,
_ctx.srv.get_replica_selector(),
std::move(requests),
_ctx.srv.read_probe(),
_ctx.foreign_read,
_ctx.deadline,
_ctx.bytes_left,
Expand Down
3 changes: 3 additions & 0 deletions src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,13 @@ struct read_result {
model::offset start_offset,
model::offset hw,
model::offset lso,
std::optional<std::chrono::milliseconds> delta,
std::vector<cluster::tx::tx_range> aborted_transactions)
: data(std::move(data))
, start_offset(start_offset)
, high_watermark(hw)
, last_stable_offset(lso)
, delta_from_tip_ms(delta)
, error(error_code::none)
, aborted_transactions(std::move(aborted_transactions)) {}

Expand Down Expand Up @@ -322,6 +324,7 @@ struct read_result {
model::offset start_offset;
model::offset high_watermark;
model::offset last_stable_offset;
std::optional<std::chrono::milliseconds> delta_from_tip_ms;
std::optional<model::node_id> preferred_replica;
error_code error;
model::partition_id partition;
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ server::server(
"kafka/server-mem-fetch")
, _probe(std::make_unique<class latency_probe>())
, _sasl_probe(std::make_unique<class sasl_probe>())
, _read_dist_probe(std::make_unique<read_distribution_probe>())
, _thread_worker(tw)
, _replica_selector(
std::make_unique<rack_aware_replica_selector>(_metadata_cache.local()))
Expand All @@ -193,6 +194,7 @@ server::server(
_probe->setup_public_metrics();

_sasl_probe->setup_metrics(cfg->local().name);
_read_dist_probe->setup_metrics();
}

void server::setup_metrics() {
Expand Down
4 changes: 4 additions & 0 deletions src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "features/feature_table.h"
#include "kafka/latency_probe.h"
#include "kafka/protocol/types.h"
#include "kafka/read_distribution_probe.h"
#include "kafka/sasl_probe.h"
#include "kafka/server/connection_context.h"
#include "kafka/server/fetch_metadata_cache.hh"
Expand Down Expand Up @@ -180,6 +181,8 @@ class server final

sasl_probe& sasl_probe() { return *_sasl_probe; }

read_distribution_probe& read_probe() { return *_read_dist_probe; }

ssx::singleton_thread_worker& thread_worker() { return _thread_worker; }

const std::unique_ptr<pandaproxy::schema_registry::api>& schema_registry() {
Expand Down Expand Up @@ -251,6 +254,7 @@ class server final
metrics::internal_metric_groups _metrics;
std::unique_ptr<class latency_probe> _probe;
std::unique_ptr<class sasl_probe> _sasl_probe;
std::unique_ptr<read_distribution_probe> _read_dist_probe;
ssx::singleton_thread_worker& _thread_worker;
std::unique_ptr<replica_selector> _replica_selector;
const std::unique_ptr<pandaproxy::schema_registry::api>& _schema_registry;
Expand Down
13 changes: 0 additions & 13 deletions src/v/pandaproxy/parsing/from_chars.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,6 @@

namespace pandaproxy::parse {

namespace detail {

template<typename T>
struct is_duration : std::false_type {};

template<typename Rep, typename Period>
struct is_duration<std::chrono::duration<Rep, Period>> : std::true_type {};

template<typename T>
inline constexpr bool is_duration_v = is_duration<T>::value;

} // namespace detail

// from_chars converts from a string_view using std::from_chars.
//
// Recurses through several well-known types as a convenience. E.g.:
Expand Down
7 changes: 7 additions & 0 deletions src/v/reflection/type_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ struct is_std_array_t : std::false_type {};
template<class T, std::size_t N>
struct is_std_array_t<std::array<T, N>> : std::true_type {};

template<typename T>
struct is_duration : std::false_type {};
template<typename Rep, typename Period>
struct is_duration<std::chrono::duration<Rep, Period>> : std::true_type {};
template<typename T>
inline constexpr bool is_duration_v = is_duration<T>::value;

} // namespace detail

namespace reflection {
Expand Down
14 changes: 14 additions & 0 deletions src/v/utils/log_hist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,21 @@ log_hist<duration_t, number_of_buckets, first_bucket_upper_bound>::
return seastar_histogram_logform<internal_hist_config>();
}

template<
typename duration_t,
int number_of_buckets,
uint64_t first_bucket_upper_bound>
seastar::metrics::histogram
log_hist<duration_t, number_of_buckets, first_bucket_upper_bound>::
read_dist_histogram_logform() const {
using read_distribution_config = logform_config<1l, 4ul, 16>;

return seastar_histogram_logform<read_distribution_config>();
}

// Explicit instantiation for log_hist_public
template class log_hist<std::chrono::microseconds, 18, 256ul>;
// Explicit instantiation for log_hist_internal
template class log_hist<std::chrono::microseconds, 26, 8ul>;
// Explicit instantiation for log_hist_read_dist
template class log_hist<std::chrono::minutes, 16, 4ul>;
26 changes: 26 additions & 0 deletions src/v/utils/log_hist.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#pragma once

#include "reflection/type_traits.h"

#include <seastar/core/metrics_types.hh>
#include <seastar/core/shared_ptr.hh>

Expand Down Expand Up @@ -156,6 +158,14 @@ class log_hist {
_counts[i]++;
}

template<
typename dur_t,
typename = std::enable_if_t<detail::is_duration_v<dur_t>, dur_t>>
void record(dur_t dur) {
record(static_cast<uint64_t>(
std::chrono::duration_cast<duration_t>(dur).count()));
}

template<int64_t _scale, uint64_t _first_bucket_bound, int _bucket_count>
struct logform_config {
static constexpr auto bound_is_pow_2 = _first_bucket_bound >= 1
Expand Down Expand Up @@ -189,6 +199,15 @@ class log_hist {
*/
seastar::metrics::histogram internal_histogram_logform() const;

/*
* Generates a Prometheus histogram with 16 buckets. The first bucket has an
* upper bound of 4 - 1 and subsequent buckets have an upper bound of 2
* times the upper bound of the previous bucket.
*
* This is the histogram type used for the Kafka read distribution.
*/
seastar::metrics::histogram read_dist_histogram_logform() const;

private:
friend measurement;

Expand All @@ -215,3 +234,10 @@ using log_hist_public = log_hist<std::chrono::microseconds, 18, 256ul>;
* bounds] [8, 10], [16, 20], [32, 41], [64, 83], [128, 167], [256, 335]
*/
using log_hist_internal = log_hist<std::chrono::microseconds, 26, 8ul>;

/*
* This histogram has units of minutes instead of microseconds, and is used for
* measuring the Kafka read distribution on the scale of less than 4 minutes in
* the first bucket to greater than 91 days in the last bucket.
*/
using log_hist_read_dist = log_hist<std::chrono::minutes, 16, 4ul>;
Loading

0 comments on commit 18fa688

Please sign in to comment.