Skip to content

Commit

Permalink
fix(proto): correct JoinGroup usage for wider version range
Browse files Browse the repository at this point in the history
Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
  • Loading branch information
dnwe committed Aug 3, 2023
1 parent 23d4561 commit 82a6d57
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 6 deletions.
19 changes: 18 additions & 1 deletion consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,24 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
req.Version = 1
req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
}
if c.groupInstanceId != nil {
if c.config.Version.IsAtLeast(V0_11_0_0) {
req.Version = 2
}
if c.config.Version.IsAtLeast(V0_11_0_0) {
req.Version = 2
}
if c.config.Version.IsAtLeast(V2_0_0_0) {
req.Version = 3
}
// XXX: protocol states "Starting from version 4, the client needs to issue a
// second request to join group", so not enabling this until we can
// investigate
/*
if c.config.Version.IsAtLeast(V2_2_0_0) {
req.Version = 4
}
*/
if c.config.Version.IsAtLeast(V2_3_0_0) {
req.Version = 5
req.GroupInstanceId = c.groupInstanceId
}
Expand Down
12 changes: 9 additions & 3 deletions join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,20 @@ func (r *JoinGroupRequest) isValidVersion() bool {

func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 4, 5:
case 5:
return V2_3_0_0
case 2, 3:
case 4:
return V2_2_0_0
case 3:
return V2_0_0_0
case 2:
return V0_11_0_0
case 1:
return V0_10_1_0
case 0:
return V0_10_0_0
default:
return V0_9_0_0
return V2_3_0_0
}
}

Expand Down
10 changes: 8 additions & 2 deletions join_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,20 @@ func (r *JoinGroupResponse) isValidVersion() bool {

func (r *JoinGroupResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 3, 4, 5:
case 5:
return V2_3_0_0
case 4:
return V2_2_0_0
case 3:
return V2_0_0_0
case 2:
return V0_11_0_0
case 1:
return V0_10_1_0
case 0:
return V0_10_0_0
default:
return V0_9_0_0
return V2_3_0_0
}
}

Expand Down

0 comments on commit 82a6d57

Please sign in to comment.