diff --git a/partitioner.go b/partitioner.go index a66e11ea3..9da871391 100644 --- a/partitioner.go +++ b/partitioner.go @@ -61,9 +61,9 @@ func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption { } // WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty -func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption { +func WithCustomFallbackPartitioner(randomHP Partitioner) HashPartitionerOption { return func(hp *hashPartitioner) { - hp.random = hp + hp.random = randomHP } } diff --git a/partitioner_test.go b/partitioner_test.go index 9c0161122..06a4ad725 100644 --- a/partitioner_test.go +++ b/partitioner_test.go @@ -207,6 +207,36 @@ func TestManualPartitioner(t *testing.T) { } } +func TestWithCustomFallbackPartitioner(t *testing.T) { + topic := "mytopic" + + partitioner := NewCustomPartitioner( + // override default random partitioner with round robin + WithCustomFallbackPartitioner(NewRoundRobinPartitioner(topic)), + )(topic) + + // Should use round robin implementation if there is no key + var i int32 + for i = 0; i < 50; i++ { + choice, err := partitioner.Partition(&ProducerMessage{Key: nil}, 7) + if err != nil { + t.Error(partitioner, err) + } + if choice != i%7 { + t.Error("Returned partition", choice, "expecting", i%7) + } + } + + // should use hash partitioner if key is specified + buf := make([]byte, 256) + for i := 0; i < 50; i++ { + if _, err := rand.Read(buf); err != nil { + t.Error(err) + } + assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50) + } +} + // By default, Sarama uses the message's key to consistently assign a partition to // a message using hashing. If no key is set, a random partition will be chosen. // This example shows how you can partition messages randomly, even when a key is set,