Skip to content

Commit

Permalink
consumer: add pausing / unpausing topics and partitions
Browse files Browse the repository at this point in the history
This adds a feature that currently exists in the Java client and in
librdkafka: pausing consuming.

Unlike those clients, this client allows for pausing entire topics at
once, rather than individual partitions. This client also supports
pausing individual partitions; the two concepts are layered:
pausing/resuming topics is independent of pausing/resuming partitions.

Similar to librdkafka and unlike Java, we do not purge any currently
buffered fetches containing paused topics or partitions when they are
paused.
  • Loading branch information
twmb committed Sep 2, 2021
1 parent d361b58 commit 33635e2
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 1 deletion.
109 changes: 109 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type consumer struct {

bufferedRecords int64

pausedMu sync.Mutex // grabbed when updating paused
paused atomic.Value // loaded when issuing fetches

// mu is grabbed when
// - polling fetches, for quickly draining sources / updating group uncommitted
// - calling assignPartitions (group / direct updates)
Expand Down Expand Up @@ -129,6 +132,10 @@ type consumer struct {
fakeReadyForDraining []Fetch
}

func (c *consumer) loadPaused() pausedTopics { return c.paused.Load().(pausedTopics) }
func (c *consumer) clonePaused() pausedTopics { return c.paused.Load().(pausedTopics).clone() }
func (c *consumer) storePaused(p pausedTopics) { c.paused.Store(p) }

// BufferedFetchRecords returns the number of records currently buffered from
// fetching within the client.
//
Expand All @@ -153,6 +160,7 @@ func (u *usedCursors) use(c *cursor) {

func (c *consumer) init(cl *Client) {
c.cl = cl
c.paused.Store(make(pausedTopics))
c.sourcesReadyCond = sync.NewCond(&c.sourcesReadyMu)

if len(cl.cfg.topics) == 0 && len(cl.cfg.partitions) == 0 {
Expand Down Expand Up @@ -386,6 +394,107 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
return fetches
}

// PauseFetchTopics sets the client to no longer fetch the given topics and
// returns all currently paused topics. Paused topics persist until resumed.
// You can call this function with no topics to simply receive the list of
// currently paused topics.
//
// In contrast to the canonical Java client, this function does not clear
// anything currently buffered. Buffered fetches containing paused topics are
// still returned from polling.
//
// Pausing topics is independent from pausing individual partitions with the
// PauseFetchPartitions method. If you pause partitions for a topic with
// PauseFetchPartitions, and then pause that same topic with PauseFetchTopics,
// the individually paused partitions will not be unpaused if you only call
// ResumeFetchTopics.
func (cl *Client) PauseFetchTopics(topics ...string) []string {
c := &cl.consumer
if len(topics) == 0 {
return c.loadPaused().pausedTopics()
}

c.pausedMu.Lock()
defer c.pausedMu.Unlock()

paused := c.clonePaused()
paused.addTopics(topics...)
c.storePaused(paused)
return paused.pausedTopics()
}

// PauseFetchPartitions sets the client to no longer fetch the given partitions
// and returns all currently paused partitions. Paused partitions persist until
// resumed. You can call this function with no partitions to simply receive the
// list of currently paused partitions.
//
// In contrast to the canonical Java client, this function does not clear
// anything currently buffered. Buffered fetches containing paused partitions
// are still returned from polling.
//
// Pausing individual partitions is independent from pausing topics with the
// PauseFetchTopics method. If you pause partitions for a topic with
// PauseFetchPartitions, and then pause that same topic with PauseFetchTopics,
// the individually paused partitions will not be unpaused if you only call
// ResumeFetchTopics.
func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[string][]int32 {
c := &cl.consumer
if len(topicPartitions) == 0 {
return c.loadPaused().pausedPartitions()
}

c.pausedMu.Lock()
defer c.pausedMu.Unlock()

paused := c.clonePaused()
paused.addPartitions(topicPartitions)
c.storePaused(paused)
return paused.pausedPartitions()
}

// ResumeFetchTopics resumes fetching the input topics if they were previously
// paused. Resuming topics that are not currently paused is a per-topic no-op.
// See the documentation on PauseTfetchTopics for more details.
func (cl *Client) ResumeFetchTopics(topics ...string) {
defer func() {
cl.sinksAndSourcesMu.Lock()
for _, sns := range cl.sinksAndSources {
sns.source.maybeConsume()
}
cl.sinksAndSourcesMu.Unlock()
}()

c := &cl.consumer
c.pausedMu.Lock()
defer c.pausedMu.Unlock()

paused := c.clonePaused()
paused.delTopics(topics...)
c.storePaused(paused)
}

// ResumeFetchPartitions resumes fetching the input partitions if they were
// previously paused. Resuming partitions that are not currently paused is a
// per-topic no-op. See the documentation on PauseFetchPartitions for more
// details.
func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) {
defer func() {
cl.sinksAndSourcesMu.Lock()
for _, sns := range cl.sinksAndSources {
sns.source.maybeConsume()
}
cl.sinksAndSourcesMu.Unlock()
}()

c := &cl.consumer
c.pausedMu.Lock()
defer c.pausedMu.Unlock()

paused := c.clonePaused()
paused.delPartitions(topicPartitions)
c.storePaused(paused)
}

// assignHow controls how assignPartitions operates.
type assignHow int8

Expand Down
4 changes: 3 additions & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,14 +439,16 @@ func (s *source) createReq() *fetchRequest {
session: s.session,
}

paused := s.cl.consumer.loadPaused()

s.cursorsMu.Lock()
defer s.cursorsMu.Unlock()

cursorIdx := s.cursorsStart
for i := 0; i < len(s.cursors); i++ {
c := s.cursors[cursorIdx]
cursorIdx = (cursorIdx + 1) % len(s.cursors)
if !c.usable() {
if !c.usable() || paused.has(c.topic, c.partition) {
continue
}
req.addCursor(c)
Expand Down
103 changes: 103 additions & 0 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,109 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
)

type pausedTopics map[string]pausedPartitions

type pausedPartitions struct {
all bool
m map[int32]struct{}
}

func (m pausedTopics) has(topic string, partition int32) (paused bool) {
if len(m) == 0 {
return false
}
pps, exists := m[topic]
if !exists {
return false
}
if pps.all {
return true
}
_, exists = pps.m[partition]
return exists
}

func (m pausedTopics) addTopics(topics ...string) {
for _, topic := range topics {
pps, exists := m[topic]
if !exists {
pps = pausedPartitions{m: make(map[int32]struct{})}
}
pps.all = true
m[topic] = pps
}
}

func (m pausedTopics) delTopics(topics ...string) {
for _, topic := range topics {
pps, exists := m[topic]
if !exists {
continue
}
pps.all = false
if !pps.all && len(pps.m) == 0 {
delete(m, topic)
}
}
}

func (m pausedTopics) addPartitions(topicPartitions map[string][]int32) {
for topic, partitions := range topicPartitions {
pps, exists := m[topic]
if !exists {
pps = pausedPartitions{m: make(map[int32]struct{})}
}
for _, partition := range partitions {
pps.m[partition] = struct{}{}
}
m[topic] = pps
}
}

func (m pausedTopics) delPartitions(topicPartitions map[string][]int32) {
for topic, partitions := range topicPartitions {
pps, exists := m[topic]
if !exists {
continue
}
for _, partition := range partitions {
delete(pps.m, partition)
}
if !pps.all && len(pps.m) == 0 {
delete(m, topic)
}
}
}

func (m pausedTopics) pausedTopics() []string {
var r []string
for topic, pps := range m {
if pps.all {
r = append(r, topic)
}
}
return r
}

func (m pausedTopics) pausedPartitions() map[string][]int32 {
r := make(map[string][]int32)
for topic, pps := range m {
ps := make([]int32, 0, len(pps.m))
for partition := range pps.m {
ps = append(ps, partition)
}
r[topic] = ps
}
return r
}

func (m pausedTopics) clone() pausedTopics {
dup := make(pausedTopics)
dup.addTopics(m.pausedTopics()...)
dup.addPartitions(m.pausedPartitions())
return dup
}

func newTopicPartitions() *topicPartitions {
parts := new(topicPartitions)
parts.v.Store(new(topicPartitionsData))
Expand Down

0 comments on commit 33635e2

Please sign in to comment.