diff --git a/consumer.go b/consumer.go index e5de258f8..df13c2996 100644 --- a/consumer.go +++ b/consumer.go @@ -74,6 +74,26 @@ type Consumer interface { // Close shuts down the consumer. It must be called after all child // PartitionConsumers have already been closed. Close() error + + // Pause suspends fetching from the requested partitions. Future calls to the broker will not return any + // records from these partitions until they have been resumed using Resume()/ResumeAll(). + // Note that this method does not affect partition subscription. + // In particular, it does not cause a group rebalance when automatic assignment is used. + Pause(topicPartitions map[string][]int32) + + // Resume resumes specified partitions which have been paused with Pause()/PauseAll(). + // New calls to the broker will return records from these partitions if there are any to be fetched. + Resume(topicPartitions map[string][]int32) + + // Pause suspends fetching from all partitions. Future calls to the broker will not return any + // records from these partitions until they have been resumed using Resume()/ResumeAll(). + // Note that this method does not affect partition subscription. + // In particular, it does not cause a group rebalance when automatic assignment is used. + PauseAll() + + // Resume resumes all partitions which have been paused with Pause()/PauseAll(). + // New calls to the broker will return records from these partitions if there are any to be fetched. + ResumeAll() } type consumer struct { @@ -245,6 +265,62 @@ func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) { delete(c.brokerConsumers, brokerWorker.broker) } +// Pause implements Consumer. +func (c *consumer) Pause(topicPartitions map[string][]int32) { + c.lock.Lock() + defer c.lock.Unlock() + + for topic, partitions := range topicPartitions { + for _, partition := range partitions { + if topicConsumers, ok := c.children[topic]; ok { + if partitionConsumer, ok := topicConsumers[partition]; ok { + partitionConsumer.Pause() + } + } + } + } +} + +// Resume implements Consumer. +func (c *consumer) Resume(topicPartitions map[string][]int32) { + c.lock.Lock() + defer c.lock.Unlock() + + for topic, partitions := range topicPartitions { + for _, partition := range partitions { + if topicConsumers, ok := c.children[topic]; ok { + if partitionConsumer, ok := topicConsumers[partition]; ok { + partitionConsumer.Resume() + } + } + } + } +} + +// PauseAll implements Consumer. +func (c *consumer) PauseAll() { + c.lock.Lock() + defer c.lock.Unlock() + + for _, partitions := range c.children { + for _, partitionConsumer := range partitions { + partitionConsumer.Pause() + } + } +} + +// ResumeAll implements Consumer. +func (c *consumer) ResumeAll() { + c.lock.Lock() + defer c.lock.Unlock() + + for _, partitions := range c.children { + for _, partitionConsumer := range partitions { + partitionConsumer.Resume() + } + } +} + // PartitionConsumer // PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or @@ -292,6 +368,20 @@ type PartitionConsumer interface { // i.e. the offset that will be used for the next message that will be produced. // You can use this to determine how far behind the processing is. HighWaterMarkOffset() int64 + + // Pause suspends fetching from this partition. Future calls to the broker will not return + // any records from these partition until it have been resumed using Resume(). + // Note that this method does not affect partition subscription. + // In particular, it does not cause a group rebalance when automatic assignment is used. + Pause() + + // Resume resumes this partition which have been paused with Pause(). + // New calls to the broker will return records from these partitions if there are any to be fetched. + // If the partition was not previously paused, this method is a no-op. + Resume() + + // IsPaused indicates if this partition consumer is paused or not + IsPaused() bool } type partitionConsumer struct { @@ -314,6 +404,8 @@ type partitionConsumer struct { fetchSize int32 offset int64 retries int32 + + paused int32 } var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing @@ -737,6 +829,21 @@ func (child *partitionConsumer) interceptors(msg *ConsumerMessage) { } } +// Pause implements PartitionConsumer. +func (child *partitionConsumer) Pause() { + atomic.StoreInt32(&child.paused, 1) +} + +// Resume implements PartitionConsumer. +func (child *partitionConsumer) Resume() { + atomic.StoreInt32(&child.paused, 0) +} + +// IsPaused implements PartitionConsumer. +func (child *partitionConsumer) IsPaused() bool { + return atomic.LoadInt32(&child.paused) == 1 +} + type brokerConsumer struct { consumer *consumer broker *Broker @@ -962,7 +1069,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { } for child := range bc.subscriptions { - request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) + if !child.IsPaused() { + request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) + } } return bc.broker.Fetch(request) diff --git a/consumer_group.go b/consumer_group.go index b603d1705..fb2393698 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -54,6 +54,26 @@ type ConsumerGroup interface { // Close stops the ConsumerGroup and detaches any running sessions. It is required to call // this function before the object passes out of scope, as it will otherwise leak memory. Close() error + + // Pause suspends fetching from the requested partitions. Future calls to the broker will not return any + // records from these partitions until they have been resumed using Resume()/ResumeAll(). + // Note that this method does not affect partition subscription. + // In particular, it does not cause a group rebalance when automatic assignment is used. + Pause(partitions map[string][]int32) + + // Resume resumes specified partitions which have been paused with Pause()/PauseAll(). + // New calls to the broker will return records from these partitions if there are any to be fetched. + Resume(partitions map[string][]int32) + + // Pause suspends fetching from all partitions. Future calls to the broker will not return any + // records from these partitions until they have been resumed using Resume()/ResumeAll(). + // Note that this method does not affect partition subscription. + // In particular, it does not cause a group rebalance when automatic assignment is used. + PauseAll() + + // Resume resumes all partitions which have been paused with Pause()/PauseAll(). + // New calls to the broker will return records from these partitions if there are any to be fetched. + ResumeAll() } type consumerGroup struct { @@ -188,6 +208,26 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co return sess.release(true) } +// Pause implements ConsumerGroup. +func (c *consumerGroup) Pause(partitions map[string][]int32) { + c.consumer.Pause(partitions) +} + +// Resume implements ConsumerGroup. +func (c *consumerGroup) Resume(partitions map[string][]int32) { + c.consumer.Resume(partitions) +} + +// PauseAll implements ConsumerGroup. +func (c *consumerGroup) PauseAll() { + c.consumer.PauseAll() +} + +// ResumeAll implements ConsumerGroup. +func (c *consumerGroup) ResumeAll() { + c.consumer.ResumeAll() +} + func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) { select { case <-c.closed: diff --git a/consumer_test.go b/consumer_test.go index 0f752c4e0..ee50d10eb 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -78,6 +78,87 @@ func TestConsumerOffsetManual(t *testing.T) { broker0.Close() } +func TestPauseResumeConsumption(t *testing.T) { + // Given + broker0 := NewMockBroker(t, 0) + + const newestOffsetBroker = 1233 + const maxOffsetBroker = newestOffsetBroker + 10 + offsetBroker := newestOffsetBroker + offsetClient := offsetBroker + + mockFetchResponse := NewMockFetchResponse(t, 1) + mockFetchResponse.SetMessage("my_topic", 0, int64(newestOffsetBroker), testMsg) + offsetBroker++ + + brokerResponses := map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetOldest, 0). + SetOffset("my_topic", 0, OffsetNewest, int64(newestOffsetBroker)), + "FetchRequest": mockFetchResponse, + } + + broker0.SetHandlerByMap(brokerResponses) + + // When + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + + consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest) + if err != nil { + t.Fatal(err) + } + + // pause the consumption + consumer.Pause() + + // set more msgs on broker + for ; offsetBroker < maxOffsetBroker; offsetBroker++ { + mockFetchResponse = mockFetchResponse.SetMessage("my_topic", 0, int64(offsetBroker), testMsg) + } + brokerResponses["FetchRequest"] = mockFetchResponse + broker0.SetHandlerByMap(brokerResponses) + + keepConsuming := true + for keepConsuming { + select { + case message := <-consumer.Messages(): + // only the first msg is expected to be consumed + offsetClient++ + assertMessageOffset(t, message, int64(newestOffsetBroker)) + case err := <-consumer.Errors(): + t.Fatal(err) + case <-time.After(time.Second): + // is expected to timedout once the consumption is pauses + keepConsuming = false + } + } + + // lets resume the consumption in order to consume the new msgs + consumer.Resume() + + for offsetClient < maxOffsetBroker { + select { + case message := <-consumer.Messages(): + assertMessageOffset(t, message, int64(offsetClient)) + offsetClient += 1 + case err := <-consumer.Errors(): + t.Fatal("Error: ", err) + case <-time.After(time.Second * 10): + t.Fatal("consumer timed out . Offset: ", offsetClient) + } + } + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() +} + // If `OffsetNewest` is passed as the initial offset then the first consumed // message indeed corresponds to the offset that broker claims to be the // newest in its metadata response. diff --git a/examples/consumergroup/README.md b/examples/consumergroup/README.md index cfe805769..eaca163f7 100644 --- a/examples/consumergroup/README.md +++ b/examples/consumergroup/README.md @@ -4,4 +4,6 @@ This example shows you how to use the Sarama consumer group consumer. The exampl ```bash $ go run main.go -brokers="127.0.0.1:9092" -topics="sarama" -group="example" -``` \ No newline at end of file +``` + +You can also toggle (pause/resume) the consumption by sending SIGUSR1 \ No newline at end of file diff --git a/examples/consumergroup/main.go b/examples/consumergroup/main.go index 53ed143f6..0edc06cc7 100644 --- a/examples/consumergroup/main.go +++ b/examples/consumergroup/main.go @@ -1,5 +1,6 @@ package main +// SIGUSR1 toggle the pause/resume consumption import ( "context" "flag" @@ -48,6 +49,7 @@ func init() { } func main() { + keepRunning := true log.Println("Starting a new Sarama consumer") if verbose { @@ -94,6 +96,7 @@ func main() { log.Panicf("Error creating consumer group client: %v", err) } + consumptionIsPaused := false wg := &sync.WaitGroup{} wg.Add(1) go func() { @@ -116,13 +119,23 @@ func main() { <-consumer.ready // Await till the consumer has been set up log.Println("Sarama consumer up and running!...") + sigusr1 := make(chan os.Signal, 1) + signal.Notify(sigusr1, syscall.SIGUSR1) + sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) - select { - case <-ctx.Done(): - log.Println("terminating: context cancelled") - case <-sigterm: - log.Println("terminating: via signal") + + for keepRunning { + select { + case <-ctx.Done(): + log.Println("terminating: context cancelled") + keepRunning = false + case <-sigterm: + log.Println("terminating: via signal") + keepRunning = false + case <-sigusr1: + toggleConsumptionFlow(client, &consumptionIsPaused) + } } cancel() wg.Wait() @@ -131,6 +144,18 @@ func main() { } } +func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) { + if *isPaused { + client.ResumeAll() + log.Println("Resuming consumption") + } else { + client.PauseAll() + log.Println("Pausing consumption") + } + + *isPaused = !*isPaused +} + // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool diff --git a/mockresponses.go b/mockresponses.go index 4c9cac809..91823db5a 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -3,6 +3,7 @@ package sarama import ( "fmt" "strings" + "sync" ) // TestReporter has methods matching go's testing.T to avoid importing @@ -264,6 +265,7 @@ func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int // MockFetchResponse is a `FetchResponse` builder. type MockFetchResponse struct { messages map[string]map[int32]map[int64]Encoder + messagesLock *sync.RWMutex highWaterMarks map[string]map[int32]int64 t TestReporter batchSize int @@ -273,6 +275,7 @@ type MockFetchResponse struct { func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse { return &MockFetchResponse{ messages: make(map[string]map[int32]map[int64]Encoder), + messagesLock: &sync.RWMutex{}, highWaterMarks: make(map[string]map[int32]int64), t: t, batchSize: batchSize, @@ -285,6 +288,8 @@ func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse { } func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse { + mfr.messagesLock.Lock() + defer mfr.messagesLock.Unlock() partitions := mfr.messages[topic] if partitions == nil { partitions = make(map[int32]map[int64]Encoder) @@ -339,6 +344,8 @@ func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader { } func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder { + mfr.messagesLock.RLock() + defer mfr.messagesLock.RUnlock() partitions := mfr.messages[topic] if partitions == nil { return nil @@ -351,6 +358,8 @@ func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset i } func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int { + mfr.messagesLock.RLock() + defer mfr.messagesLock.RUnlock() partitions := mfr.messages[topic] if partitions == nil { return 0 diff --git a/mocks/consumer.go b/mocks/consumer.go index c10ae4765..60436401e 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -127,6 +127,62 @@ func (c *Consumer) Close() error { return nil } +// Pause implements Consumer. +func (c *Consumer) Pause(topicPartitions map[string][]int32) { + c.l.Lock() + defer c.l.Unlock() + + for topic, partitions := range topicPartitions { + for _, partition := range partitions { + if topicConsumers, ok := c.partitionConsumers[topic]; ok { + if partitionConsumer, ok := topicConsumers[partition]; ok { + partitionConsumer.Pause() + } + } + } + } +} + +// Resume implements Consumer. +func (c *Consumer) Resume(topicPartitions map[string][]int32) { + c.l.Lock() + defer c.l.Unlock() + + for topic, partitions := range topicPartitions { + for _, partition := range partitions { + if topicConsumers, ok := c.partitionConsumers[topic]; ok { + if partitionConsumer, ok := topicConsumers[partition]; ok { + partitionConsumer.Resume() + } + } + } + } +} + +// PauseAll implements Consumer. +func (c *Consumer) PauseAll() { + c.l.Lock() + defer c.l.Unlock() + + for _, partitions := range c.partitionConsumers { + for _, partitionConsumer := range partitions { + partitionConsumer.Pause() + } + } +} + +// ResumeAll implements Consumer. +func (c *Consumer) ResumeAll() { + c.l.Lock() + defer c.l.Unlock() + + for _, partitions := range c.partitionConsumers { + for _, partitionConsumer := range partitions { + partitionConsumer.Resume() + } + } +} + /////////////////////////////////////////////////// // Expectation API /////////////////////////////////////////////////// @@ -156,12 +212,13 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset if c.partitionConsumers[topic][partition] == nil { c.partitionConsumers[topic][partition] = &PartitionConsumer{ - t: c.t, - topic: topic, - partition: partition, - offset: offset, - messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), - errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize), + t: c.t, + topic: topic, + partition: partition, + offset: offset, + messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), + suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), + errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize), } } @@ -178,18 +235,21 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset // Errors and Messages channel, you should specify what values will be provided on these // channels using YieldMessage and YieldError. type PartitionConsumer struct { - highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG - l sync.Mutex - t ErrorReporter - topic string - partition int32 - offset int64 - messages chan *sarama.ConsumerMessage - errors chan *sarama.ConsumerError - singleClose sync.Once - consumed bool - errorsShouldBeDrained bool - messagesShouldBeDrained bool + highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG + l sync.Mutex + t ErrorReporter + topic string + partition int32 + offset int64 + messages chan *sarama.ConsumerMessage + suppressedMessages chan *sarama.ConsumerMessage + suppressedHighWaterMarkOffset int64 + errors chan *sarama.ConsumerError + singleClose sync.Once + consumed bool + errorsShouldBeDrained bool + messagesShouldBeDrained bool + paused bool } /////////////////////////////////////////////////// @@ -199,6 +259,7 @@ type PartitionConsumer struct { // AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface. func (pc *PartitionConsumer) AsyncClose() { pc.singleClose.Do(func() { + close(pc.suppressedMessages) close(pc.messages) close(pc.errors) }) @@ -249,6 +310,14 @@ func (pc *PartitionConsumer) Close() error { } }() + wg.Add(1) + go func() { + defer wg.Done() + for range pc.suppressedMessages { + // drain + } + }() + wg.Wait() return closeErr } @@ -267,6 +336,38 @@ func (pc *PartitionConsumer) HighWaterMarkOffset() int64 { return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1 } +// Pause implements the Pause method from the sarama.PartitionConsumer interface. +func (pc *PartitionConsumer) Pause() { + pc.l.Lock() + defer pc.l.Unlock() + + pc.suppressedHighWaterMarkOffset = atomic.LoadInt64(&pc.highWaterMarkOffset) + + pc.paused = true +} + +// Resume implements the Resume method from the sarama.PartitionConsumer interface. +func (pc *PartitionConsumer) Resume() { + pc.l.Lock() + defer pc.l.Unlock() + + pc.highWaterMarkOffset = atomic.LoadInt64(&pc.suppressedHighWaterMarkOffset) + for len(pc.suppressedMessages) > 0 { + msg := <-pc.suppressedMessages + pc.messages <- msg + } + + pc.paused = false +} + +// IsPaused implements the IsPaused method from the sarama.PartitionConsumer interface. +func (pc *PartitionConsumer) IsPaused() bool { + pc.l.Lock() + defer pc.l.Unlock() + + return pc.paused +} + /////////////////////////////////////////////////// // Expectation API /////////////////////////////////////////////////// @@ -282,10 +383,17 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *Partitio msg.Topic = pc.topic msg.Partition = pc.partition - msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) pc.messages <- msg + if pc.paused { + msg.Offset = atomic.AddInt64(&pc.suppressedHighWaterMarkOffset, 1) + pc.suppressedMessages <- msg + } else { + msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) + pc.messages <- msg + } + return pc } diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index 9367d6aa1..02aa2a14b 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -64,6 +64,73 @@ func TestConsumerHandlesExpectations(t *testing.T) { } } +func TestConsumerHandlesExpectationsPausingResuming(t *testing.T) { + consumer := NewConsumer(t, NewTestConfig()) + defer func() { + if err := consumer.Close(); err != nil { + t.Error(err) + } + }() + + consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).Pause() + consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")}) + consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers) + consumer.ExpectConsumePartition("test", 1, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world again")}) + consumer.ExpectConsumePartition("other", 0, AnyOffset).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello other")}) + + pc_test0, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) + if err != nil { + t.Fatal(err) + } + if len(pc_test0.Messages()) > 0 { + t.Error("Problem to pause consumption") + } + test0_err := <-pc_test0.Errors() + if test0_err.Err != sarama.ErrOutOfBrokers { + t.Error("Expected sarama.ErrOutOfBrokers, found:", test0_err.Err) + } + + if pc_test0.HighWaterMarkOffset() != 1 { + t.Error("High water mark offset with value different from the expected: ", pc_test0.HighWaterMarkOffset()) + } + + pc_test1, err := consumer.ConsumePartition("test", 1, sarama.OffsetOldest) + if err != nil { + t.Fatal(err) + } + test1_msg := <-pc_test1.Messages() + if test1_msg.Topic != "test" || test1_msg.Partition != 1 || string(test1_msg.Value) != "hello world again" { + t.Error("Message was not as expected:", test1_msg) + } + + if pc_test1.HighWaterMarkOffset() != 2 { + t.Error("High water mark offset with value different from the expected: ", pc_test1.HighWaterMarkOffset()) + } + + pc_other0, err := consumer.ConsumePartition("other", 0, sarama.OffsetNewest) + if err != nil { + t.Fatal(err) + } + other0_msg := <-pc_other0.Messages() + if other0_msg.Topic != "other" || other0_msg.Partition != 0 || string(other0_msg.Value) != "hello other" { + t.Error("Message was not as expected:", other0_msg) + } + + if pc_other0.HighWaterMarkOffset() != 2 { + t.Error("High water mark offset with value different from the expected: ", pc_other0.HighWaterMarkOffset()) + } + + pc_test0.Resume() + test0_msg := <-pc_test0.Messages() + if test0_msg.Topic != "test" || test0_msg.Partition != 0 || string(test0_msg.Value) != "hello world" { + t.Error("Message was not as expected:", test0_msg) + } + + if pc_test0.HighWaterMarkOffset() != 2 { + t.Error("High water mark offset with value different from the expected: ", pc_test0.HighWaterMarkOffset()) + } +} + func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) { consumer := NewConsumer(t, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers)