Skip to content

Commit

Permalink
rename struct member
Browse files Browse the repository at this point in the history
Signed-off-by: SammyOina <sammyoina@gmail.com>
  • Loading branch information
SammyOina committed May 30, 2023
1 parent 543337f commit f7d284e
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions consumers/tracing/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,27 @@ var _ consumers.AsyncConsumer = (*tracingMiddlewareAsync)(nil)
var _ consumers.BlockingConsumer = (*tracingMiddlewareBlock)(nil)

type tracingMiddlewareAsync struct {
consumerAsync consumers.AsyncConsumer
tracer opentracing.Tracer
consumer consumers.AsyncConsumer
tracer opentracing.Tracer
}
type tracingMiddlewareBlock struct {
consumerBlock consumers.BlockingConsumer
tracer opentracing.Tracer
consumer consumers.BlockingConsumer
tracer opentracing.Tracer
}

// NewAsync creates a new traced consumers.AsyncConsumer service
func NewAsync(tracer opentracing.Tracer, consumerAsync consumers.AsyncConsumer) consumers.AsyncConsumer {
return &tracingMiddlewareAsync{
consumerAsync: consumerAsync,
tracer: tracer,
consumer: consumerAsync,
tracer: tracer,
}
}

// NewBlocking creates a new traced consumers.BlockingConsumer service
func NewBlocking(tracer opentracing.Tracer, consumerBlock consumers.BlockingConsumer) consumers.BlockingConsumer {
return &tracingMiddlewareBlock{
consumerBlock: consumerBlock,
tracer: tracer,
consumer: consumerBlock,
tracer: tracer,
}
}

Expand All @@ -48,44 +48,44 @@ func (tm *tracingMiddlewareBlock) ConsumeBlocking(ctx context.Context, messages
var span opentracing.Span
switch m := messages.(type) {
case mfjson.Messages:
if len(m.Data) >= 1 {
if len(m.Data) > 0 {
firstMsg := m.Data[0]
span, ctx = createMessageSpan(ctx, tm.tracer, firstMsg.Channel, firstMsg.Subtopic, firstMsg.Publisher, consumeBlockingOP, len(m.Data))
defer span.Finish()
}
case []senml.Message:
if len(m) >= 1 {
if len(m) > 0 {
firstMsg := m[0]
span, ctx = createMessageSpan(ctx, tm.tracer, firstMsg.Channel, firstMsg.Subtopic, firstMsg.Publisher, consumeBlockingOP, len(m))
defer span.Finish()
}
}
return tm.consumerBlock.ConsumeBlocking(ctx, messages)
return tm.consumer.ConsumeBlocking(ctx, messages)
}

// ConsumeAsync traces consume operations for message/s consumed.
func (tm *tracingMiddlewareAsync) ConsumeAsync(ctx context.Context, messages interface{}) {
var span opentracing.Span
switch m := messages.(type) {
case mfjson.Messages:
if len(m.Data) >= 1 {
if len(m.Data) > 0 {
firstMsg := m.Data[0]
span, ctx = createMessageSpan(ctx, tm.tracer, firstMsg.Channel, firstMsg.Subtopic, firstMsg.Publisher, consumeAsyncOP, len(m.Data))
defer span.Finish()
}
case []senml.Message:
if len(m) >= 1 {
if len(m) > 0 {
firstMsg := m[0]
span, ctx = createMessageSpan(ctx, tm.tracer, firstMsg.Channel, firstMsg.Subtopic, firstMsg.Publisher, consumeAsyncOP, len(m))
defer span.Finish()
}
}
tm.consumerAsync.ConsumeAsync(ctx, messages)
tm.consumer.ConsumeAsync(ctx, messages)
}

// Errors traces async consume errors.
func (tm *tracingMiddlewareAsync) Errors() <-chan error {
return tm.consumerAsync.Errors()
return tm.consumer.Errors()
}

func createMessageSpan(ctx context.Context, tracer opentracing.Tracer, topic, subTopic, publisher, operation string, noMessages int) (opentracing.Span, context.Context) {
Expand Down

0 comments on commit f7d284e

Please sign in to comment.