From fbebaf80630febe16337f9d31a8ccf9250910587 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 21 Nov 2024 03:01:14 -0500 Subject: [PATCH] Add TestPartitionReader_ShouldNotMissRecordsIfKafkaReturnsAFetchBothWithAnErrorAndSomeRecords (#9964) * Add TestPartitionReader_ShouldNotMissRecordsIfKafkaReturnsAFetchBothWithAnErrorAndSomeRecords Signed-off-by: Marco Pracucci * Updated CHANGELOG Signed-off-by: Marco Pracucci --------- Signed-off-by: Marco Pracucci --- CHANGELOG.md | 2 +- pkg/storage/ingest/reader_test.go | 296 ++++++++++++++++++++++-------- 2 files changed, 224 insertions(+), 74 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index efcc4dcd9ff..6de54604183 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,7 +80,7 @@ * [BUGFIX] Ingester: Fix race condition in exemplar adding. #9765 * [BUGFIX] Ingester: Fix race condition in native histogram appending. #9765 * [BUGFIX] Ingester: Fix bug in concurrent fetching where a failure to list topics on startup would cause to use an invalid topic ID (0x00000000000000000000000000000000). #9883 -* [BUGFIX] Ingester: Fix data loss bug in the experimental ingest storage when a Kafka Fetch is split into multiple requests and some of them return an error. #9963 +* [BUGFIX] Ingester: Fix data loss bug in the experimental ingest storage when a Kafka Fetch is split into multiple requests and some of them return an error. #9963 #9964 * [BUGFIX] PromQL: `round` now removes the metric name again. #9879 ### Mixin diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 897d80f71d6..a000b58f3db 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -2056,82 +2056,13 @@ func TestPartitionReader_ShouldNotMissRecordsIfFetchRequestContainPartialFailure t.Logf("Produced %d records", totalProducedRecords) - // - // Get topic ID. - // - - topics, err := kadm.NewClient(client).ListTopics(ctx, topicName) - require.NoError(t, err) - require.NoError(t, topics.Error()) - require.True(t, topics.Has(topicName)) - topicID := topics[topicName].ID - - t.Logf("Fetched topic ID") - - // // Fetch the raw record batches for each offset, so that it's easier to later mock the Kafka // server and control the returned batches. - // - - fetchResponseByRequestedOffset := map[int64]*kmsg.FetchResponse{} - - for offset := int64(0); offset < totalProducedRecords+1; offset++ { - // Build a Fetch request. - req := kmsg.NewFetchRequest() - req.MinBytes = 1 - req.Version = 13 - req.MaxWaitMillis = 1000 - req.MaxBytes = 1 // Request the minimum amount of bytes. - - reqTopic := kmsg.NewFetchRequestTopic() - reqTopic.Topic = topicName - reqTopic.TopicID = topicID - - reqPartition := kmsg.NewFetchRequestTopicPartition() - reqPartition.Partition = partitionID - reqPartition.FetchOffset = offset - reqPartition.PartitionMaxBytes = 1 // Request the minimum amount of bytes. - reqPartition.CurrentLeaderEpoch = 0 // Not needed here. - - reqTopic.Partitions = append(reqTopic.Partitions, reqPartition) - req.Topics = append(req.Topics, reqTopic) - - // Issue the Fetch request. - kres, err := client.Request(context.Background(), &req) - require.NoError(t, err) - - res := kres.(*kmsg.FetchResponse) - require.Equal(t, int16(0), res.ErrorCode) - require.Equal(t, 1, len(res.Topics)) - require.Equal(t, 1, len(res.Topics[0].Partitions)) - - // Parse the response, just to check how many records we got. - parseOptions := kgo.ProcessFetchPartitionOptions{ - KeepControlRecords: false, - Offset: offset, - IsolationLevel: kgo.ReadUncommitted(), - Topic: topicName, - Partition: partitionID, - } - - rawPartitionResp := res.Topics[0].Partitions[0] - partition, _ := kgo.ProcessRespPartition(parseOptions, &rawPartitionResp, func(_ kgo.FetchBatchMetrics) {}) - - // Ensure we got a low number of records, otherwise the premise of this test is wrong - // because we want a single fetchWatch to be fulfilled in many Fetch requests. - require.LessOrEqual(t, len(partition.Records), 5) - - // Keep track of the raw response. - fetchResponseByRequestedOffset[offset] = res - } - + fetchResponseByRequestedOffset := fetchSmallestRecordsBatchForEachOffset(t, client, topicName, partitionID, 0, totalProducedRecords) t.Logf("Collected raw Fetch responses for all expected offsets") - // // Mock the Kafka server to intercept Fetch requests, return less records than requested and // inject random failures. - // - cluster.ControlKey(kmsg.Fetch.Int16(), func(kreq kmsg.Request) (kmsg.Response, error, bool) { cluster.KeepControl() @@ -2174,10 +2105,7 @@ func TestPartitionReader_ShouldNotMissRecordsIfFetchRequestContainPartialFailure return nil, nil, false }) - // // Consume all the records using the PartitionReader. - // - var ( totalConsumedRecords = atomic.NewInt64(0) consumedRecordIDs = sync.Map{} @@ -2224,6 +2152,162 @@ func TestPartitionReader_ShouldNotMissRecordsIfFetchRequestContainPartialFailure } } +// This test reproduces a scenario that we don't think should happen but, if it happens, we want to make sure +// that we don't lose records. The scenario is when Kafka returns a Fetch response for a *single* topic-partition +// containing *both* the error code set and some records. +func TestPartitionReader_ShouldNotMissRecordsIfKafkaReturnsAFetchBothWithAnErrorAndSomeRecords(t *testing.T) { + t.Parallel() + + const ( + topicName = "test" + partitionID = 1 + totalProducedRecords = 10000 + recordSizeBytes = initialBytesPerRecord + maxBufferedBytes = (totalProducedRecords * initialBytesPerRecord) / 100 + ) + + // We want to run all these tests with different concurrency config. + concurrencyVariants := map[string][]readerTestCfgOpt{ + "without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)}, + "with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)}, + "with startup and ongoing concurrency (same settings)": {withStartupConcurrency(2), withOngoingConcurrency(2)}, + "with startup and ongoing concurrency (different settings)": {withStartupConcurrency(2), withOngoingConcurrency(4)}, + } + + for concurrencyName, concurrencyVariant := range concurrencyVariants { + concurrencyVariant := concurrencyVariant + + t.Run(concurrencyName, func(t *testing.T) { + t.Parallel() + + var ( + cluster, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName) + reg = prometheus.NewPedanticRegistry() + ) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + t.Cleanup(cancel) + + // Produce records. + writeClient := newKafkaProduceClient(t, clusterAddr) + for i := 0; i < totalProducedRecords; i++ { + produceRandomRecord(ctx, t, writeClient, topicName, partitionID, recordSizeBytes, fmt.Sprintf("record-%05d", i)) + } + + // Fetch the raw record batches for each offset, so that it's easier to later mock the Kafka + // server and control the returned batches. + fetchResponseByRequestedOffset := fetchSmallestRecordsBatchForEachOffset(t, writeClient, topicName, partitionID, 0, totalProducedRecords) + t.Logf("Collected raw Fetch responses for all expected offsets") + + // Mock the Kafka server to intercept Fetch requests, return less records than requested and + // randomly include error codes in some fetches. + cluster.ControlKey(kmsg.Fetch.Int16(), func(kreq kmsg.Request) (kmsg.Response, error, bool) { + cluster.KeepControl() + + req := kreq.(*kmsg.FetchRequest) + + // We expect only 1 partition in the request. + if len(req.Topics) != 1 { + return nil, fmt.Errorf("expected 1 topic in the request, got %d", len(req.Topics)), true + } + if len(req.Topics[0].Partitions) != 1 { + return nil, fmt.Errorf("expected 1 partition in the request, got %d", len(req.Topics[0].Partitions)), true + } + + // Lookup the response among the ones we previously fetched with a small "MaxBytes". + res := fetchResponseByRequestedOffset[req.Topics[0].Partitions[0].FetchOffset] + if res == nil { + if req.Topics[0].Partitions[0].FetchOffset < totalProducedRecords { + return nil, errors.New("the offset requested has not been found among the ones we previously fetched"), true + } + + // It was requested that we haven't been previously produced (could be a future offset), we just let kfake handle it. + return nil, nil, false + } + + // We expect only 1 partition in the response. + if len(res.Topics) != 1 { + return nil, fmt.Errorf("expected 1 topic in the response, got %d", len(res.Topics)), true + } + if len(res.Topics[0].Partitions) != 1 { + return nil, fmt.Errorf("expected 1 partition in the response, got %d", len(res.Topics[0].Partitions)), true + } + + // Simulate a 10% error rate in the Kafka responses, mixed with records. + if rand.Int()%10 == 0 { + // Make a copy so we don't overwrite the cached version, which will be later requested again. + resCopy := &kmsg.FetchResponse{Version: req.Version} + if err := resCopy.ReadFrom(res.AppendTo(nil)); err != nil { + return nil, fmt.Errorf("failed to make a copy of FetchResponse: %v", err), true + } + + resCopy.Topics[0].Partitions[0].ErrorCode = kerr.UnknownServerError.Code + res = resCopy + } + + return res, nil, true + }) + + // Consume all records. + var ( + totalConsumedRecords = atomic.NewInt64(0) + consumedRecordIDs = sync.Map{} + ) + + consumer := consumerFunc(func(_ context.Context, records []record) error { + for _, rec := range records { + totalConsumedRecords.Inc() + + // Parse the record ID from the actual record data. + recordID, err := strconv.ParseInt(string(rec.content[7:12]), 10, 64) + require.NoError(t, err) + consumedRecordIDs.Store(recordID, struct{}{}) + } + + return nil + }) + + readerOpts := append([]readerTestCfgOpt{ + withConsumeFromPositionAtStartup(consumeFromStart), + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withMaxBufferedBytes(maxBufferedBytes), + withRegistry(reg), + withLogger(log.NewNopLogger()), + }, concurrencyVariant...) + + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, readerOpts...) + require.NoError(t, reader.StartAsync(ctx)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) + }) + + // Wait until all produced have been consumed. + test.Poll(t, 60*time.Second, int64(totalProducedRecords), func() interface{} { + return totalConsumedRecords.Load() + }) + + // Ensure that the actual records content match the expected one. + for i := int64(0); i < totalProducedRecords; i++ { + _, found := consumedRecordIDs.Load(i) + require.Truef(t, found, "Expected to find a consumed record with ID %d", i) + } + + // We expect the last consumed offset to be tracked in a metric. + test.Poll(t, time.Second, nil, func() interface{} { + return promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` + # HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet. + # TYPE cortex_ingest_storage_reader_last_consumed_offset gauge + cortex_ingest_storage_reader_last_consumed_offset{partition="1"} %d + + # HELP cortex_ingest_storage_reader_buffered_fetch_records_total Total number of records buffered within the client ready to be consumed + # TYPE cortex_ingest_storage_reader_buffered_fetch_records_total gauge + cortex_ingest_storage_reader_buffered_fetch_records_total{component="partition-reader"} 0 + `, totalProducedRecords-1)), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total") + }) + }) + } +} + func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) { const ( topicName = "test" @@ -2846,3 +2930,69 @@ func createTestContextWithTimeout(t *testing.T, timeout time.Duration) context.C t.Cleanup(cancel) return ctx } + +func fetchSmallestRecordsBatchForEachOffset(t *testing.T, client *kgo.Client, topicName string, partitionID int32, startOffset, endOffset int64) map[int64]*kmsg.FetchResponse { + // Get topic ID. + topics, err := kadm.NewClient(client).ListTopics(context.Background(), topicName) + require.NoError(t, err) + require.NoError(t, topics.Error()) + require.True(t, topics.Has(topicName)) + topicID := topics[topicName].ID + + t.Logf("Fetched topic ID") + + // Fetch the raw record batches for each offset + fetchResponseByRequestedOffset := map[int64]*kmsg.FetchResponse{} + + for offset := startOffset; offset <= endOffset; offset++ { + // Build a Fetch request. + req := kmsg.NewFetchRequest() + req.MinBytes = 1 + req.Version = 13 + req.MaxWaitMillis = 1000 + req.MaxBytes = 1 // Request the minimum amount of bytes. + + reqTopic := kmsg.NewFetchRequestTopic() + reqTopic.Topic = topicName + reqTopic.TopicID = topicID + + reqPartition := kmsg.NewFetchRequestTopicPartition() + reqPartition.Partition = partitionID + reqPartition.FetchOffset = offset + reqPartition.PartitionMaxBytes = 1 // Request the minimum amount of bytes. + reqPartition.CurrentLeaderEpoch = 0 // Not needed here. + + reqTopic.Partitions = append(reqTopic.Partitions, reqPartition) + req.Topics = append(req.Topics, reqTopic) + + // Issue the Fetch request. + kres, err := client.Request(context.Background(), &req) + require.NoError(t, err) + + res := kres.(*kmsg.FetchResponse) + require.Equal(t, int16(0), res.ErrorCode) + require.Equal(t, 1, len(res.Topics)) + require.Equal(t, 1, len(res.Topics[0].Partitions)) + + // Parse the response, just to check how many records we got. + parseOptions := kgo.ProcessFetchPartitionOptions{ + KeepControlRecords: false, + Offset: offset, + IsolationLevel: kgo.ReadUncommitted(), + Topic: topicName, + Partition: partitionID, + } + + rawPartitionResp := res.Topics[0].Partitions[0] + partition, _ := kgo.ProcessRespPartition(parseOptions, &rawPartitionResp, func(_ kgo.FetchBatchMetrics) {}) + + // Ensure we got a low number of records, otherwise the premise of this test is wrong + // because we want a single fetchWatch to be fulfilled in many Fetch requests. + require.LessOrEqual(t, len(partition.Records), 5) + + // Keep track of the raw response. + fetchResponseByRequestedOffset[offset] = res + } + + return fetchResponseByRequestedOffset +}