diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 08fba483ec9b..3ad586f3e596 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -901,12 +901,11 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, if len(stream.Stream.Entries) == 0 { return nil } - /* partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) - if err != nil { - d.kafkaAppends.WithLabelValues("kafka", "fail").Inc() - return fmt.Errorf("failed to find active partition for stream: %w", err) - }*/ - partitionID := int32(0) + partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) + if err != nil { + d.kafkaAppends.WithLabelValues("kafka", "fail").Inc() + return fmt.Errorf("failed to find active partition for stream: %w", err) + } startTime := time.Now()