Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Backport: Set subscription readiness based on consumer group status (#182) #249

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cmd/channel/consolidated/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,5 @@ func main() {
ctx = injection.WithNamespaceScope(ctx, ns)
}

// Do not run the dispatcher in leader-election mode
ctx = sharedmain.WithHADisabled(ctx)

sharedmain.MainWithContext(ctx, component, controller.NewController)
}
109 changes: 109 additions & 0 deletions pkg/channel/consolidated/kafka/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kafka

import (
"context"
"fmt"
"math"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/sets"

"go.uber.org/zap"

"github.com/Shopify/sarama"
"knative.dev/pkg/logging"
)

var mutex sync.Mutex

type ClusterAdminFactory func() (sarama.ClusterAdmin, error)

type AdminClient interface {
// ListConsumerGroups Lists the consumer groups
ListConsumerGroups() ([]string, error)
}

// AdminClientManager manages a ClusterAdmin connection and recreates one when needed
// it is made to overcome https://github.com/Shopify/sarama/issues/1162
type AdminClientManager struct {
logger *zap.SugaredLogger
adminFactory ClusterAdminFactory
clusterAdmin sarama.ClusterAdmin
}

func NewAdminClient(ctx context.Context, caFactory ClusterAdminFactory) (AdminClient, error) {
logger := logging.FromContext(ctx)
logger.Info("Creating a new AdminClient")
kafkaClusterAdmin, err := caFactory()
if err != nil {
logger.Errorw("error while creating ClusterAdmin", zap.Error(err))
return nil, err
}
return &AdminClientManager{
logger: logger,
adminFactory: caFactory,
clusterAdmin: kafkaClusterAdmin,
}, nil
}

// ListConsumerGroups Returns a list of the consumer groups.
//
// In the occasion of errors, there will be a retry with an exponential backoff.
// Due to a known issue in Sarama ClusterAdmin https://github.com/Shopify/sarama/issues/1162,
// a new ClusterAdmin will be created with every retry until the call succeeds or
// the timeout is reached.
func (c *AdminClientManager) ListConsumerGroups() ([]string, error) {
c.logger.Info("Attempting to list consumer group")
mutex.Lock()
defer mutex.Unlock()
r := 0
// This gives us around ~13min of exponential backoff
max := 13
cgsMap, err := c.clusterAdmin.ListConsumerGroups()
for err != nil && r <= max {
// There's on error, let's retry and presume a new ClusterAdmin can fix it

// Calculate incremental delay following this https://docs.aws.amazon.com/general/latest/gr/api-retries.html
t := int(math.Pow(2, float64(r)) * 100)
d := time.Duration(t) * time.Millisecond
c.logger.Errorw("listing consumer group failed. Refreshing the ClusterAdmin and retrying.",
zap.Error(err),
zap.Duration("retry after", d),
zap.Int("Retry attempt", r),
zap.Int("Max retries", max),
)
time.Sleep(d)

// let's reconnect and try again
c.clusterAdmin, err = c.adminFactory()
r += 1
if err != nil {
// skip this attempt
continue
}
cgsMap, err = c.clusterAdmin.ListConsumerGroups()
}

if r > max {
return nil, fmt.Errorf("failed to refresh the culster admin and retry: %v", err)
}

return sets.StringKeySet(cgsMap).List(), nil
}
93 changes: 93 additions & 0 deletions pkg/channel/consolidated/kafka/admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kafka

import (
"fmt"
"sync"
"testing"
"time"

"github.com/Shopify/sarama"
pkgtesting "knative.dev/pkg/logging/testing"
)

const testCG = "cg1"

var m sync.RWMutex

type FakeClusterAdmin struct {
sarama.ClusterAdmin
faulty bool
}

func (f *FakeClusterAdmin) ListConsumerGroups() (map[string]string, error) {
cgs := map[string]string{
testCG: "cg",
}
m.RLock()
defer m.RUnlock()
if f.faulty {
return nil, fmt.Errorf("Error")
}
return cgs, nil
}

func TestAdminClient(t *testing.T) {
var wg sync.WaitGroup
wg.Add(10)
ctx := pkgtesting.TestContextWithLogger(t)
f := &FakeClusterAdmin{}
ac, err := NewAdminClient(ctx, func() (sarama.ClusterAdmin, error) {
return f, nil
})
if err != nil {
t.Error("failed to obtain new client", err)
}
for i := 0; i < 10; i += 1 {
go func() {
doList(t, ac)
check := make(chan struct{})
go func() {
m.Lock()
f.faulty = true
m.Unlock()
check <- struct{}{}
time.Sleep(2 * time.Second)
m.Lock()
f.faulty = false
m.Unlock()
check <- struct{}{}
}()
<-check
doList(t, ac)
<-check
wg.Done()
}()
}
wg.Wait()
}

func doList(t *testing.T, ac AdminClient) {
cgs, _ := ac.ListConsumerGroups()
if len(cgs) != 1 {
t.Fatalf("list consumer group: got %d, want %d", len(cgs), 1)
}
if cgs[0] != testCG {
t.Fatalf("consumer group: got %s, want %s", cgs[0], testCG)
}
}
Loading