Skip to content

Commit

Permalink
feat: add ack to kafka client (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
BonapartePC authored Feb 12, 2024
1 parent 111bd3b commit b4e3c34
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
119 changes: 119 additions & 0 deletions kafkaclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,125 @@ func TestAzureEventHubsCloud(t *testing.T) {
require.Contains(t, err.Error(), "SASL Authentication failed")
}

func TestConsumerACK(t *testing.T) {
// Prepare cluster - Zookeeper + 1 Kafka brokers
pool, err := dockertest.NewPool("")
require.NoError(t, err)

kafkaContainer, err := dockerKafka.Setup(pool, t,
dockerKafka.WithBrokers(1))
require.NoError(t, err)

kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Ports[0])
kafkaClient, err := New("tcp", []string{"bad-host", kafkaHost}, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

var (
noOfMessages = 10
ctx, cancel = context.WithCancel(context.Background())
tc = testutil.NewWithDialer(kafkaClient.dialer, kafkaClient.network, kafkaClient.addresses...)
)

t.Cleanup(cancel)

// Check connectivity and try to create the desired topic until the brokers are up and running (max 30s)
require.NoError(t, kafkaClient.Ping(ctx))
require.Eventually(t, func() bool {
err := tc.CreateTopic(ctx, t.Name(), 1, 1) // partitions = 1, replication factor = 1
if err != nil {
t.Logf("Could not create topic: %v", err)
}
return err == nil
}, defaultTestTimeout, time.Second)

// Check that the topic has been created with the right number of partitions
var topics []testutil.TopicPartition
require.Eventually(t, func() bool {
topics, err = tc.ListTopics(ctx)
success := err == nil && len(topics) == 1
if !success {
t.Logf("List topics failure %+v: %v", topics, err)
}
return success
}, defaultTestTimeout, time.Second)
require.Equal(t, []testutil.TopicPartition{
{Topic: t.Name(), Partition: 0},
}, topics)

// Produce X messages in a single batch
producerConf := ProducerConfig{
ClientID: "producer-01",
WriteTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second,
Logger: newKafkaLogger(t, false),
ErrorLogger: newKafkaLogger(t, true),
}
producer, err := kafkaClient.NewProducer(producerConf)
require.NoError(t, err)
publishMessages(ctx, t, producer, noOfMessages)

consumerConf := ConsumerConfig{
GroupID: "group-01",
StartOffset: FirstOffset,
CommitInterval: time.Second, // to make the test faster instead of committing each single message
FetchBatchesMaxWait: 10 * time.Second,
Logger: newKafkaLogger(t, false),
ErrorLogger: newKafkaLogger(t, true),
}
consume := func(c *Consumer, id string, noOfMsgsToConsume int) []Message {
messages := make([]Message, 0, noOfMsgsToConsume)
for noOfMsgsToConsume > 0 {
msg, err := c.Receive(ctx)
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
t.Logf("Closing %s: %v", id, err)
return messages
}
require.NoError(t, err)
t.Logf("Got a message on %s", id)
noOfMsgsToConsume--
require.NoError(t, c.Ack(ctx, msg))
messages = append(messages, msg)
}
return messages
}
closeConsumer := func(c *Consumer, id string) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := c.Close(ctx); err != nil {
t.Logf("Error closing %s: %v", id, err)
}
}
consumer := kafkaClient.NewConsumer(t.Name(), consumerConf)
closeConsumer(consumer, "consumer") // closing consumer
// we're doing this in order to have a subscription on the topic for retention

ackCount := noOfMessages / 2
require.Greater(t, ackCount, 0)
count := 0
consumer = kafkaClient.NewConsumer(t.Name(), consumerConf) // re-creating consumer
messages := consume(consumer, "consumer", ackCount) // consuming only half messages
require.Equal(t, ackCount, len(messages))
for _, msg := range messages {
require.Equal(t, fmt.Sprintf("key-%d", count), string(msg.Key))
require.Equal(t, fmt.Sprintf("value-%d", count), string(msg.Value))
count++
}
closeConsumer(consumer, "consumer") // closing consumer

remainingCount := noOfMessages - ackCount
require.Greater(t, remainingCount, 0)
consumer = kafkaClient.NewConsumer(t.Name(), consumerConf) // re-creating consumer
messages = consume(consumer, "consumer", remainingCount) // consuming the rest of the messages
require.Equal(t, remainingCount, len(messages))
for _, msg := range messages {
require.Equal(t, fmt.Sprintf("key-%d", count), string(msg.Key))
require.Equal(t, fmt.Sprintf("value-%d", count), string(msg.Value))
count++
}
closeConsumer(consumer, "consumer") // closing consumer
require.Equal(t, noOfMessages, count)
}

func TestSSH(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)
Expand Down
12 changes: 12 additions & 0 deletions kafkaclient/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,15 @@ func (c *Consumer) Receive(ctx context.Context) (Message, error) {
Timestamp: msg.Time,
}, nil
}

func (c *Consumer) Ack(ctx context.Context, msgs ...Message) error {
internalMsgs := make([]kafka.Message, 0, len(msgs))
for _, msg := range msgs {
internalMsgs = append(internalMsgs, kafka.Message{
Topic: msg.Topic,
Partition: int(msg.Partition),
Offset: msg.Offset,
})
}
return c.reader.CommitMessages(ctx, internalMsgs...)
}

0 comments on commit b4e3c34

Please sign in to comment.