Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ack to kafka client #327

Merged
merged 3 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions kafkaclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -697,6 +698,128 @@ func TestAzureEventHubsCloud(t *testing.T) {
require.Contains(t, err.Error(), "SASL Authentication failed")
}

func TestConsumerACK(t *testing.T) {
// Prepare cluster - Zookeeper + 2 Kafka brokers
// We need more than one broker, or we'll be stuck with a "GROUP_COORDINATOR_NOT_AVAILABLE" error
pool, err := dockertest.NewPool("")
require.NoError(t, err)

kafkaContainer, err := dockerKafka.Setup(pool, t,
dockerKafka.WithBrokers(2))
BonapartePC marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)

addresses := make([]string, 0, len(kafkaContainer.Ports))
for i := 0; i < len(kafkaContainer.Ports); i++ {
addresses = append(addresses, net.JoinHostPort("localhost", kafkaContainer.Ports[i]))
}
kafkaClient, err := New("tcp", addresses, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

var (
noOfMessages = 10
ctx, cancel = context.WithCancel(context.Background())
tc = testutil.NewWithDialer(kafkaClient.dialer, kafkaClient.network, kafkaClient.addresses...)
)

t.Cleanup(cancel)

// Check connectivity and try to create the desired topic until the brokers are up and running (max 30s)
require.NoError(t, kafkaClient.Ping(ctx))
require.Eventually(t, func() bool {
err := tc.CreateTopic(ctx, t.Name(), 1, 1) // partitions = 1, replication factor = 1
if err != nil {
t.Logf("Could not create topic: %v", err)
}
return err == nil
}, defaultTestTimeout, time.Second)

// Check that the topic has been created with the right number of partitions
var topics []testutil.TopicPartition
require.Eventually(t, func() bool {
topics, err = tc.ListTopics(ctx)
success := err == nil && len(topics) == 1
if !success {
t.Logf("List topics failure %+v: %v", topics, err)
}
return success
}, defaultTestTimeout, time.Second)
require.Equal(t, []testutil.TopicPartition{
{Topic: t.Name(), Partition: 0},
}, topics)

// Produce X messages in a single batch
producerConf := ProducerConfig{
ClientID: "producer-01",
WriteTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second,
Logger: newKafkaLogger(t, false),
ErrorLogger: newKafkaLogger(t, true),
}
producer, err := kafkaClient.NewProducer(producerConf)
require.NoError(t, err)
publishMessages(ctx, t, producer, noOfMessages)

consumerConf := ConsumerConfig{
GroupID: "group-01",
StartOffset: FirstOffset,
CommitInterval: time.Second, // to make the test faster instead of committing each single message
FetchBatchesMaxWait: 10 * time.Second,
Logger: newKafkaLogger(t, false),
ErrorLogger: newKafkaLogger(t, true),
}
consume := func(c *Consumer, id string, noOfMsgsToConsume int) []Message {
messages := make([]Message, 0, noOfMsgsToConsume)
for noOfMsgsToConsume > 0 {
msg, err := c.Receive(ctx)
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
t.Logf("Closing %s: %v", id, err)
return messages
}
require.NoError(t, err)
t.Logf("Got a message on %s", id)
noOfMsgsToConsume--
require.NoError(t, c.Ack(ctx, msg))
messages = append(messages, msg)
}
return messages
}
closeConsumer := func(c *Consumer, id string) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := c.Close(ctx); err != nil {
t.Logf("Error closing %s: %v", id, err)
}
}
consumer := kafkaClient.NewConsumer(t.Name(), consumerConf)
closeConsumer(consumer, "consumer") // closing consumer
BonapartePC marked this conversation as resolved.
Show resolved Hide resolved

ackCount := noOfMessages / 2
require.Greater(t, ackCount, 0)
count := 0
consumer = kafkaClient.NewConsumer(t.Name(), consumerConf) // re-creating consumer
messages := consume(consumer, "consumer", ackCount) // consuming only half messages
require.Equal(t, ackCount, len(messages))
for _, msg := range messages {
require.Equal(t, fmt.Sprintf("key-%d", count), string(msg.Key))
require.Equal(t, fmt.Sprintf("value-%d", count), string(msg.Value))
count++
}
closeConsumer(consumer, "consumer") // closing consumer

remainingCount := noOfMessages - ackCount
require.Greater(t, remainingCount, 0)
consumer = kafkaClient.NewConsumer(t.Name(), consumerConf) // re-creating consumer
messages = consume(consumer, "consumer", remainingCount) // consuming the rest of the messages
require.Equal(t, remainingCount, len(messages))
for _, msg := range messages {
require.Equal(t, fmt.Sprintf("key-%d", count), string(msg.Key))
require.Equal(t, fmt.Sprintf("value-%d", count), string(msg.Value))
count++
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
}
closeConsumer(consumer, "consumer") // closing consumer
require.Equal(t, noOfMessages, count)
}

func TestSSH(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)
Expand Down
12 changes: 12 additions & 0 deletions kafkaclient/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,15 @@ func (c *Consumer) Receive(ctx context.Context) (Message, error) {
Timestamp: msg.Time,
}, nil
}

func (c *Consumer) Ack(ctx context.Context, msgs ...Message) error {
internalMsgs := make([]kafka.Message, 0, len(msgs))
for _, msg := range msgs {
internalMsgs = append(internalMsgs, kafka.Message{
Topic: msg.Topic,
Partition: int(msg.Partition),
Offset: msg.Offset,
})
}
return c.reader.CommitMessages(ctx, internalMsgs...)
}
Loading