From fda19be2a9d607293e27e71c22da9e166b53ec5a Mon Sep 17 00:00:00 2001 From: Won Jun Jang Date: Fri, 2 Jun 2017 11:36:16 -0400 Subject: [PATCH] Update udpTransport to only send Jaeger.thrift (#147) --- config/config.go | 6 +- jaeger_span.go => jaeger_thrift_span.go | 4 +- ...span_test.go => jaeger_thrift_span_test.go | 10 +- reporter.go | 13 +- reporter_test.go | 125 +++++++------- testutils/mock_agent.go | 43 +++-- testutils/mock_agent_test.go | 31 ++-- thrift-gen/agent/agent.go | 154 ++++++++++++++++++ thrift-gen/jaeger/ttypes.go | 97 +++++++++++ transport/transport.go => transport.go | 6 +- transport/zipkin/http.go | 8 +- transport/zipkin/http_test.go | 6 +- .../udp/transport_udp.go => transport_udp.go | 67 ++++---- ...sport_udp_test.go => transport_udp_test.go | 129 ++++++++++----- utils/udp_client.go | 15 +- thrift_span.go => zipkin_thrift_span.go | 23 ++- ...span_test.go => zipkin_thrift_span_test.go | 12 +- 17 files changed, 527 insertions(+), 222 deletions(-) rename jaeger_span.go => jaeger_thrift_span.go (97%) rename jaeger_span_test.go => jaeger_thrift_span_test.go (98%) rename transport/transport.go => transport.go (94%) rename transport/udp/transport_udp.go => transport_udp.go (61%) rename transport/udp/transport_udp_test.go => transport_udp_test.go (55%) rename thrift_span.go => zipkin_thrift_span.go (97%) rename thrift_span_test.go => zipkin_thrift_span_test.go (97%) diff --git a/config/config.go b/config/config.go index f828ec8c..2da6c936 100644 --- a/config/config.go +++ b/config/config.go @@ -31,8 +31,6 @@ import ( "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/rpcmetrics" - "github.com/uber/jaeger-client-go/transport" - "github.com/uber/jaeger-client-go/transport/udp" ) const defaultSamplingProbability = 0.001 @@ -245,6 +243,6 @@ func (rc *ReporterConfig) NewReporter( return reporter, err } -func (rc *ReporterConfig) newTransport() (transport.Transport, error) { - return udp.NewUDPTransport(rc.LocalAgentHostPort, 0) +func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) { + return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0) } diff --git a/jaeger_span.go b/jaeger_thrift_span.go similarity index 97% rename from jaeger_span.go rename to jaeger_thrift_span.go index 31339ee6..3971516e 100644 --- a/jaeger_span.go +++ b/jaeger_thrift_span.go @@ -29,8 +29,8 @@ import ( "github.com/uber/jaeger-client-go/utils" ) -// buildJaegerSpan builds jaeger span based on internal span. -func buildJaegerSpan(span *Span) *j.Span { +// BuildJaegerThrift builds jaeger span based on internal span. +func BuildJaegerThrift(span *Span) *j.Span { startTime := utils.TimeToMicrosecondsSinceEpochInt64(span.startTime) duration := span.duration.Nanoseconds() / int64(time.Microsecond) jaegerSpan := &j.Span{ diff --git a/jaeger_span_test.go b/jaeger_thrift_span_test.go similarity index 98% rename from jaeger_span_test.go rename to jaeger_thrift_span_test.go index c6faf887..839a2e51 100644 --- a/jaeger_span_test.go +++ b/jaeger_thrift_span_test.go @@ -46,7 +46,7 @@ var ( someSliceString = "[a]" ) -func TestBuildJaegerSpan(t *testing.T) { +func TestBuildJaegerThrift(t *testing.T) { tracer, closer := NewTracer("DOOP", NewConstSampler(true), NewNullReporter()) @@ -60,8 +60,8 @@ func TestBuildJaegerSpan(t *testing.T) { sp2.Finish() sp1.Finish() - jaegerSpan1 := buildJaegerSpan(sp1) - jaegerSpan2 := buildJaegerSpan(sp2) + jaegerSpan1 := BuildJaegerThrift(sp1) + jaegerSpan2 := BuildJaegerThrift(sp2) assert.Equal(t, "sp1", jaegerSpan1.OperationName) assert.Equal(t, "sp2", jaegerSpan2.OperationName) assert.EqualValues(t, 0, jaegerSpan1.ParentSpanId) @@ -235,7 +235,7 @@ func TestBuildLogs(t *testing.T) { } else if test.field != (log.Field{}) { sp.LogFields(test.field) } - jaegerSpan := buildJaegerSpan(sp.(*Span)) + jaegerSpan := BuildJaegerThrift(sp.(*Span)) if test.disableSampling { assert.Equal(t, 0, len(jaegerSpan.Logs), testName) continue @@ -307,7 +307,7 @@ func TestJaegerSpanBaggageLogs(t *testing.T) { ext.SpanKindRPCServer.Set(sp) sp.Finish() - jaegerSpan := buildJaegerSpan(sp) + jaegerSpan := BuildJaegerThrift(sp) require.Len(t, jaegerSpan.Logs, 1) fields := jaegerSpan.Logs[0].Fields require.Len(t, fields, 3) diff --git a/reporter.go b/reporter.go index bf95ecea..d77ece99 100644 --- a/reporter.go +++ b/reporter.go @@ -28,8 +28,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go/log" - "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" - "github.com/uber/jaeger-client-go/transport" ) // Reporter is called by the tracer when a span is completed to report the span to the tracing collector. @@ -166,15 +164,15 @@ const ( type remoteReporter struct { reporterOptions - sender transport.Transport - queue chan *zipkincore.Span + sender Transport + queue chan *Span queueLength int64 // signed because metric's gauge is signed queueDrained sync.WaitGroup flushSignal chan *sync.WaitGroup } // NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender -func NewRemoteReporter(sender transport.Transport, opts ...ReporterOption) Reporter { +func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter { options := reporterOptions{} for _, option := range opts { option(&options) @@ -195,7 +193,7 @@ func NewRemoteReporter(sender transport.Transport, opts ...ReporterOption) Repor reporterOptions: options, sender: sender, flushSignal: make(chan *sync.WaitGroup), - queue: make(chan *zipkincore.Span, options.queueSize), + queue: make(chan *Span, options.queueSize), } go reporter.processQueue() return reporter @@ -204,9 +202,8 @@ func NewRemoteReporter(sender transport.Transport, opts ...ReporterOption) Repor // Report implements Report() method of Reporter. // It passes the span to a background go-routine for submission to Jaeger. func (r *remoteReporter) Report(span *Span) { - thriftSpan := buildThriftSpan(span) select { - case r.queue <- thriftSpan: + case r.queue <- span: atomic.AddInt64(&r.queueLength, 1) default: r.metrics.ReporterDropped.Inc(1) diff --git a/reporter_test.go b/reporter_test.go index 3ffc6d8b..cae28495 100644 --- a/reporter_test.go +++ b/reporter_test.go @@ -21,7 +21,6 @@ package jaeger import ( - "fmt" "io" "sort" "strings" @@ -34,13 +33,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/uber/jaeger-client-go/testutils" - z "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" "github.com/uber/jaeger-lib/metrics" mTestutils "github.com/uber/jaeger-lib/metrics/testutils" - "github.com/uber/jaeger-client-go/transport" - "github.com/uber/jaeger-client-go/transport/udp" + "github.com/uber/jaeger-client-go/testutils" + j "github.com/uber/jaeger-client-go/thrift-gen/jaeger" ) type reporterSuite struct { @@ -92,7 +89,7 @@ func (s *reporterSuite) flushReporter() { wg.Wait() } -func (s *reporterSuite) TestRootSpanAnnotations() { +func (s *reporterSuite) TestRootSpanTags() { s.metricsFactory.Clear() sp := s.tracer.StartSpan("get_name") ext.SpanKindRPCServer.Set(sp) @@ -100,11 +97,10 @@ func (s *reporterSuite) TestRootSpanAnnotations() { sp.Finish() s.flushReporter() s.Equal(1, len(s.collector.Spans())) - zSpan := s.collector.Spans()[0] - s.NotNil(findAnnotation(zSpan, "sr"), "expecting sr annotation") - s.NotNil(findAnnotation(zSpan, "ss"), "expecting ss annotation") - s.NotNil(findBinaryAnnotation(zSpan, "ca"), "expecting ca annotation") - s.NotNil(findBinaryAnnotation(zSpan, JaegerClientVersionTagKey), "expecting client version tag") + span := s.collector.Spans()[0] + s.Len(span.tags, 6) + s.EqualValues("server", span.tags[2].value, "span.kind should be server") + s.NotNil(findDomainTag(span, JaegerClientVersionTagKey), "expecting client version tag") mTestutils.AssertCounterMetrics(s.T(), s.metricsFactory, mTestutils.ExpectedMetric{ @@ -115,7 +111,7 @@ func (s *reporterSuite) TestRootSpanAnnotations() { ) } -func (s *reporterSuite) TestClientSpanAnnotations() { +func (s *reporterSuite) TestClientSpan() { s.metricsFactory.Clear() sp := s.tracer.StartSpan("get_name") ext.SpanKindRPCServer.Set(sp) @@ -127,13 +123,10 @@ func (s *reporterSuite) TestClientSpanAnnotations() { sp.Finish() s.flushReporter() s.Equal(2, len(s.collector.Spans())) - zSpan := s.collector.Spans()[0] // child span is reported first - s.EqualValues(zSpan.ID, sp2.(*Span).context.spanID) - s.Equal(2, len(zSpan.Annotations), "expecting two annotations, cs and cr") - s.Equal(1, len(zSpan.BinaryAnnotations), "expecting one binary annotation sa") - s.NotNil(findAnnotation(zSpan, "cs"), "expecting cs annotation") - s.NotNil(findAnnotation(zSpan, "cr"), "expecting cr annotation") - s.NotNil(findBinaryAnnotation(zSpan, "sa"), "expecting sa annotation") + span := s.collector.Spans()[0] // child span is reported first + s.EqualValues(span.context.spanID, sp2.(*Span).context.spanID) + s.Len(span.tags, 2) + s.EqualValues("client", span.tags[0].value, "span.kind should be client") mTestutils.AssertCounterMetrics(s.T(), s.metricsFactory, mTestutils.ExpectedMetric{ @@ -151,7 +144,7 @@ func (s *reporterSuite) TestTagsAndEvents() { expected := []string{"long", "ping", "awake", "awake", "one", "two", "three", "bite me", JaegerClientVersionTagKey, TracerHostnameTagKey, SamplerParamTagKey, SamplerTypeTagKey, - "lc", "does not compute"} + "does not compute"} sp.SetTag("long", strings.Repeat("x", 300)) sp.SetTag("ping", "pong") sp.SetTag("awake", true) @@ -164,39 +157,19 @@ func (s *reporterSuite) TestTagsAndEvents() { sp.Finish() s.flushReporter() s.Equal(1, len(s.collector.Spans())) - zSpan := s.collector.Spans()[0] - s.Equal(2, len(zSpan.Annotations), "expecting two annotations for events") - s.Equal(len(expected), len(zSpan.BinaryAnnotations), - "expecting %d binary annotations", len(expected)) - binAnnos := []string{} - for _, a := range zSpan.BinaryAnnotations { - binAnnos = append(binAnnos, string(a.Key)) + span := s.collector.Spans()[0] + s.Equal(2, len(span.logs), "expecting two logs") + s.Equal(len(expected), len(span.tags), + "expecting %d tags", len(expected)) + tags := []string{} + for _, tag := range span.tags { + tags = append(tags, string(tag.key)) } sort.Strings(expected) - sort.Strings(binAnnos) - s.Equal(expected, binAnnos, "expecting %d binary annotations", len(expected)) - - s.NotNil(findAnnotation(zSpan, "hello"), "expecting 'hello' annotation: %+v", zSpan.Annotations) - - longEvent := false - for _, a := range zSpan.Annotations { - if strings.HasPrefix(a.Value, "long event") { - longEvent = true - s.EqualValues(256, len(a.Value)) - } - } - s.True(longEvent, "Must have truncated and saved long event name") - - for i := range expected { - s.NotNil(findBinaryAnnotation(zSpan, expected[i]), "expecting annotation '%s'", expected[i]) - } - doesNotCompute := findBinaryAnnotation(zSpan, "does not compute") - s.NotNil(doesNotCompute) - doesNotComputeStr := fmt.Sprintf("%+v", sp) - s.Equal(doesNotComputeStr, string(doesNotCompute.Value)) + sort.Strings(tags) + s.Equal(expected, tags, "expecting %d tags", len(expected)) - longStr := findBinaryAnnotation(zSpan, "long") - s.EqualValues(256, len(longStr.Value), "long tag valur must be truncated") + s.NotNil(findDomainLog(span, "hello"), "expecting 'hello' log: %+v", span.logs) } func TestUDPReporter(t *testing.T) { @@ -205,18 +178,18 @@ func TestUDPReporter(t *testing.T) { defer agent.Close() testRemoteReporter(t, - func(m *Metrics) (transport.Transport, error) { - return udp.NewUDPTransport(agent.SpanServerAddr(), 0) + func(m *Metrics) (Transport, error) { + return NewUDPTransport(agent.SpanServerAddr(), 0) }, - func() []*z.Span { - return agent.GetZipkinSpans() + func() []*j.Batch { + return agent.GetJaegerBatches() }) } func testRemoteReporter( t *testing.T, - factory func(m *Metrics) (transport.Transport, error), - getSpans func() []*z.Span, + factory func(m *Metrics) (Transport, error), + getBatches func() []*j.Batch, ) { metricsFactory := metrics.NewLocalFactory(0) metrics := NewMetrics(metricsFactory, nil) @@ -239,12 +212,14 @@ func testRemoteReporter( // however, in case of UDP reporter it's fire and forget, so we need to wait a bit time.Sleep(5 * time.Millisecond) - spans := getSpans() - require.Equal(t, 1, len(spans)) - assert.Equal(t, "leela", spans[0].Name) - sa := findBinaryAnnotation(spans[0], z.SERVER_ADDR) - require.NotNil(t, sa) - assert.Equal(t, "downstream", sa.Host.ServiceName) + batches := getBatches() + require.Equal(t, 1, len(batches)) + require.Equal(t, 1, len(batches[0].Spans)) + assert.Equal(t, "leela", batches[0].Spans[0].OperationName) + assert.Equal(t, "reporter-test-service", batches[0].Process.ServiceName) + tag := findJaegerTag("peer.service", batches[0].Spans[0].Tags) + assert.NotNil(t, tag) + assert.Equal(t, "downstream", *tag.VStr) mTestutils.AssertCounterMetrics(t, metricsFactory, []mTestutils.ExpectedMetric{ {Name: "jaeger.reporter-spans", Tags: map[string]string{"state": "success"}, Value: 1}, @@ -262,11 +237,11 @@ func (s *reporterSuite) TestMemoryReporterReport() { } type fakeSender struct { - spans []*z.Span + spans []*Span mutex sync.Mutex } -func (s *fakeSender) Append(span *z.Span) (int, error) { +func (s *fakeSender) Append(span *Span) (int, error) { s.mutex.Lock() defer s.mutex.Unlock() s.spans = append(s.spans, span) @@ -279,10 +254,28 @@ func (s *fakeSender) Flush() (int, error) { func (s *fakeSender) Close() error { return nil } -func (s *fakeSender) Spans() []*z.Span { +func (s *fakeSender) Spans() []*Span { s.mutex.Lock() defer s.mutex.Unlock() - res := make([]*z.Span, len(s.spans)) + res := make([]*Span, len(s.spans)) copy(res, s.spans) return res } + +func findDomainLog(span *Span, key string) *opentracing.LogRecord { + for _, log := range span.logs { + if log.Fields[0].Value().(string) == key { + return &log + } + } + return nil +} + +func findDomainTag(span *Span, key string) *Tag { + for _, tag := range span.tags { + if tag.key == key { + return &tag + } + } + return nil +} diff --git a/testutils/mock_agent.go b/testutils/mock_agent.go index 5e952748..c3be9236 100644 --- a/testutils/mock_agent.go +++ b/testutils/mock_agent.go @@ -22,18 +22,20 @@ package testutils import ( "encoding/json" + "errors" "fmt" "net/http" "net/http/httptest" "sync" "sync/atomic" + "github.com/apache/thrift/lib/go/thrift" + "github.com/uber/jaeger-client-go/thrift-gen/agent" + "github.com/uber/jaeger-client-go/thrift-gen/jaeger" "github.com/uber/jaeger-client-go/thrift-gen/sampling" "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" "github.com/uber/jaeger-client-go/utils" - - "github.com/apache/thrift/lib/go/thrift" ) // StartMockAgent runs a mock representation of jaeger-agent. @@ -72,12 +74,12 @@ func (s *MockAgent) Close() { // MockAgent is a mock representation of Jaeger Agent. // It receives spans over UDP, and has an HTTP endpoint for sampling strategies. type MockAgent struct { - transport *TUDPTransport - zipkinSpans []*zipkincore.Span - mutex sync.Mutex - serving uint32 - samplingMgr *samplingManager - samplingSrv *httptest.Server + transport *TUDPTransport + jaegerBatches []*jaeger.Batch + mutex sync.Mutex + serving uint32 + samplingMgr *samplingManager + samplingSrv *httptest.Server } // SpanServerAddr returns the UDP host:port where MockAgent listens for spans @@ -115,9 +117,14 @@ func (s *MockAgent) serve(started *sync.WaitGroup) { // EmitZipkinBatch implements EmitZipkinBatch() of TChanSamplingManagerServer func (s *MockAgent) EmitZipkinBatch(spans []*zipkincore.Span) (err error) { + return errors.New("Not implemented") +} + +// EmitBatch implements EmitBatch() of TChanSamplingManagerServer +func (s *MockAgent) EmitBatch(batch *jaeger.Batch) (err error) { s.mutex.Lock() defer s.mutex.Unlock() - s.zipkinSpans = append(s.zipkinSpans, spans...) + s.jaegerBatches = append(s.jaegerBatches, batch) return err } @@ -131,21 +138,21 @@ func (s *MockAgent) AddSamplingStrategy(service string, strategy *sampling.Sampl s.samplingMgr.AddSamplingStrategy(service, strategy) } -// GetZipkinSpans returns accumulated Zipkin spans -func (s *MockAgent) GetZipkinSpans() []*zipkincore.Span { +// GetJaegerBatches returns accumulated Jaeger batches +func (s *MockAgent) GetJaegerBatches() []*jaeger.Batch { s.mutex.Lock() defer s.mutex.Unlock() - n := len(s.zipkinSpans) - spans := make([]*zipkincore.Span, n, n) - copy(spans, s.zipkinSpans) - return spans + n := len(s.jaegerBatches) + batches := make([]*jaeger.Batch, n, n) + copy(batches, s.jaegerBatches) + return batches } -// ResetZipkinSpans discards accumulated Zipkin spans -func (s *MockAgent) ResetZipkinSpans() { +// ResetJaegerBatches discards accumulated Jaeger batches +func (s *MockAgent) ResetJaegerBatches() { s.mutex.Lock() defer s.mutex.Unlock() - s.zipkinSpans = nil + s.jaegerBatches = nil } type samplingHandler struct { diff --git a/testutils/mock_agent_test.go b/testutils/mock_agent_test.go index 8277ee5d..d3865ea2 100644 --- a/testutils/mock_agent_test.go +++ b/testutils/mock_agent_test.go @@ -25,12 +25,12 @@ import ( "testing" "time" - "github.com/uber/jaeger-client-go/thrift-gen/sampling" - "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" - "github.com/uber/jaeger-client-go/utils" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/uber/jaeger-client-go/thrift-gen/jaeger" + "github.com/uber/jaeger-client-go/thrift-gen/sampling" + "github.com/uber/jaeger-client-go/utils" ) func TestMockAgentSpanServer(t *testing.T) { @@ -42,28 +42,31 @@ func TestMockAgentSpanServer(t *testing.T) { require.NoError(t, err) for i := 1; i < 5; i++ { - spans := make([]*zipkincore.Span, i, i) + batch := &jaeger.Batch{Process: &jaeger.Process{ServiceName: "svc"}} + spans := make([]*jaeger.Span, i, i) for j := 0; j < i; j++ { - spans[j] = zipkincore.NewSpan() - spans[j].Name = fmt.Sprintf("span-%d", j) + spans[j] = jaeger.NewSpan() + spans[j].OperationName = fmt.Sprintf("span-%d", j) } + batch.Spans = spans - err = client.EmitZipkinBatch(spans) + err = client.EmitBatch(batch) assert.NoError(t, err) for k := 0; k < 100; k++ { time.Sleep(time.Millisecond) - spans = mockAgent.GetZipkinSpans() - if len(spans) == i { + batches := mockAgent.GetJaegerBatches() + if len(batches) > 0 && len(batches[0].Spans) == i { break } } - spans = mockAgent.GetZipkinSpans() - require.Equal(t, i, len(spans)) + batches := mockAgent.GetJaegerBatches() + require.NotEmpty(t, len(batches)) + require.Equal(t, i, len(batches[0].Spans)) for j := 0; j < i; j++ { - assert.Equal(t, fmt.Sprintf("span-%d", j), spans[j].Name) + assert.Equal(t, fmt.Sprintf("span-%d", j), batches[0].Spans[j].OperationName) } - mockAgent.ResetZipkinSpans() + mockAgent.ResetJaegerBatches() } } diff --git a/thrift-gen/agent/agent.go b/thrift-gen/agent/agent.go index 0e927f5d..a192cb6f 100644 --- a/thrift-gen/agent/agent.go +++ b/thrift-gen/agent/agent.go @@ -7,6 +7,7 @@ import ( "bytes" "fmt" "github.com/apache/thrift/lib/go/thrift" + "github.com/uber/jaeger-client-go/thrift-gen/jaeger" "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" ) @@ -21,6 +22,9 @@ type Agent interface { // Parameters: // - Spans EmitZipkinBatch(spans []*zipkincore.Span) (err error) + // Parameters: + // - Batch + EmitBatch(batch *jaeger.Batch) (err error) } type AgentClient struct { @@ -80,6 +84,37 @@ func (p *AgentClient) sendEmitZipkinBatch(spans []*zipkincore.Span) (err error) return oprot.Flush() } +// Parameters: +// - Batch +func (p *AgentClient) EmitBatch(batch *jaeger.Batch) (err error) { + if err = p.sendEmitBatch(batch); err != nil { + return + } + return +} + +func (p *AgentClient) sendEmitBatch(batch *jaeger.Batch) (err error) { + oprot := p.OutputProtocol + if oprot == nil { + oprot = p.ProtocolFactory.GetProtocol(p.Transport) + p.OutputProtocol = oprot + } + p.SeqId++ + if err = oprot.WriteMessageBegin("emitBatch", thrift.ONEWAY, p.SeqId); err != nil { + return + } + args := AgentEmitBatchArgs{ + Batch: batch, + } + if err = args.Write(oprot); err != nil { + return + } + if err = oprot.WriteMessageEnd(); err != nil { + return + } + return oprot.Flush() +} + type AgentProcessor struct { processorMap map[string]thrift.TProcessorFunction handler Agent @@ -102,6 +137,7 @@ func NewAgentProcessor(handler Agent) *AgentProcessor { self0 := &AgentProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} self0.processorMap["emitZipkinBatch"] = &agentProcessorEmitZipkinBatch{handler: handler} + self0.processorMap["emitBatch"] = &agentProcessorEmitBatch{handler: handler} return self0 } @@ -143,6 +179,25 @@ func (p *agentProcessorEmitZipkinBatch) Process(seqId int32, iprot, oprot thrift return true, nil } +type agentProcessorEmitBatch struct { + handler Agent +} + +func (p *agentProcessorEmitBatch) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { + args := AgentEmitBatchArgs{} + if err = args.Read(iprot); err != nil { + iprot.ReadMessageEnd() + return false, err + } + + iprot.ReadMessageEnd() + var err2 error + if err2 = p.handler.EmitBatch(args.Batch); err2 != nil { + return true, err2 + } + return true, nil +} + // HELPER FUNCTIONS AND STRUCTURES // Attributes: @@ -254,3 +309,102 @@ func (p *AgentEmitZipkinBatchArgs) String() string { } return fmt.Sprintf("AgentEmitZipkinBatchArgs(%+v)", *p) } + +// Attributes: +// - Batch +type AgentEmitBatchArgs struct { + Batch *jaeger.Batch `thrift:"batch,1" json:"batch"` +} + +func NewAgentEmitBatchArgs() *AgentEmitBatchArgs { + return &AgentEmitBatchArgs{} +} + +var AgentEmitBatchArgs_Batch_DEFAULT *jaeger.Batch + +func (p *AgentEmitBatchArgs) GetBatch() *jaeger.Batch { + if !p.IsSetBatch() { + return AgentEmitBatchArgs_Batch_DEFAULT + } + return p.Batch +} +func (p *AgentEmitBatchArgs) IsSetBatch() bool { + return p.Batch != nil +} + +func (p *AgentEmitBatchArgs) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.readField1(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *AgentEmitBatchArgs) readField1(iprot thrift.TProtocol) error { + p.Batch = &jaeger.Batch{} + if err := p.Batch.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Batch), err) + } + return nil +} + +func (p *AgentEmitBatchArgs) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("emitBatch_args"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *AgentEmitBatchArgs) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("batch", thrift.STRUCT, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:batch: ", p), err) + } + if err := p.Batch.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Batch), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:batch: ", p), err) + } + return err +} + +func (p *AgentEmitBatchArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("AgentEmitBatchArgs(%+v)", *p) +} diff --git a/thrift-gen/jaeger/ttypes.go b/thrift-gen/jaeger/ttypes.go index 754c8679..b5ddfa64 100644 --- a/thrift-gen/jaeger/ttypes.go +++ b/thrift-gen/jaeger/ttypes.go @@ -1739,3 +1739,100 @@ func (p *Batch) String() string { } return fmt.Sprintf("Batch(%+v)", *p) } + +// Attributes: +// - Ok +type BatchSubmitResponse struct { + Ok bool `thrift:"ok,1,required" json:"ok"` +} + +func NewBatchSubmitResponse() *BatchSubmitResponse { + return &BatchSubmitResponse{} +} + +func (p *BatchSubmitResponse) GetOk() bool { + return p.Ok +} +func (p *BatchSubmitResponse) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + var issetOk bool = false + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.readField1(iprot); err != nil { + return err + } + issetOk = true + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + if !issetOk { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field Ok is not set")) + } + return nil +} + +func (p *BatchSubmitResponse) readField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBool(); err != nil { + return thrift.PrependError("error reading field 1: ", err) + } else { + p.Ok = v + } + return nil +} + +func (p *BatchSubmitResponse) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("BatchSubmitResponse"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *BatchSubmitResponse) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("ok", thrift.BOOL, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:ok: ", p), err) + } + if err := oprot.WriteBool(bool(p.Ok)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.ok (1) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:ok: ", p), err) + } + return err +} + +func (p *BatchSubmitResponse) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("BatchSubmitResponse(%+v)", *p) +} diff --git a/transport/transport.go b/transport.go similarity index 94% rename from transport/transport.go rename to transport.go index 7811619d..f02c6f3b 100644 --- a/transport/transport.go +++ b/transport.go @@ -18,12 +18,10 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package transport +package jaeger import ( "io" - - "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" ) // Transport abstracts the method of sending spans out of process. @@ -35,7 +33,7 @@ type Transport interface { // size, the transport should call Flush() and return the number of spans // flushed, otherwise return 0. If error is returned, the returned number // of spans is treated as failed span, and reported to metrics accordingly. - Append(span *zipkincore.Span) (int, error) + Append(span *Span) (int, error) // Flush submits the internal buffer to the remote server. It returns the // number of spans flushed. If error is returned, the returned number of diff --git a/transport/zipkin/http.go b/transport/zipkin/http.go index 625adbea..f161ad35 100644 --- a/transport/zipkin/http.go +++ b/transport/zipkin/http.go @@ -35,7 +35,6 @@ import ( "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/log" "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" - "github.com/uber/jaeger-client-go/transport" ) // Default timeout for http request in seconds @@ -74,7 +73,7 @@ func HTTPBatchSize(n int) HTTPOption { // NewHTTPTransport returns a new HTTP-backend transport. url should be an http // url to handle post request, typically something like: // http://hostname:9411/api/v1/spans -func NewHTTPTransport(url string, options ...HTTPOption) (transport.Transport, error) { +func NewHTTPTransport(url string, options ...HTTPOption) (*HTTPTransport, error) { c := &HTTPTransport{ logger: log.NullLogger, url: url, @@ -90,8 +89,9 @@ func NewHTTPTransport(url string, options ...HTTPOption) (transport.Transport, e } // Append implements Transport. -func (c *HTTPTransport) Append(s *zipkincore.Span) (int, error) { - c.batch = append(c.batch, s) +func (c *HTTPTransport) Append(span *jaeger.Span) (int, error) { + zSpan := jaeger.BuildZipkinThrift(span) + c.batch = append(c.batch, zSpan) if len(c.batch) >= c.batchSize { return c.Flush() } diff --git a/transport/zipkin/http_test.go b/transport/zipkin/http_test.go index 85f62045..c5aa4d93 100644 --- a/transport/zipkin/http_test.go +++ b/transport/zipkin/http_test.go @@ -88,9 +88,9 @@ func TestHTTPOptions(t *testing.T) { HTTPTimeout(123*time.Millisecond), ) require.NoError(t, err) - assert.Equal(t, log.StdLogger, sender.(*HTTPTransport).logger) - assert.Equal(t, 123, sender.(*HTTPTransport).batchSize) - assert.Equal(t, 123*time.Millisecond, sender.(*HTTPTransport).client.Timeout) + assert.Equal(t, log.StdLogger, sender.logger) + assert.Equal(t, 123, sender.batchSize) + assert.Equal(t, 123*time.Millisecond, sender.client.Timeout) } type httpServer struct { diff --git a/transport/udp/transport_udp.go b/transport_udp.go similarity index 61% rename from transport/udp/transport_udp.go rename to transport_udp.go index f43dc045..c5265f24 100644 --- a/transport/udp/transport_udp.go +++ b/transport_udp.go @@ -18,41 +18,43 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package udp +package jaeger import ( "errors" "github.com/apache/thrift/lib/go/thrift" - "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" - "github.com/uber/jaeger-client-go/transport" + j "github.com/uber/jaeger-client-go/thrift-gen/jaeger" "github.com/uber/jaeger-client-go/utils" ) // Empirically obtained constant for how many bytes in the message are used for envelope. -// The total datagram size is sizeof(Span) * numSpans + emitSpanBatchOverhead <= maxPacketSize -// There is a unit test `TestEmitSpanBatchOverhead` that validates this number. +// The total datagram size is: +// sizeof(Span) * numSpans + processByteSize + emitBatchOverhead <= maxPacketSize +// There is a unit test `TestEmitBatchOverhead` that validates this number. // Note that due to the use of Compact Thrift protocol, overhead grows with the number of spans // in the batch, because the length of the list is encoded as varint32, as well as SeqId. -const emitSpanBatchOverhead = 30 +const emitBatchOverhead = 30 -const defaultUDPSpanServerHostPort = "localhost:5775" +const defaultUDPSpanServerHostPort = "localhost:6831" var errSpanTooLarge = errors.New("Span is too large") type udpSender struct { - client *utils.AgentClientUDP - maxPacketSize int // max size of datagram in bytes - maxSpanBytes int // max number of bytes to record spans (excluding envelope) in the datagram - byteBufferSize int // current number of span bytes accumulated in the buffer - spanBuffer []*zipkincore.Span // spans buffered before a flush - thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span - thriftProtocol thrift.TProtocol + client *utils.AgentClientUDP + maxPacketSize int // max size of datagram in bytes + maxSpanBytes int // max number of bytes to record spans (excluding envelope) in the datagram + byteBufferSize int // current number of span bytes accumulated in the buffer + spanBuffer []*j.Span // spans buffered before a flush + thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span + thriftProtocol thrift.TProtocol + process *j.Process + processByteSize int } // NewUDPTransport creates a reporter that submits spans to jaeger-agent -func NewUDPTransport(hostPort string, maxPacketSize int) (transport.Transport, error) { +func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) { if len(hostPort) == 0 { hostPort = defaultUDPSpanServerHostPort } @@ -73,33 +75,36 @@ func NewUDPTransport(hostPort string, maxPacketSize int) (transport.Transport, e sender := &udpSender{ client: client, - maxSpanBytes: maxPacketSize - emitSpanBatchOverhead, + maxSpanBytes: maxPacketSize - emitBatchOverhead, thriftBuffer: thriftBuffer, thriftProtocol: thriftProtocol} return sender, nil } -func (s *udpSender) calcSpanSize(span *zipkincore.Span) (int, error) { +func (s *udpSender) calcSizeOfSerializedThrift(thriftStruct thrift.TStruct) int { s.thriftBuffer.Reset() - if err := span.Write(s.thriftProtocol); err != nil { - return 0, err - } - return s.thriftBuffer.Len(), nil + thriftStruct.Write(s.thriftProtocol) + return s.thriftBuffer.Len() } -func (s *udpSender) Append(span *zipkincore.Span) (int, error) { - spanSize, err := s.calcSpanSize(span) - if err != nil { - // should not be getting this error from in-memory transport - ¯\_(ツ)_/¯ - return 1, err +func (s *udpSender) Append(span *Span) (int, error) { + if s.process == nil { + s.process = &j.Process{ + ServiceName: span.tracer.serviceName, + Tags: buildTags(span.tracer.tags), + } + s.processByteSize = s.calcSizeOfSerializedThrift(s.process) + s.byteBufferSize += s.processByteSize } + jSpan := BuildJaegerThrift(span) + spanSize := s.calcSizeOfSerializedThrift(jSpan) if spanSize > s.maxSpanBytes { return 1, errSpanTooLarge } s.byteBufferSize += spanSize if s.byteBufferSize <= s.maxSpanBytes { - s.spanBuffer = append(s.spanBuffer, span) + s.spanBuffer = append(s.spanBuffer, jSpan) if s.byteBufferSize < s.maxSpanBytes { return 0, nil } @@ -107,8 +112,8 @@ func (s *udpSender) Append(span *zipkincore.Span) (int, error) { } // the latest span did not fit in the buffer n, err := s.Flush() - s.spanBuffer = append(s.spanBuffer, span) - s.byteBufferSize = spanSize + s.spanBuffer = append(s.spanBuffer, jSpan) + s.byteBufferSize = spanSize + s.processByteSize return n, err } @@ -117,7 +122,7 @@ func (s *udpSender) Flush() (int, error) { if n == 0 { return 0, nil } - err := s.client.EmitZipkinBatch(s.spanBuffer) + err := s.client.EmitBatch(&j.Batch{Process: s.process, Spans: s.spanBuffer}) s.resetBuffers() return n, err @@ -132,5 +137,5 @@ func (s *udpSender) resetBuffers() { s.spanBuffer[i] = nil } s.spanBuffer = s.spanBuffer[:0] - s.byteBufferSize = 0 + s.byteBufferSize = s.processByteSize } diff --git a/transport/udp/transport_udp_test.go b/transport_udp_test.go similarity index 55% rename from transport/udp/transport_udp_test.go rename to transport_udp_test.go index 48a06f19..047064c3 100644 --- a/transport/udp/transport_udp_test.go +++ b/transport_udp_test.go @@ -18,51 +18,77 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package udp +package jaeger import ( "testing" "time" - "github.com/uber/jaeger-client-go/testutils" - "github.com/uber/jaeger-client-go/thrift-gen/agent" - "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" - "github.com/apache/thrift/lib/go/thrift" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/uber/jaeger-client-go/testutils" + "github.com/uber/jaeger-client-go/thrift-gen/agent" + j "github.com/uber/jaeger-client-go/thrift-gen/jaeger" +) + +var ( + testTracer, _ = NewTracer("svcName", NewConstSampler(false), NewNullReporter()) + jaegerTracer = testTracer.(*tracer) ) -func getThriftSpanByteLength(t *testing.T, span *zipkincore.Span) int { +func getThriftSpanByteLength(t *testing.T, span *Span) int { + jSpan := BuildJaegerThrift(span) transport := thrift.NewTMemoryBufferLen(1000) protocolFactory := thrift.NewTCompactProtocolFactory() - err := span.Write(protocolFactory.GetProtocol(transport)) + err := jSpan.Write(protocolFactory.GetProtocol(transport)) require.NoError(t, err) return transport.Len() } -func TestEmitSpanBatchOverhead(t *testing.T) { +func getThriftProcessByteLengthFromTracer(t *testing.T, tracer *tracer) int { + process := &j.Process{ + ServiceName: tracer.serviceName, + Tags: buildTags(tracer.tags), + } + return getThriftProcessByteLength(t, process) +} + +func getThriftProcessByteLength(t *testing.T, process *j.Process) int { + transport := thrift.NewTMemoryBufferLen(1000) + protocolFactory := thrift.NewTCompactProtocolFactory() + err := process.Write(protocolFactory.GetProtocol(transport)) + require.NoError(t, err) + return transport.Len() +} +func TestEmitBatchOverhead(t *testing.T) { transport := thrift.NewTMemoryBufferLen(1000) protocolFactory := thrift.NewTCompactProtocolFactory() client := agent.NewAgentClientFactory(transport, protocolFactory) - span := &zipkincore.Span{Name: "test-span"} + span := &Span{operationName: "test-span", tracer: jaegerTracer} spanSize := getThriftSpanByteLength(t, span) tests := []int{1, 2, 14, 15, 377, 500, 65000, 0xFFFF} for i, n := range tests { transport.Reset() - batch := make([]*zipkincore.Span, n) - for j := 0; j < n; j++ { - batch[j] = span + batch := make([]*j.Span, n) + processTags := make([]*j.Tag, n) + for x := 0; x < n; x++ { + batch[x] = BuildJaegerThrift(span) + processTags[x] = &j.Tag{} } + process := &j.Process{ServiceName: "svcName", Tags: processTags} client.SeqId = -2 // this causes the longest encoding of varint32 as 5 bytes - err := client.EmitZipkinBatch(batch) + err := client.EmitBatch(&j.Batch{Process: process, Spans: batch}) + processSize := getThriftProcessByteLength(t, process) require.NoError(t, err) - overhead := transport.Len() - n*spanSize - assert.True(t, overhead <= emitSpanBatchOverhead, - "test %d, n=%d, expected overhead %d <= %d", i, n, overhead, emitSpanBatchOverhead) + overhead := transport.Len() - n*spanSize - processSize + assert.True(t, overhead <= emitBatchOverhead, + "test %d, n=%d, expected overhead %d <= %d", i, n, overhead, emitBatchOverhead) + t.Logf("span count: %d, overhead: %d", n, overhead) } } @@ -71,10 +97,11 @@ func TestUDPSenderFlush(t *testing.T) { require.NoError(t, err) defer agent.Close() - span := &zipkincore.Span{Name: "test-span"} + span := &Span{operationName: "test-span", tracer: jaegerTracer} spanSize := getThriftSpanByteLength(t, span) + processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer) - sender, err := NewUDPTransport(agent.SpanServerAddr(), 5*spanSize+emitSpanBatchOverhead) + sender, err := NewUDPTransport(agent.SpanServerAddr(), 5*spanSize+processSize+emitBatchOverhead) require.NoError(t, err) udpSender := sender.(*udpSender) @@ -89,25 +116,26 @@ func TestUDPSenderFlush(t *testing.T) { assert.Equal(t, 0, n, "span should be in buffer, not flushed") buffer := udpSender.spanBuffer require.Equal(t, 1, len(buffer), "span should be in buffer, not flushed") - assert.Equal(t, span, buffer[0], "span should be in buffer, not flushed") + assert.Equal(t, BuildJaegerThrift(span), buffer[0], "span should be in buffer, not flushed") n, err = sender.Flush() require.NoError(t, err) assert.Equal(t, 1, n) assert.Equal(t, 0, len(udpSender.spanBuffer), "buffer should become empty") - assert.Equal(t, 0, udpSender.byteBufferSize, "buffer size counter should go to 0") + assert.Equal(t, processSize, udpSender.byteBufferSize, "buffer size counter should be equal to the processSize") assert.Nil(t, buffer[0], "buffer should not keep reference to the span") for i := 0; i < 10000; i++ { - spans := agent.GetZipkinSpans() - if len(spans) > 0 { + batches := agent.GetJaegerBatches() + if len(batches) > 0 { break } time.Sleep(1 * time.Millisecond) } - spans := agent.GetZipkinSpans() - require.Equal(t, 1, len(spans), "agent should have received the span") - assert.Equal(t, span.Name, spans[0].Name) + batches := agent.GetJaegerBatches() + require.Equal(t, 1, len(batches), "agent should have received the batch") + require.Equal(t, 1, len(batches[0].Spans)) + assert.Equal(t, span.operationName, batches[0].Spans[0].OperationName) } func TestUDPSenderAppend(t *testing.T) { @@ -115,28 +143,31 @@ func TestUDPSenderAppend(t *testing.T) { require.NoError(t, err) defer agent.Close() - span := &zipkincore.Span{Name: "test-span"} + span := &Span{operationName: "test-span", tracer: jaegerTracer} spanSize := getThriftSpanByteLength(t, span) + processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer) tests := []struct { - bufferSizeOffset int - expectFlush bool - expectSpansFlushed int - manualFlush bool - expectSpansFlushed2 int - description string + bufferSizeOffset int + expectFlush bool + expectSpansFlushed int + expectBatchesFlushed int + manualFlush bool + expectSpansFlushed2 int + expectBatchesFlushed2 int + description string }{ - {1, false, 0, true, 5, "in test: buffer bigger than 5 spans"}, - {0, true, 5, false, 0, "in test: buffer fits exactly 5 spans"}, - {-1, true, 4, true, 1, "in test: buffer smaller than 5 spans"}, + {1, false, 0, 0, true, 5, 1, "in test: buffer bigger than 5 spans"}, + {0, true, 5, 1, false, 0, 0, "in test: buffer fits exactly 5 spans"}, + {-1, true, 4, 1, true, 1, 1, "in test: buffer smaller than 5 spans"}, } for _, test := range tests { - bufferSize := 5*spanSize + test.bufferSizeOffset + emitSpanBatchOverhead + bufferSize := 5*spanSize + test.bufferSizeOffset + processSize + emitBatchOverhead sender, err := NewUDPTransport(agent.SpanServerAddr(), bufferSize) require.NoError(t, err, test.description) - agent.ResetZipkinSpans() + agent.ResetJaegerBatches() for i := 0; i < 5; i++ { n, err := sender.Append(span) require.NoError(t, err, test.description) @@ -149,23 +180,33 @@ func TestUDPSenderAppend(t *testing.T) { if test.expectFlush { time.Sleep(5 * time.Millisecond) } - spans := agent.GetZipkinSpans() + batches := agent.GetJaegerBatches() + require.Equal(t, test.expectBatchesFlushed, len(batches), test.description) + var spans []*j.Span + if test.expectBatchesFlushed > 0 { + spans = batches[0].Spans + } require.Equal(t, test.expectSpansFlushed, len(spans), test.description) for i := 0; i < test.expectSpansFlushed; i++ { - assert.Equal(t, span.Name, spans[i].Name, test.description) + assert.Equal(t, span.operationName, spans[i].OperationName, test.description) } if test.manualFlush { - agent.ResetZipkinSpans() + agent.ResetJaegerBatches() n, err := sender.Flush() require.NoError(t, err, test.description) assert.Equal(t, test.expectSpansFlushed2, n, test.description) time.Sleep(5 * time.Millisecond) - spans := agent.GetZipkinSpans() + batches = agent.GetJaegerBatches() + require.Equal(t, test.expectBatchesFlushed2, len(batches), test.description) + spans = []*j.Span{} + if test.expectBatchesFlushed2 > 0 { + spans = batches[0].Spans + } require.Equal(t, test.expectSpansFlushed2, len(spans), test.description) for i := 0; i < test.expectSpansFlushed2; i++ { - assert.Equal(t, span.Name, spans[i].Name, test.description) + assert.Equal(t, span.operationName, spans[i].OperationName, test.description) } } @@ -177,10 +218,10 @@ func TestUDPSenderHugeSpan(t *testing.T) { require.NoError(t, err) defer agent.Close() - span := &zipkincore.Span{Name: "test-span"} + span := &Span{operationName: "test-span", tracer: jaegerTracer} spanSize := getThriftSpanByteLength(t, span) - sender, err := NewUDPTransport(agent.SpanServerAddr(), spanSize/2+emitSpanBatchOverhead) + sender, err := NewUDPTransport(agent.SpanServerAddr(), spanSize/2+emitBatchOverhead) require.NoError(t, err) n, err := sender.Append(span) diff --git a/utils/udp_client.go b/utils/udp_client.go index 5fa89575..b1de37be 100644 --- a/utils/udp_client.go +++ b/utils/udp_client.go @@ -21,14 +21,16 @@ package utils import ( + "errors" "fmt" "io" "net" + "github.com/apache/thrift/lib/go/thrift" + "github.com/uber/jaeger-client-go/thrift-gen/agent" + "github.com/uber/jaeger-client-go/thrift-gen/jaeger" "github.com/uber/jaeger-client-go/thrift-gen/zipkincore" - - "github.com/apache/thrift/lib/go/thrift" ) // UDPPacketMaxLength is the max size of UDP packet we want to send, synced with jaeger-agent @@ -78,14 +80,19 @@ func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, err // EmitZipkinBatch implements EmitZipkinBatch() of Agent interface func (a *AgentClientUDP) EmitZipkinBatch(spans []*zipkincore.Span) error { + return errors.New("Not implemented") +} + +// EmitBatch implements EmitBatch() of Agent interface +func (a *AgentClientUDP) EmitBatch(batch *jaeger.Batch) error { a.thriftBuffer.Reset() a.client.SeqId = 0 // we have no need for distinct SeqIds for our one-way UDP messages - if err := a.client.EmitZipkinBatch(spans); err != nil { + if err := a.client.EmitBatch(batch); err != nil { return err } if a.thriftBuffer.Len() > a.maxPacketSize { return fmt.Errorf("Data does not fit within one UDP packet; size %d, max %d, spans %d", - a.thriftBuffer.Len(), a.maxPacketSize, len(spans)) + a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans)) } _, err := a.connUDP.Write(a.thriftBuffer.Bytes()) return err diff --git a/thrift_span.go b/zipkin_thrift_span.go similarity index 97% rename from thrift_span.go rename to zipkin_thrift_span.go index 5cecb188..980bd85b 100644 --- a/thrift_span.go +++ b/zipkin_thrift_span.go @@ -40,8 +40,15 @@ const ( allowPackedNumbers = false ) -// buildThriftSpan builds thrift span based on internal span. -func buildThriftSpan(s *Span) *z.Span { +var specialTagHandlers = map[string]func(*zipkinSpan, interface{}){ + string(ext.SpanKind): setSpanKind, + string(ext.PeerHostIPv4): setPeerIPv4, + string(ext.PeerPort): setPeerPort, + string(ext.PeerService): setPeerService, +} + +// BuildZipkinThrift builds thrift span based on internal span. +func BuildZipkinThrift(s *Span) *z.Span { span := &zipkinSpan{Span: s} span.handleSpecialTags() parentID := int64(span.context.parentID) @@ -141,6 +148,11 @@ func buildBinaryAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.BinaryA annotations = append(annotations, local) } for _, tag := range span.tags { + // "Special tags" are already handled by this point, we'd be double reporting the + // tags if we don't skip here + if _, ok := specialTagHandlers[tag.key]; ok { + continue + } if anno := buildBinaryAnnotation(tag.key, tag.value, nil); anno != nil { annotations = append(annotations, anno) } @@ -233,13 +245,6 @@ type zipkinSpan struct { spanKind string } -var specialTagHandlers = map[string]func(*zipkinSpan, interface{}){ - string(ext.SpanKind): setSpanKind, - string(ext.PeerHostIPv4): setPeerIPv4, - string(ext.PeerPort): setPeerPort, - string(ext.PeerService): setPeerService, -} - func (s *zipkinSpan) handleSpecialTags() { s.Lock() defer s.Unlock() diff --git a/thrift_span_test.go b/zipkin_thrift_span_test.go similarity index 97% rename from thrift_span_test.go rename to zipkin_thrift_span_test.go index 431ff4c8..2aa8e3b4 100644 --- a/thrift_span_test.go +++ b/zipkin_thrift_span_test.go @@ -61,7 +61,7 @@ func TestThriftFirstInProcessSpan(t *testing.T) { } else { check = assert.Nil } - thriftSpan := buildThriftSpan(test.span) + thriftSpan := BuildZipkinThrift(test.span) version := findBinaryAnnotation(thriftSpan, JaegerClientVersionTagKey) hostname := findBinaryAnnotation(thriftSpan, TracerHostnameTagKey) check(t, version) @@ -79,7 +79,7 @@ func TestThriftForceSampled(t *testing.T) { ext.SamplingPriority.Set(sp, 1) assert.True(t, sp.context.IsSampled()) assert.True(t, sp.context.IsDebug()) - thriftSpan := buildThriftSpan(sp) + thriftSpan := BuildZipkinThrift(sp) assert.True(t, thriftSpan.Debug) } @@ -227,7 +227,7 @@ func TestThriftSpanLogs(t *testing.T) { } else if len(test.fields) > 0 { sp.LogFields(test.fields...) } - thriftSpan := buildThriftSpan(sp.(*Span)) + thriftSpan := BuildZipkinThrift(sp.(*Span)) if test.disableSampling { assert.Equal(t, 0, len(thriftSpan.Annotations), testName) continue @@ -260,7 +260,7 @@ func TestThriftLocalComponentSpan(t *testing.T) { ext.Component.Set(sp, "c1") } sp.Finish() - thriftSpan := buildThriftSpan(sp) + thriftSpan := BuildZipkinThrift(sp) anno := findBinaryAnnotation(thriftSpan, "lc") assert.NotNil(t, anno) @@ -281,7 +281,7 @@ func TestSpecialTags(t *testing.T) { ext.PeerHostIPv4.Set(sp, 2130706433) sp.Finish() - thriftSpan := buildThriftSpan(sp) + thriftSpan := BuildZipkinThrift(sp) // Special tags should not be copied over to binary annotations assert.Nil(t, findBinaryAnnotation(thriftSpan, "span.kind")) assert.Nil(t, findBinaryAnnotation(thriftSpan, "peer.service")) @@ -310,7 +310,7 @@ func TestBaggageLogs(t *testing.T) { ext.SpanKindRPCServer.Set(sp) sp.Finish() - thriftSpan := buildThriftSpan(sp) + thriftSpan := BuildZipkinThrift(sp) assert.NotNil(t, findAnnotation(thriftSpan, `{"event":"baggage","key":"auth.token","value":"token"}`)) }