diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 903edc0d..f93f3cfb 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -92,6 +92,9 @@ type consumer struct { bufferedRecords int64 + pausedMu sync.Mutex // grabbed when updating paused + paused atomic.Value // loaded when issuing fetches + // mu is grabbed when // - polling fetches, for quickly draining sources / updating group uncommitted // - calling assignPartitions (group / direct updates) @@ -129,6 +132,10 @@ type consumer struct { fakeReadyForDraining []Fetch } +func (c *consumer) loadPaused() pausedTopics { return c.paused.Load().(pausedTopics) } +func (c *consumer) clonePaused() pausedTopics { return c.paused.Load().(pausedTopics).clone() } +func (c *consumer) storePaused(p pausedTopics) { c.paused.Store(p) } + // BufferedFetchRecords returns the number of records currently buffered from // fetching within the client. // @@ -153,6 +160,7 @@ func (u *usedCursors) use(c *cursor) { func (c *consumer) init(cl *Client) { c.cl = cl + c.paused.Store(make(pausedTopics)) c.sourcesReadyCond = sync.NewCond(&c.sourcesReadyMu) if len(cl.cfg.topics) == 0 && len(cl.cfg.partitions) == 0 { @@ -386,6 +394,107 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { return fetches } +// PauseFetchTopics sets the client to no longer fetch the given topics and +// returns all currently paused topics. Paused topics persist until resumed. +// You can call this function with no topics to simply receive the list of +// currently paused topics. +// +// In contrast to the canonical Java client, this function does not clear +// anything currently buffered. Buffered fetches containing paused topics are +// still returned from polling. +// +// Pausing topics is independent from pausing individual partitions with the +// PauseFetchPartitions method. If you pause partitions for a topic with +// PauseFetchPartitions, and then pause that same topic with PauseFetchTopics, +// the individually paused partitions will not be unpaused if you only call +// ResumeFetchTopics. +func (cl *Client) PauseFetchTopics(topics ...string) []string { + c := &cl.consumer + if len(topics) == 0 { + return c.loadPaused().pausedTopics() + } + + c.pausedMu.Lock() + defer c.pausedMu.Unlock() + + paused := c.clonePaused() + paused.addTopics(topics...) + c.storePaused(paused) + return paused.pausedTopics() +} + +// PauseFetchPartitions sets the client to no longer fetch the given partitions +// and returns all currently paused partitions. Paused partitions persist until +// resumed. You can call this function with no partitions to simply receive the +// list of currently paused partitions. +// +// In contrast to the canonical Java client, this function does not clear +// anything currently buffered. Buffered fetches containing paused partitions +// are still returned from polling. +// +// Pausing individual partitions is independent from pausing topics with the +// PauseFetchTopics method. If you pause partitions for a topic with +// PauseFetchPartitions, and then pause that same topic with PauseFetchTopics, +// the individually paused partitions will not be unpaused if you only call +// ResumeFetchTopics. +func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[string][]int32 { + c := &cl.consumer + if len(topicPartitions) == 0 { + return c.loadPaused().pausedPartitions() + } + + c.pausedMu.Lock() + defer c.pausedMu.Unlock() + + paused := c.clonePaused() + paused.addPartitions(topicPartitions) + c.storePaused(paused) + return paused.pausedPartitions() +} + +// ResumeFetchTopics resumes fetching the input topics if they were previously +// paused. Resuming topics that are not currently paused is a per-topic no-op. +// See the documentation on PauseTfetchTopics for more details. +func (cl *Client) ResumeFetchTopics(topics ...string) { + defer func() { + cl.sinksAndSourcesMu.Lock() + for _, sns := range cl.sinksAndSources { + sns.source.maybeConsume() + } + cl.sinksAndSourcesMu.Unlock() + }() + + c := &cl.consumer + c.pausedMu.Lock() + defer c.pausedMu.Unlock() + + paused := c.clonePaused() + paused.delTopics(topics...) + c.storePaused(paused) +} + +// ResumeFetchPartitions resumes fetching the input partitions if they were +// previously paused. Resuming partitions that are not currently paused is a +// per-topic no-op. See the documentation on PauseFetchPartitions for more +// details. +func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) { + defer func() { + cl.sinksAndSourcesMu.Lock() + for _, sns := range cl.sinksAndSources { + sns.source.maybeConsume() + } + cl.sinksAndSourcesMu.Unlock() + }() + + c := &cl.consumer + c.pausedMu.Lock() + defer c.pausedMu.Unlock() + + paused := c.clonePaused() + paused.delPartitions(topicPartitions) + c.storePaused(paused) +} + // assignHow controls how assignPartitions operates. type assignHow int8 diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 295870bd..e9144c9a 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -439,6 +439,8 @@ func (s *source) createReq() *fetchRequest { session: s.session, } + paused := s.cl.consumer.loadPaused() + s.cursorsMu.Lock() defer s.cursorsMu.Unlock() @@ -446,7 +448,7 @@ func (s *source) createReq() *fetchRequest { for i := 0; i < len(s.cursors); i++ { c := s.cursors[cursorIdx] cursorIdx = (cursorIdx + 1) % len(s.cursors) - if !c.usable() { + if !c.usable() || paused.has(c.topic, c.partition) { continue } req.addCursor(c) diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index bdc47918..4ffccaf9 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -7,6 +7,109 @@ import ( "github.com/twmb/franz-go/pkg/kerr" ) +type pausedTopics map[string]pausedPartitions + +type pausedPartitions struct { + all bool + m map[int32]struct{} +} + +func (m pausedTopics) has(topic string, partition int32) (paused bool) { + if len(m) == 0 { + return false + } + pps, exists := m[topic] + if !exists { + return false + } + if pps.all { + return true + } + _, exists = pps.m[partition] + return exists +} + +func (m pausedTopics) addTopics(topics ...string) { + for _, topic := range topics { + pps, exists := m[topic] + if !exists { + pps = pausedPartitions{m: make(map[int32]struct{})} + } + pps.all = true + m[topic] = pps + } +} + +func (m pausedTopics) delTopics(topics ...string) { + for _, topic := range topics { + pps, exists := m[topic] + if !exists { + continue + } + pps.all = false + if !pps.all && len(pps.m) == 0 { + delete(m, topic) + } + } +} + +func (m pausedTopics) addPartitions(topicPartitions map[string][]int32) { + for topic, partitions := range topicPartitions { + pps, exists := m[topic] + if !exists { + pps = pausedPartitions{m: make(map[int32]struct{})} + } + for _, partition := range partitions { + pps.m[partition] = struct{}{} + } + m[topic] = pps + } +} + +func (m pausedTopics) delPartitions(topicPartitions map[string][]int32) { + for topic, partitions := range topicPartitions { + pps, exists := m[topic] + if !exists { + continue + } + for _, partition := range partitions { + delete(pps.m, partition) + } + if !pps.all && len(pps.m) == 0 { + delete(m, topic) + } + } +} + +func (m pausedTopics) pausedTopics() []string { + var r []string + for topic, pps := range m { + if pps.all { + r = append(r, topic) + } + } + return r +} + +func (m pausedTopics) pausedPartitions() map[string][]int32 { + r := make(map[string][]int32) + for topic, pps := range m { + ps := make([]int32, 0, len(pps.m)) + for partition := range pps.m { + ps = append(ps, partition) + } + r[topic] = ps + } + return r +} + +func (m pausedTopics) clone() pausedTopics { + dup := make(pausedTopics) + dup.addTopics(m.pausedTopics()...) + dup.addPartitions(m.pausedPartitions()) + return dup +} + func newTopicPartitions() *topicPartitions { parts := new(topicPartitions) parts.v.Store(new(topicPartitionsData))