Skip to content

Commit

Permalink
Merge pull request #666 from twmb/kadm
Browse files Browse the repository at this point in the history
kadm: add DeleteTopic,DeleteGroup,Error
  • Loading branch information
twmb authored Jan 22, 2024
2 parents c1a3c3d + 0b89158 commit a2d69ce
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 18 deletions.
61 changes: 61 additions & 0 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,17 @@ func (rs DescribedGroups) On(group string, fn func(*DescribedGroup) error) (Desc
return DescribedGroup{}, kerr.GroupIDNotFound
}

// Error iterates over all groups and returns the first error encountered, if
// any.
func (ds DescribedGroups) Error() error {
for _, d := range ds {
if d.Err != nil {
return d.Err
}
}
return nil
}

// Topics returns a sorted list of all group names.
func (ds DescribedGroups) Names() []string {
all := make([]string, 0, len(ds))
Expand Down Expand Up @@ -385,6 +396,32 @@ func (rs DeleteGroupResponses) On(group string, fn func(*DeleteGroupResponse) er
return DeleteGroupResponse{}, kerr.GroupIDNotFound
}

// Error iterates over all groups and returns the first error encountered, if
// any.
func (rs DeleteGroupResponses) Error() error {
for _, r := range rs {
if r.Err != nil {
return r.Err
}
}
return nil
}

// DeleteGroup deletes the specified group. This is similar to DeleteGroups,
// but returns the kerr.ErrorForCode(response.ErrorCode) if the request/response
// is successful.
func (cl *Client) DeleteGroup(ctx context.Context, group string) (DeleteGroupResponse, error) {
rs, err := cl.DeleteGroups(ctx, group)
if err != nil {
return DeleteGroupResponse{}, err
}
g, exists := rs[group]
if !exists {
return DeleteGroupResponse{}, errors.New("requested group was not part of the delete group response")
}
return g, g.Err
}

// DeleteGroups deletes all groups specified.
//
// The purpose of this request is to allow operators a way to delete groups
Expand Down Expand Up @@ -984,6 +1021,17 @@ func (rs FetchOffsetsResponses) On(group string, fn func(*FetchOffsetsResponse)
return FetchOffsetsResponse{}, kerr.GroupIDNotFound
}

// Error iterates over all responses and returns the first error encountered,
// if any.
func (rs FetchOffsetsResponses) Error() error {
for _, r := range rs {
if r.Err != nil {
return r.Err
}
}
return nil
}

// FetchManyOffsets issues a fetch offsets requests for each group specified.
//
// This function is a batch version of FetchOffsets. FetchOffsets and
Expand Down Expand Up @@ -1092,6 +1140,19 @@ func (ds DeleteOffsetsResponses) EachError(fn func(string, int32, error)) {
}
}

// Error iterates over all responses and returns the first error encountered,
// if any.
func (ds DeleteOffsetsResponses) Error() error {
for _, ps := range ds {
for _, err := range ps {
if err != nil {
return err
}
}
}
return nil
}

// DeleteOffsets deletes offsets for the given group.
//
// Originally, offset commits were persisted in Kafka for some retention time.
Expand Down
12 changes: 12 additions & 0 deletions pkg/kadm/partas.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ func (rs AlterPartitionAssignmentsResponses) Each(fn func(AlterPartitionAssignme
}
}

// Error returns the first error in the responses, if any.
func (rs AlterPartitionAssignmentsResponses) Error() error {
for _, ps := range rs {
for _, r := range ps {
if r.Err != nil {
return r.Err
}
}
}
return nil
}

// AlterPartitionAssignments alters partition assignments for the requested
// partitions, returning an error if the response could not be issued or if
// you do not have permissions.
Expand Down
83 changes: 65 additions & 18 deletions pkg/kadm/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,21 @@ func (rs CreateTopicResponses) On(topic string, fn func(*CreateTopicResponse) er
return CreateTopicResponse{}, kerr.UnknownTopicOrPartition
}

// Error iterates over all responses and returns the first error
// encountered, if any.
func (rs CreateTopicResponses) Error() error {
for _, r := range rs {
if r.Err != nil {
return r.Err
}
}
return nil
}

// CreateTopic issues a create topics request with the given partitions,
// replication factor, and (optional) configs for the given topic name. Under
// the hood, this uses the default 15s request timeout and lets Kafka choose
// where to place partitions. This function exists to complement CreateTopics,
// making the single-topic creation case easier to handle.
//
// Version 4 of the underlying create topic request was introduced in Kafka 2.4
// and brought client support for creation defaults. If talking to a 2.4+
// cluster, you can use -1 for partitions and replicationFactor to use broker
// defaults.
//
// This package includes a StringPtr function to aid in building config values.
//
// If the topic could not be created this function will return an error. An
// error may be returned due to authorization failure, a failed network
// request, a missing controller or other issues. If the request was successful
// but the CreateTopicResponse.Err is non-nil, this returns the error, so you
// do not need to additionally check the Err field.
// replication factor, and (optional) configs for the given topic name.
// This is similar to CreateTopics, but returns the kerr.ErrorForCode(response.ErrorCode)
// if the request/response is successful.
func (cl *Client) CreateTopic(
ctx context.Context,
partitions int32,
Expand Down Expand Up @@ -277,8 +274,34 @@ func (rs DeleteTopicResponses) On(topic string, fn func(*DeleteTopicResponse) er
return DeleteTopicResponse{}, kerr.UnknownTopicOrPartition
}

// Error iterates over all responses and returns the first error
// encountered, if any.
func (rs DeleteTopicResponses) Error() error {
for _, r := range rs {
if r.Err != nil {
return r.Err
}
}
return nil
}

// DeleteTopic issues a delete topic request for the given topic name with a
// (by default) 15s timeout. This is similar to DeleteTopics, but returns the
// kerr.ErrorForCode(response.ErrorCode) if the request/response is successful.
func (cl *Client) DeleteTopic(ctx context.Context, topic string) (DeleteTopicResponse, error) {
rs, err := cl.DeleteTopics(ctx, topic)
if err != nil {
return DeleteTopicResponse{}, err
}
r, exists := rs[topic]
if !exists {
return DeleteTopicResponse{}, errors.New("requested topic was not part of delete topic response")
}
return r, r.Err
}

// DeleteTopics issues a delete topics request for the given topic names with a
// 15s timeout.
// (by default) 15s timeout.
//
// This does not return an error on authorization failures, instead,
// authorization failures are included in the responses. This only returns an
Expand Down Expand Up @@ -402,6 +425,19 @@ func (rs DeleteRecordsResponses) On(topic string, partition int32, fn func(*Dele
return DeleteRecordsResponse{}, kerr.UnknownTopicOrPartition
}

// Error iterates over all responses and returns the first error
// encountered, if any.
func (rs DeleteRecordsResponses) Error() error {
for _, ps := range rs {
for _, r := range ps {
if r.Err != nil {
return r.Err
}
}
}
return nil
}

// DeleteRecords issues a delete records request for the given offsets. Per
// offset, only the Offset field needs to be set.
//
Expand Down Expand Up @@ -498,6 +534,17 @@ func (rs CreatePartitionsResponses) On(topic string, fn func(*CreatePartitionsRe
return CreatePartitionsResponse{}, kerr.UnknownTopicOrPartition
}

// Error iterates over all responses and returns the first error
// encountered, if any.
func (rs CreatePartitionsResponses) Error() error {
for _, r := range rs {
if r.Err != nil {
return r.Err
}
}
return nil
}

// CreatePartitions issues a create partitions request for the given topics,
// adding "add" partitions to each topic. This request lets Kafka choose where
// the new partitions should be.
Expand Down

0 comments on commit a2d69ce

Please sign in to comment.