Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DeleteGroups function to Client #1095

Merged
merged 5 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions deletegroups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/deletegroups"
)

// DeleteGroupsRequest represents a request sent to a kafka broker to delete
// consumer groups.
type DeleteGroupsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// Identifiers of groups to delete.
GroupIDs []string
}

// DeleteGroupsResponse represents a response from a kafka broker to a consumer group
// deletion request.
type DeleteGroupsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// Mapping of group ids to errors that occurred while attempting to delete those groups.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Errors map[string]error
}

// DeleteGroups sends a delete groups request and returns the response. The request is sent to the group coordinator of the first group
// of the request. All deleted groups must be managed by the same group coordinator.
func (c *Client) DeleteGroups(
ctx context.Context,
req *DeleteGroupsRequest,
) (*DeleteGroupsResponse, error) {
m, err := c.roundTrip(ctx, req.Addr, &deletegroups.Request{
GroupIDs: req.GroupIDs,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DeleteGroups: %w", err)
}

r := m.(*deletegroups.Response)

ret := &DeleteGroupsResponse{
Throttle: makeDuration(r.ThrottleTimeMs),
Errors: make(map[string]error, len(r.Responses)),
}

for _, t := range r.Responses {
ret.Errors[t.GroupID] = makeError(t.ErrorCode, "")
}

return ret, nil
}
80 changes: 80 additions & 0 deletions deletegroups_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package kafka

import (
"context"
"errors"
"testing"
"time"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestClientDeleteGroups(t *testing.T) {
if !ktesting.KafkaIsAtLeast("1.1.0") {
t.Skip("Skipping test because kafka version is not high enough.")
}

client, shutdown := newLocalClient()
defer shutdown()

topic := makeTopic()
createTopic(t, topic, 1)

groupID := makeGroupID()

group, err := NewConsumerGroup(ConsumerGroupConfig{
ID: groupID,
Topics: []string{topic},
Brokers: []string{"localhost:9092"},
HeartbeatInterval: 2 * time.Second,
RebalanceTimeout: 2 * time.Second,
RetentionTime: time.Hour,
Logger: &testKafkaLogger{T: t},
})
if err != nil {
t.Fatal(err)
}
defer group.Close()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

gen, err := group.Next(ctx)
if gen == nil {
t.Fatalf("expected generation 1 not to be nil")
}
if err != nil {
t.Fatalf("expected no error, but got %+v", err)
}

// delete not empty group
res, err := client.DeleteGroups(ctx, &DeleteGroupsRequest{
GroupIDs: []string{groupID},
})

if err != nil {
t.Fatal(err)
}

if !errors.Is(res.Errors[groupID], NonEmptyGroup) {
t.Fatalf("expected NonEmptyGroup error, but got %+v", res.Errors[groupID])
}

err = group.Close()
if err != nil {
t.Fatal(err)
}

// delete empty group
res, err = client.DeleteGroups(ctx, &DeleteGroupsRequest{
GroupIDs: []string{groupID},
})

if err != nil {
t.Fatal(err)
}

if err = res.Errors[groupID]; err != nil {
t.Error(err)
}
}
45 changes: 45 additions & 0 deletions protocol/deletegroups/deletegroups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package deletegroups

import "github.com/segmentio/kafka-go/protocol"

func init() {
protocol.Register(&Request{}, &Response{})
}

type Request struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
_ struct{} `kafka:"min=v2,max=v2,tag"`

GroupIDs []string `kafka:"min=v0,max=v2"`
}

func (r *Request) Group() string {
// use first group to determine group coordinator
if len(r.GroupIDs) > 0 {
return r.GroupIDs[0]
}
return ""
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.DeleteGroups }

var (
_ protocol.GroupMessage = (*Request)(nil)
)

type Response struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
_ struct{} `kafka:"min=v2,max=v2,tag"`

ThrottleTimeMs int32 `kafka:"min=v0,max=v2"`
Responses []ResponseGroup `kafka:"min=v0,max=v2"`
}

func (r *Response) ApiKey() protocol.ApiKey { return protocol.DeleteGroups }

type ResponseGroup struct {
GroupID string `kafka:"min=v0,max=v2"`
ErrorCode int16 `kafka:"min=v0,max=v2"`
}
33 changes: 33 additions & 0 deletions protocol/deletegroups/deletegroups_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package deletegroups_test

import (
"testing"

"github.com/segmentio/kafka-go/protocol/deletegroups"
"github.com/segmentio/kafka-go/protocol/prototest"
)

func TestDeleteGroupsRequest(t *testing.T) {
for _, version := range []int16{0, 1, 2} {
prototest.TestRequest(t, version, &deletegroups.Request{
GroupIDs: []string{"group1", "group2"},
})
}
}

func TestDeleteGroupsResponse(t *testing.T) {
for _, version := range []int16{0, 1, 2} {
prototest.TestResponse(t, version, &deletegroups.Response{
Responses: []deletegroups.ResponseGroup{
{
GroupID: "group1",
ErrorCode: 0,
},
{
GroupID: "group2",
ErrorCode: 1,
},
},
})
}
}