From a5a9b835afebdc927d62dbbc1687482a99fb59eb Mon Sep 17 00:00:00 2001 From: Robin Date: Mon, 18 Dec 2017 22:26:28 +0100 Subject: [PATCH 1/2] refactor ConsumerMetadataRequest/Response to FindCoordinatorRequest/Response --- broker.go | 12 +++ client.go | 11 +-- consumer_metadata_request.go | 13 +++- consumer_metadata_request_test.go | 10 ++- consumer_metadata_response.go | 65 +++++----------- find_coordinator_request.go | 61 +++++++++++++++ find_coordinator_request_test.go | 33 ++++++++ find_coordinator_response.go | 121 ++++++++++++++++++++++++++++++ find_coordinator_response_test.go | 49 ++++++++++++ request.go | 2 +- 10 files changed, 317 insertions(+), 60 deletions(-) create mode 100644 find_coordinator_request.go create mode 100644 find_coordinator_request_test.go create mode 100644 find_coordinator_response.go create mode 100644 find_coordinator_response_test.go diff --git a/broker.go b/broker.go index 923b07faf..df171cb73 100644 --- a/broker.go +++ b/broker.go @@ -230,6 +230,18 @@ func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*Consume return response, nil } +func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) { + response := new(FindCoordinatorResponse) + + err := b.sendAndReceive(request, response) + + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) { response := new(OffsetResponse) diff --git a/client.go b/client.go index 3dbfc4b06..937000a7b 100644 --- a/client.go +++ b/client.go @@ -735,8 +735,8 @@ func (client *client) cachedCoordinator(consumerGroup string) *Broker { return nil } -func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*ConsumerMetadataResponse, error) { - retry := func(err error) (*ConsumerMetadataResponse, error) { +func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) { + retry := func(err error) (*FindCoordinatorResponse, error) { if attemptsRemaining > 0 { Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining) time.Sleep(client.conf.Metadata.Retry.Backoff) @@ -748,10 +748,11 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin for broker := client.any(); broker != nil; broker = client.any() { Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr()) - request := new(ConsumerMetadataRequest) - request.ConsumerGroup = consumerGroup + request := new(FindCoordinatorRequest) + request.CoordinatorKey = consumerGroup + request.CoordinatorType = CoordinatorGroup - response, err := broker.GetConsumerMetadata(request) + response, err := broker.FindCoordinator(request) if err != nil { Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err) diff --git a/consumer_metadata_request.go b/consumer_metadata_request.go index 483be3354..4de45e7bf 100644 --- a/consumer_metadata_request.go +++ b/consumer_metadata_request.go @@ -5,12 +5,19 @@ type ConsumerMetadataRequest struct { } func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error { - return pe.putString(r.ConsumerGroup) + tmp := new(FindCoordinatorRequest) + tmp.CoordinatorKey = r.ConsumerGroup + tmp.CoordinatorType = CoordinatorGroup + return tmp.encode(pe) } func (r *ConsumerMetadataRequest) decode(pd packetDecoder, version int16) (err error) { - r.ConsumerGroup, err = pd.getString() - return err + tmp := new(FindCoordinatorRequest) + if err := tmp.decode(pd, version); err != nil { + return err + } + r.ConsumerGroup = tmp.CoordinatorKey + return nil } func (r *ConsumerMetadataRequest) key() int16 { diff --git a/consumer_metadata_request_test.go b/consumer_metadata_request_test.go index 4509631a0..24e5f0a43 100644 --- a/consumer_metadata_request_test.go +++ b/consumer_metadata_request_test.go @@ -1,6 +1,8 @@ package sarama -import "testing" +import ( + "testing" +) var ( consumerMetadataRequestEmpty = []byte{ @@ -12,8 +14,10 @@ var ( func TestConsumerMetadataRequest(t *testing.T) { request := new(ConsumerMetadataRequest) - testRequest(t, "empty string", request, consumerMetadataRequestEmpty) + testEncodable(t, "empty string", request, consumerMetadataRequestEmpty) + testVersionDecodable(t, "empty string", request, consumerMetadataRequestEmpty, 0) request.ConsumerGroup = "foobar" - testRequest(t, "with string", request, consumerMetadataRequestString) + testEncodable(t, "with string", request, consumerMetadataRequestString) + testVersionDecodable(t, "with string", request, consumerMetadataRequestString, 0) } diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index 6b9632bba..722035205 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -1,10 +1,5 @@ package sarama -import ( - "net" - "strconv" -) - type ConsumerMetadataResponse struct { Err KError Coordinator *Broker @@ -14,61 +9,35 @@ type ConsumerMetadataResponse struct { } func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) { - tmp, err := pd.getInt16() - if err != nil { - return err - } - r.Err = KError(tmp) + tmp := new(FindCoordinatorResponse) - coordinator := new(Broker) - if err := coordinator.decode(pd); err != nil { + if err := tmp.decode(pd, version); err != nil { return err } - if coordinator.addr == ":0" { - return nil - } - r.Coordinator = coordinator - // this can all go away in 2.0, but we have to fill in deprecated fields to maintain - // backwards compatibility - host, portstr, err := net.SplitHostPort(r.Coordinator.Addr()) - if err != nil { - return err - } - port, err := strconv.ParseInt(portstr, 10, 32) - if err != nil { - return err - } - r.CoordinatorID = r.Coordinator.ID() - r.CoordinatorHost = host - r.CoordinatorPort = int32(port) + r.Err = tmp.Err + r.Coordinator = tmp.Coordinator + r.CoordinatorID = tmp.CoordinatorID + r.CoordinatorHost = tmp.CoordinatorHost + r.CoordinatorPort = tmp.CoordinatorPort return nil } func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error { - pe.putInt16(int16(r.Err)) - if r.Coordinator != nil { - host, portstr, err := net.SplitHostPort(r.Coordinator.Addr()) - if err != nil { - return err - } - port, err := strconv.ParseInt(portstr, 10, 32) - if err != nil { - return err - } - pe.putInt32(r.Coordinator.ID()) - if err := pe.putString(host); err != nil { - return err - } - pe.putInt32(int32(port)) - return nil + tmp := &FindCoordinatorResponse{ + Version: 0, + Err: r.Err, + Coordinator: r.Coordinator, + CoordinatorID: r.CoordinatorID, + CoordinatorHost: r.CoordinatorHost, + CoordinatorPort: r.CoordinatorPort, } - pe.putInt32(r.CoordinatorID) - if err := pe.putString(r.CoordinatorHost); err != nil { + + if err := tmp.encode(pe); err != nil { return err } - pe.putInt32(r.CoordinatorPort) + return nil } diff --git a/find_coordinator_request.go b/find_coordinator_request.go new file mode 100644 index 000000000..0ab5cb5ff --- /dev/null +++ b/find_coordinator_request.go @@ -0,0 +1,61 @@ +package sarama + +type CoordinatorType int8 + +const ( + CoordinatorGroup CoordinatorType = 0 + CoordinatorTransaction CoordinatorType = 1 +) + +type FindCoordinatorRequest struct { + Version int16 + CoordinatorKey string + CoordinatorType CoordinatorType +} + +func (f *FindCoordinatorRequest) encode(pe packetEncoder) error { + if err := pe.putString(f.CoordinatorKey); err != nil { + return err + } + + if f.Version >= 1 { + pe.putInt8(int8(f.CoordinatorType)) + } + + return nil +} + +func (f *FindCoordinatorRequest) decode(pd packetDecoder, version int16) (err error) { + if f.CoordinatorKey, err = pd.getString(); err != nil { + return err + } + + if version >= 1 { + f.Version = version + coordinatorType, err := pd.getInt8() + if err != nil { + return err + } + + f.CoordinatorType = CoordinatorType(coordinatorType) + } + + return nil +} + +func (f *FindCoordinatorRequest) key() int16 { + return 10 +} + +func (f *FindCoordinatorRequest) version() int16 { + return f.Version +} + +func (f *FindCoordinatorRequest) requiredVersion() KafkaVersion { + switch f.Version { + case 1: + return V0_11_0_0 + default: + return V0_8_2_0 + } +} diff --git a/find_coordinator_request_test.go b/find_coordinator_request_test.go new file mode 100644 index 000000000..7e889b074 --- /dev/null +++ b/find_coordinator_request_test.go @@ -0,0 +1,33 @@ +package sarama + +import "testing" + +var ( + findCoordinatorRequestConsumerGroup = []byte{ + 0, 5, 'g', 'r', 'o', 'u', 'p', + 0, + } + + findCoordinatorRequestTransaction = []byte{ + 0, 13, 't', 'r', 'a', 'n', 's', 'a', 'c', 't', 'i', 'o', 'n', 'i', 'd', + 1, + } +) + +func TestFindCoordinatorRequest(t *testing.T) { + req := &FindCoordinatorRequest{ + Version: 1, + CoordinatorKey: "group", + CoordinatorType: CoordinatorGroup, + } + + testRequest(t, "version 1 - group", req, findCoordinatorRequestConsumerGroup) + + req = &FindCoordinatorRequest{ + Version: 1, + CoordinatorKey: "transactionid", + CoordinatorType: CoordinatorTransaction, + } + + testRequest(t, "version 1 - transaction", req, findCoordinatorRequestTransaction) +} diff --git a/find_coordinator_response.go b/find_coordinator_response.go new file mode 100644 index 000000000..2bd6b88f9 --- /dev/null +++ b/find_coordinator_response.go @@ -0,0 +1,121 @@ +package sarama + +import ( + "net" + "strconv" + "time" +) + +type FindCoordinatorResponse struct { + Version int16 + ThrottleTime time.Duration + Err KError + ErrMsg *string + Coordinator *Broker + CoordinatorID int32 // deprecated: use Coordinator.ID() + CoordinatorHost string // deprecated: use Coordinator.Addr() + CoordinatorPort int32 // deprecated: use Coordinator.Addr() +} + +func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) { + if version >= 1 { + f.Version = version + + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + f.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + } + + tmp, err := pd.getInt16() + if err != nil { + return err + } + f.Err = KError(tmp) + + if version >= 1 { + if f.ErrMsg, err = pd.getNullableString(); err != nil { + return err + } + } + + coordinator := new(Broker) + if err := coordinator.decode(pd); err != nil { + return err + } + if coordinator.addr == ":0" { + return nil + } + f.Coordinator = coordinator + + // this can all go away in 2.0, but we have to fill in deprecated fields to maintain + // backwards compatibility + host, portstr, err := net.SplitHostPort(f.Coordinator.Addr()) + if err != nil { + return err + } + port, err := strconv.ParseInt(portstr, 10, 32) + if err != nil { + return err + } + f.CoordinatorID = f.Coordinator.ID() + f.CoordinatorHost = host + f.CoordinatorPort = int32(port) + + return nil +} + +func (f *FindCoordinatorResponse) encode(pe packetEncoder) error { + if f.Version >= 1 { + pe.putInt32(int32(f.ThrottleTime / time.Millisecond)) + } + + pe.putInt16(int16(f.Err)) + + if f.Version >= 1 { + if err := pe.putNullableString(f.ErrMsg); err != nil { + return err + } + } + + if f.Coordinator != nil { + host, portstr, err := net.SplitHostPort(f.Coordinator.Addr()) + if err != nil { + return err + } + port, err := strconv.ParseInt(portstr, 10, 32) + if err != nil { + return err + } + pe.putInt32(f.Coordinator.ID()) + if err := pe.putString(host); err != nil { + return err + } + pe.putInt32(int32(port)) + return nil + } + pe.putInt32(f.CoordinatorID) + if err := pe.putString(f.CoordinatorHost); err != nil { + return err + } + pe.putInt32(f.CoordinatorPort) + return nil +} + +func (f *FindCoordinatorResponse) key() int16 { + return 10 +} + +func (f *FindCoordinatorResponse) version() int16 { + return f.Version +} + +func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion { + switch f.Version { + case 1: + return V0_11_0_0 + default: + return V0_8_2_0 + } +} diff --git a/find_coordinator_response_test.go b/find_coordinator_response_test.go new file mode 100644 index 000000000..6bc5d3cf5 --- /dev/null +++ b/find_coordinator_response_test.go @@ -0,0 +1,49 @@ +package sarama + +import ( + "testing" + "time" +) + +var ( + findCoordinatorResponse = []byte{ + 0, 0, 0, 100, + 0, 0, + 255, 255, // empty ErrMsg + 0, 0, 0, 1, + 0, 4, 'h', 'o', 's', 't', + 0, 0, 35, 132, + } + + findCoordinatorResponseError = []byte{ + 0, 0, 0, 100, + 0, 15, + 0, 3, 'm', 's', 'g', + 0, 0, 0, 1, + 0, 4, 'h', 'o', 's', 't', + 0, 0, 35, 132, + } +) + +func TestFindCoordinatorResponse(t *testing.T) { + broker := NewBroker("host:9092") + broker.id = 1 + resp := &FindCoordinatorResponse{ + Version: 1, + ThrottleTime: 100 * time.Millisecond, + Err: ErrNoError, + ErrMsg: nil, + CoordinatorID: 1, + CoordinatorHost: "host", + CoordinatorPort: 9092, + Coordinator: broker, + } + + testResponse(t, "version 1 - no error", resp, findCoordinatorResponse) + + msg := "msg" + resp.Err = ErrConsumerCoordinatorNotAvailable + resp.ErrMsg = &msg + + testResponse(t, "version 1 - error", resp, findCoordinatorResponseError) +} diff --git a/request.go b/request.go index 9c37ca78b..fe3488728 100644 --- a/request.go +++ b/request.go @@ -97,7 +97,7 @@ func allocateBody(key, version int16) protocolBody { case 9: return &OffsetFetchRequest{} case 10: - return &ConsumerMetadataRequest{} + return &FindCoordinatorRequest{} case 11: return &JoinGroupRequest{} case 12: From d0340489e8be655e6f978bf93d1d35f1efc62f14 Mon Sep 17 00:00:00 2001 From: Robin Date: Thu, 25 Jan 2018 10:27:38 +0100 Subject: [PATCH 2/2] remove deprecated fields --- consumer_metadata_response.go | 41 ++++++++++++++++++------ consumer_metadata_response_test.go | 13 ++++++-- find_coordinator_response.go | 50 +++++------------------------- find_coordinator_response_test.go | 13 +++----- 4 files changed, 55 insertions(+), 62 deletions(-) diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index 722035205..442cbde7a 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -1,5 +1,10 @@ package sarama +import ( + "net" + "strconv" +) + type ConsumerMetadataResponse struct { Err KError Coordinator *Broker @@ -16,22 +21,40 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err } r.Err = tmp.Err + r.Coordinator = tmp.Coordinator - r.CoordinatorID = tmp.CoordinatorID - r.CoordinatorHost = tmp.CoordinatorHost - r.CoordinatorPort = tmp.CoordinatorPort + if tmp.Coordinator == nil { + return nil + } + + // this can all go away in 2.0, but we have to fill in deprecated fields to maintain + // backwards compatibility + host, portstr, err := net.SplitHostPort(r.Coordinator.Addr()) + if err != nil { + return err + } + port, err := strconv.ParseInt(portstr, 10, 32) + if err != nil { + return err + } + r.CoordinatorID = r.Coordinator.ID() + r.CoordinatorHost = host + r.CoordinatorPort = int32(port) return nil } func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error { + if r.Coordinator == nil { + r.Coordinator = new(Broker) + r.Coordinator.id = r.CoordinatorID + r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort))) + } + tmp := &FindCoordinatorResponse{ - Version: 0, - Err: r.Err, - Coordinator: r.Coordinator, - CoordinatorID: r.CoordinatorID, - CoordinatorHost: r.CoordinatorHost, - CoordinatorPort: r.CoordinatorPort, + Version: 0, + Err: r.Err, + Coordinator: r.Coordinator, } if err := tmp.encode(pe); err != nil { diff --git a/consumer_metadata_response_test.go b/consumer_metadata_response_test.go index b748784d7..8482f6ff1 100644 --- a/consumer_metadata_response_test.go +++ b/consumer_metadata_response_test.go @@ -17,8 +17,17 @@ var ( ) func TestConsumerMetadataResponseError(t *testing.T) { - response := ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress} - testResponse(t, "error", &response, consumerMetadataResponseError) + response := &ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress} + testEncodable(t, "", response, consumerMetadataResponseError) + + decodedResp := &ConsumerMetadataResponse{} + if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil { + t.Error("could not decode: ", err) + } + + if decodedResp.Err != ErrOffsetsLoadInProgress { + t.Errorf("got %s, want %s", decodedResp.Err, ErrOffsetsLoadInProgress) + } } func TestConsumerMetadataResponseSuccess(t *testing.T) { diff --git a/find_coordinator_response.go b/find_coordinator_response.go index 2bd6b88f9..f2d178f7c 100644 --- a/find_coordinator_response.go +++ b/find_coordinator_response.go @@ -1,20 +1,15 @@ package sarama import ( - "net" - "strconv" "time" ) type FindCoordinatorResponse struct { - Version int16 - ThrottleTime time.Duration - Err KError - ErrMsg *string - Coordinator *Broker - CoordinatorID int32 // deprecated: use Coordinator.ID() - CoordinatorHost string // deprecated: use Coordinator.Addr() - CoordinatorPort int32 // deprecated: use Coordinator.Addr() + Version int16 + ThrottleTime time.Duration + Err KError + ErrMsg *string + Coordinator *Broker } func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) { @@ -49,20 +44,6 @@ func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err e } f.Coordinator = coordinator - // this can all go away in 2.0, but we have to fill in deprecated fields to maintain - // backwards compatibility - host, portstr, err := net.SplitHostPort(f.Coordinator.Addr()) - if err != nil { - return err - } - port, err := strconv.ParseInt(portstr, 10, 32) - if err != nil { - return err - } - f.CoordinatorID = f.Coordinator.ID() - f.CoordinatorHost = host - f.CoordinatorPort = int32(port) - return nil } @@ -79,27 +60,10 @@ func (f *FindCoordinatorResponse) encode(pe packetEncoder) error { } } - if f.Coordinator != nil { - host, portstr, err := net.SplitHostPort(f.Coordinator.Addr()) - if err != nil { - return err - } - port, err := strconv.ParseInt(portstr, 10, 32) - if err != nil { - return err - } - pe.putInt32(f.Coordinator.ID()) - if err := pe.putString(host); err != nil { - return err - } - pe.putInt32(int32(port)) - return nil - } - pe.putInt32(f.CoordinatorID) - if err := pe.putString(f.CoordinatorHost); err != nil { + if err := f.Coordinator.encode(pe); err != nil { return err } - pe.putInt32(f.CoordinatorPort) + return nil } diff --git a/find_coordinator_response_test.go b/find_coordinator_response_test.go index 6bc5d3cf5..39cec6469 100644 --- a/find_coordinator_response_test.go +++ b/find_coordinator_response_test.go @@ -29,14 +29,11 @@ func TestFindCoordinatorResponse(t *testing.T) { broker := NewBroker("host:9092") broker.id = 1 resp := &FindCoordinatorResponse{ - Version: 1, - ThrottleTime: 100 * time.Millisecond, - Err: ErrNoError, - ErrMsg: nil, - CoordinatorID: 1, - CoordinatorHost: "host", - CoordinatorPort: 9092, - Coordinator: broker, + Version: 1, + ThrottleTime: 100 * time.Millisecond, + Err: ErrNoError, + ErrMsg: nil, + Coordinator: broker, } testResponse(t, "version 1 - no error", resp, findCoordinatorResponse)