Skip to content

Commit

Permalink
Add ingester consumer (#942)
Browse files Browse the repository at this point in the history
- Export consumer to be initialized and started in a main file
- Add Builder to pkg/kafka/consumer
  • Loading branch information
davit-y authored and vprithvi committed Jul 26, 2018
1 parent 8f7e497 commit d1c2da6
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 67 deletions.
90 changes: 54 additions & 36 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package consumer

import (
"io"
"sync"

"github.com/Shopify/sarama"
Expand All @@ -24,47 +23,72 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
)

type consumer struct {
metricsFactory metrics.Factory
logger *zap.Logger
processorFactory processorFactory
// Params are the parameters of a Consumer
type Params struct {
ProcessorFactory ProcessorFactory
Factory metrics.Factory
Logger *zap.Logger
InternalConsumer consumer.Consumer
}

// Consumer uses sarama to consume and handle messages from kafka
type Consumer struct {
metricsFactory metrics.Factory
logger *zap.Logger

internalConsumer consumer.Consumer
processorFactory ProcessorFactory

close chan struct{}
isClosed sync.WaitGroup

SaramaConsumer
}

// SaramaConsumer is an interface to features of Sarama that we use
type SaramaConsumer interface {
Partitions() <-chan sc.PartitionConsumer
MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
io.Closer
// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
return &Consumer{
metricsFactory: params.Factory,
logger: params.Logger,
close: make(chan struct{}, 1),
isClosed: sync.WaitGroup{},
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
}, nil
}

func (c *consumer) mainLoop() {
// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.isClosed.Add(1)
c.logger.Info("Starting main loop")
go func() {
for {
select {
case pc := <-c.Partitions():
c.isClosed.Add(2)

go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())

case <-c.close:
c.isClosed.Done()
return
}
go c.mainLoop()
}

// Close closes the Consumer and underlying sarama consumer
func (c *Consumer) Close() error {
close(c.close)
c.isClosed.Wait()
return c.internalConsumer.Close()
}

func (c *Consumer) mainLoop() {
for {
select {
case pc := <-c.internalConsumer.Partitions():
c.isClosed.Add(2)

go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())

case <-c.close:
c.isClosed.Done()
return
}
}()
}
}

func (c *consumer) handleMessages(pc sc.PartitionConsumer) {
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler")
defer c.isClosed.Done()
defer c.closePartition(pc)
Expand All @@ -87,13 +111,13 @@ func (c *consumer) handleMessages(pc sc.PartitionConsumer) {
}
}

func (c *consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
}

func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
c.logger.Info("Starting error handler")
defer c.isClosed.Done()

Expand All @@ -103,9 +127,3 @@ func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.Consumer
c.logger.Error("Error consuming from Kafka", zap.Error(err))
}
}

func (c *consumer) Close() error {
close(c.close)
c.isClosed.Wait()
return c.SaramaConsumer.Close()
}
4 changes: 2 additions & 2 deletions cmd/ingester/app/consumer/consumer_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type errMetrics struct {
errCounter metrics.Counter
}

func (c *consumer) newMsgMetrics(partition int32) msgMetrics {
func (c *Consumer) newMsgMetrics(partition int32) msgMetrics {
f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return msgMetrics{
counter: f.Counter("messages", nil),
Expand All @@ -39,7 +39,7 @@ func (c *consumer) newMsgMetrics(partition int32) msgMetrics {
}
}

func (c *consumer) newErrMetrics(partition int32) errMetrics {
func (c *Consumer) newErrMetrics(partition int32) errMetrics {
f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return errMetrics{errCounter: f.Counter("errors", nil)}

Expand Down
42 changes: 24 additions & 18 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,49 @@ import (
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/jaeger-lib/metrics/testutils"
"go.uber.org/zap"

kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks"
pmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks"
)

//go:generate mockery -name SaramaConsumer
//go:generate mockery -dir ../../../../pkg/kafka/config/ -name Consumer
//go:generate mockery -dir ../../../../../vendor/github.com/bsm/sarama-cluster/ -name PartitionConsumer

type consumerTest struct {
saramaConsumer *kmocks.SaramaConsumer
consumer *consumer
saramaConsumer *kmocks.Consumer
consumer *Consumer
partitionConsumer *kmocks.PartitionConsumer
}

func TestConstructor(t *testing.T) {
newConsumer, err := New(Params{})
assert.NoError(t, err)
assert.NotNil(t, newConsumer)
}

func withWrappedConsumer(fn func(c *consumerTest)) {
sc := &kmocks.SaramaConsumer{}
sc := &kmocks.Consumer{}
logger, _ := zap.NewDevelopment()
metricsFactory := metrics.NewLocalFactory(0)
c := &consumerTest{
saramaConsumer: sc,
consumer: &consumer{
metricsFactory: metricsFactory,
logger: logger,
close: make(chan struct{}),
isClosed: sync.WaitGroup{},
SaramaConsumer: sc,
processorFactory: processorFactory{
consumer: &Consumer{
metricsFactory: metricsFactory,
logger: logger,
close: make(chan struct{}),
isClosed: sync.WaitGroup{},
internalConsumer: sc,
processorFactory: ProcessorFactory{
topic: "topic",
consumer: sc,
metricsFactory: metricsFactory,
logger: logger,
baseProcessor: &mocks.SpanProcessor{},
baseProcessor: &pmocks.SpanProcessor{},
parallelism: 1,
},
},
Expand All @@ -74,15 +81,14 @@ func withWrappedConsumer(fn func(c *consumerTest)) {
}

func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) {

withWrappedConsumer(func(c *consumerTest) {
topic := "morekuzambu"
partition := int32(316)
offset := int64(1111110111111)
metadata := "meatbag"
c.saramaConsumer.On("MarkPartitionOffset", topic, partition, offset, metadata).Return()

c.consumer.MarkPartitionOffset(topic, partition, offset, metadata)
c.saramaConsumer.MarkPartitionOffset(topic, partition, offset, metadata)

c.saramaConsumer.AssertCalled(t, "MarkPartitionOffset", topic, partition, offset, metadata)
})
Expand All @@ -102,11 +108,11 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
c.partitionConsumer.On("HighWaterMarkOffset").Return(int64(1234))
c.partitionConsumer.On("Close").Return(nil)

mp := &mocks.SpanProcessor{}
mp := &pmocks.SpanProcessor{}
mp.On("Process", &saramaMessageWrapper{msg}).Return(nil)
c.consumer.processorFactory.baseProcessor = mp

c.consumer.mainLoop()
c.consumer.Start()
time.Sleep(100 * time.Millisecond)
close(msgCh)
close(errCh)
Expand Down Expand Up @@ -149,7 +155,7 @@ func TestSaramaConsumerWrapper_start_Errors(t *testing.T) {
c.partitionConsumer.On("Messages").Return((<-chan *sarama.ConsumerMessage)(msgCh))
c.partitionConsumer.On("Close").Return(nil)

c.consumer.mainLoop()
c.consumer.Start()
time.Sleep(100 * time.Millisecond)
close(msgCh)
close(errCh)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 27 additions & 3 deletions cmd/ingester/app/consumer/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,42 @@ import (
"github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator"
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
)

type processorFactory struct {
// ProcessorFactoryParams are the parameters of a ProcessorFactory
type ProcessorFactoryParams struct {
Parallelism int
Topic string
BaseProcessor processor.SpanProcessor
SaramaConsumer consumer.Consumer
Factory metrics.Factory
Logger *zap.Logger
}

// ProcessorFactory is a factory for creating startedProcessors
type ProcessorFactory struct {
topic string
consumer SaramaConsumer
consumer consumer.Consumer
metricsFactory metrics.Factory
logger *zap.Logger
baseProcessor processor.SpanProcessor
parallelism int
}

func (c *processorFactory) new(partition int32, minOffset int64) processor.SpanProcessor {
// NewProcessorFactory constructs a new ProcessorFactory
func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, error) {
return &ProcessorFactory{
topic: params.Topic,
consumer: params.SaramaConsumer,
metricsFactory: params.Factory,
logger: params.Logger,
baseProcessor: params.BaseProcessor,
parallelism: params.Parallelism,
}, nil
}

func (c *ProcessorFactory) new(partition int32, minOffset int64) processor.SpanProcessor {
c.logger.Info("Creating new processors", zap.Int32("partition", partition))

markOffset := func(offset int64) {
Expand Down
11 changes: 9 additions & 2 deletions cmd/ingester/app/consumer/processor_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,23 @@ import (
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks"
)

func Test_NewFactory(t *testing.T) {
params := ProcessorFactoryParams{}
newFactory, err := NewProcessorFactory(params)
assert.NoError(t, err)
assert.NotNil(t, newFactory)
}

func Test_new(t *testing.T) {

mockConsumer := &kmocks.SaramaConsumer{}
mockConsumer := &kmocks.Consumer{}
mockConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

topic := "coelacanth"
partition := int32(21)
offset := int64(555)

pf := processorFactory{
pf := ProcessorFactory{
topic: topic,
consumer: mockConsumer,
metricsFactory: metrics.NullFactory,
Expand Down
Loading

0 comments on commit d1c2da6

Please sign in to comment.