diff --git a/kafkaclient/client_test.go b/kafkaclient/client_test.go index eccd2b6c..0ba22d93 100644 --- a/kafkaclient/client_test.go +++ b/kafkaclient/client_test.go @@ -697,6 +697,139 @@ func TestAzureEventHubsCloud(t *testing.T) { require.Contains(t, err.Error(), "SASL Authentication failed") } +func TestConsumerACK(t *testing.T) { + // Prepare cluster - Zookeeper + 3 Kafka brokers + // We need more than one broker, or we'll be stuck with a "GROUP_COORDINATOR_NOT_AVAILABLE" error + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + kafkaContainer, err := dockerKafka.Setup(pool, t, + dockerKafka.WithBrokers(1)) + require.NoError(t, err) + + addresses := make([]string, 0, len(kafkaContainer.Ports)) + for i := 0; i < len(kafkaContainer.Ports); i++ { + addresses = append(addresses, fmt.Sprintf("localhost:%s", kafkaContainer.Ports[i])) + } + c, err := New("tcp", addresses, Config{ClientID: "some-client", DialTimeout: 5 * time.Second}) + require.NoError(t, err) + + var ( + noOfMessages = 10 + ctx, cancel = context.WithCancel(context.Background()) + tc = testutil.NewWithDialer(c.dialer, c.network, c.addresses...) + ) + + t.Cleanup(cancel) + + // Check connectivity and try to create the desired topic until the brokers are up and running (max 30s) + require.NoError(t, c.Ping(ctx)) + require.Eventually(t, func() bool { + err := tc.CreateTopic(ctx, t.Name(), 1, 1) // partitions = 1, replication factor = 1 + if err != nil { + t.Logf("Could not create topic: %v", err) + } + return err == nil + }, defaultTestTimeout, time.Second) + + // Check that the topic has been created with the right number of partitions + var topics []testutil.TopicPartition + require.Eventually(t, func() bool { + topics, err = tc.ListTopics(ctx) + success := err == nil && len(topics) == 1 + if !success { + t.Logf("List topics failure %+v: %v", topics, err) + } + return success + }, defaultTestTimeout, time.Second) + require.Equal(t, []testutil.TopicPartition{ + {Topic: t.Name(), Partition: 0}, + }, topics) + + // Produce X messages in a single batch + producerConf := ProducerConfig{ + ClientID: "producer-01", + WriteTimeout: 5 * time.Second, + ReadTimeout: 5 * time.Second, + Logger: newKafkaLogger(t, false), + ErrorLogger: newKafkaLogger(t, true), + } + p, err := c.NewProducer(producerConf) + require.NoError(t, err) + publishMessages(ctx, t, p, noOfMessages) + + // Starting consumers with group-01 and FirstOffset + var ( + // The ticker is used so that the test won't end as long as we keep getting messages since the consumers + // will reset the ticker each time they receive a message + tickerMu sync.Mutex + tickerReset = 10 * time.Second + ticker = time.NewTicker(30 * time.Second) + ) + consumerConf := ConsumerConfig{ + GroupID: "group-01", + StartOffset: FirstOffset, + CommitInterval: time.Second, // to make the test faster instead of committing each single message + FetchBatchesMaxWait: 10 * time.Second, + Logger: newKafkaLogger(t, false), + ErrorLogger: newKafkaLogger(t, true), + } + consume := func(c *Consumer, id string, count int32) []Message { + messages := make([]Message, 0, count) + for count > 0 { + msg, err := c.Receive(ctx) + if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + t.Logf("Closing %s: %v", id, err) + return messages + } + require.NoError(t, err) + t.Logf("Got a message on %s", id) + tickerMu.Lock() + ticker.Reset(tickerReset) + tickerMu.Unlock() + count-- + require.NoError(t, c.Ack(ctx, msg)) + messages = append(messages, msg) + } + return messages + } + closeConsumer := func(c *Consumer, id string) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := c.Close(ctx); err != nil { + t.Logf("Error closing %s: %v", id, err) + } + } + + consumer := c.NewConsumer(t.Name(), consumerConf) + closeConsumer(consumer, "consumer") // closing consumer + + publishMessages(ctx, t, p, noOfMessages) // publishing messages + + ackCount := int32(noOfMessages) / 2 + require.Greater(t, ackCount, int32(0)) + count := 0 + consumer = c.NewConsumer(t.Name(), consumerConf) // re-creating consumer + messages := consume(consumer, "consumer", ackCount) // consuming only half messages + for _, msg := range messages { + require.Equal(t, fmt.Sprintf("key-%d", count), string(msg.Key)) + require.Equal(t, fmt.Sprintf("value-%d", count), string(msg.Value)) + count++ + } + + closeConsumer(consumer, "consumer") // closing consumer + + remainingCount := int32(noOfMessages) - ackCount + require.Greater(t, remainingCount, int32(0)) + consumer = c.NewConsumer(t.Name(), consumerConf) // re-creating consumer + messages = consume(consumer, "consumer", remainingCount) // consuming the rest of the messages + for _, msg := range messages { + require.Equal(t, fmt.Sprintf("key-%d", count), string(msg.Key)) + require.Equal(t, fmt.Sprintf("value-%d", count), string(msg.Value)) + count++ + } +} + func TestSSH(t *testing.T) { pool, err := dockertest.NewPool("") require.NoError(t, err) diff --git a/kafkaclient/consumer.go b/kafkaclient/consumer.go index 44652168..80c46766 100644 --- a/kafkaclient/consumer.go +++ b/kafkaclient/consumer.go @@ -103,3 +103,15 @@ func (c *Consumer) Receive(ctx context.Context) (Message, error) { Timestamp: msg.Time, }, nil } + +func (c *Consumer) Ack(ctx context.Context, msgs ...Message) error { + internalMsgs := make([]kafka.Message, 0, len(msgs)) + for _, msg := range msgs { + internalMsgs = append(internalMsgs, kafka.Message{ + Topic: msg.Topic, + Partition: int(msg.Partition), + Offset: msg.Offset, + }) + } + return c.reader.CommitMessages(ctx, internalMsgs...) +}