Skip to content

Commit

Permalink
ProcessPipeline tracing and some tags
Browse files Browse the repository at this point in the history
- Wrap ProcessPipeline
- Add the following tags:
   - peer.address
   - span.kind
  • Loading branch information
ledor473 authored and smacker committed Apr 11, 2019
1 parent c01c8f7 commit 1bbe121
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 66 deletions.
65 changes: 50 additions & 15 deletions otredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,69 @@ package otredis

import (
"context"
"strings"

"github.com/go-redis/redis"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)

// WrapRedisClient adds opentracing measurements for commands and returns cloned client
func WrapRedisClient(ctx context.Context, c *redis.Client) *redis.Client {
func WrapRedisClient(ctx context.Context, client *redis.Client) *redis.Client {
if ctx == nil {
return c
return client
}
parentSpan := opentracing.SpanFromContext(ctx)
if parentSpan == nil {
return c
return client
}

// clone using context
copy := c.WithContext(c.Context())
copy.WrapProcess(func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(cmd redis.Cmder) error {
tr := parentSpan.Tracer()
sp := tr.StartSpan("redis", opentracing.ChildOf(parentSpan.Context()))
ext.DBType.Set(sp, "redis")
sp.SetTag("db.method", cmd.Name())
defer sp.Finish()
ctxClient := client.WithContext(ctx)
opts := ctxClient.Options()
ctxClient.WrapProcess(process(parentSpan, opts))
ctxClient.WrapProcessPipeline(processPipeline(parentSpan, opts))
return ctxClient
}

func process(parentSpan opentracing.Span, opts *redis.Options) func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(cmd redis.Cmder) error {
dbMethod := formatCommandAsDbMethod(cmd)
doSpan(parentSpan, opts, "redis-cmd", dbMethod)
return oldProcess(cmd)
}
})
return copy
}
}

func processPipeline(parentSpan opentracing.Span, opts *redis.Options) func(oldProcess func(cmds []redis.Cmder) error) func(cmds []redis.Cmder) error {
return func(oldProcess func(cmds []redis.Cmder) error) func(cmds []redis.Cmder) error {
return func(cmds []redis.Cmder) error {
dbMethod := formatCommandsAsDbMethods(cmds)
doSpan(parentSpan, opts, "redis-pipeline-cmd", dbMethod)
return oldProcess(cmds)
}
}
}

func formatCommandAsDbMethod(cmd redis.Cmder) string {
return cmd.Name()
}

func formatCommandsAsDbMethods(cmds []redis.Cmder) string {
cmdsAsDbMethods := make([]string, len(cmds))
for i, cmd := range cmds {
dbMethod := formatCommandAsDbMethod(cmd)
cmdsAsDbMethods[i] = dbMethod
}
return strings.Join(cmdsAsDbMethods, " -> ")
}

func doSpan(parentSpan opentracing.Span, opts *redis.Options, operationName, dbMethod string) {
tracer := parentSpan.Tracer()
span := tracer.StartSpan(operationName, opentracing.ChildOf(parentSpan.Context()))
defer span.Finish()
ext.DBType.Set(span, "redis")
ext.PeerAddress.Set(span, opts.Addr)
ext.SpanKind.Set(span, ext.SpanKindEnum("client"))
span.SetTag("db.method", dbMethod)
}
166 changes: 115 additions & 51 deletions otredis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (

"github.com/alicebob/miniredis"
"github.com/go-redis/redis"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/stretchr/testify/assert"
)

var redisAddr string
var client *redis.Client
var tracer *mocktracer.MockTracer

Expand All @@ -21,98 +24,159 @@ func init() {

func TestMain(m *testing.M) {
// in-memory redis
s, err := miniredis.Run()
miniRedis, err := miniredis.Run()
if err != nil {
panic(err)
}
defer s.Close()
defer miniRedis.Close()

redisAddr = miniRedis.Addr()

client = redis.NewClient(&redis.Options{
Addr: s.Addr(),
Addr: redisAddr,
})

os.Exit(m.Run())
}

// SET

func TestSet(t *testing.T) {
ctx := context.Background()
assert := assert.New(t)

span, ctx := opentracing.StartSpanFromContext(ctx, "test-params")
span, ctx := opentracing.StartSpanFromContext(ctx, "test-set")
ctxClient := WrapRedisClient(ctx, client)
callSet(t, ctxClient, "with span")
callSet(assert, ctxClient, "with span")
span.Finish()

spans := tracer.FinishedSpans()
if len(spans) != 2 {
t.Fatalf("should be 2 finished spans but there are %d: %v", len(spans), spans)
}
assert.Len(spans, 2, "the number of finished spans is invalid")

redisSpan := spans[0]
if redisSpan.OperationName != "redis" {
t.Errorf("first span operation should be redis but it's '%s'", redisSpan.OperationName)
}
assert.Equal("redis-cmd", redisSpan.OperationName)

testTags(t, redisSpan, map[string]string{
"db.type": "redis",
"db.method": "set",
})
expectedTags := buildExpectedTags("set")
assertTags(assert, redisSpan, expectedTags)

tracer.Reset()
}

func TestGet(t *testing.T) {
func TestSetPipeline(t *testing.T) {
ctx := context.Background()
assert := assert.New(t)

span, ctx := opentracing.StartSpanFromContext(ctx, "test-params")
span, ctx := opentracing.StartSpanFromContext(ctx, "test-set-pipeline")
ctxClient := WrapRedisClient(ctx, client)
callGet(t, ctxClient)
setPipelineParams := make(map[string]string)
setPipelineParams["foo"] = "with span on foo pipeline"
setPipelineParams["bar"] = "with span on bar pipeline"
callSetPipeline(assert, ctxClient, setPipelineParams)
span.Finish()

spans := tracer.FinishedSpans()
if len(spans) != 2 {
t.Fatalf("should be 2 finished spans but there are %d: %v", len(spans), spans)
}
assert.Len(spans, 2, "the number of finished spans is invalid")

redisSpan := spans[0]
if redisSpan.OperationName != "redis" {
t.Errorf("first span operation should be redis but it's '%s'", redisSpan.OperationName)
}
assert.Equal("redis-pipeline-cmd", redisSpan.OperationName)

testTags(t, redisSpan, map[string]string{
"db.type": "redis",
"db.method": "get",
})
expectedTags := buildExpectedTags("set -> set")
assertTags(assert, redisSpan, expectedTags)

tracer.Reset()
}

func callSet(t *testing.T, c *redis.Client, value string) {
_, err := c.Set("foo", value, 0).Result()
if err != nil {
t.Fatalf("Redis returned error: %v", err)
}
func callSet(assert *assert.Assertions, client *redis.Client, value string) {
_, err := client.Set("foo", value, 0).Result()
assert.Nil(err, "Redis returned error: %v", err)
}

func callGet(t *testing.T, c *redis.Client) {
_, err := c.Get("foo").Result()
if err != nil {
t.Fatalf("Redis returned error: %v", err)
func callSetPipeline(assert *assert.Assertions, client *redis.Client, setPipelineParams map[string]string) {
pipeline := client.Pipeline()
for key, value := range setPipelineParams {
pipeline.Set(key, value, 0)
}
_, err := pipeline.Exec()
assert.Nil(err, "Redis returned error: %v", err)
}

func testTags(t *testing.T, redisSpan *mocktracer.MockSpan, expectedTags map[string]string) {
redisTags := redisSpan.Tags()
if len(redisTags) != len(expectedTags) {
t.Errorf("redis span should have %d tags but it has %d", len(expectedTags), len(redisTags))
// GET

func TestGet(t *testing.T) {
ctx := context.Background()
assert := assert.New(t)

span, ctx := opentracing.StartSpanFromContext(ctx, "test-get")
ctxClient := WrapRedisClient(ctx, client)
callGet(assert, ctxClient)
span.Finish()

spans := tracer.FinishedSpans()
assert.Len(spans, 2, "the number of finished spans is invalid")

redisSpan := spans[0]
assert.Equal("redis-cmd", redisSpan.OperationName)

expectedTags := buildExpectedTags("get")
assertTags(assert, redisSpan, expectedTags)

tracer.Reset()
}

func TestGetPipeline(t *testing.T) {
ctx := context.Background()
assert := assert.New(t)

span, ctx := opentracing.StartSpanFromContext(ctx, "test-get-pipeline")
ctxClient := WrapRedisClient(ctx, client)
getPipelineParams := []string{"foo", "bar"}
callGetPipeline(assert, ctxClient, getPipelineParams)
span.Finish()

spans := tracer.FinishedSpans()
assert.Len(spans, 2, "the number of finished spans is invalid")

redisSpan := spans[0]
assert.Equal("redis-pipeline-cmd", redisSpan.OperationName)

expectedTags := buildExpectedTags("get -> get")
assertTags(assert, redisSpan, expectedTags)

tracer.Reset()
}

func callGet(assert *assert.Assertions, client *redis.Client) {
_, err := client.Get("foo").Result()
assert.Nil(err, "Redis returned error: %v", err)
}

func callGetPipeline(assert *assert.Assertions, client *redis.Client, getPipelineParams []string) {
pipeline := client.Pipeline()
for _, key := range getPipelineParams {
pipeline.Get(key)
}
_, err := pipeline.Exec()
assert.Nil(err, "Redis returned error: %v", err)
}

// MISC

func buildExpectedTags(expectedDbMethod string) map[string]interface{} {
expectedTags := make(map[string]interface{})
expectedTags["db.type"] = "redis"
expectedTags["db.method"] = expectedDbMethod
expectedTags["peer.address"] = redisAddr
expectedTags["span.kind"] = ext.SpanKindEnum("client")
return expectedTags
}

for name, expected := range expectedTags {
value, ok := redisTags[name]
if !ok {
t.Errorf("redis span doesn't have tag '%s'", name)
continue
}
if value != expected {
t.Errorf("redis span tag '%s' should have value '%s' but it has '%s'", name, expected, value)
}
func assertTags(assert *assert.Assertions, redisSpan *mocktracer.MockSpan, expectedTags map[string]interface{}) map[string]interface{} {
actualTags := redisSpan.Tags()
assert.Len(actualTags, len(expectedTags), "redis span tags number is invalid")
for expectedTagKey, expectedTagValue := range expectedTags {
actualTagValue, ok := actualTags[expectedTagKey]
assert.True(ok, "redis span doesn't have tag '%s'", expectedTagKey)
assert.Equal(expectedTagValue, actualTagValue, "redis span tag '%s' is invalid", expectedTagKey)
}
return actualTags
}

0 comments on commit 1bbe121

Please sign in to comment.