Skip to content

Commit

Permalink
feat: add ack to kafka client
Browse files Browse the repository at this point in the history
  • Loading branch information
BonapartePC committed Feb 12, 2024
1 parent 111bd3b commit a9b2f9d
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 0 deletions.
133 changes: 133 additions & 0 deletions kafkaclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,139 @@ func TestAzureEventHubsCloud(t *testing.T) {
require.Contains(t, err.Error(), "SASL Authentication failed")
}

func TestConsumerACK(t *testing.T) {
// Prepare cluster - Zookeeper + 3 Kafka brokers
// We need more than one broker, or we'll be stuck with a "GROUP_COORDINATOR_NOT_AVAILABLE" error
pool, err := dockertest.NewPool("")
require.NoError(t, err)

kafkaContainer, err := dockerKafka.Setup(pool, t,
dockerKafka.WithBrokers(1))
require.NoError(t, err)

addresses := make([]string, 0, len(kafkaContainer.Ports))
for i := 0; i < len(kafkaContainer.Ports); i++ {
addresses = append(addresses, fmt.Sprintf("localhost:%s", kafkaContainer.Ports[i]))
}
c, err := New("tcp", addresses, Config{ClientID: "some-client", DialTimeout: 5 * time.Second})
require.NoError(t, err)

var (
noOfMessages = 10
ctx, cancel = context.WithCancel(context.Background())
tc = testutil.NewWithDialer(c.dialer, c.network, c.addresses...)
)

t.Cleanup(cancel)

// Check connectivity and try to create the desired topic until the brokers are up and running (max 30s)
require.NoError(t, c.Ping(ctx))
require.Eventually(t, func() bool {
err := tc.CreateTopic(ctx, t.Name(), 1, 1) // partitions = 1, replication factor = 1
if err != nil {
t.Logf("Could not create topic: %v", err)
}
return err == nil
}, defaultTestTimeout, time.Second)

// Check that the topic has been created with the right number of partitions
var topics []testutil.TopicPartition
require.Eventually(t, func() bool {
topics, err = tc.ListTopics(ctx)
success := err == nil && len(topics) == 1
if !success {
t.Logf("List topics failure %+v: %v", topics, err)
}
return success
}, defaultTestTimeout, time.Second)
require.Equal(t, []testutil.TopicPartition{
{Topic: t.Name(), Partition: 0},
}, topics)

// Produce X messages in a single batch
producerConf := ProducerConfig{
ClientID: "producer-01",
WriteTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second,
Logger: newKafkaLogger(t, false),
ErrorLogger: newKafkaLogger(t, true),
}
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
publishMessages(ctx, t, p, noOfMessages)

// Starting consumers with group-01 and FirstOffset
var (
// The ticker is used so that the test won't end as long as we keep getting messages since the consumers
// will reset the ticker each time they receive a message
tickerMu sync.Mutex
tickerReset = 10 * time.Second
ticker = time.NewTicker(30 * time.Second)
)
consumerConf := ConsumerConfig{
GroupID: "group-01",
StartOffset: FirstOffset,
CommitInterval: time.Second, // to make the test faster instead of committing each single message
FetchBatchesMaxWait: 10 * time.Second,
Logger: newKafkaLogger(t, false),
ErrorLogger: newKafkaLogger(t, true),
}
consume := func(c *Consumer, id string, count int32) []Message {
messages := make([]Message, 0, count)
for count > 0 {
msg, err := c.Receive(ctx)
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
t.Logf("Closing %s: %v", id, err)
return messages
}
require.NoError(t, err)
t.Logf("Got a message on %s", id)
tickerMu.Lock()
ticker.Reset(tickerReset)
tickerMu.Unlock()
count--
require.NoError(t, c.Ack(ctx, msg))
messages = append(messages, msg)
}
return messages
}
closeConsumer := func(c *Consumer, id string) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := c.Close(ctx); err != nil {
t.Logf("Error closing %s: %v", id, err)
}
}

consumer := c.NewConsumer(t.Name(), consumerConf)
closeConsumer(consumer, "consumer") // closing consumer

publishMessages(ctx, t, p, noOfMessages) // publishing messages

ackCount := int32(noOfMessages) / 2
require.Greater(t, ackCount, int32(0))
count := 0
consumer = c.NewConsumer(t.Name(), consumerConf) // re-creating consumer
messages := consume(consumer, "consumer", ackCount) // consuming only half messages
for _, msg := range messages {
require.Equal(t, fmt.Sprintf("key-%d", count), string(msg.Key))
require.Equal(t, fmt.Sprintf("value-%d", count), string(msg.Value))
count++
}

closeConsumer(consumer, "consumer") // closing consumer

remainingCount := int32(noOfMessages) - ackCount
require.Greater(t, remainingCount, int32(0))
consumer = c.NewConsumer(t.Name(), consumerConf) // re-creating consumer
messages = consume(consumer, "consumer", remainingCount) // consuming the rest of the messages
for _, msg := range messages {
require.Equal(t, fmt.Sprintf("key-%d", count), string(msg.Key))
require.Equal(t, fmt.Sprintf("value-%d", count), string(msg.Value))
count++
}
}

func TestSSH(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)
Expand Down
12 changes: 12 additions & 0 deletions kafkaclient/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,15 @@ func (c *Consumer) Receive(ctx context.Context) (Message, error) {
Timestamp: msg.Time,
}, nil
}

func (c *Consumer) Ack(ctx context.Context, msgs ...Message) error {
internalMsgs := make([]kafka.Message, 0, len(msgs))
for _, msg := range msgs {
internalMsgs = append(internalMsgs, kafka.Message{
Topic: msg.Topic,
Partition: int(msg.Partition),
Offset: msg.Offset,
})
}
return c.reader.CommitMessages(ctx, internalMsgs...)
}

0 comments on commit a9b2f9d

Please sign in to comment.