Skip to content

Commit

Permalink
Add kafka ConsumerBuilder
Browse files Browse the repository at this point in the history
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
  • Loading branch information
Davit Yeghshatyan committed Jul 20, 2018
1 parent 346630d commit 3ddb0ce
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 46 deletions.
26 changes: 9 additions & 17 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package consumer

import (
"io"
"sync"

"github.com/Shopify/sarama"
Expand All @@ -24,21 +23,16 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/pkg/kafka/config"
)

// SaramaConsumer is an interface to features of Sarama that are necessary for the consumer
type SaramaConsumer interface {
Partitions() <-chan sc.PartitionConsumer
MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
io.Closer
}

// Params are the parameters of a Consumer
type Params struct {
Options Options
Processor processor.SpanProcessor
Factory metrics.Factory
Logger *zap.Logger
config.ConsumerBuilder
}

// Consumer uses sarama to consume messages from kafka and handle
Expand All @@ -50,23 +44,21 @@ type Consumer struct {
close chan struct{}
isClosed sync.WaitGroup

SaramaConsumer
config.Consumer
}

// New is a constructor for a Consumer
func New(params Params) (Consumer, error) {
saramaConfig := sc.NewConfig()
saramaConfig.Group.Mode = sc.ConsumerModePartitions
saramaConsumer, err := sc.NewConsumer(params.Options.Brokers, params.Options.GroupID, []string{params.Options.Topic}, saramaConfig)
func New(params Params) (*Consumer, error) {
saramaConsumer, err := params.ConsumerBuilder.NewConsumer()
if err != nil {
return Consumer{}, err
return nil, err
}
return Consumer{
return &Consumer{
metricsFactory: params.Factory,
logger: params.Logger,
close: make(chan struct{}, 1),
isClosed: sync.WaitGroup{},
SaramaConsumer: saramaConsumer,
Consumer: saramaConsumer,
processorFactory: processorFactory{
topic: params.Options.Topic,
consumer: saramaConsumer,
Expand All @@ -89,7 +81,7 @@ func (c *Consumer) Start() {
func (c *Consumer) Close() error {
close(c.close)
c.isClosed.Wait()
return c.SaramaConsumer.Close()
return c.Consumer.Close()
}

func (c *Consumer) mainLoop() {
Expand Down
50 changes: 42 additions & 8 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,61 @@ import (
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/jaeger-lib/metrics/testutils"
"go.uber.org/zap"

kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks"
pmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks"
"github.com/jaegertracing/jaeger/pkg/kafka/config"
)

//go:generate mockery -name SaramaConsumer
//go:generate mockery -dir ../../../../pkg/kafka/config/ -name Consumer
//go:generate mockery -dir ../../../../../vendor/github.com/bsm/sarama-cluster/ -name PartitionConsumer

type consumerTest struct {
saramaConsumer *kmocks.SaramaConsumer
saramaConsumer *kmocks.Consumer
consumer *Consumer
partitionConsumer *kmocks.PartitionConsumer
}

type mockConsumerConfiguration struct {
config.ConsumerConfiguration
err error
}

func (m *mockConsumerConfiguration) NewConsumer() (config.Consumer, error) {
return &kmocks.Consumer{}, m.err
}

func TestConstructor(t *testing.T) {
params := Params{
Options: Options{
Parallelism: 1,
ConsumerConfiguration: config.ConsumerConfiguration{
Brokers: []string{"someBroker"},
Topic: "someTopic",
GroupID: "someGroup",
},
},
}
params.ConsumerBuilder = &mockConsumerConfiguration{}
consumer, err := New(params)
assert.NoError(t, err)
assert.NotNil(t, consumer)
assert.NotNil(t, consumer.processorFactory)

params.ConsumerBuilder = &mockConsumerConfiguration{
err: errors.New("consumerBuilder error"),
}
_, err = New(params)
assert.Error(t, err, "consumerBuilder error")
}

func withWrappedConsumer(fn func(c *consumerTest)) {
sc := &kmocks.SaramaConsumer{}
sc := &kmocks.Consumer{}
logger, _ := zap.NewDevelopment()
metricsFactory := metrics.NewLocalFactory(0)
c := &consumerTest{
Expand All @@ -51,13 +86,13 @@ func withWrappedConsumer(fn func(c *consumerTest)) {
logger: logger,
close: make(chan struct{}),
isClosed: sync.WaitGroup{},
SaramaConsumer: sc,
Consumer: sc,
processorFactory: processorFactory{
topic: "topic",
consumer: sc,
metricsFactory: metricsFactory,
logger: logger,
baseProcessor: &mocks.SpanProcessor{},
baseProcessor: &pmocks.SpanProcessor{},
parallelism: 1,
},
},
Expand All @@ -74,7 +109,6 @@ func withWrappedConsumer(fn func(c *consumerTest)) {
}

func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) {

withWrappedConsumer(func(c *consumerTest) {
topic := "morekuzambu"
partition := int32(316)
Expand Down Expand Up @@ -102,7 +136,7 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
c.partitionConsumer.On("HighWaterMarkOffset").Return(int64(1234))
c.partitionConsumer.On("Close").Return(nil)

mp := &mocks.SpanProcessor{}
mp := &pmocks.SpanProcessor{}
mp.On("Process", &saramaMessageWrapper{msg}).Return(nil)
c.consumer.processorFactory.baseProcessor = mp

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions cmd/ingester/app/consumer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"strings"

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/kafka/config"
)

const (
Expand All @@ -37,14 +39,12 @@ const (

// Options stores the configuration options for a Kafka consumer
type Options struct {
Topic string
GroupID string
Brokers []string
config.ConsumerConfiguration
Parallelism int
}

// AddFlags adds flags for Options
func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+suffixBrokers,
defaultBroker,
Expand Down
4 changes: 2 additions & 2 deletions cmd/ingester/app/consumer/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func TestOptionsWithFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--ingester-consumer.topic=topic1",
"--ingester-consumer.brokers=127.0.0.1:9092,0.0.0:1234",
Expand All @@ -40,7 +40,7 @@ func TestOptionsWithFlags(t *testing.T) {

func TestFlagDefaults(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{})
opts.InitFromViper(v)

Expand Down
3 changes: 2 additions & 1 deletion cmd/ingester/app/consumer/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (
"github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator"
"github.com/jaegertracing/jaeger/pkg/kafka/config"
)

type processorFactory struct {
topic string
consumer SaramaConsumer
consumer config.Consumer
metricsFactory metrics.Factory
logger *zap.Logger
baseProcessor processor.SpanProcessor
Expand Down
2 changes: 1 addition & 1 deletion cmd/ingester/app/consumer/processor_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func Test_new(t *testing.T) {

mockConsumer := &kmocks.SaramaConsumer{}
mockConsumer := &kmocks.Consumer{}
mockConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

topic := "coelacanth"
Expand Down
38 changes: 34 additions & 4 deletions pkg/kafka/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,52 @@
package config

import (
"io"

"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
)

// Configuration describes the configuration properties needed to create a Kafka producer
type Configuration struct {
Brokers []string
// Consumer is an interface to features of Sarama that are necessary for the consumer
type Consumer interface {
Partitions() <-chan sc.PartitionConsumer
MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
io.Closer
}

// ProducerBuilder builds a new kafka producer
type ProducerBuilder interface {
NewProducer() (sarama.AsyncProducer, error)
}

// ConsumerBuilder builds a new kafka producer
type ConsumerBuilder interface {
NewConsumer() (Consumer, error)
}

// ProducerConfiguration describes the configuration properties needed to create a Kafka producer
type ProducerConfiguration struct {
Brokers []string
}

// NewProducer creates a new asynchronous kafka producer
func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
func (c *ProducerConfiguration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
return sarama.NewAsyncProducer(c.Brokers, saramaConfig)
}

// ConsumerConfiguration describes the configuration properties needed to create a Kafka consumer
type ConsumerConfiguration struct {
Brokers []string
Topic string
GroupID string
Consumer
}

// NewConsumer creates a new kafka consumer
func (c *ConsumerConfiguration) NewConsumer() (Consumer, error) {
saramaConfig := sc.NewConfig()
saramaConfig.Group.Mode = sc.ConsumerModePartitions
return sc.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
}
2 changes: 1 addition & 1 deletion plugin/storage/kafka/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
var _ storage.Factory = new(Factory)

type mockProducerBuilder struct {
kafkaConfig.Configuration
kafkaConfig.ProducerConfiguration
err error
t *testing.T
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (

// Options stores the configuration options for Kafka
type Options struct {
config config.Configuration
config config.ProducerConfiguration
topic string
encoding string
}
Expand All @@ -64,7 +64,7 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
opt.config = config.Configuration{
opt.config = config.ProducerConfiguration{
Brokers: strings.Split(v.GetString(configPrefix+suffixBrokers), ","),
}
opt.topic = v.GetString(configPrefix + suffixTopic)
Expand Down

0 comments on commit 3ddb0ce

Please sign in to comment.