Skip to content

Commit

Permalink
Merge pull request #6 from DataDog/golamkawsar/add-kafka-write-metric…
Browse files Browse the repository at this point in the history
…s-to-patch

[Nicky] [Sarama] Added several new metrics to see breakdown of Sarama latency numbers
  • Loading branch information
golam-kawsar authored Jun 13, 2023
2 parents b33682e + 028dd78 commit f75a462
Showing 1 changed file with 73 additions and 8 deletions.
81 changes: 73 additions & 8 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,15 @@ type Broker struct {
brokerRequestsInFlight metrics.Counter
brokerThrottleTime metrics.Histogram

kerberosAuthenticator GSSAPIKerberosAuth
kerberosAuthenticator GSSAPIKerberosAuth
connLockWaitTime metrics.Histogram
brokerConnLockWaitTime metrics.Histogram
connWriteTime metrics.Histogram
brokerConnWriteTime metrics.Histogram
responsePromiseWriteTime metrics.Histogram
brokerResponsePromiseWriteTime metrics.Histogram
connReadTime metrics.Histogram
brokerConnReadTime metrics.Histogram
}

// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
Expand Down Expand Up @@ -196,6 +204,10 @@ func (b *Broker) Open(conf *Config) error {
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
b.connLockWaitTime = getOrRegisterHistogram("conn-lock-wait-time-in-ms", conf.MetricRegistry)
b.connWriteTime = getOrRegisterHistogram("conn-write-time-in-ms", conf.MetricRegistry)
b.responsePromiseWriteTime = getOrRegisterHistogram("response-promise-write-time-in-ms", conf.MetricRegistry)
b.connReadTime = getOrRegisterHistogram("conn-read-time-in-ms", conf.MetricRegistry)
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 && !metrics.UseNilMetrics {
Expand Down Expand Up @@ -807,6 +819,8 @@ func (b *Broker) write(buf []byte) (n int, err error) {
}

func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
connWaitStart := time.Now()

b.lock.Lock()
defer b.lock.Unlock()

Expand All @@ -821,6 +835,8 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersi
return nil, ErrUnsupportedVersion
}

b.updateConnLockWaitTimeMetric(time.Since(connWaitStart))

req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
if err != nil {
Expand All @@ -832,6 +848,7 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersi
b.addRequestInFlightMetrics(1)
bytes, err := b.write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
b.updateConnWriteTimeMetric(time.Since(requestTime))
if err != nil {
b.addRequestInFlightMetrics(-1)
return nil, err
Expand All @@ -844,9 +861,13 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersi
return nil, nil
}

responsePromiseWriteStart := time.Now()

promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
b.responses <- promise

b.updateResponsePromiseWriteTimeMetric(time.Since(responsePromiseWriteStart))

return &promise, nil
}

Expand Down Expand Up @@ -949,8 +970,12 @@ func (b *Broker) responseReceiver() {
headerLength := getHeaderLength(response.headerVersion)
header := make([]byte, headerLength)

readStartTime := time.Now()
bytesReadHeader, err := b.readFull(header)
b.updateConnReadTimeMetric(time.Since(readStartTime))

requestLatency := time.Since(response.requestTime)

if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
dead = err
Expand Down Expand Up @@ -1083,14 +1108,14 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
// In SASL Plain, Kafka expects the auth header to be in the following format
// Message format (from https://tools.ietf.org/html/rfc4616):
//
// message = [authzid] UTF8NUL authcid UTF8NUL passwd
// authcid = 1*SAFE ; MUST accept up to 255 octets
// authzid = 1*SAFE ; MUST accept up to 255 octets
// passwd = 1*SAFE ; MUST accept up to 255 octets
// UTF8NUL = %x00 ; UTF-8 encoded NUL character
// message = [authzid] UTF8NUL authcid UTF8NUL passwd
// authcid = 1*SAFE ; MUST accept up to 255 octets
// authzid = 1*SAFE ; MUST accept up to 255 octets
// passwd = 1*SAFE ; MUST accept up to 255 octets
// UTF8NUL = %x00 ; UTF-8 encoded NUL character
//
// SAFE = UTF1 / UTF2 / UTF3 / UTF4
// ;; any UTF-8 encoded Unicode character except NUL
// SAFE = UTF1 / UTF2 / UTF3 / UTF4
// ;; any UTF-8 encoded Unicode character except NUL
//
// With SASL v0 handshake and auth then:
// When credentials are valid, Kafka returns a 4 byte array of null characters.
Expand Down Expand Up @@ -1520,6 +1545,42 @@ func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Dura
b.addRequestInFlightMetrics(-1)
}

func (b *Broker) updateConnLockWaitTimeMetric(waitTime time.Duration) {
waitTimeInMs := int64(waitTime / time.Millisecond)
b.connLockWaitTime.Update(waitTimeInMs)

if b.brokerConnLockWaitTime != nil {
b.brokerConnLockWaitTime.Update(waitTimeInMs)
}
}

func (b *Broker) updateConnWriteTimeMetric(writeTime time.Duration) {
writeTimeInMs := int64(writeTime / time.Millisecond)
b.connWriteTime.Update(writeTimeInMs)

if b.brokerConnWriteTime != nil {
b.brokerConnWriteTime.Update(writeTimeInMs)
}
}

func (b *Broker) updateResponsePromiseWriteTimeMetric(writeTime time.Duration) {
writeTimeInMs := int64(writeTime / time.Millisecond)
b.responsePromiseWriteTime.Update(writeTimeInMs)

if b.brokerResponsePromiseWriteTime != nil {
b.brokerResponsePromiseWriteTime.Update(writeTimeInMs)
}
}

func (b *Broker) updateConnReadTimeMetric(readTime time.Duration) {
readTimeInMs := int64(readTime / time.Millisecond)
b.connReadTime.Update(readTimeInMs)

if b.brokerConnReadTime != nil {
b.brokerConnReadTime.Update(readTimeInMs)
}
}

func (b *Broker) addRequestInFlightMetrics(i int64) {
b.requestsInFlight.Inc(i)
if b.brokerRequestsInFlight != nil {
Expand Down Expand Up @@ -1555,6 +1616,10 @@ func (b *Broker) registerMetrics() {
b.brokerResponseSize = b.registerHistogram("response-size")
b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
b.brokerThrottleTime = b.registerHistogram("throttle-time-in-ms")
b.brokerConnLockWaitTime = b.registerHistogram("conn-lock-wait-time-in-ms")
b.brokerConnWriteTime = b.registerHistogram("conn-write-time-in-ms")
b.brokerResponsePromiseWriteTime = b.registerHistogram("response-promise-write-time-in-ms")
b.brokerConnReadTime = b.registerHistogram("conn-read-time-in-ms")
}

func (b *Broker) unregisterMetrics() {
Expand Down

0 comments on commit f75a462

Please sign in to comment.