Skip to content

Commit

Permalink
Configure consumer with viper
Browse files Browse the repository at this point in the history
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
  • Loading branch information
Davit Yeghshatyan committed Jul 19, 2018
1 parent b48b81e commit 2f2d3a2
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 15 deletions.
20 changes: 5 additions & 15 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,11 @@ type saramaConsumer interface {
io.Closer
}


// Config stores the configuration for a Consumer
type Config struct {
Topic string `yaml:"topic"`
GroupID string `yaml:"group_id"`
Brokers []string `yaml:"brokers"`
Parallelism int `yaml:"parallelism"`
}

// Params are the parameters of a Consumer
type Params struct {
Config Config
Options Options
Processor processor.SpanProcessor
Factory metrics.Factory `name:"service_metrics"`
Factory metrics.Factory
Logger *zap.Logger
}

Expand All @@ -65,7 +56,7 @@ type Consumer struct {
func New(params Params) (Consumer, error) {
saramaConfig := sc.NewConfig()
saramaConfig.Group.Mode = sc.ConsumerModePartitions
saramaConsumer, err := sc.NewConsumer(params.Config.Brokers, params.Config.GroupID, []string{params.Config.Topic}, saramaConfig)
saramaConsumer, err := sc.NewConsumer(params.Options.Brokers, params.Options.GroupID, []string{params.Options.Topic}, saramaConfig)
if err != nil {
return Consumer{}, err
}
Expand All @@ -76,17 +67,16 @@ func New(params Params) (Consumer, error) {
isClosed: sync.WaitGroup{},
saramaConsumer: saramaConsumer,
processorFactory: processorFactory{
topic: params.Config.Topic,
topic: params.Options.Topic,
consumer: saramaConsumer,
metricsFactory: params.Factory,
logger: params.Logger,
baseProcessor: params.Processor,
parallelism: params.Config.Parallelism,
parallelism: params.Options.Parallelism,
},
}, nil
}


// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.isClosed.Add(1)
Expand Down
58 changes: 58 additions & 0 deletions cmd/ingester/app/consumer/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package consumer

import (
"flag"
"strings"

"github.com/spf13/viper"
"strconv"
)

const (
configPrefix = "ingester-consumer"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixGroupID = ".group-id"
suffixParallelism = ".parallelism"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-ingester-spans"
defaultGroupID = "jaeger-ingester"
defaultParallelism = 1000
)

// Options stores the configuration options for a Kafka consumer
type Options struct {
Topic string
GroupID string
Brokers []string
Parallelism int
}

// AddFlags adds flags for Options
func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+suffixBrokers,
defaultBroker,
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
flagSet.String(
configPrefix+suffixTopic,
defaultTopic,
"The name of the kafka topic to consume from")
flagSet.String(
configPrefix+suffixGroupID,
defaultGroupID,
"The Consumer Group that ingester will be consuming on behalf of")
flagSet.String(
configPrefix+suffixParallelism,
strconv.Itoa(defaultParallelism),
"The number of messages to process in parallel")
}

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
opt.Brokers = strings.Split(v.GetString(configPrefix+suffixBrokers), ",")
opt.Topic = v.GetString(configPrefix + suffixTopic)
opt.GroupID = v.GetString(configPrefix + suffixGroupID)
opt.Parallelism = v.GetInt(configPrefix + suffixParallelism)
}
50 changes: 50 additions & 0 deletions cmd/ingester/app/consumer/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumer

import (
"testing"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/stretchr/testify/assert"
)

func TestOptionsWithFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
"--ingester-consumer.topic=topic1",
"--ingester-consumer.brokers=127.0.0.1:9092,0.0.0:1234",
"--ingester-consumer.group-id=group1",
"--ingester-consumer.parallelism=5"})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.Brokers)
assert.Equal(t, "group1", opts.GroupID)
assert.Equal(t, 5, opts.Parallelism)
}

func TestFlagDefaults(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{})
opts.InitFromViper(v)

assert.Equal(t, defaultTopic, opts.Topic)
assert.Equal(t, []string{defaultBroker}, opts.Brokers)
assert.Equal(t, defaultGroupID, opts.GroupID)
assert.Equal(t, defaultParallelism, opts.Parallelism)
}

0 comments on commit 2f2d3a2

Please sign in to comment.