Skip to content

Commit

Permalink
KIP 396 ListOffsets (#1029)
Browse files Browse the repository at this point in the history
* Single Commit

* Comments

* Minor

* lint changes

* Fix build issue due to merge

* Address comments

* Address one remaining type

* Style, doc, gitignore, checks

---------

Co-authored-by: Milind L <milindl@users.noreply.github.com>
Co-authored-by: Milind L <miluthra@confluent.io>
Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>
  • Loading branch information
4 people authored Oct 23, 2023
1 parent b4a69cc commit 115e03b
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 15 deletions.
1 change: 1 addition & 0 deletions examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ admin_incremental_alter_configs/admin_incremental_alter_configs
admin_describe_consumer_groups/admin_describe_consumer_groups
admin_list_consumer_groups/admin_list_consumer_groups
admin_list_consumer_group_offsets/admin_list_consumer_group_offsets
admin_list_offsets/admin_list_offsets
admin_describe_user_scram_credentials/admin_describe_user_scram_credentials
admin_alter_user_scram_credentials/admin_alter_user_scram_credentials
avro_generic_consumer_example/avro_generic_consumer_example
Expand Down
30 changes: 16 additions & 14 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ Examples
[admin_alter_consumer_group_offsets](admin_alter_consumer_group_offsets) - Alter Consumer Group Offsets

[admin_create_acls](admin_create_acls) - Create Access Control Lists

[admin_create_topic](admin_create_topic) - Create a topic

[admin_delete_acls](admin_delete_acls) - Delete Access Control Lists using different filters

[admin_delete_topics](admin_delete_topics) - Delete some topics

[admin_delete_consumer_groups](admin_delete_consumer_groups) - Delete consumer groups

[admin_delete_topics](admin_delete_topics) - Delete topics
Expand All @@ -28,18 +28,20 @@ Examples

[admin_list_consumer_group_offsets](admin_list_consumer_group_offsets) - List consumer group offsets

[admin_list_offsets](admin_list_offsets) - List partition offsets

[admin_list_consumer_groups](admin_list_consumer_groups) - List consumer groups

[avro_generic_consumer_example](avro_generic_consumer_example) - consumer with Schema Registry and Avro Generic Deserializer

[avro_generic_producer_example](avro_generic_producer_example) - producer with Schema Registry and Avro Generic Serializer

[avro_specific_consumer_example](avro_specific_consumer_example) - consumer with Schema Registry and Avro Specific Deserializer

[avro_specific_producer_example](avro_specific_producer_example) - producer with Schema Registry and Avro Specific Serializer

[consumer_example](consumer_example) - Function & callback based consumer

[consumer_offset_metadata](consumer_offset_metadata) - Commit offset with metadata

[consumer_rebalance_example](consumer_rebalance_example) - Use of rebalance callback with manual commit
Expand All @@ -53,29 +55,29 @@ Examples
[idempotent_producer_example](idempotent_producer_example) - Idempotent producer

[json_consumer_example](json_consumer_example) - consumer with Schema Registry and JSON Schema Deserializer

[json_producer_example](json_producer_example) - producer with Schema Registry and JSON Schema Serializer

[legacy](legacy) - Legacy examples

[library-version](library-version) - Show the library version

[mockcluster_example](mockcluster_example) - Use a mock cluster for testing

[mockcluster_failure_example](mockcluster_failure_example) - Use a mock cluster for failure testing

[oauthbearer_consumer_example](oauthbearer_consumer_example) - Unsecured SASL/OAUTHBEARER consumer example

[oauthbearer_oidc_example](oauthbearer_oidc_example) - SASL/OAUTHBEARER with OIDC method example

[oauthbearer_producer_example](oauthbearer_producer_example) - Unsecured SASL/OAUTHBEARER producer example

[producer_custom_channel_example](producer_custom_channel_example) - Function based producer with a custom delivery channel

[producer_example](producer_example) - Function based producer

[protobuf_consumer_example](protobuf_consumer_example) - consumer with Schema Registry and Protocol Buffers Deserializer

[protobuf_producer_example](protobuf_producer_example) - producer with Schema Registry and Protocol Buffers Serializer

[stats_example](stats_example) - Receiving stats events
Expand Down
117 changes: 117 additions & 0 deletions examples/admin_list_offsets/admin_list_offsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// List Offsets example
package main

import (
"context"
"fmt"
"os"
"strconv"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

if len(os.Args) < 2 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> <topicname> <partition> <EARLIEST/LATEST/MAX_TIMESTAMP/TIMESTAMP t1> ..\n", os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]

argsCnt := len(os.Args)
i := 2
index := 0
topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec)
for i < argsCnt {
if i+3 > argsCnt {
fmt.Printf("Expected %d arguments for partition %d, got %d\n", 3, index, argsCnt-i)
os.Exit(1)
}

topicName := os.Args[i]
partition, err := strconv.Atoi(os.Args[i+1])
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid partition: %s\n", err)
os.Exit(1)
}

tp := kafka.TopicPartition{Topic: &topicName, Partition: int32(partition)}

if os.Args[i+2] == "EARLIEST" {
topicPartitionOffsets[tp] = kafka.EarliestOffsetSpec
} else if os.Args[i+2] == "LATEST" {
topicPartitionOffsets[tp] = kafka.LatestOffsetSpec
} else if os.Args[i+2] == "MAX_TIMESTAMP" {
topicPartitionOffsets[tp] = kafka.MaxTimestampOffsetSpec
} else if os.Args[i+2] == "TIMESTAMP" {
if i+4 > argsCnt {
fmt.Printf("Expected %d arguments for partition %d, got %d\n", 4, index, argsCnt-i)
os.Exit(1)
}

timestamp, timestampErr := strconv.Atoi(os.Args[i+3])
if timestampErr != nil {
fmt.Fprintf(os.Stderr, "Invalid timestamp: %s\n", timestampErr)
os.Exit(1)
}
topicPartitionOffsets[tp] = kafka.NewOffsetSpecForTimestamp(int64(timestamp))
i = i + 1
} else {
fmt.Fprintf(os.Stderr, "Invalid OffsetSpec.\n")
os.Exit(1)
}
i = i + 3
index++
}

// Create a new AdminClient.
// AdminClient can also be instantiated using an existing
// Producer or Consumer instance, see NewAdminClientFromProducer and
// NewAdminClientFromConsumer.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer a.Close()

// Contexts are used to abort or limit the amount of time
// the Admin call blocks waiting for a result.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

results, err := a.ListOffsets(ctx, topicPartitionOffsets,
kafka.SetAdminIsolationLevel(kafka.IsolationLevelReadCommitted))
if err != nil {
fmt.Printf("Failed to List offsets: %v\n", err)
os.Exit(1)
}
// map[TopicPartition]ListOffsetsResultInfo
// Print results
for tp, info := range results.ResultsInfos {
fmt.Printf("Topic: %s Partition: %d\n", *tp.Topic, tp.Partition)
if info.Error.Code() != kafka.ErrNoError {
fmt.Printf(" ErrorCode: %d ErrorMessage: %s\n\n", info.Error.Code(), info.Error.String())
} else {
fmt.Printf(" Offset: %d Timestamp: %d\n\n", info.Offset, info.Timestamp)
}
}
}
128 changes: 128 additions & 0 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ AlterUserScramCredentials_result_response_by_idx(const rd_kafka_AlterUserScramCr
return responses[idx];
}
static const rd_kafka_ListOffsetsResultInfo_t *
ListOffsetsResultInfo_by_idx(const rd_kafka_ListOffsetsResultInfo_t **result_infos, size_t cnt, size_t idx) {
if (idx >= cnt)
return NULL;
return result_infos[idx];
}
static const rd_kafka_error_t *
error_by_idx(const rd_kafka_error_t **errors, size_t cnt, size_t idx) {
if (idx >= cnt)
Expand Down Expand Up @@ -988,6 +995,36 @@ type AlterUserScramCredentialsResult struct {
Errors map[string]Error
}

// OffsetSpec specifies desired offsets while using ListOffsets.
type OffsetSpec int64

const (
// MaxTimestampOffsetSpec is used to describe the offset with the Max Timestamp which may be different then LatestOffsetSpec as Timestamp can be set client side.
MaxTimestampOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP)
// EarliestOffsetSpec is used to describe the earliest offset for the TopicPartition.
EarliestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_EARLIEST)
// LatestOffsetSpec is used to describe the latest offset for the TopicPartition.
LatestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_LATEST)
)

// NewOffsetSpecForTimestamp creates an OffsetSpec corresponding to the timestamp.
func NewOffsetSpecForTimestamp(timestamp int64) OffsetSpec {
return OffsetSpec(timestamp)
}

// ListOffsetsResultInfo describes the result of ListOffsets request for a Topic Partition.
type ListOffsetsResultInfo struct {
Offset Offset
Timestamp int64
LeaderEpoch *int32
Error Error
}

// ListOffsetsResult holds the map of TopicPartition to ListOffsetsResultInfo for a request.
type ListOffsetsResult struct {
ResultsInfos map[TopicPartition]ListOffsetsResultInfo
}

// waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens
// first.
// The returned result event is checked for errors its error is returned if set.
Expand Down Expand Up @@ -1380,6 +1417,30 @@ func cToDescribeUserScramCredentialsResult(
return result
}

// cToListOffsetsResult converts a C
// rd_kafka_ListOffsets_result_t to a Go ListOffsetsResult
func cToListOffsetsResult(cRes *C.rd_kafka_ListOffsets_result_t) (result ListOffsetsResult) {
result = ListOffsetsResult{ResultsInfos: make(map[TopicPartition]ListOffsetsResultInfo)}
var cPartitionCount C.size_t
cResultInfos := C.rd_kafka_ListOffsets_result_infos(cRes, &cPartitionCount)
for itr := 0; itr < int(cPartitionCount); itr++ {
cResultInfo := C.ListOffsetsResultInfo_by_idx(cResultInfos, cPartitionCount, C.size_t(itr))
resultInfo := ListOffsetsResultInfo{}
cPartition := C.rd_kafka_ListOffsetsResultInfo_topic_partition(cResultInfo)
Topic := C.GoString(cPartition.topic)
Partition := TopicPartition{Topic: &Topic, Partition: int32(cPartition.partition)}
resultInfo.Offset = Offset(cPartition.offset)
resultInfo.Timestamp = int64(C.rd_kafka_ListOffsetsResultInfo_timestamp(cResultInfo))
cLeaderEpoch := int32(C.rd_kafka_topic_partition_get_leader_epoch(cPartition))
if cLeaderEpoch >= 0 {
resultInfo.LeaderEpoch = &cLeaderEpoch
}
resultInfo.Error = newError(cPartition.err)
result.ResultsInfos[Partition] = resultInfo
}
return result
}

// ConsumerGroupDescription converts a C rd_kafka_ConsumerGroupListing_t array
// to a Go ConsumerGroupListing slice.
func (a *AdminClient) cToConsumerGroupListings(
Expand Down Expand Up @@ -3151,6 +3212,73 @@ func (a *AdminClient) DescribeUserScramCredentials(
return result, nil
}

// ListOffsets describe offsets for the
// specified TopicPartiton based on an OffsetSpec.
//
// Parameters:
// - `ctx` - context with the maximum amount of time to block, or nil for
// indefinite.
// - `topicPartitionOffsets` - a map from TopicPartition to OffsetSpec, it holds either the OffsetSpec enum value or timestamp.
// - `options` - ListOffsetsAdminOption options.
//
// Returns a ListOffsetsResult.
// Each TopicPartition's ListOffset can have an individual error.
func (a *AdminClient) ListOffsets(
ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec,
options ...ListOffsetsAdminOption) (result ListOffsetsResult, err error) {
if len(topicPartitionOffsets) < 1 || topicPartitionOffsets == nil {
return result, newErrorFromString(ErrInvalidArg, "expected topicPartitionOffsets of size greater or equal 1.")
}

topicPartitions := C.rd_kafka_topic_partition_list_new(C.int(len(topicPartitionOffsets)))
defer C.rd_kafka_topic_partition_list_destroy(topicPartitions)

for tp, offsetValue := range topicPartitionOffsets {
cStr := C.CString(*tp.Topic)
defer C.free(unsafe.Pointer(cStr))
topicPartition := C.rd_kafka_topic_partition_list_add(topicPartitions, cStr, C.int32_t(tp.Partition))
topicPartition.offset = C.int64_t(offsetValue)
}

// Convert Go AdminOptions (if any) to C AdminOptions.
genericOptions := make([]AdminOption, len(options))
for i := range options {
genericOptions[i] = options[i]
}
cOptions, err := adminOptionsSetup(
a.handle, C.RD_KAFKA_ADMIN_OP_LISTOFFSETS, genericOptions)
if err != nil {
return result, err
}
defer C.rd_kafka_AdminOptions_destroy(cOptions)

// Create temporary queue for async operation.
cQueue := C.rd_kafka_queue_new(a.handle.rk)
defer C.rd_kafka_queue_destroy(cQueue)

// Call rd_kafka_ListOffsets (asynchronous).
C.rd_kafka_ListOffsets(
a.handle.rk,
topicPartitions,
cOptions,
cQueue)

// Wait for result, error or context timeout.
rkev, err := a.waitResult(
ctx, cQueue, C.RD_KAFKA_EVENT_LISTOFFSETS_RESULT)
if err != nil {
return result, err
}
defer C.rd_kafka_event_destroy(rkev)

cRes := C.rd_kafka_event_ListOffsets_result(rkev)

// Convert result from C to Go.
result = cToListOffsetsResult(cRes)

return result, nil
}

// AlterUserScramCredentials alters SASL/SCRAM credentials.
// The pair (user, mechanism) must be unique among upsertions and deletions.
//
Expand Down
Loading

0 comments on commit 115e03b

Please sign in to comment.