Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-546: Add Client Quota APIs #1119

Merged
merged 25 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2956635
start on createquotas protocl
petedannemann Apr 14, 2023
89c534e
fix file name
petedannemann Apr 14, 2023
c7dd87c
add descibeclientquotas protocol
petedannemann Apr 14, 2023
05dd93c
fix api keys
petedannemann Apr 14, 2023
65cefed
add protocol support for float64s
petedannemann Apr 18, 2023
f697fff
cleanup and testing
petedannemann Apr 19, 2023
21f84b0
add public api
petedannemann Apr 19, 2023
13ba79e
doc strings
petedannemann Apr 19, 2023
7450047
describeclientquotas protocol cleanup and tests
petedannemann Apr 19, 2023
af7dc5d
fix describeclientquotas and testing
petedannemann Apr 19, 2023
441539d
gofmt
petedannemann Apr 19, 2023
edbbbe6
grammar
petedannemann Apr 19, 2023
15c6979
gofmt
petedannemann Apr 19, 2023
8b47e27
move expected value closer to assertion
petedannemann May 1, 2023
47269fe
Revert "move expected value closer to assertion"
petedannemann May 1, 2023
db01413
move expected value closer to assertion
petedannemann May 1, 2023
66ec903
move expected value closer to assertion
petedannemann May 1, 2023
f8535d3
wrap errors with makeError
petedannemann May 5, 2023
032aac5
add tagged field for v1 flexible messages
petedannemann May 5, 2023
80b8082
add compact annotations for v1 protocol
petedannemann May 5, 2023
35535cf
remove unnecessary kafka tags in describeclientquotas.go
petedannemann May 5, 2023
5b5b590
add more tagged fields
petedannemann May 31, 2023
259b170
Revert "add more tagged fields"
petedannemann May 31, 2023
10b8107
add tags to describeclientquotas types
petedannemann Jun 7, 2023
b5875a6
add tags to alterclientquotas types
petedannemann Jun 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions alterclientquotas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/alterclientquotas"
)

// AlterClientQuotasRequest represents a request sent to a kafka broker to
// alter client quotas.
type AlterClientQuotasRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of client quotas entries to alter.
Entries []AlterClientQuotaEntry

// Whether the alteration should be validated, but not performed.
ValidateOnly bool
}

type AlterClientQuotaEntry struct {
// The quota entities to alter.
Entities []AlterClientQuotaEntity

// An individual quota configuration entry to alter.
Ops []AlterClientQuotaOps
}

type AlterClientQuotaEntity struct {
// The quota entity type.
EntityType string

// The name of the quota entity, or null if the default.
EntityName string
}

type AlterClientQuotaOps struct {
// The quota configuration key.
Key string

// The quota configuration value to set, otherwise ignored if the value is to be removed.
Value float64

// Whether the quota configuration value should be removed, otherwise set.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise set.

Is there a word missing at the end there ("false" maybe)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these doc strings (including this one) are just quotes from the API docs. This verbiage seems common in the Kafka API docs so I think it is ok. I can change it if you feel otherwise

Remove bool
}

type AlterClientQuotaResponseQuotas struct {
// Error is set to a non-nil value including the code and message if a top-level
// error was encountered when doing the update.
Error error

// The altered quota entities.
Entities []AlterClientQuotaEntity
}

// AlterClientQuotasResponse represents a response from a kafka broker to an alter client
// quotas request.
type AlterClientQuotasResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// List of altered client quotas responses.
Entries []AlterClientQuotaResponseQuotas
}

// AlterClientQuotas sends client quotas alteration request to a kafka broker and returns
// the response.
func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) {
entries := make([]alterclientquotas.Entry, len(req.Entries))

for entryIdx, entry := range req.Entries {
entities := make([]alterclientquotas.Entity, len(entry.Entities))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the pattern used elsewhere where we choose not to reuse the types defined in the protocol package and create new types in the kafka package

for entityIdx, entity := range entry.Entities {
entities[entityIdx] = alterclientquotas.Entity{
EntityType: entity.EntityType,
EntityName: entity.EntityName,
}
}

ops := make([]alterclientquotas.Ops, len(entry.Ops))
for opsIdx, op := range entry.Ops {
ops[opsIdx] = alterclientquotas.Ops{
Key: op.Key,
Value: op.Value,
Remove: op.Remove,
}
}

entries[entryIdx] = alterclientquotas.Entry{
Entities: entities,
Ops: ops,
}
}

m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{
Entries: entries,
ValidateOnly: req.ValidateOnly,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err)
}

res := m.(*alterclientquotas.Response)
responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results))

for responseEntryIdx, responseEntry := range res.Results {
responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities))
for responseEntityIdx, responseEntity := range responseEntry.Entities {
responseEntities[responseEntityIdx] = AlterClientQuotaEntity{
EntityType: responseEntity.EntityType,
EntityName: responseEntity.EntityName,
}
}

responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{
Error: makeError(responseEntry.ErrorCode, responseEntry.ErrorMessage),
Entities: responseEntities,
}
}
ret := &AlterClientQuotasResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Entries: responseEntries,
}

return ret, nil
}
104 changes: 104 additions & 0 deletions alterclientquotas_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package kafka

import (
"context"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
"github.com/stretchr/testify/assert"
)

func TestClientAlterClientQuotas(t *testing.T) {
// Added in Version 2.6.0 https://issues.apache.org/jira/browse/KAFKA-7740
if !ktesting.KafkaIsAtLeast("2.6.0") {
return
}

const (
entityType = "client-id"
entityName = "my-client-id"
key = "producer_byte_rate"
value = 500000.0
)

client, shutdown := newLocalClient()
defer shutdown()

alterResp, err := client.AlterClientQuotas(context.Background(), &AlterClientQuotasRequest{
Entries: []AlterClientQuotaEntry{
{
Entities: []AlterClientQuotaEntity{
{
EntityType: entityType,
EntityName: entityName,
},
},
Ops: []AlterClientQuotaOps{
{
Key: key,
Value: value,
Remove: false,
},
},
},
},
})

if err != nil {
t.Fatal(err)
}

expectedAlterResp := AlterClientQuotasResponse{
Throttle: 0,
Entries: []AlterClientQuotaResponseQuotas{
{
Error: makeError(0, ""),
Entities: []AlterClientQuotaEntity{
{
EntityName: entityName,
EntityType: entityType,
},
},
},
},
}

assert.Equal(t, expectedAlterResp, *alterResp)

describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{
Components: []DescribeClientQuotasRequestComponent{
{
EntityType: entityType,
MatchType: 0,
Match: entityName,
},
},
})

if err != nil {
t.Fatal(err)
}

expectedDescribeResp := DescribeClientQuotasResponse{
Throttle: 0,
Error: makeError(0, ""),
Entries: []DescribeClientQuotasResponseQuotas{
{
Entities: []DescribeClientQuotasEntity{
{
EntityType: entityType,
EntityName: entityName,
},
},
Values: []DescribeClientQuotasValue{
{
Key: key,
Value: value,
},
},
},
},
}

assert.Equal(t, expectedDescribeResp, *describeResp)
}
126 changes: 126 additions & 0 deletions describeclientquotas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/describeclientquotas"
)

// DescribeClientQuotasRequest represents a request sent to a kafka broker to
// describe client quotas.
type DescribeClientQuotasRequest struct {
// Address of the kafka broker to send the request to
Addr net.Addr

// List of quota components to describe.
Components []DescribeClientQuotasRequestComponent

// Whether the match is strict, i.e. should exclude entities with
// unspecified entity types.
Strict bool
}

type DescribeClientQuotasRequestComponent struct {
// The entity type that the filter component applies to.
EntityType string

// How to match the entity (0 = exact name, 1 = default name,
// 2 = any specified name).
MatchType int8

// The string to match against, or null if unused for the match type.
Match string
}

// DescribeClientQuotasReesponse represents a response from a kafka broker to a describe client quota request.
type DescribeClientQuotasResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// Error is set to a non-nil value including the code and message if a top-level
// error was encountered when doing the update.
Error error

// List of describe client quota responses.
Entries []DescribeClientQuotasResponseQuotas
}

type DescribeClientQuotasEntity struct {
// The quota entity type.
EntityType string

// The name of the quota entity, or null if the default.
EntityName string
}

type DescribeClientQuotasValue struct {
// The quota configuration key.
Key string

// The quota configuration value.
Value float64
}

type DescribeClientQuotasResponseQuotas struct {
// List of client quota entities and their descriptions.
Entities []DescribeClientQuotasEntity

// The client quota configuration values.
Values []DescribeClientQuotasValue
}

// DescribeClientQuotas sends a describe client quotas request to a kafka broker and returns
// the response.
func (c *Client) DescribeClientQuotas(ctx context.Context, req *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) {
components := make([]describeclientquotas.Component, len(req.Components))

for componentIdx, component := range req.Components {
components[componentIdx] = describeclientquotas.Component{
EntityType: component.EntityType,
MatchType: component.MatchType,
Match: component.Match,
}
}

m, err := c.roundTrip(ctx, req.Addr, &describeclientquotas.Request{
Components: components,
Strict: req.Strict,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DescribeClientQuotas: %w", err)
}

res := m.(*describeclientquotas.Response)
responseEntries := make([]DescribeClientQuotasResponseQuotas, len(res.Entries))

for responseEntryIdx, responseEntry := range res.Entries {
responseEntities := make([]DescribeClientQuotasEntity, len(responseEntry.Entities))
for responseEntityIdx, responseEntity := range responseEntry.Entities {
responseEntities[responseEntityIdx] = DescribeClientQuotasEntity{
EntityType: responseEntity.EntityType,
EntityName: responseEntity.EntityName,
}
}

responseValues := make([]DescribeClientQuotasValue, len(responseEntry.Values))
for responseValueIdx, responseValue := range responseEntry.Values {
responseValues[responseValueIdx] = DescribeClientQuotasValue{
Key: responseValue.Key,
Value: responseValue.Value,
}
}
responseEntries[responseEntryIdx] = DescribeClientQuotasResponseQuotas{
Entities: responseEntities,
Values: responseValues,
}
}
ret := &DescribeClientQuotasResponse{
Throttle: time.Duration(res.ThrottleTimeMs),
Entries: responseEntries,
}

return ret, nil
}
Loading