Skip to content

Commit

Permalink
Merge pull request #2538 from IBM/dnwe/is-valid-version
Browse files Browse the repository at this point in the history
feat: add isValidVersion to protocol types
  • Loading branch information
dnwe authored Aug 3, 2023
2 parents 02c5de3 + a9126ad commit ce1ac25
Show file tree
Hide file tree
Showing 86 changed files with 918 additions and 129 deletions.
4 changes: 4 additions & 0 deletions acl_create_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (c *CreateAclsRequest) headerVersion() int16 {
return 1
}

func (c *CreateAclsRequest) isValidVersion() bool {
return c.Version >= 0 && c.Version <= 1
}

func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 1:
Expand Down
14 changes: 12 additions & 2 deletions acl_create_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "time"

// CreateAclsResponse is a an acl response creation type
type CreateAclsResponse struct {
Version int16
ThrottleTime time.Duration
AclCreationResponses []*AclCreationResponse
}
Expand Down Expand Up @@ -52,15 +53,24 @@ func (c *CreateAclsResponse) key() int16 {
}

func (c *CreateAclsResponse) version() int16 {
return 0
return c.Version
}

func (c *CreateAclsResponse) headerVersion() int16 {
return 0
}

func (c *CreateAclsResponse) isValidVersion() bool {
return c.Version >= 0 && c.Version <= 1
}

func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch c.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *CreateAclsResponse) throttleTime() time.Duration {
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (d *DeleteAclsRequest) headerVersion() int16 {
return 1
}

func (d *DeleteAclsRequest) isValidVersion() bool {
return d.Version >= 0 && d.Version <= 1
}

func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
11 changes: 10 additions & 1 deletion acl_delete_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,17 @@ func (d *DeleteAclsResponse) headerVersion() int16 {
return 0
}

func (d *DeleteAclsResponse) isValidVersion() bool {
return d.Version >= 0 && d.Version <= 1
}

func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch d.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *DeleteAclsResponse) throttleTime() time.Duration {
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (d *DescribeAclsRequest) headerVersion() int16 {
return 1
}

func (d *DescribeAclsRequest) isValidVersion() bool {
return d.Version >= 0 && d.Version <= 1
}

func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (d *DescribeAclsResponse) headerVersion() int16 {
return 0
}

func (d *DescribeAclsResponse) isValidVersion() bool {
return d.Version >= 0 && d.Version <= 1
}

func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
16 changes: 14 additions & 2 deletions add_offsets_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// AddOffsetsToTxnRequest adds offsets to a transaction request
type AddOffsetsToTxnRequest struct {
Version int16
TransactionalID string
ProducerID int64
ProducerEpoch int16
Expand Down Expand Up @@ -45,13 +46,24 @@ func (a *AddOffsetsToTxnRequest) key() int16 {
}

func (a *AddOffsetsToTxnRequest) version() int16 {
return 0
return a.Version
}

func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddOffsetsToTxnRequest) isValidVersion() bool {
return a.Version >= 0 && a.Version <= 2
}

func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
switch a.Version {
case 2:
return V2_7_0_0
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}
16 changes: 14 additions & 2 deletions add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

// AddOffsetsToTxnResponse is a response type for adding offsets to txns
type AddOffsetsToTxnResponse struct {
Version int16
ThrottleTime time.Duration
Err KError
}
Expand Down Expand Up @@ -37,15 +38,26 @@ func (a *AddOffsetsToTxnResponse) key() int16 {
}

func (a *AddOffsetsToTxnResponse) version() int16 {
return 0
return a.Version
}

func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) isValidVersion() bool {
return a.Version >= 0 && a.Version <= 2
}

func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch a.Version {
case 2:
return V2_7_0_0
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *AddOffsetsToTxnResponse) throttleTime() time.Duration {
Expand Down
16 changes: 14 additions & 2 deletions add_partitions_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// AddPartitionsToTxnRequest is a add partition request
type AddPartitionsToTxnRequest struct {
Version int16
TransactionalID string
ProducerID int64
ProducerEpoch int16
Expand Down Expand Up @@ -69,13 +70,24 @@ func (a *AddPartitionsToTxnRequest) key() int16 {
}

func (a *AddPartitionsToTxnRequest) version() int16 {
return 0
return a.Version
}

func (a *AddPartitionsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddPartitionsToTxnRequest) isValidVersion() bool {
return a.Version >= 0 && a.Version <= 2
}

func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
switch a.Version {
case 2:
return V2_7_0_0
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}
16 changes: 14 additions & 2 deletions add_partitions_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

// AddPartitionsToTxnResponse is a partition errors to transaction type
type AddPartitionsToTxnResponse struct {
Version int16
ThrottleTime time.Duration
Errors map[string][]*PartitionError
}
Expand Down Expand Up @@ -76,15 +77,26 @@ func (a *AddPartitionsToTxnResponse) key() int16 {
}

func (a *AddPartitionsToTxnResponse) version() int16 {
return 0
return a.Version
}

func (a *AddPartitionsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) isValidVersion() bool {
return a.Version >= 0 && a.Version <= 2
}

func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch a.Version {
case 2:
return V2_7_0_0
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *AddPartitionsToTxnResponse) throttleTime() time.Duration {
Expand Down
13 changes: 12 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,9 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
Topics: topics,
Timeout: ca.conf.Admin.Timeout,
}
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 1
}
rsp, err := broker.DeleteRecords(request)
if err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -1061,7 +1064,11 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
defer wg.Done()
_ = b.Open(conf) // Ensure that broker is opened

response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
request := &DescribeLogDirsRequest{}
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 1
}
response, err := b.DescribeLogDirs(request)
if err != nil {
errChan <- err
return
Expand Down Expand Up @@ -1208,6 +1215,10 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie
}

func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
if !ca.conf.Version.IsAtLeast(V2_4_0_0) {
return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0")
}

controller, err := ca.client.Coordinator(groupId)
if err != nil {
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion alter_client_quotas_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package sarama
// validate_only => BOOLEAN

type AlterClientQuotasRequest struct {
Version int16
Entries []AlterClientQuotasEntry // The quota configuration entries to alter.
ValidateOnly bool // Whether the alteration should be validated, but not performed.
}
Expand Down Expand Up @@ -182,13 +183,17 @@ func (a *AlterClientQuotasRequest) key() int16 {
}

func (a *AlterClientQuotasRequest) version() int16 {
return 0
return a.Version
}

func (a *AlterClientQuotasRequest) headerVersion() int16 {
return 1
}

func (a *AlterClientQuotasRequest) isValidVersion() bool {
return a.Version == 0
}

func (a *AlterClientQuotasRequest) requiredVersion() KafkaVersion {
return V2_6_0_0
}
7 changes: 6 additions & 1 deletion alter_client_quotas_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// entity_name => NULLABLE_STRING

type AlterClientQuotasResponse struct {
Version int16
ThrottleTime time.Duration // The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
Entries []AlterClientQuotasEntryResponse // The quota configuration entries altered.
}
Expand Down Expand Up @@ -133,13 +134,17 @@ func (a *AlterClientQuotasResponse) key() int16 {
}

func (a *AlterClientQuotasResponse) version() int16 {
return 0
return a.Version
}

func (a *AlterClientQuotasResponse) headerVersion() int16 {
return 0
}

func (a *AlterClientQuotasResponse) isValidVersion() bool {
return a.Version == 0
}

func (a *AlterClientQuotasResponse) requiredVersion() KafkaVersion {
return V2_6_0_0
}
Expand Down
7 changes: 6 additions & 1 deletion alter_configs_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// AlterConfigsRequest is an alter config request type
type AlterConfigsRequest struct {
Version int16
Resources []*AlterConfigsResource
ValidateOnly bool
}
Expand Down Expand Up @@ -114,13 +115,17 @@ func (a *AlterConfigsRequest) key() int16 {
}

func (a *AlterConfigsRequest) version() int16 {
return 0
return a.Version
}

func (a *AlterConfigsRequest) headerVersion() int16 {
return 1
}

func (a *AlterConfigsRequest) isValidVersion() bool {
return a.Version == 0
}

func (a *AlterConfigsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
9 changes: 7 additions & 2 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "time"

// AlterConfigsResponse is a response type for alter config
type AlterConfigsResponse struct {
Version int16
ThrottleTime time.Duration
Resources []*AlterConfigsResourceResponse
}
Expand Down Expand Up @@ -100,17 +101,21 @@ func (a *AlterConfigsResourceResponse) decode(pd packetDecoder, version int16) e
}

func (a *AlterConfigsResponse) key() int16 {
return 32
return 33
}

func (a *AlterConfigsResponse) version() int16 {
return 0
return a.Version
}

func (a *AlterConfigsResponse) headerVersion() int16 {
return 0
}

func (a *AlterConfigsResponse) isValidVersion() bool {
return a.Version == 0
}

func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions alter_partition_reassignments_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (r *AlterPartitionReassignmentsRequest) headerVersion() int16 {
return 2
}

func (r *AlterPartitionReassignmentsRequest) isValidVersion() bool {
return r.Version == 0
}

func (r *AlterPartitionReassignmentsRequest) requiredVersion() KafkaVersion {
return V2_4_0_0
}
Expand Down
Loading

0 comments on commit ce1ac25

Please sign in to comment.