Skip to content

Commit

Permalink
Add TestPartitionReader_ShouldNotMissRecordsIfKafkaReturnsAFetchBothW…
Browse files Browse the repository at this point in the history
…ithAnErrorAndSomeRecords (#9964)

* Add TestPartitionReader_ShouldNotMissRecordsIfKafkaReturnsAFetchBothWithAnErrorAndSomeRecords

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Updated CHANGELOG

Signed-off-by: Marco Pracucci <marco@pracucci.com>

---------

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Nov 21, 2024
1 parent 693efbc commit fbebaf8
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 74 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
* [BUGFIX] Ingester: Fix race condition in exemplar adding. #9765
* [BUGFIX] Ingester: Fix race condition in native histogram appending. #9765
* [BUGFIX] Ingester: Fix bug in concurrent fetching where a failure to list topics on startup would cause to use an invalid topic ID (0x00000000000000000000000000000000). #9883
* [BUGFIX] Ingester: Fix data loss bug in the experimental ingest storage when a Kafka Fetch is split into multiple requests and some of them return an error. #9963
* [BUGFIX] Ingester: Fix data loss bug in the experimental ingest storage when a Kafka Fetch is split into multiple requests and some of them return an error. #9963 #9964
* [BUGFIX] PromQL: `round` now removes the metric name again. #9879

### Mixin
Expand Down
296 changes: 223 additions & 73 deletions pkg/storage/ingest/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2056,82 +2056,13 @@ func TestPartitionReader_ShouldNotMissRecordsIfFetchRequestContainPartialFailure

t.Logf("Produced %d records", totalProducedRecords)

//
// Get topic ID.
//

topics, err := kadm.NewClient(client).ListTopics(ctx, topicName)
require.NoError(t, err)
require.NoError(t, topics.Error())
require.True(t, topics.Has(topicName))
topicID := topics[topicName].ID

t.Logf("Fetched topic ID")

//
// Fetch the raw record batches for each offset, so that it's easier to later mock the Kafka
// server and control the returned batches.
//

fetchResponseByRequestedOffset := map[int64]*kmsg.FetchResponse{}

for offset := int64(0); offset < totalProducedRecords+1; offset++ {
// Build a Fetch request.
req := kmsg.NewFetchRequest()
req.MinBytes = 1
req.Version = 13
req.MaxWaitMillis = 1000
req.MaxBytes = 1 // Request the minimum amount of bytes.

reqTopic := kmsg.NewFetchRequestTopic()
reqTopic.Topic = topicName
reqTopic.TopicID = topicID

reqPartition := kmsg.NewFetchRequestTopicPartition()
reqPartition.Partition = partitionID
reqPartition.FetchOffset = offset
reqPartition.PartitionMaxBytes = 1 // Request the minimum amount of bytes.
reqPartition.CurrentLeaderEpoch = 0 // Not needed here.

reqTopic.Partitions = append(reqTopic.Partitions, reqPartition)
req.Topics = append(req.Topics, reqTopic)

// Issue the Fetch request.
kres, err := client.Request(context.Background(), &req)
require.NoError(t, err)

res := kres.(*kmsg.FetchResponse)
require.Equal(t, int16(0), res.ErrorCode)
require.Equal(t, 1, len(res.Topics))
require.Equal(t, 1, len(res.Topics[0].Partitions))

// Parse the response, just to check how many records we got.
parseOptions := kgo.ProcessFetchPartitionOptions{
KeepControlRecords: false,
Offset: offset,
IsolationLevel: kgo.ReadUncommitted(),
Topic: topicName,
Partition: partitionID,
}

rawPartitionResp := res.Topics[0].Partitions[0]
partition, _ := kgo.ProcessRespPartition(parseOptions, &rawPartitionResp, func(_ kgo.FetchBatchMetrics) {})

// Ensure we got a low number of records, otherwise the premise of this test is wrong
// because we want a single fetchWatch to be fulfilled in many Fetch requests.
require.LessOrEqual(t, len(partition.Records), 5)

// Keep track of the raw response.
fetchResponseByRequestedOffset[offset] = res
}

fetchResponseByRequestedOffset := fetchSmallestRecordsBatchForEachOffset(t, client, topicName, partitionID, 0, totalProducedRecords)
t.Logf("Collected raw Fetch responses for all expected offsets")

//
// Mock the Kafka server to intercept Fetch requests, return less records than requested and
// inject random failures.
//

cluster.ControlKey(kmsg.Fetch.Int16(), func(kreq kmsg.Request) (kmsg.Response, error, bool) {
cluster.KeepControl()

Expand Down Expand Up @@ -2174,10 +2105,7 @@ func TestPartitionReader_ShouldNotMissRecordsIfFetchRequestContainPartialFailure
return nil, nil, false
})

//
// Consume all the records using the PartitionReader.
//

var (
totalConsumedRecords = atomic.NewInt64(0)
consumedRecordIDs = sync.Map{}
Expand Down Expand Up @@ -2224,6 +2152,162 @@ func TestPartitionReader_ShouldNotMissRecordsIfFetchRequestContainPartialFailure
}
}

// This test reproduces a scenario that we don't think should happen but, if it happens, we want to make sure
// that we don't lose records. The scenario is when Kafka returns a Fetch response for a *single* topic-partition
// containing *both* the error code set and some records.
func TestPartitionReader_ShouldNotMissRecordsIfKafkaReturnsAFetchBothWithAnErrorAndSomeRecords(t *testing.T) {
t.Parallel()

const (
topicName = "test"
partitionID = 1
totalProducedRecords = 10000
recordSizeBytes = initialBytesPerRecord
maxBufferedBytes = (totalProducedRecords * initialBytesPerRecord) / 100
)

// We want to run all these tests with different concurrency config.
concurrencyVariants := map[string][]readerTestCfgOpt{
"without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)},
"with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)},
"with startup and ongoing concurrency (same settings)": {withStartupConcurrency(2), withOngoingConcurrency(2)},
"with startup and ongoing concurrency (different settings)": {withStartupConcurrency(2), withOngoingConcurrency(4)},
}

for concurrencyName, concurrencyVariant := range concurrencyVariants {
concurrencyVariant := concurrencyVariant

t.Run(concurrencyName, func(t *testing.T) {
t.Parallel()

var (
cluster, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName)
reg = prometheus.NewPedanticRegistry()
)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
t.Cleanup(cancel)

// Produce records.
writeClient := newKafkaProduceClient(t, clusterAddr)
for i := 0; i < totalProducedRecords; i++ {
produceRandomRecord(ctx, t, writeClient, topicName, partitionID, recordSizeBytes, fmt.Sprintf("record-%05d", i))
}

// Fetch the raw record batches for each offset, so that it's easier to later mock the Kafka
// server and control the returned batches.
fetchResponseByRequestedOffset := fetchSmallestRecordsBatchForEachOffset(t, writeClient, topicName, partitionID, 0, totalProducedRecords)
t.Logf("Collected raw Fetch responses for all expected offsets")

// Mock the Kafka server to intercept Fetch requests, return less records than requested and
// randomly include error codes in some fetches.
cluster.ControlKey(kmsg.Fetch.Int16(), func(kreq kmsg.Request) (kmsg.Response, error, bool) {
cluster.KeepControl()

req := kreq.(*kmsg.FetchRequest)

// We expect only 1 partition in the request.
if len(req.Topics) != 1 {
return nil, fmt.Errorf("expected 1 topic in the request, got %d", len(req.Topics)), true
}
if len(req.Topics[0].Partitions) != 1 {
return nil, fmt.Errorf("expected 1 partition in the request, got %d", len(req.Topics[0].Partitions)), true
}

// Lookup the response among the ones we previously fetched with a small "MaxBytes".
res := fetchResponseByRequestedOffset[req.Topics[0].Partitions[0].FetchOffset]
if res == nil {
if req.Topics[0].Partitions[0].FetchOffset < totalProducedRecords {
return nil, errors.New("the offset requested has not been found among the ones we previously fetched"), true
}

// It was requested that we haven't been previously produced (could be a future offset), we just let kfake handle it.
return nil, nil, false
}

// We expect only 1 partition in the response.
if len(res.Topics) != 1 {
return nil, fmt.Errorf("expected 1 topic in the response, got %d", len(res.Topics)), true
}
if len(res.Topics[0].Partitions) != 1 {
return nil, fmt.Errorf("expected 1 partition in the response, got %d", len(res.Topics[0].Partitions)), true
}

// Simulate a 10% error rate in the Kafka responses, mixed with records.
if rand.Int()%10 == 0 {
// Make a copy so we don't overwrite the cached version, which will be later requested again.
resCopy := &kmsg.FetchResponse{Version: req.Version}
if err := resCopy.ReadFrom(res.AppendTo(nil)); err != nil {
return nil, fmt.Errorf("failed to make a copy of FetchResponse: %v", err), true
}

resCopy.Topics[0].Partitions[0].ErrorCode = kerr.UnknownServerError.Code
res = resCopy
}

return res, nil, true
})

// Consume all records.
var (
totalConsumedRecords = atomic.NewInt64(0)
consumedRecordIDs = sync.Map{}
)

consumer := consumerFunc(func(_ context.Context, records []record) error {
for _, rec := range records {
totalConsumedRecords.Inc()

// Parse the record ID from the actual record data.
recordID, err := strconv.ParseInt(string(rec.content[7:12]), 10, 64)
require.NoError(t, err)
consumedRecordIDs.Store(recordID, struct{}{})
}

return nil
})

readerOpts := append([]readerTestCfgOpt{
withConsumeFromPositionAtStartup(consumeFromStart),
withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second),
withMaxBufferedBytes(maxBufferedBytes),
withRegistry(reg),
withLogger(log.NewNopLogger()),
}, concurrencyVariant...)

reader := createReader(t, clusterAddr, topicName, partitionID, consumer, readerOpts...)
require.NoError(t, reader.StartAsync(ctx))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, reader))
})

// Wait until all produced have been consumed.
test.Poll(t, 60*time.Second, int64(totalProducedRecords), func() interface{} {
return totalConsumedRecords.Load()
})

// Ensure that the actual records content match the expected one.
for i := int64(0); i < totalProducedRecords; i++ {
_, found := consumedRecordIDs.Load(i)
require.Truef(t, found, "Expected to find a consumed record with ID %d", i)
}

// We expect the last consumed offset to be tracked in a metric.
test.Poll(t, time.Second, nil, func() interface{} {
return promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet.
# TYPE cortex_ingest_storage_reader_last_consumed_offset gauge
cortex_ingest_storage_reader_last_consumed_offset{partition="1"} %d
# HELP cortex_ingest_storage_reader_buffered_fetch_records_total Total number of records buffered within the client ready to be consumed
# TYPE cortex_ingest_storage_reader_buffered_fetch_records_total gauge
cortex_ingest_storage_reader_buffered_fetch_records_total{component="partition-reader"} 0
`, totalProducedRecords-1)), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total")
})
})
}
}

func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) {
const (
topicName = "test"
Expand Down Expand Up @@ -2846,3 +2930,69 @@ func createTestContextWithTimeout(t *testing.T, timeout time.Duration) context.C
t.Cleanup(cancel)
return ctx
}

func fetchSmallestRecordsBatchForEachOffset(t *testing.T, client *kgo.Client, topicName string, partitionID int32, startOffset, endOffset int64) map[int64]*kmsg.FetchResponse {
// Get topic ID.
topics, err := kadm.NewClient(client).ListTopics(context.Background(), topicName)
require.NoError(t, err)
require.NoError(t, topics.Error())
require.True(t, topics.Has(topicName))
topicID := topics[topicName].ID

t.Logf("Fetched topic ID")

// Fetch the raw record batches for each offset
fetchResponseByRequestedOffset := map[int64]*kmsg.FetchResponse{}

for offset := startOffset; offset <= endOffset; offset++ {
// Build a Fetch request.
req := kmsg.NewFetchRequest()
req.MinBytes = 1
req.Version = 13
req.MaxWaitMillis = 1000
req.MaxBytes = 1 // Request the minimum amount of bytes.

reqTopic := kmsg.NewFetchRequestTopic()
reqTopic.Topic = topicName
reqTopic.TopicID = topicID

reqPartition := kmsg.NewFetchRequestTopicPartition()
reqPartition.Partition = partitionID
reqPartition.FetchOffset = offset
reqPartition.PartitionMaxBytes = 1 // Request the minimum amount of bytes.
reqPartition.CurrentLeaderEpoch = 0 // Not needed here.

reqTopic.Partitions = append(reqTopic.Partitions, reqPartition)
req.Topics = append(req.Topics, reqTopic)

// Issue the Fetch request.
kres, err := client.Request(context.Background(), &req)
require.NoError(t, err)

res := kres.(*kmsg.FetchResponse)
require.Equal(t, int16(0), res.ErrorCode)
require.Equal(t, 1, len(res.Topics))
require.Equal(t, 1, len(res.Topics[0].Partitions))

// Parse the response, just to check how many records we got.
parseOptions := kgo.ProcessFetchPartitionOptions{
KeepControlRecords: false,
Offset: offset,
IsolationLevel: kgo.ReadUncommitted(),
Topic: topicName,
Partition: partitionID,
}

rawPartitionResp := res.Topics[0].Partitions[0]
partition, _ := kgo.ProcessRespPartition(parseOptions, &rawPartitionResp, func(_ kgo.FetchBatchMetrics) {})

// Ensure we got a low number of records, otherwise the premise of this test is wrong
// because we want a single fetchWatch to be fulfilled in many Fetch requests.
require.LessOrEqual(t, len(partition.Records), 5)

// Keep track of the raw response.
fetchResponseByRequestedOffset[offset] = res
}

return fetchResponseByRequestedOffset
}

0 comments on commit fbebaf8

Please sign in to comment.