Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester Consumer #942

Merged
merged 12 commits into from
Jul 26, 2018
90 changes: 54 additions & 36 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package consumer

import (
"io"
"sync"

"github.com/Shopify/sarama"
Expand All @@ -24,47 +23,72 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
)

type consumer struct {
metricsFactory metrics.Factory
logger *zap.Logger
processorFactory processorFactory
// Params are the parameters of a Consumer
type Params struct {
ProcessorFactory ProcessorFactory
Factory metrics.Factory
Logger *zap.Logger
InternalConsumer consumer.Consumer
}

// Consumer uses sarama to consume and handle messages from kafka
type Consumer struct {
metricsFactory metrics.Factory
logger *zap.Logger

internalConsumer consumer.Consumer
processorFactory ProcessorFactory

close chan struct{}
isClosed sync.WaitGroup

SaramaConsumer
}

// SaramaConsumer is an interface to features of Sarama that we use
type SaramaConsumer interface {
Partitions() <-chan sc.PartitionConsumer
MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
io.Closer
// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
return &Consumer{
metricsFactory: params.Factory,
logger: params.Logger,
close: make(chan struct{}, 1),
isClosed: sync.WaitGroup{},
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
}, nil
}

func (c *consumer) mainLoop() {
// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.isClosed.Add(1)
c.logger.Info("Starting main loop")
go func() {
for {
select {
case pc := <-c.Partitions():
c.isClosed.Add(2)

go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())

case <-c.close:
c.isClosed.Done()
return
}
go c.mainLoop()
}

// Close closes the Consumer and underlying sarama consumer
func (c *Consumer) Close() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for moving this?

Copy link
Contributor Author

@davit-y davit-y Jul 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the producer PR you recommended this folder structure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mean moving Close from L106-L110 to here.

close(c.close)
c.isClosed.Wait()
return c.internalConsumer.Close()
}

func (c *Consumer) mainLoop() {
for {
select {
case pc := <-c.internalConsumer.Partitions():
c.isClosed.Add(2)

go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())

case <-c.close:
c.isClosed.Done()
return
}
}()
}
}

func (c *consumer) handleMessages(pc sc.PartitionConsumer) {
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler")
defer c.isClosed.Done()
defer c.closePartition(pc)
Expand All @@ -87,13 +111,13 @@ func (c *consumer) handleMessages(pc sc.PartitionConsumer) {
}
}

func (c *consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
}

func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
c.logger.Info("Starting error handler")
defer c.isClosed.Done()

Expand All @@ -103,9 +127,3 @@ func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.Consumer
c.logger.Error("Error consuming from Kafka", zap.Error(err))
}
}

func (c *consumer) Close() error {
close(c.close)
c.isClosed.Wait()
return c.SaramaConsumer.Close()
}
4 changes: 2 additions & 2 deletions cmd/ingester/app/consumer/consumer_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type errMetrics struct {
errCounter metrics.Counter
}

func (c *consumer) newMsgMetrics(partition int32) msgMetrics {
func (c *Consumer) newMsgMetrics(partition int32) msgMetrics {
f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return msgMetrics{
counter: f.Counter("messages", nil),
Expand All @@ -39,7 +39,7 @@ func (c *consumer) newMsgMetrics(partition int32) msgMetrics {
}
}

func (c *consumer) newErrMetrics(partition int32) errMetrics {
func (c *Consumer) newErrMetrics(partition int32) errMetrics {
f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return errMetrics{errCounter: f.Counter("errors", nil)}

Expand Down
42 changes: 24 additions & 18 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,49 @@ import (
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/jaeger-lib/metrics/testutils"
"go.uber.org/zap"

kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks"
pmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks"
)

//go:generate mockery -name SaramaConsumer
//go:generate mockery -dir ../../../../pkg/kafka/config/ -name Consumer
//go:generate mockery -dir ../../../../../vendor/github.com/bsm/sarama-cluster/ -name PartitionConsumer

type consumerTest struct {
saramaConsumer *kmocks.SaramaConsumer
consumer *consumer
saramaConsumer *kmocks.Consumer
consumer *Consumer
partitionConsumer *kmocks.PartitionConsumer
}

func TestConstructor(t *testing.T) {
newConsumer, err := New(Params{})
assert.NoError(t, err)
assert.NotNil(t, newConsumer)
}

func withWrappedConsumer(fn func(c *consumerTest)) {
sc := &kmocks.SaramaConsumer{}
sc := &kmocks.Consumer{}
logger, _ := zap.NewDevelopment()
metricsFactory := metrics.NewLocalFactory(0)
c := &consumerTest{
saramaConsumer: sc,
consumer: &consumer{
metricsFactory: metricsFactory,
logger: logger,
close: make(chan struct{}),
isClosed: sync.WaitGroup{},
SaramaConsumer: sc,
processorFactory: processorFactory{
consumer: &Consumer{
metricsFactory: metricsFactory,
logger: logger,
close: make(chan struct{}),
isClosed: sync.WaitGroup{},
internalConsumer: sc,
processorFactory: ProcessorFactory{
topic: "topic",
consumer: sc,
metricsFactory: metricsFactory,
logger: logger,
baseProcessor: &mocks.SpanProcessor{},
baseProcessor: &pmocks.SpanProcessor{},
parallelism: 1,
},
},
Expand All @@ -74,15 +81,14 @@ func withWrappedConsumer(fn func(c *consumerTest)) {
}

func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) {

withWrappedConsumer(func(c *consumerTest) {
topic := "morekuzambu"
partition := int32(316)
offset := int64(1111110111111)
metadata := "meatbag"
c.saramaConsumer.On("MarkPartitionOffset", topic, partition, offset, metadata).Return()

c.consumer.MarkPartitionOffset(topic, partition, offset, metadata)
c.saramaConsumer.MarkPartitionOffset(topic, partition, offset, metadata)

c.saramaConsumer.AssertCalled(t, "MarkPartitionOffset", topic, partition, offset, metadata)
})
Expand All @@ -102,11 +108,11 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
c.partitionConsumer.On("HighWaterMarkOffset").Return(int64(1234))
c.partitionConsumer.On("Close").Return(nil)

mp := &mocks.SpanProcessor{}
mp := &pmocks.SpanProcessor{}
mp.On("Process", &saramaMessageWrapper{msg}).Return(nil)
c.consumer.processorFactory.baseProcessor = mp

c.consumer.mainLoop()
c.consumer.Start()
time.Sleep(100 * time.Millisecond)
close(msgCh)
close(errCh)
Expand Down Expand Up @@ -149,7 +155,7 @@ func TestSaramaConsumerWrapper_start_Errors(t *testing.T) {
c.partitionConsumer.On("Messages").Return((<-chan *sarama.ConsumerMessage)(msgCh))
c.partitionConsumer.On("Close").Return(nil)

c.consumer.mainLoop()
c.consumer.Start()
time.Sleep(100 * time.Millisecond)
close(msgCh)
close(errCh)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 27 additions & 3 deletions cmd/ingester/app/consumer/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,42 @@ import (
"github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator"
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
)

type processorFactory struct {
// ProcessorFactoryParams are the parameters of a ProcessorFactory
type ProcessorFactoryParams struct {
Parallelism int
Topic string
BaseProcessor processor.SpanProcessor
SaramaConsumer consumer.Consumer
Factory metrics.Factory
Logger *zap.Logger
}

// ProcessorFactory is a factory for creating startedProcessors
type ProcessorFactory struct {
topic string
consumer SaramaConsumer
consumer consumer.Consumer
metricsFactory metrics.Factory
logger *zap.Logger
baseProcessor processor.SpanProcessor
parallelism int
}

func (c *processorFactory) new(partition int32, minOffset int64) processor.SpanProcessor {
// NewProcessorFactory constructs a new ProcessorFactory
func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, error) {
return &ProcessorFactory{
topic: params.Topic,
consumer: params.SaramaConsumer,
metricsFactory: params.Factory,
logger: params.Logger,
baseProcessor: params.BaseProcessor,
parallelism: params.Parallelism,
}, nil
}

func (c *ProcessorFactory) new(partition int32, minOffset int64) processor.SpanProcessor {
c.logger.Info("Creating new processors", zap.Int32("partition", partition))

markOffset := func(offset int64) {
Expand Down
11 changes: 9 additions & 2 deletions cmd/ingester/app/consumer/processor_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,23 @@ import (
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks"
)

func Test_NewFactory(t *testing.T) {
params := ProcessorFactoryParams{}
newFactory, err := NewProcessorFactory(params)
assert.NoError(t, err)
assert.NotNil(t, newFactory)
}

func Test_new(t *testing.T) {

mockConsumer := &kmocks.SaramaConsumer{}
mockConsumer := &kmocks.Consumer{}
mockConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

topic := "coelacanth"
partition := int32(21)
offset := int64(555)

pf := processorFactory{
pf := ProcessorFactory{
topic: topic,
consumer: mockConsumer,
metricsFactory: metrics.NullFactory,
Expand Down
Loading