Skip to content

Commit

Permalink
Rename structs
Browse files Browse the repository at this point in the history
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
  • Loading branch information
Davit Yeghshatyan committed Jul 26, 2018
1 parent 63b6772 commit 10eda27
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 17 deletions.
10 changes: 5 additions & 5 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ type Params struct {
ProcessorFactory ProcessorFactory
Factory metrics.Factory
Logger *zap.Logger
SaramaConsumer consumer.Consumer
InternalConsumer consumer.Consumer
}

// Consumer uses sarama to consume and handle messages from kafka
type Consumer struct {
metricsFactory metrics.Factory
logger *zap.Logger

saramaConsumer consumer.Consumer
internalConsumer consumer.Consumer
processorFactory ProcessorFactory

close chan struct{}
Expand All @@ -53,7 +53,7 @@ func New(params Params) (*Consumer, error) {
logger: params.Logger,
close: make(chan struct{}, 1),
isClosed: sync.WaitGroup{},
saramaConsumer: params.SaramaConsumer,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
}, nil
}
Expand All @@ -69,13 +69,13 @@ func (c *Consumer) Start() {
func (c *Consumer) Close() error {
close(c.close)
c.isClosed.Wait()
return c.saramaConsumer.Close()
return c.internalConsumer.Close()
}

func (c *Consumer) mainLoop() {
for {
select {
case pc := <-c.saramaConsumer.Partitions():
case pc := <-c.internalConsumer.Partitions():
c.isClosed.Add(2)

go c.handleMessages(pc)
Expand Down
10 changes: 5 additions & 5 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func withWrappedConsumer(fn func(c *consumerTest)) {
c := &consumerTest{
saramaConsumer: sc,
consumer: &Consumer{
metricsFactory: metricsFactory,
logger: logger,
close: make(chan struct{}),
isClosed: sync.WaitGroup{},
saramaConsumer: sc,
metricsFactory: metricsFactory,
logger: logger,
close: make(chan struct{}),
isClosed: sync.WaitGroup{},
internalConsumer: sc,
processorFactory: ProcessorFactory{
topic: "topic",
consumer: sc,
Expand Down
10 changes: 5 additions & 5 deletions cmd/ingester/app/consumer/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
)

// FactoryParams are the parameters of a ProcessorFactory
type FactoryParams struct {
// ProcessorFactoryParams are the parameters of a ProcessorFactory
type ProcessorFactoryParams struct {
Parallelism int
Topic string
BaseProcessor processor.SpanProcessor
Expand All @@ -46,9 +46,9 @@ type ProcessorFactory struct {
parallelism int
}

// NewFactory constructs a new ProcessorFactory
func NewFactory(params FactoryParams) (ProcessorFactory, error) {
return ProcessorFactory{
// NewProcessorFactory constructs a new ProcessorFactory
func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, error) {
return &ProcessorFactory{
topic: params.Topic,
consumer: params.SaramaConsumer,
metricsFactory: params.Factory,
Expand Down
4 changes: 2 additions & 2 deletions cmd/ingester/app/consumer/processor_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

func Test_NewFactory(t *testing.T) {
params := FactoryParams{}
newFactory, err := NewFactory(params)
params := ProcessorFactoryParams{}
newFactory, err := NewProcessorFactory(params)
assert.NoError(t, err)
assert.NotNil(t, newFactory)
}
Expand Down

0 comments on commit 10eda27

Please sign in to comment.