From ebc05cc93ab4483e294affa4c37359dd375f4493 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 11 May 2022 18:21:13 +0100 Subject: [PATCH] fix(admin): make DeleteRecords err consistent Also prevent a few other creations of new errors where the existing KError should have been preserved for errors.Is checking. Fixes #2225 --- admin.go | 38 +++++++++++++++++++++----------------- admin_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 18 deletions(-) diff --git a/admin.go b/admin.go index 3fa7258b1..7683b44fb 100644 --- a/admin.go +++ b/admin.go @@ -525,14 +525,13 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][ errs = append(errs, err) } else { if rsp.ErrorCode > 0 { - errs = append(errs, errors.New(rsp.ErrorCode.Error())) + errs = append(errs, rsp.ErrorCode) } for topic, topicErrors := range rsp.Errors { for partition, partitionError := range topicErrors { if !errors.Is(partitionError.errorCode, ErrNoError) { - errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error()) - errs = append(errs, errors.New(errStr)) + errs = append(errs, fmt.Errorf("[%s-%d]: %w", topic, partition, partitionError.errorCode)) } } } @@ -577,40 +576,45 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i if topic == "" { return ErrInvalidTopic } + errs := make([]error, 0) partitionPerBroker := make(map[*Broker][]int32) for partition := range partitionOffsets { broker, err := ca.client.Leader(topic, partition) if err != nil { - return err + errs = append(errs, err) + continue } partitionPerBroker[broker] = append(partitionPerBroker[broker], partition) } - errs := make([]error, 0) for broker, partitions := range partitionPerBroker { topics := make(map[string]*DeleteRecordsRequestTopic) recordsToDelete := make(map[int32]int64) for _, p := range partitions { recordsToDelete[p] = partitionOffsets[p] } - topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete} + topics[topic] = &DeleteRecordsRequestTopic{ + PartitionOffsets: recordsToDelete, + } request := &DeleteRecordsRequest{ Topics: topics, Timeout: ca.conf.Admin.Timeout, } - rsp, err := broker.DeleteRecords(request) if err != nil { errs = append(errs, err) - } else { - deleteRecordsResponseTopic, ok := rsp.Topics[topic] - if !ok { - errs = append(errs, ErrIncompleteResponse) - } else { - for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions { - if !errors.Is(deleteRecordsResponsePartition.Err, ErrNoError) { - errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error())) - } - } + continue + } + + deleteRecordsResponseTopic, ok := rsp.Topics[topic] + if !ok { + errs = append(errs, ErrIncompleteResponse) + continue + } + + for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions { + if !errors.Is(deleteRecordsResponsePartition.Err, ErrNoError) { + errs = append(errs, deleteRecordsResponsePartition.Err) + continue } } } diff --git a/admin_test.go b/admin_test.go index 268c6b60e..c21a8c9d9 100644 --- a/admin_test.go +++ b/admin_test.go @@ -593,7 +593,7 @@ func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) { } } -func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) { +func TestClusterAdminDeleteRecordsWithUnsupportedVersion(t *testing.T) { topicName := "my_topic" seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -643,6 +643,51 @@ func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) { } } +func TestClusterAdminDeleteRecordsWithLeaderNotAvailable(t *testing.T) { + topicName := "my_topic" + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetLeader("my_topic", 1, -1). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + }) + + config := NewTestConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + partitionOffset := make(map[int32]int64) + partitionOffset[1] = 1000 + + err = admin.DeleteRecords(topicName, partitionOffset) + if err == nil { + t.Fatal("expected an ErrDeleteRecords") + } + + if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") { + t.Fatal(err) + } + + if !errors.Is(err, ErrDeleteRecords) { + t.Fatal(err) + } + + if !errors.Is(err, ErrLeaderNotAvailable) { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + func TestClusterAdminDescribeConfig(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close()