Skip to content

Commit

Permalink
- add the ability to accept partial spans that will be upserted in pe…
Browse files Browse the repository at this point in the history
…rsistence

See jaegertracing#729 for details on the concept

Signed-off-by: Martin Foerster <martin@atroo.de>
  • Loading branch information
phal0r committed Mar 8, 2018
1 parent dbd5db7 commit 368dbd6
Show file tree
Hide file tree
Showing 21 changed files with 224 additions and 20 deletions.
15 changes: 10 additions & 5 deletions model/converter/json/fixtures/ui_01.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
}
],
"processID": "p1",
"warnings": null
"warnings": null,
"incomplete": false
},
{
"traceID": "1",
Expand Down Expand Up @@ -70,7 +71,8 @@
],
"logs": [],
"processID": "p1",
"warnings": null
"warnings": null,
"incomplete": false
},
{
"traceID": "1",
Expand All @@ -88,7 +90,8 @@
"tags": [],
"logs": [],
"processID": "p2",
"warnings": null
"warnings": null,
"incomplete": false
},
{
"traceID": "1",
Expand Down Expand Up @@ -118,7 +121,8 @@
"processID": "p2",
"warnings": [
"some span warning"
]
],
"incomplete": false
},
{
"traceID": "1",
Expand All @@ -138,7 +142,8 @@
"processID": "p2",
"warnings": [
"some span warning"
]
],
"incomplete": false
}
],
"processes": {
Expand Down
1 change: 1 addition & 0 deletions model/converter/json/to_domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (td toDomain) spanToDomain(dbSpan *json.Span) (*model.Span, error) {
Tags: tags,
Logs: logs,
Process: process,
Incomplete: dbSpan.Incomplete,
}
return span, nil
}
Expand Down
1 change: 1 addition & 0 deletions model/converter/thrift/jaeger/fixtures/model_01.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"operationName":"get",
"startTime":"2017-01-26T16:46:31.639875-05:00",
"duration":22938000,
"type": "jaeger",
"tags":[
{
"key":"http.url",
Expand Down
3 changes: 2 additions & 1 deletion model/converter/thrift/jaeger/fixtures/model_02.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"operationName":"get",
"startTime":"2017-01-26T16:46:31.639875-05:00",
"duration":22938000,
"type": "jaeger",
"tags":[
{
"key":"peer.service",
Expand All @@ -17,4 +18,4 @@
"serviceName":"api"
}
}
]
]
2 changes: 2 additions & 0 deletions model/converter/thrift/jaeger/fixtures/model_03.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"operationName":"get",
"startTime":"2017-01-26T16:46:31.639875-05:00",
"duration":22938000,
"type": "jaeger",
"tags":[
{
"key":"http.url",
Expand Down Expand Up @@ -97,6 +98,7 @@
"operationName":"get",
"startTime":"2017-01-26T16:46:31.639875-05:00",
"duration":22938000,
"type": "jaeger",
"references": [
{
"refType": "child-of",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"operationName": "get",
"startTime": 1485467191639875,
"duration": 22938,
"type": "jaeger",
"tags": [
{
"key": "http.url",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"operationName": "get",
"startTime": 1485467191639875,
"duration": 22938,
"type": "jaeger",
"tags": [
{
"key": "peer.service",
Expand Down
2 changes: 2 additions & 0 deletions model/converter/thrift/jaeger/to_domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func (td toDomain) transformSpan(jSpan *jaeger.Span, mProcess *model.Process) *m
Tags: tags,
Logs: td.getLogs(jSpan.Logs),
Process: mProcess,
Incomplete: jSpan.Incomplete,
Type: model.JaegerSpanType,
}
}

Expand Down
3 changes: 3 additions & 0 deletions model/converter/thrift/zipkin/fixtures/jaeger_01.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"spanID": "2",
"operationName": "test-general-conversion",
"startTime": "2017-01-26T16:46:31.639875-05:00",
"type": "zipkin",
"process": {
"serviceName": "service-x"
},
Expand Down Expand Up @@ -34,6 +35,7 @@
"spanID": "2",
"operationName": "some-operation",
"startTime": "1970-01-01T00:00:00-00:00",
"type": "zipkin",
"tags": [
{
"key": "peer.service",
Expand Down Expand Up @@ -66,6 +68,7 @@
"parentSpanID": "2",
"operationName": "some-operation",
"startTime": "1970-01-01T00:00:00-00:00",
"type": "zipkin",
"tags": [
{
"key": "peer.service",
Expand Down
1 change: 1 addition & 0 deletions model/converter/thrift/zipkin/fixtures/jaeger_02.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"spanID": "2",
"operationName": "test-process-tags",
"startTime": "1970-01-01T00:00:00-00:00",
"type": "zipkin",
"process": {
"serviceName": "service-x",
"tags": [
Expand Down
1 change: 1 addition & 0 deletions model/converter/thrift/zipkin/fixtures/jaeger_03.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"flags": 2,
"operationName": "test-custom-tags",
"startTime": "1970-01-01T00:00:00-00:00",
"type": "zipkin",
"process": {
"serviceName": "service-x"
},
Expand Down
1 change: 1 addition & 0 deletions model/converter/thrift/zipkin/to_domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (td toDomain) transformSpan(zSpan *zipkincore.Span) []*model.Span {
Duration: model.MicrosecondsAsDuration(uint64(zSpan.GetDuration())),
Tags: tags,
Logs: td.getLogs(zSpan.Annotations),
Type: model.ZipkinSpanType,
}}

cs := td.findAnnotation(zSpan, zipkincore.CLIENT_SEND)
Expand Down
10 changes: 5 additions & 5 deletions model/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type Hashable interface {

// HashCode calcualtes a FNV-1a hash code for a Hashable object.
func HashCode(o Hashable) (uint64, error) {
h := fnv.New64a()
if err := o.Hash(h); err != nil {
return 0, err
}
return h.Sum64(), nil
h := fnv.New64a()
if err := o.Hash(h); err != nil {
return 0, err
}
return h.Sum64(), nil
}
105 changes: 105 additions & 0 deletions model/json/fixture-actual.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
{
"traceID": "abc0",
"spans": [
{
"traceID": "abc0",
"spanID": "abc0",
"operationName": "root-span",
"references": null,
"startTime": 1000,
"duration": 500,
"tags": null,
"logs": null,
"processID": "p1",
"warnings": null,
"incomplete": false
},
{
"traceID": "abc0",
"spanID": "123",
"operationName": "span1",
"references": [
{
"refType": "CHILD_OF",
"traceID": "abc0",
"spanID": "abc0"
}
],
"startTime": 1000,
"duration": 500,
"tags": [
{
"key": "error",
"type": "bool",
"value": true
},
{
"key": "int64",
"type": "int64",
"value": 123
},
{
"key": "float64",
"type": "float64",
"value": 123.567
},
{
"key": "binary",
"type": "binary",
"value": "AQ=="
}
],
"logs": [
{
"timestamp": 1400,
"fields": [
{
"key": "error",
"type": "string",
"value": "something bad happened"
}
]
}
],
"processID": "p2",
"warnings": null,
"incomplete": false
},
{
"traceID": "abc0",
"spanID": "567",
"operationName": "span2",
"references": [
{
"refType": "FOLLOWS_FROM",
"traceID": "abc0",
"spanID": "abc0"
}
],
"startTime": 1000,
"duration": 500,
"tags": null,
"logs": null,
"processID": "p2",
"warnings": null,
"incomplete": false
}
],
"processes": {
"p1": {
"serviceName": "service_1",
"tags": null
},
"p2": {
"serviceName": "service_2",
"tags": [
{
"key": "host",
"type": "string",
"value": "google.com"
}
]
}
},
"warnings": null
}
9 changes: 6 additions & 3 deletions model/json/fixture.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
"tags": null,
"logs": null,
"processID": "p1",
"warnings": null
"warnings": null,
"incomplete": false
},
{
"traceID": "abc0",
Expand Down Expand Up @@ -61,7 +62,8 @@
}
],
"processID": "p2",
"warnings": null
"warnings": null,
"incomplete": false
},
{
"traceID": "abc0",
Expand All @@ -79,7 +81,8 @@
"tags": null,
"logs": null,
"processID": "p2",
"warnings": null
"warnings": null,
"incomplete": false
}
],
"processes": {
Expand Down
1 change: 1 addition & 0 deletions model/json/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Span struct {
ProcessID ProcessID `json:"processID"`
Process *Process `json:"process,omitempty"`
Warnings []string `json:"warnings"`
Incomplete bool `json:"incomplete"`
}

// Reference is a reference from one span to another
Expand Down
20 changes: 19 additions & 1 deletion model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
sampledFlag = Flags(1)
// debugFlag is the bit set in Flags in order to define a span as a debug span
debugFlag = Flags(2)

JaegerSpanType string = "jaeger"
ZipkinSpanType string = "zipkin"
)

// TraceID is a random 128bit identifier for a trace
Expand Down Expand Up @@ -58,14 +61,29 @@ type Span struct {
Logs []Log `json:"logs,omitempty"`
Process *Process `json:"process"`
Warnings []string `json:"warnings,omitempty"`
Incomplete bool `json:"incomplete"`
Type string `json:"type,omitempty"`
}

type JaegerSpanHash struct {
TraceID TraceID
SpanID SpanID
}

// Hash implements Hash from Hashable.
func (s *Span) Hash(w io.Writer) (err error) {
// gob is not the most efficient way, but it ensures we don't miss any fields.
// See BenchmarkSpanHash in span_test.go
enc := gob.NewEncoder(w)
return enc.Encode(s)

//with supporting incomplete spans the hash needs to be unique for a combination
//of spanId and traceId, so the final representation of the same span
//will overwrite the incomplete version in cassandra
if s.Type == JaegerSpanType {
return enc.Encode(JaegerSpanHash{s.TraceID, s.SpanID})
} else {
return enc.Encode(s)
}
}

// HasSpanKind returns true if the span has a `span.kind` tag set to `kind`.
Expand Down
11 changes: 8 additions & 3 deletions plugin/storage/es/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *SpanWriter) WriteSpan(span *model.Span) error {
if err := s.createIndex(spanIndexName, spanMapping, jsonSpan); err != nil {
return err
}
return s.writeSpan(spanIndexName, jsonSpan)
return s.writeSpan(spanIndexName, jsonSpan, span.Type)
}

func indexNames(span *model.Span) (string, string) {
Expand Down Expand Up @@ -171,10 +171,15 @@ func (s *SpanWriter) writeService(indexName string, jsonSpan *jModel.Span) error
return s.serviceWriter(indexName, jsonSpan)
}

func (s *SpanWriter) writeSpan(indexName string, jsonSpan *jModel.Span) error {
func (s *SpanWriter) writeSpan(indexName string, jsonSpan *jModel.Span, jaegerSpanType string) error {
start := time.Now()
elasticSpan := Span{Span: jsonSpan, StartTimeMillis: jsonSpan.StartTime / 1000} // Microseconds to milliseconds
_, err := s.client.Index().Index(indexName).Type(spanType).BodyJson(&elasticSpan).Do(s.ctx)
var err error
if jaegerSpanType == model.JaegerSpanType {
_, err = s.client.Index().Index(indexName).Type(spanType).Id(string(jsonSpan.TraceID) + string(jsonSpan.SpanID)).BodyJson(&elasticSpan).Do(s.ctx)
} else {
_, err = s.client.Index().Index(indexName).Type(spanType).BodyJson(&elasticSpan).Do(s.ctx)
}
s.writerMetrics.spans.Emit(err, time.Since(start))
if err != nil {
return s.logError(jsonSpan, err, "Failed to insert span", s.logger)
Expand Down
Loading

0 comments on commit 368dbd6

Please sign in to comment.