Skip to content

Commit

Permalink
Added a DeleteRecords API (#1141)
Browse files Browse the repository at this point in the history
* Added a delete records api

* Corrected semaphore errors

* Resolved all the comments

* Resolved formatting errors

* Minor Change

* Comments added

* indentation check

* Update kafka/adminapi.go

* Update kafka/adminapi.go

* Update kafka/adminapi.go

* Update tests and CHANGELOG.md per comments

* Update kafka/adminapi.go

Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>

* Address public API comments

* Update examples/admin_delete_records/admin_delete_records.go

Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>

* Update examples/admin_delete_records/admin_delete_records.go

Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>

* Update examples/admin_delete_records/admin_delete_records.go

Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>

* Update kafka/adminapi.go

Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>

* Update kafka/integration_test.go

Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>

* Update kafka/integration_test.go

Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>

* Address PR comments

* Regenerate api.html

* Run gofmt on example after comment addressal via Github UI

* Small changes within comments and strings

---------

Co-authored-by: Milind L <milindl@users.noreply.github.com>
Co-authored-by: Milind L <miluthra@confluent.io>
Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>
  • Loading branch information
4 people authored Jul 10, 2024
1 parent d30da91 commit 37256f4
Show file tree
Hide file tree
Showing 11 changed files with 3,714 additions and 3,180 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

This is a feature release.

* Adds an AdminAPI `DeleteRecords()` (#1141, @PratRanj07).

## Fixes

* Issues: #965
Expand All @@ -19,7 +21,6 @@ for a complete list of changes, enhancements, fixes and upgrade considerations.

# v2.4.0


This is a feature release.

* [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol):
Expand All @@ -44,7 +45,6 @@ confluent-kafka-go is based on librdkafka v2.4.0, see the
[librdkafka v2.4.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


# v2.3.0

This is a feature release.
Expand Down
1 change: 1 addition & 0 deletions examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ admin_describe_topics/admin_describe_topics
admin_describe_cluster/admin_describe_cluster
admin_delete_acls/admin_delete_acls
admin_delete_consumer_groups/admin_delete_consumer_groups
admin_delete_records/admin_delete_records
admin_delete_topics/admin_delete_topics
admin_describe_acls/admin_describe_acls
admin_describe_config/admin_describe_config
Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Examples

[admin_delete_consumer_groups](admin_delete_consumer_groups) - Delete consumer groups

[admin_delete_records](admin_delete_records) - Delete records before a specified offset

[admin_delete_topics](admin_delete_topics) - Delete topics

[admin_describe_acls](admin_describe_acls) - Find Access Control Lists using a filter
Expand Down
109 changes: 109 additions & 0 deletions examples/admin_delete_records/admin_delete_records.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Copyright 2024 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Delete Records before a particular offset in specified Topic Partition.

package main

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
args := os.Args

if len(args) < 5 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap_servers> "+
"<topic1> <partition1> <offset1> ...\n",
args[0])
os.Exit(1)
}

// Create new AdminClient.
ac, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": args[1],
})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer ac.Close()

var topicPartitionOffsets []kafka.TopicPartition

argsCnt := len(os.Args)
i := 2
index := 0

for i < argsCnt {
if i+3 > argsCnt {
fmt.Printf("Expected %d arguments for partition %d, got %d\n", 3, index, argsCnt-i)
os.Exit(1)
}
topicName := os.Args[i]
partition, err := strconv.ParseInt(args[i+1], 10, 32)
if err != nil {
panic(err)
}
offset, err := strconv.ParseInt(args[i+2], 10, 64)
if offset < -1 {
err = fmt.Errorf("invalid offset %d for topic %s partition %d",
offset, topicName, partition)
}
if err != nil {
panic(err)
}

topicPartitionOffsets = append(topicPartitionOffsets,
kafka.TopicPartition{
Topic: &topicName,
Partition: int32(partition),
Offset: kafka.Offset(offset),
})
i += 3
index++
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

res, err := ac.DeleteRecords(ctx, topicPartitionOffsets)
if err != nil {
fmt.Printf("Failed to delete records: %s\n", err)
os.Exit(1)
}

for _, deleteRecordsResult := range res.DeleteRecordsResults {
fmt.Printf("Delete records result for topic %s partition: %+v\n",
*deleteRecordsResult.TopicPartition.Topic,
deleteRecordsResult.TopicPartition.Partition)
err := deleteRecordsResult.TopicPartition.Error
if err != nil {
fmt.Printf("\tDelete records failed with error: %s\n", err.Error())
} else {
fmt.Printf("\tDelete records succeeded\n")
fmt.Printf("\t\tNew low-watermark: %v\n",
deleteRecordsResult.DeletedRecords.LowWatermark)
}
}
}
119 changes: 119 additions & 0 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,29 @@ type ListConsumerGroupsResult struct {
Errors []error
}

// DeletedRecords contains information about deleted
// records of a single partition
type DeletedRecords struct {
// Low-watermark offset after deletion
LowWatermark Offset
}

// DeleteRecordsResult represents the result of a DeleteRecords call
// for a single partition.
type DeleteRecordsResult struct {
// One of requested partitions.
// The Error field is set if any occurred for that partition.
TopicPartition TopicPartition
// Deleted records information, or nil if an error occurred.
DeletedRecords *DeletedRecords
}

// DeleteRecordsResults represents the results of a DeleteRecords call.
type DeleteRecordsResults struct {
// A slice of DeleteRecordsResult, one for each requested topic partition.
DeleteRecordsResults []DeleteRecordsResult
}

// MemberAssignment represents the assignment of a consumer group member.
type MemberAssignment struct {
// Partitions assigned to current member.
Expand Down Expand Up @@ -1505,6 +1528,24 @@ func (a *AdminClient) cConfigResourceToResult(cRes **C.rd_kafka_ConfigResource_t
return result, nil
}

// cToDeletedRecordResult converts a C topic partitions list to a Go DeleteRecordsResult slice.
func cToDeletedRecordResult(
cparts *C.rd_kafka_topic_partition_list_t) (results []DeleteRecordsResult) {
partitions := newTopicPartitionsFromCparts(cparts)
partitionsLen := len(partitions)
results = make([]DeleteRecordsResult, partitionsLen)

for i := 0; i < partitionsLen; i++ {
results[i].TopicPartition = partitions[i]
if results[i].TopicPartition.Error == nil {
results[i].DeletedRecords = &DeletedRecords{
LowWatermark: results[i].TopicPartition.Offset}
}
}

return results
}

// ClusterID returns the cluster ID as reported in broker metadata.
//
// Note on cancellation: Although the underlying C function respects the
Expand Down Expand Up @@ -3397,6 +3438,84 @@ func (a *AdminClient) AlterUserScramCredentials(
return result, nil
}

// DeleteRecords deletes records (messages) in topic partitions older than the offsets provided.
//
// Parameters:
// - `ctx` - context with the maximum amount of time to block, or nil for
// indefinite.
// - `recordsToDelete` - A slice of TopicPartitions with the offset field set.
// For each partition, delete all messages up to but not including the specified offset.
// The offset could be set to kafka.OffsetEnd to delete all the messages in the partition.
// - `options` - DeleteRecordsAdminOptions options.
//
// Returns a DeleteRecordsResults, which contains a slice of
// DeleteRecordsResult, each representing the result for one topic partition.
// Individual TopicPartitions inside the DeleteRecordsResult should be checked for errors.
// If successful, the DeletedRecords within the DeleteRecordsResult will be non-nil,
// and contain the low-watermark offset (smallest available offset of all live replicas).
func (a *AdminClient) DeleteRecords(ctx context.Context,
recordsToDelete []TopicPartition,
options ...DeleteRecordsAdminOption) (result DeleteRecordsResults, err error) {
err = a.verifyClient()
if err != nil {
return result, err
}

if len(recordsToDelete) == 0 {
return result, newErrorFromString(ErrInvalidArg, "No records to delete")
}

// convert recordsToDelete to rd_kafka_DeleteRecords_t** required by implementation
cRecordsToDelete := newCPartsFromTopicPartitions(recordsToDelete)
defer C.rd_kafka_topic_partition_list_destroy(cRecordsToDelete)

cDelRecords := make([]*C.rd_kafka_DeleteRecords_t, 1)
defer C.rd_kafka_DeleteRecords_destroy_array(&cDelRecords[0], C.size_t(1))

cDelRecords[0] = C.rd_kafka_DeleteRecords_new(cRecordsToDelete)

// Convert Go AdminOptions (if any) to C AdminOptions.
genericOptions := make([]AdminOption, len(options))
for i := range options {
genericOptions[i] = options[i]
}
cOptions, err := adminOptionsSetup(
a.handle, C.RD_KAFKA_ADMIN_OP_DELETERECORDS, genericOptions)
if err != nil {
return result, err
}
defer C.rd_kafka_AdminOptions_destroy(cOptions)

// Create temporary queue for async operation.
cQueue := C.rd_kafka_queue_new(a.handle.rk)
defer C.rd_kafka_queue_destroy(cQueue)

// Call rd_kafka_DeleteRecords (asynchronous).
C.rd_kafka_DeleteRecords(
a.handle.rk,
&cDelRecords[0],
C.size_t(1),
cOptions,
cQueue)

// Wait for result, error or context timeout.
rkev, err := a.waitResult(
ctx, cQueue, C.RD_KAFKA_EVENT_DELETERECORDS_RESULT)
if err != nil {
return result, err
}
defer C.rd_kafka_event_destroy(rkev)

cRes := C.rd_kafka_event_DeleteRecords_result(rkev)
cDeleteRecordsResultList := C.rd_kafka_DeleteRecords_result_offsets(cRes)

// Convert result from C to Go.
result.DeleteRecordsResults =
cToDeletedRecordResult(cDeleteRecordsResultList)

return result, nil
}

// NewAdminClient creats a new AdminClient instance with a new underlying client instance
func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {

Expand Down
38 changes: 38 additions & 0 deletions kafka/adminapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,43 @@ func testAdminAPIsUserScramCredentials(what string, a *AdminClient, expDuration
}
}

func testAdminAPIsDeleteRecords(what string, a *AdminClient, expDuration time.Duration, t *testing.T) {
topic := "test"
partition := int32(0)
offset := Offset(2)
ctx, cancel := context.WithTimeout(context.Background(), expDuration)
defer cancel()
topicPartitionOffset := []TopicPartition{{Topic: &topic, Partition: partition, Offset: offset}}
_, err := a.DeleteRecords(ctx, topicPartitionOffset, SetAdminRequestTimeout(time.Second))
if err == nil || ctx.Err() != context.DeadlineExceeded {
t.Fatalf("Expected context deadline exceeded, got %s and %s\n",
err, ctx.Err())
}

// Invalid option value
_, err = a.DeleteRecords(ctx, topicPartitionOffset, SetAdminRequestTimeout(-1))
if err == nil || err.(Error).Code() != ErrInvalidArg {
t.Fatalf("Expected ErrInvalidArg, not %v", err)
}

for _, options := range [][]DeleteRecordsAdminOption{
{},
{SetAdminRequestTimeout(time.Second)},
} {
// nil argument should fail, not being treated as empty
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()
result, err := a.DeleteRecords(ctx, nil, options...)
if result.DeleteRecordsResults != nil || err == nil {
t.Fatalf("Expected DeleteRecords to fail, but got result: %v, err: %v",
result, err)
}
if err.(Error).Code() != ErrInvalidArg {
t.Fatalf("Expected ErrInvalidArg, not %v", err)
}
}
}

func testAdminAPIs(what string, a *AdminClient, t *testing.T) {
t.Logf("AdminClient API testing on %s: %s", a, what)

Expand Down Expand Up @@ -1134,6 +1171,7 @@ func testAdminAPIs(what string, a *AdminClient, t *testing.T) {
testAdminAPIsListOffsets(what, a, expDuration, t)

testAdminAPIsUserScramCredentials(what, a, expDuration, t)
testAdminAPIsDeleteRecords(what, a, expDuration, t)
}

// TestAdminAPIs dry-tests most Admin APIs, no broker is needed.
Expand Down
12 changes: 12 additions & 0 deletions kafka/adminoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func (ao AdminOptionOperationTimeout) supportsDeleteTopics() {
}
func (ao AdminOptionOperationTimeout) supportsCreatePartitions() {
}
func (ao AdminOptionOperationTimeout) supportsDeleteRecords() {
}

func (ao AdminOptionOperationTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
if !ao.isSet {
Expand Down Expand Up @@ -143,6 +145,8 @@ func (ao AdminOptionRequestTimeout) supportsDescribeUserScramCredentials() {
}
func (ao AdminOptionRequestTimeout) supportsAlterUserScramCredentials() {
}
func (ao AdminOptionRequestTimeout) supportsDeleteRecords() {
}
func (ao AdminOptionRequestTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
if !ao.isSet {
return nil
Expand Down Expand Up @@ -563,6 +567,14 @@ type ListOffsetsAdminOption interface {
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}

// DeleteRecordsAdminOption - see setter.
//
// See SetAdminRequestTimeout, SetAdminOperationTimeout.
type DeleteRecordsAdminOption interface {
supportsDeleteRecords()
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}

// AdminOption is a generic type not to be used directly.
//
// See CreateTopicsAdminOption et.al.
Expand Down
Loading

0 comments on commit 37256f4

Please sign in to comment.