Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Varun committed Mar 29, 2019
2 parents fcf2bf2 + 8f15a58 commit bb3bd3c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .github/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ We will gladly accept bug fixes, or additions to this library. Please fork this
- If you plan to work on something major, please open an issue to discuss the design first.
- Don't break backwards compatibility. If you really have to, open an issue to discuss this first.
- Make sure to use the `go fmt` command to format your code according to the standards. Even better, set up your editor to do this for you when saving.
- Run [go vet](https://godoc.org/golang.org/x/tools/cmd/vet) to detect any suspicious constructs in your code that could be bugs.
- Run [go vet](https://golang.org/cmd/vet/) to detect any suspicious constructs in your code that could be bugs.
- Explicitly handle all error return values. If you really want to ignore an error value, you can assign it to `_`.You can use [errcheck](https://github.com/kisielk/errcheck) to verify whether you have handled all errors.
- You may also want to run [golint](https://github.com/golang/lint) as well to detect style problems.
- Add tests that cover the changes you made. Make sure to run `go test` with the `-race` argument to test for race conditions.
Expand Down
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
language: go
go:
- 1.10.x
- 1.11.x
- 1.12.x

Expand All @@ -12,8 +11,8 @@ env:
- KAFKA_HOSTNAME=localhost
- DEBUG=true
matrix:
- KAFKA_VERSION=2.0.1 KAFKA_SCALA_VERSION=2.12
- KAFKA_VERSION=2.1.1 KAFKA_SCALA_VERSION=2.12
- KAFKA_VERSION=2.2.0 KAFKA_SCALA_VERSION=2.12

before_install:
- export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
Expand Down
4 changes: 2 additions & 2 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
// --------------------------------------------------------------------

// BalanceStrategy is used to balance topics and partitions
// across memebers of a consumer group
// across members of a consumer group
type BalanceStrategy interface {
// Name uniquely identifies the strategy.
Name() string
Expand Down Expand Up @@ -78,7 +78,7 @@ type balanceStrategy struct {
// Name implements BalanceStrategy.
func (s *balanceStrategy) Name() string { return s.name }

// Balance implements BalanceStrategy.
// Plan implements BalanceStrategy.
func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
// Build members by topic map
mbt := make(map[string][]string)
Expand Down
57 changes: 37 additions & 20 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Broker struct {
responses chan responsePromise
done chan bool

registeredMetrics []string

incomingByteRate metrics.Meter
requestRate metrics.Meter
requestSize metrics.Histogram
Expand Down Expand Up @@ -178,13 +180,7 @@ func (b *Broker) Open(conf *Config) error {
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 {
b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
b.registerMetrics()
}

if conf.Net.SASL.Enable {
Expand Down Expand Up @@ -246,12 +242,7 @@ func (b *Broker) Close() error {
b.done = nil
b.responses = nil

if b.id >= 0 {
b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
}
b.unregisterMetrics()

if err == nil {
Logger.Printf("Closed connection to broker %s\n", b.addr)
Expand Down Expand Up @@ -1068,7 +1059,7 @@ func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (i

func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
buf := make([]byte, responseLengthSize+correlationIDSize)
bytesRead, err := io.ReadFull(b.conn, buf)
_, err := io.ReadFull(b.conn, buf)
if err != nil {
return nil, err
}
Expand All @@ -1084,8 +1075,7 @@ func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, e
}

buf = make([]byte, header.length-correlationIDSize)
c, err := io.ReadFull(b.conn, buf)
bytesRead += c
_, err = io.ReadFull(b.conn, buf)
if err != nil {
return nil, err
}
Expand All @@ -1094,11 +1084,9 @@ func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, e
if err := versionedDecode(buf, res, 0); err != nil {
return nil, err
}

if res.Err != ErrNoError {
return nil, res.Err
}

return res.SaslAuthBytes, nil
}

Expand Down Expand Up @@ -1156,7 +1144,8 @@ func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlati
}

func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
buf := make([]byte, 8)

buf := make([]byte, responseLengthSize+correlationIDSize)

bytesRead, err := io.ReadFull(b.conn, buf)
if err != nil {
Expand All @@ -1174,7 +1163,7 @@ func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int,
return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
}

buf = make([]byte, header.length-4)
buf = make([]byte, header.length-correlationIDSize)

c, err := io.ReadFull(b.conn, buf)
bytesRead += c
Expand Down Expand Up @@ -1247,3 +1236,31 @@ func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
}

}

func (b *Broker) registerMetrics() {
b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
b.brokerRequestRate = b.registerMeter("request-rate")
b.brokerRequestSize = b.registerHistogram("request-size")
b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
b.brokerResponseRate = b.registerMeter("response-rate")
b.brokerResponseSize = b.registerHistogram("response-size")
}

func (b *Broker) unregisterMetrics() {
for _, name := range b.registeredMetrics {
b.conf.MetricRegistry.Unregister(name)
}
}

func (b *Broker) registerMeter(name string) metrics.Meter {
nameForBroker := getMetricNameForBroker(name, b)
b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
}

func (b *Broker) registerHistogram(name string) metrics.Histogram {
nameForBroker := getMetricNameForBroker(name, b)
b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
}
8 changes: 0 additions & 8 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ func getMetricNameForBroker(name string, broker *Broker) string {
return fmt.Sprintf(name+"-for-broker-%d", broker.ID())
}

func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) metrics.Meter {
return metrics.GetOrRegisterMeter(getMetricNameForBroker(name, broker), r)
}

func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram {
return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r)
}

func getMetricNameForTopic(name string, topic string) string {
// Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
// cf. KAFKA-1902 and KAFKA-2337
Expand Down

0 comments on commit bb3bd3c

Please sign in to comment.