Skip to content

Commit

Permalink
Support describeacls (#1166)
Browse files Browse the repository at this point in the history
* Support describeacls

* gofmt -s -w createacl_test.go

* make test diff smaller and fix protocl api key

* fix another protocol api key

* improve test name

* protocol fixes

* add missing patterntype

* fix createacls protocol

* fix tags and add tagged fields back in

* bump createacls version to v3

* wip

* just one filter, not a list of filters

* add missing patterntype in test

* fix patterntype location

* add prototests

* createacl_test.go -> createacls_test.go

* seperate createacls_test and describeacls_test

* fix describeaclstest

* add comment for ResourcePatternTypeFilter
  • Loading branch information
petedannemann authored Jul 28, 2023
1 parent 6193fa9 commit f4ca0b4
Show file tree
Hide file tree
Showing 7 changed files with 560 additions and 18 deletions.
11 changes: 7 additions & 4 deletions createacl_test.go → createacls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ func TestClientCreateACLs(t *testing.T) {
client, shutdown := newLocalClient()
defer shutdown()

res, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
topic := makeTopic()
group := makeGroupID()

createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
ACLs: []ACLEntry{
{
Principal: "User:alice",
PermissionType: ACLPermissionTypeAllow,
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeTopic,
ResourcePatternType: PatternTypeLiteral,
ResourceName: "fake-topic-for-alice",
ResourceName: topic,
Host: "*",
},
{
Expand All @@ -32,7 +35,7 @@ func TestClientCreateACLs(t *testing.T) {
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeGroup,
ResourcePatternType: PatternTypeLiteral,
ResourceName: "fake-group-for-bob",
ResourceName: group,
Host: "*",
},
},
Expand All @@ -41,7 +44,7 @@ func TestClientCreateACLs(t *testing.T) {
t.Fatal(err)
}

for _, err := range res.Errors {
for _, err := range createRes.Errors {
if err != nil {
t.Error(err)
}
Expand Down
107 changes: 107 additions & 0 deletions describeacls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/describeacls"
)

// DescribeACLsRequest represents a request sent to a kafka broker to describe
// existing ACLs.
type DescribeACLsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// Filter to filter ACLs on.
Filter ACLFilter
}

type ACLFilter struct {
ResourceTypeFilter ResourceType
ResourceNameFilter string
// ResourcePatternTypeFilter was added in v1 and is not available prior to that.
ResourcePatternTypeFilter PatternType
PrincipalFilter string
HostFilter string
Operation ACLOperationType
PermissionType ACLPermissionType
}

// DescribeACLsResponse represents a response from a kafka broker to an ACL
// describe request.
type DescribeACLsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// Error that occurred while attempting to describe
// the ACLs.
Error error

// ACL resources returned from the describe request.
Resources []ACLResource
}

type ACLResource struct {
ResourceType ResourceType
ResourceName string
PatternType PatternType
ACLs []ACLDescription
}

type ACLDescription struct {
Principal string
Host string
Operation ACLOperationType
PermissionType ACLPermissionType
}

func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error) {
m, err := c.roundTrip(ctx, req.Addr, &describeacls.Request{
Filter: describeacls.ACLFilter{
ResourceTypeFilter: int8(req.Filter.ResourceTypeFilter),
ResourceNameFilter: req.Filter.ResourceNameFilter,
ResourcePatternTypeFilter: int8(req.Filter.ResourcePatternTypeFilter),
PrincipalFilter: req.Filter.PrincipalFilter,
HostFilter: req.Filter.HostFilter,
Operation: int8(req.Filter.Operation),
PermissionType: int8(req.Filter.PermissionType),
},
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DescribeACLs: %w", err)
}

res := m.(*describeacls.Response)
resources := make([]ACLResource, len(res.Resources))

for resourceIdx, respResource := range res.Resources {
descriptions := make([]ACLDescription, len(respResource.ACLs))

for descriptionIdx, respDescription := range respResource.ACLs {
descriptions[descriptionIdx] = ACLDescription{
Principal: respDescription.Principal,
Host: respDescription.Host,
Operation: ACLOperationType(respDescription.Operation),
PermissionType: ACLPermissionType(respDescription.PermissionType),
}
}

resources[resourceIdx] = ACLResource{
ResourceType: ResourceType(respResource.ResourceType),
ResourceName: respResource.ResourceName,
PatternType: PatternType(respResource.PatternType),
ACLs: descriptions,
}
}

ret := &DescribeACLsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Error: makeError(res.ErrorCode, res.ErrorMessage),
Resources: resources,
}

return ret, nil
}
88 changes: 88 additions & 0 deletions describeacls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package kafka

import (
"context"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
"github.com/stretchr/testify/assert"
)

func TestClientDescribeACLs(t *testing.T) {
if !ktesting.KafkaIsAtLeast("2.0.1") {
return
}

client, shutdown := newLocalClient()
defer shutdown()

topic := makeTopic()
group := makeGroupID()

createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
ACLs: []ACLEntry{
{
Principal: "User:alice",
PermissionType: ACLPermissionTypeAllow,
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeTopic,
ResourcePatternType: PatternTypeLiteral,
ResourceName: topic,
Host: "*",
},
{
Principal: "User:bob",
PermissionType: ACLPermissionTypeAllow,
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeGroup,
ResourcePatternType: PatternTypeLiteral,
ResourceName: group,
Host: "*",
},
},
})
if err != nil {
t.Fatal(err)
}

for _, err := range createRes.Errors {
if err != nil {
t.Error(err)
}
}

describeResp, err := client.DescribeACLs(context.Background(), &DescribeACLsRequest{
Filter: ACLFilter{
ResourceTypeFilter: ResourceTypeTopic,
ResourceNameFilter: topic,
ResourcePatternTypeFilter: PatternTypeLiteral,
Operation: ACLOperationTypeRead,
PermissionType: ACLPermissionTypeAllow,
},
})
if err != nil {
t.Fatal(err)
}

expectedDescribeResp := DescribeACLsResponse{
Throttle: 0,
Error: makeError(0, ""),
Resources: []ACLResource{
{
ResourceType: ResourceTypeTopic,
ResourceName: topic,
PatternType: PatternTypeLiteral,
ACLs: []ACLDescription{
{
Principal: "User:alice",
Host: "*",
Operation: ACLOperationTypeRead,
PermissionType: ACLPermissionTypeAllow,
},
},
},
},
}

assert.Equal(t, expectedDescribeResp, *describeResp)
}
36 changes: 22 additions & 14 deletions protocol/createacls/createacls.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ func init() {
type Request struct {
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v2,tag"`
_ struct{} `kafka:"min=v2,max=v3,tag"`

Creations []RequestACLs `kafka:"min=v0,max=v2"`
Creations []RequestACLs `kafka:"min=v0,max=v3"`
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.CreateAcls }
Expand All @@ -21,29 +21,37 @@ func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
}

type RequestACLs struct {
ResourceType int8 `kafka:"min=v0,max=v2"`
ResourceName string `kafka:"min=v0,max=v2"`
ResourcePatternType int8 `kafka:"min=v0,max=v2"`
Principal string `kafka:"min=v0,max=v2"`
Host string `kafka:"min=v0,max=v2"`
Operation int8 `kafka:"min=v0,max=v2"`
PermissionType int8 `kafka:"min=v0,max=v2"`
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v3,tag"`

ResourceType int8 `kafka:"min=v0,max=v3"`
ResourceName string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
ResourcePatternType int8 `kafka:"min=v1,max=v3"`
Principal string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
Host string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
Operation int8 `kafka:"min=v0,max=v3"`
PermissionType int8 `kafka:"min=v0,max=v3"`
}

type Response struct {
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v2,tag"`
_ struct{} `kafka:"min=v2,max=v3,tag"`

ThrottleTimeMs int32 `kafka:"min=v0,max=v2"`
Results []ResponseACLs `kafka:"min=v0,max=v2"`
ThrottleTimeMs int32 `kafka:"min=v0,max=v3"`
Results []ResponseACLs `kafka:"min=v0,max=v3"`
}

func (r *Response) ApiKey() protocol.ApiKey { return protocol.CreateAcls }

type ResponseACLs struct {
ErrorCode int16 `kafka:"min=v0,max=v2"`
ErrorMessage string `kafka:"min=v0,max=v2,nullable"`
// We need at least one tagged field to indicate that v2+ uses "flexible"
// messages.
_ struct{} `kafka:"min=v2,max=v3,tag"`

ErrorCode int16 `kafka:"min=v0,max=v3"`
ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
}

var _ protocol.BrokerMessage = (*Request)(nil)
Loading

0 comments on commit f4ca0b4

Please sign in to comment.