Skip to content

Commit

Permalink
Take SaramaConsumer as parameter to Consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
  • Loading branch information
Davit Yeghshatyan committed Jul 25, 2018
1 parent d551944 commit 53b46cf
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 42 deletions.
20 changes: 8 additions & 12 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,29 @@ type Params struct {
ProcessorFactory ProcessorFactory
Factory metrics.Factory
Logger *zap.Logger
consumer.Builder
SaramaConsumer consumer.Consumer
}

// Consumer uses sarama to consume and handle messages from kafka
type Consumer struct {
metricsFactory metrics.Factory
logger *zap.Logger
metricsFactory metrics.Factory
logger *zap.Logger

saramaConsumer consumer.Consumer
processorFactory ProcessorFactory

close chan struct{}
isClosed sync.WaitGroup

consumer.Consumer
}

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
saramaConsumer, err := params.NewConsumer()
if err != nil {
return nil, err
}
return &Consumer{
metricsFactory: params.Factory,
logger: params.Logger,
close: make(chan struct{}, 1),
isClosed: sync.WaitGroup{},
Consumer: saramaConsumer,
saramaConsumer: params.SaramaConsumer,
processorFactory: params.ProcessorFactory,
}, nil
}
Expand All @@ -73,13 +69,13 @@ func (c *Consumer) Start() {
func (c *Consumer) Close() error {
close(c.close)
c.isClosed.Wait()
return c.Consumer.Close()
return c.saramaConsumer.Close()
}

func (c *Consumer) mainLoop() {
for {
select {
case pc := <-c.Partitions():
case pc := <-c.saramaConsumer.Partitions():
c.isClosed.Add(2)

go c.handleMessages(pc)
Expand Down
24 changes: 3 additions & 21 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks"
pmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks"
"github.com/jaegertracing/jaeger/pkg/kafka/config/consumer"
)

//go:generate mockery -dir ../../../../pkg/kafka/config/ -name Consumer
Expand All @@ -42,27 +41,10 @@ type consumerTest struct {
partitionConsumer *kmocks.PartitionConsumer
}

type mockConsumerConfiguration struct {
consumer.Configuration
err error
}

func (m *mockConsumerConfiguration) NewConsumer() (consumer.Consumer, error) {
return &kmocks.Consumer{}, m.err
}

func TestConstructor(t *testing.T) {
params := Params{}
params.Builder = &mockConsumerConfiguration{}
newConsumer, err := New(params)
newConsumer, err := New(Params{})
assert.NoError(t, err)
assert.NotNil(t, newConsumer)

params.Builder = &mockConsumerConfiguration{
err: errors.New("consumerBuilder error"),
}
_, err = New(params)
assert.Error(t, err, "consumerBuilder error")
}

func withWrappedConsumer(fn func(c *consumerTest)) {
Expand All @@ -76,7 +58,7 @@ func withWrappedConsumer(fn func(c *consumerTest)) {
logger: logger,
close: make(chan struct{}),
isClosed: sync.WaitGroup{},
Consumer: sc,
saramaConsumer: sc,
processorFactory: ProcessorFactory{
topic: "topic",
consumer: sc,
Expand Down Expand Up @@ -106,7 +88,7 @@ func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) {
metadata := "meatbag"
c.saramaConsumer.On("MarkPartitionOffset", topic, partition, offset, metadata).Return()

c.consumer.MarkPartitionOffset(topic, partition, offset, metadata)
c.saramaConsumer.MarkPartitionOffset(topic, partition, offset, metadata)

c.saramaConsumer.AssertCalled(t, "MarkPartitionOffset", topic, partition, offset, metadata)
})
Expand Down
18 changes: 9 additions & 9 deletions cmd/ingester/app/consumer/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import (

// FactoryParams are the parameters of a ProcessorFactory
type FactoryParams struct {
Parallelism int
Topic string
BaseProcessor processor.SpanProcessor
Consumer *Consumer
Factory metrics.Factory
Logger *zap.Logger
Parallelism int
Topic string
BaseProcessor processor.SpanProcessor
SaramaConsumer consumer.Consumer
Factory metrics.Factory
Logger *zap.Logger
}

// ProcessorFactory is a factory for creating startedProcessors
Expand All @@ -47,10 +47,10 @@ type ProcessorFactory struct {
}

// NewFactory constructs a new ProcessorFactory
func NewFactory(params FactoryParams) (*ProcessorFactory, error) {
return &ProcessorFactory{
func NewFactory(params FactoryParams) (ProcessorFactory, error) {
return ProcessorFactory{
topic: params.Topic,
consumer: params.Consumer,
consumer: params.SaramaConsumer,
metricsFactory: params.Factory,
logger: params.Logger,
baseProcessor: params.BaseProcessor,
Expand Down

0 comments on commit 53b46cf

Please sign in to comment.