From 77b0554f37a4c9a586901e4766cf853f722242fb Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Fri, 3 Sep 2021 17:34:58 +0800 Subject: [PATCH] processor/otel: index span events as logs Produce log events for non-exception span events, capturing the span event name as the log message, and all other attributes as labels. We only index logs when data streams are enabled; they are dropped when classic indices are in use. --- beater/beater.go | 25 ++++ changelogs/head.asciidoc | 1 + docs/guide/opentelemetry-elastic.asciidoc | 7 +- docs/jaeger-reference.asciidoc | 1 - model/log.go | 27 +++++ model/modelprocessor/datastream.go | 3 + model/modelprocessor/datastream_test.go | 3 + processor/otel/exceptions_test.go | 10 +- .../span_jaeger_http.approved.json | 101 ++++++++++++++++ .../transaction_jaeger_full.approved.json | 107 ++++++++++++++++ processor/otel/traces.go | 62 +++++----- processor/otel/traces_test.go | 12 +- .../data_streams_disabled.approved.json} | 0 .../data_streams_enabled.approved.json | 114 ++++++++++++++++++ systemtest/otlp_test.go | 23 +++- testdata/jaeger/batch_0.approved.json | 93 ++++++++++++++ testdata/jaeger/batch_1.approved.json | 42 +++++++ 17 files changed, 583 insertions(+), 48 deletions(-) create mode 100644 model/log.go rename systemtest/approvals/{TestOTLPGRPCTraces.approved.json => TestOTLPGRPCTraces/data_streams_disabled.approved.json} (100%) create mode 100644 systemtest/approvals/TestOTLPGRPCTraces/data_streams_enabled.approved.json diff --git a/beater/beater.go b/beater/beater.go index 703050ec47d..ce482097af2 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -46,6 +46,7 @@ import ( "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/logp" esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" "github.com/elastic/apm-server/beater/config" @@ -356,10 +357,20 @@ func (s *serverRunner) run(listener net.Listener) error { // Send config to telemetry. recordAPMServerConfig(s.config) + publisherConfig := &publish.PublisherConfig{ Pipeline: s.config.Pipeline, Namespace: s.namespace, } + if !s.config.DataStreams.Enabled { + // Logs are only supported with data streams; + // add a beat.Processor which drops them. + dropLogsProcessor, err := newDropLogsBeatProcessor() + if err != nil { + return err + } + publisherConfig.Processor = dropLogsProcessor + } var kibanaClient kibana_client.Client if s.config.Kibana.Enabled { @@ -755,3 +766,17 @@ type transformerFunc func(context.Context) []beat.Event func (f transformerFunc) Transform(ctx context.Context) []beat.Event { return f(ctx) } + +func newDropLogsBeatProcessor() (beat.ProcessorList, error) { + return processors.New(processors.PluginConfig{ + common.MustNewConfigFrom(map[string]interface{}{ + "drop_event": map[string]interface{}{ + "when": map[string]interface{}{ + "contains": map[string]interface{}{ + "processor.event": "log", + }, + }, + }, + }), + }) +} diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index ac29a334ca8..020a6d6b5f9 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -36,6 +36,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits] - The `error.log.message` or `error.exception.message` field of errors will be copied to the ECS field `message` {pull}5974[5974] - Define index sorting for internal metrics data stream {pull}6116[6116] - Add histogram dynamic_template to app metrics data stream {pull}6043[6043] +- Index OpenTelemetry span events and Jaeger logs into a log data stream {pull}6122[6122] [float] ==== Deprecated diff --git a/docs/guide/opentelemetry-elastic.asciidoc b/docs/guide/opentelemetry-elastic.asciidoc index 71eef16b396..ae2538be83d 100644 --- a/docs/guide/opentelemetry-elastic.asciidoc +++ b/docs/guide/opentelemetry-elastic.asciidoc @@ -371,23 +371,20 @@ Here is an example of AWS Lambda Node.js function managed with Terraform and the * https://github.com/michaelhyatt/terraform-aws-nodejs-api-worker-otel/tree/v0.23[Sample Terraform code] [[elastic-open-telemetry-known-limitations]] -==== Limitations in 7.13 +==== Limitations [[elastic-open-telemetry-traces-limitations]] ===== OpenTelemetry traces * Traces of applications using `messaging` semantics might be wrongly displayed or not shown in the APM UI. You may only see `spans` coming from such services, but no `transaction` https://github.com/elastic/apm-server/issues/5094[#5094] -* Inability to see Stack traces in spans or, in general, arbitrary span events for applications instrumented with OpenTelemetry https://github.com/elastic/apm-server/issues/4715[#4715] +* Inability to see Stack traces in spans * Inability in APM views to view the "Time Spent by Span Type" https://github.com/elastic/apm-server/issues/5747[#5747] * Metrics derived from traces (throughput, latency, and errors) are not accurate when traces are sampled before being ingested by Elastic Observability (ie by an OpenTelemetry Collector or OpenTelemetry APM agent or SDK) https://github.com/elastic/apm/issues/472[#472] [[elastic-open-telemetry-metrics-limitations]] ===== OpenTelemetry metrics -* OpenTelemetry histogram metrics, https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-api/latest/io/opentelemetry/api/metrics/DoubleValueRecorder.html[`DoubleValueRecorder`] -and https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-api/latest/io/opentelemetry/api/metrics/LongValueObserver.html[`LongValueRecorder`], are not yet supported https://github.com/elastic/apm-server/issues/3195[#3195] -* Inability to see JVM metrics in Elastic APM UI for Java applications instrumented with OpenTelemetry https://github.com/elastic/apm-server/issues/4919[#4919] * Inability to see host metrics in Elastic Metrics Infrastructure view when using the OpenTelemetry Collector host metrics receiver https://github.com/elastic/apm-server/issues/5310[#5310] [[elastic-open-telemetry-logs-limitations]] diff --git a/docs/jaeger-reference.asciidoc b/docs/jaeger-reference.asciidoc index 10ed2b7fa2e..9f774a10620 100644 --- a/docs/jaeger-reference.asciidoc +++ b/docs/jaeger-reference.asciidoc @@ -77,7 +77,6 @@ There are some limitations and differences between Elastic APM and Jaeger that y * Because Jaeger has its own trace context header, and does not currently support W3C trace context headers, it is not possible to mix and match the use of Elastic's APM agents and Jaeger's clients. * Elastic APM only supports probabilistic sampling. -* We currently only support exception logging. Span logs are not supported. *Differences between APM Agents and Jaeger Clients:* diff --git a/model/log.go b/model/log.go new file mode 100644 index 00000000000..97f44c6a54f --- /dev/null +++ b/model/log.go @@ -0,0 +1,27 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +const ( + AppLogsDataset = "apm.app" +) + +var ( + // LogProcessor is the Processor value that should be assigned to log events. + LogProcessor = Processor{Name: "log", Event: "log"} +) diff --git a/model/modelprocessor/datastream.go b/model/modelprocessor/datastream.go index df9395c859b..7add272b2ab 100644 --- a/model/modelprocessor/datastream.go +++ b/model/modelprocessor/datastream.go @@ -46,6 +46,9 @@ func (s *SetDataStream) setDataStream(event *model.APMEvent) { case model.ErrorProcessor: event.DataStream.Type = datastreams.LogsType event.DataStream.Dataset = model.ErrorsDataset + case model.LogProcessor: + event.DataStream.Type = datastreams.LogsType + event.DataStream.Dataset = model.AppLogsDataset case model.MetricsetProcessor: event.DataStream.Type = datastreams.MetricsType // Metrics that include well-defined transaction/span fields diff --git a/model/modelprocessor/datastream_test.go b/model/modelprocessor/datastream_test.go index 468c6fe40ed..cc11f88b439 100644 --- a/model/modelprocessor/datastream_test.go +++ b/model/modelprocessor/datastream_test.go @@ -43,6 +43,9 @@ func TestSetDataStream(t *testing.T) { }, { input: model.APMEvent{Processor: model.ErrorProcessor}, output: model.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "custom"}, + }, { + input: model.APMEvent{Processor: model.LogProcessor}, + output: model.DataStream{Type: "logs", Dataset: "apm.app", Namespace: "custom"}, }, { input: model.APMEvent{ Processor: model.MetricsetProcessor, diff --git a/processor/otel/exceptions_test.go b/processor/otel/exceptions_test.go index c93e6731374..6482ad0ffb3 100644 --- a/processor/otel/exceptions_test.go +++ b/processor/otel/exceptions_test.go @@ -44,6 +44,7 @@ import ( semconv "go.opentelemetry.io/collector/model/semconv/v1.5.0" "github.com/elastic/apm-server/model" + "github.com/elastic/beats/v7/libbeat/common" ) func TestEncodeSpanEventsNonExceptions(t *testing.T) { @@ -57,8 +58,10 @@ func TestEncodeSpanEventsNonExceptions(t *testing.T) { semconv.AttributeExceptionStacktrace: pdata.NewAttributeValueString("stacktrace"), }) - _, errors := transformTransactionSpanEvents(t, "java", nonExceptionEvent, incompleteExceptionEvent) - require.Empty(t, errors) + _, events := transformTransactionSpanEvents(t, "java", nonExceptionEvent, incompleteExceptionEvent) + require.Len(t, events, 2) + assert.Equal(t, model.LogProcessor, events[0].Processor) + assert.Equal(t, model.LogProcessor, events[1].Processor) } func TestEncodeSpanEventsJavaExceptions(t *testing.T) { @@ -114,6 +117,7 @@ Caused by: LowLevelException Service: service, Agent: agent, Timestamp: timestamp, + Labels: common.MapStr{}, Processor: model.ErrorProcessor, Trace: transactionEvent.Trace, Parent: model.Parent{ID: transactionEvent.Transaction.ID}, @@ -164,6 +168,7 @@ Caused by: LowLevelException Service: service, Agent: agent, Timestamp: timestamp, + Labels: common.MapStr{}, Processor: model.ErrorProcessor, Trace: transactionEvent.Trace, Parent: model.Parent{ID: transactionEvent.Transaction.ID}, @@ -323,6 +328,7 @@ func TestEncodeSpanEventsNonJavaExceptions(t *testing.T) { Service: service, Agent: agent, Timestamp: timestamp, + Labels: common.MapStr{}, Processor: model.ErrorProcessor, Trace: transactionEvent.Trace, Parent: model.Parent{ID: transactionEvent.Transaction.ID}, diff --git a/processor/otel/test_approved/span_jaeger_http.approved.json b/processor/otel/test_approved/span_jaeger_http.approved.json index d7944dfa5bc..e1fa3e19b86 100644 --- a/processor/otel/test_approved/span_jaeger_http.approved.json +++ b/processor/otel/test_approved/span_jaeger_http.approved.json @@ -309,6 +309,107 @@ "url": { "original": "http://foo.bar.com?a=12" } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "event": "baggage", + "isValid": false + }, + "parent": { + "id": "0000000058585858" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "original": "http://foo.bar.com?a=12" + } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "level": "info" + }, + "message": "retrying connection", + "parent": { + "id": "0000000058585858" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "original": "http://foo.bar.com?a=12" + } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "level": "error" + }, + "parent": { + "id": "0000000058585858" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "original": "http://foo.bar.com?a=12" + } } ] } diff --git a/processor/otel/test_approved/transaction_jaeger_full.approved.json b/processor/otel/test_approved/transaction_jaeger_full.approved.json index 21f131c53f9..dc866c795e0 100644 --- a/processor/otel/test_approved/transaction_jaeger_full.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_full.approved.json @@ -409,6 +409,113 @@ "query": "a=12", "scheme": "http" } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "event": "baggage", + "isValid": false + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown", + "version": "1.0" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "domain": "foo.bar.com", + "full": "http://foo.bar.com?a=12", + "original": "http://foo.bar.com?a=12", + "query": "a=12", + "scheme": "http" + } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "level": "info" + }, + "message": "retrying connection", + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown", + "version": "1.0" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "domain": "foo.bar.com", + "full": "http://foo.bar.com?a=12", + "original": "http://foo.bar.com?a=12", + "query": "a=12", + "scheme": "http" + } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "level": "error" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown", + "version": "1.0" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "domain": "foo.bar.com", + "full": "http://foo.bar.com?a=12", + "original": "http://foo.bar.com?a=12", + "query": "a=12", + "scheme": "http" + } } ] } diff --git a/processor/otel/traces.go b/processor/otel/traces.go index 3fc00f3123e..b2544801553 100644 --- a/processor/otel/traces.go +++ b/processor/otel/traces.go @@ -53,6 +53,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/apm-server/datastreams" logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/model" ) @@ -234,7 +235,7 @@ func (c *Consumer) convertSpan( event.Event.Outcome = "" // don't set event.outcome for span events event.Destination = model.Destination{} // don't set destination for span events for i := 0; i < events.Len(); i++ { - convertSpanEvent(logger, events.At(i), event, timeDelta, out) + *out = append(*out, convertSpanEvent(logger, events.At(i), event, timeDelta)) } } @@ -824,24 +825,22 @@ func convertSpanEvent( spanEvent pdata.SpanEvent, parent model.APMEvent, // either span or transaction timeDelta time.Duration, - out *model.Batch, -) { - var e *model.Error +) model.APMEvent { + event := parent + event.Labels = initEventLabels(event.Labels) + event.Transaction = nil + event.Span = nil + event.Timestamp = spanEvent.Timestamp().AsTime().Add(timeDelta) + isJaeger := strings.HasPrefix(parent.Agent.Name, "Jaeger") if isJaeger { - e = convertJaegerErrorSpanEvent(logger, spanEvent) - } else { + event.Error = convertJaegerErrorSpanEvent(logger, spanEvent, event.Labels) + } else if spanEvent.Name() == "exception" { // Translate exception span events to errors. // // If it's not Jaeger, we assume OpenTelemetry semantic semconv. - // - // TODO(axw) we don't currently support arbitrary events, we only look - // for exceptions and convert those to Elastic APM error events. - if spanEvent.Name() != "exception" { - // Per OpenTelemetry semantic conventions: - // `The name of the event MUST be "exception"` - return - } + // Per OpenTelemetry semantic conventions: + // `The name of the event MUST be "exception"` var exceptionEscaped bool var exceptionMessage, exceptionStacktrace, exceptionType string spanEvent.Attributes().Range(func(k string, v pdata.AttributeValue) bool { @@ -854,34 +853,39 @@ func convertSpanEvent( exceptionType = v.StringVal() case "exception.escaped": exceptionEscaped = v.BoolVal() + default: + event.Labels[replaceDots(k)] = ifaceAttributeValue(v) } return true }) - if exceptionMessage == "" && exceptionType == "" { + if exceptionMessage != "" || exceptionType != "" { // Per OpenTelemetry semantic conventions: // `At least one of the following sets of attributes is required: // - exception.type // - exception.message` - return + event.Error = convertOpenTelemetryExceptionSpanEvent( + exceptionType, exceptionMessage, exceptionStacktrace, + exceptionEscaped, parent.Service.Language.Name, + ) } - e = convertOpenTelemetryExceptionSpanEvent( - exceptionType, exceptionMessage, exceptionStacktrace, - exceptionEscaped, parent.Service.Language.Name, - ) } - if e != nil { - event := parent - event.Transaction = nil - event.Span = nil + + if event.Error != nil { event.Processor = model.ErrorProcessor - event.Error = e - event.Timestamp = spanEvent.Timestamp().AsTime().Add(timeDelta) setErrorContext(&event, parent) - *out = append(*out, event) + } else { + event.Processor = model.LogProcessor + event.DataStream.Type = datastreams.LogsType + event.Message = spanEvent.Name() + spanEvent.Attributes().Range(func(k string, v pdata.AttributeValue) bool { + event.Labels[replaceDots(k)] = ifaceAttributeValue(v) + return true + }) } + return event } -func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent) *model.Error { +func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent, labels common.MapStr) *model.Error { var isError bool var exMessage, exType string logMessage := event.Name() @@ -913,6 +917,8 @@ func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent) *mo isError = true case "level": isError = stringval == "error" + default: + labels[replaceDots(k)] = ifaceAttributeValue(v) } return true }) diff --git a/processor/otel/traces_test.go b/processor/otel/traces_test.go index a95a015b9a2..78482c98d28 100644 --- a/processor/otel/traces_test.go +++ b/processor/otel/traces_test.go @@ -1122,7 +1122,7 @@ func testJaegerLogs() []jaegermodel.Log { }, { Timestamp: testStartTime().Add(65 * time.Nanosecond), Fields: jaegerKeyValues( - "event", "retrying connection", + "message", "retrying connection", "level", "info", ), }, { @@ -1231,7 +1231,7 @@ func transformSpanWithAttributes(t *testing.T, attrs map[string]pdata.AttributeV return events[0] } -func transformTransactionSpanEvents(t *testing.T, language string, spanEvents ...pdata.SpanEvent) (transaction model.APMEvent, errors []model.APMEvent) { +func transformTransactionSpanEvents(t *testing.T, language string, spanEvents ...pdata.SpanEvent) (transaction model.APMEvent, events []model.APMEvent) { traces, spans := newTracesSpans() traces.ResourceSpans().At(0).Resource().Attributes().InitFromMap(map[string]pdata.AttributeValue{ semconv.AttributeTelemetrySDKLanguage: pdata.NewAttributeValueString(language), @@ -1242,12 +1242,10 @@ func transformTransactionSpanEvents(t *testing.T, language string, spanEvents .. for _, spanEvent := range spanEvents { spanEvent.CopyTo(otelSpan.Events().AppendEmpty()) } - events := transformTraces(t, traces) - require.NotEmpty(t, events) - errors = make([]model.APMEvent, len(events)-1) - copy(errors, events[1:]) - return events[0], errors + allEvents := transformTraces(t, traces) + require.NotEmpty(t, allEvents) + return allEvents[0], allEvents[1:] } func transformTraces(t *testing.T, traces pdata.Traces) model.Batch { diff --git a/systemtest/approvals/TestOTLPGRPCTraces.approved.json b/systemtest/approvals/TestOTLPGRPCTraces/data_streams_disabled.approved.json similarity index 100% rename from systemtest/approvals/TestOTLPGRPCTraces.approved.json rename to systemtest/approvals/TestOTLPGRPCTraces/data_streams_disabled.approved.json diff --git a/systemtest/approvals/TestOTLPGRPCTraces/data_streams_enabled.approved.json b/systemtest/approvals/TestOTLPGRPCTraces/data_streams_enabled.approved.json new file mode 100644 index 00000000000..ae27a4cfd3f --- /dev/null +++ b/systemtest/approvals/TestOTLPGRPCTraces/data_streams_enabled.approved.json @@ -0,0 +1,114 @@ +{ + "events": [ + { + "@timestamp": "1970-01-01T00:02:03.000Z", + "agent": { + "name": "opentelemetry/go", + "version": "0.19.0" + }, + "data_stream.dataset": "apm", + "data_stream.namespace": "default", + "data_stream.type": "traces", + "ecs": { + "version": "dynamic" + }, + "event": { + "agent_id_status": "missing", + "ingested": "dynamic", + "outcome": "success" + }, + "labels": { + "resource_attribute_array": [ + "a", + "b" + ], + "span_attribute_array": [ + "a", + "b", + "c" + ] + }, + "observer": { + "ephemeral_id": "dynamic", + "hostname": "dynamic", + "id": "dynamic", + "type": "apm-server", + "version": "dynamic", + "version_major": "dynamic" + }, + "processor": { + "event": "transaction", + "name": "transaction" + }, + "service": { + "framework": { + "name": "systemtest" + }, + "language": { + "name": "go" + }, + "name": "unknown_service_systemtest_test" + }, + "timestamp": { + "us": 123000000 + }, + "trace": { + "id": "d2acbef8b37655e48548fd9d61ad6114" + }, + "transaction": { + "duration": { + "us": 1000000 + }, + "id": "b3ee9be3b687a611", + "name": "operation_name", + "result": "Success", + "sampled": true, + "type": "custom" + } + }, + { + "@timestamp": "1970-01-01T00:02:03.001Z", + "agent": { + "name": "opentelemetry/go", + "version": "0.19.0" + }, + "data_stream.dataset": "apm.app", + "data_stream.namespace": "default", + "data_stream.type": "logs", + "ecs": { + "version": "dynamic" + }, + "labels": { + "resource_attribute_array": [ + "a", + "b" + ] + }, + "message": "a_span_event", + "observer": { + "ephemeral_id": "dynamic", + "hostname": "dynamic", + "id": "dynamic", + "type": "apm-server", + "version": "dynamic", + "version_major": "dynamic" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "framework": { + "name": "systemtest" + }, + "language": { + "name": "go" + }, + "name": "unknown_service_systemtest_test" + }, + "trace": { + "id": "d2acbef8b37655e48548fd9d61ad6114" + } + } + ] +} diff --git a/systemtest/otlp_test.go b/systemtest/otlp_test.go index 66296f0a2d3..053f623a976 100644 --- a/systemtest/otlp_test.go +++ b/systemtest/otlp_test.go @@ -64,9 +64,11 @@ func init() { } func TestOTLPGRPCTraces(t *testing.T) { - systemtest.CleanupElasticsearch(t) - srv := apmservertest.NewServer(t) + withDataStreams(t, testOTLPGRPCTraces) +} +func testOTLPGRPCTraces(t *testing.T, srv *apmservertest.Server) { + srv.Start() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -77,9 +79,19 @@ func TestOTLPGRPCTraces(t *testing.T) { ))) require.NoError(t, err) - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{ - estest.TermQuery{Field: "processor.event", Value: "transaction"}, - }}) + expectMin := 1 + if srv.Config.DataStreams != nil && srv.Config.DataStreams.Enabled { + expectMin++ // span events only indexed into data streams + } + + indices := "apm-*,traces-apm*,logs-apm*" + result := systemtest.Elasticsearch.ExpectMinDocs(t, expectMin, indices, estest.BoolQuery{ + Should: []interface{}{ + estest.TermQuery{Field: "processor.event", Value: "transaction"}, + estest.TermQuery{Field: "processor.event", Value: "log"}, + }, + MinimumShouldMatch: 1, + }) systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits) } @@ -331,6 +343,7 @@ func sendOTLPTrace(ctx context.Context, tracerProvider *sdktrace.TracerProvider) _, span := tracer.Start(ctx, "operation_name", trace.WithTimestamp(startTime), trace.WithAttributes( attribute.Array("span_attribute_array", []string{"a", "b", "c"}), )) + span.AddEvent("a_span_event", trace.WithTimestamp(startTime.Add(time.Millisecond))) span.End(trace.WithTimestamp(endTime)) return flushTracerProvider(ctx, tracerProvider) } diff --git a/testdata/jaeger/batch_0.approved.json b/testdata/jaeger/batch_0.approved.json index e98b42be11e..b102649bc54 100644 --- a/testdata/jaeger/batch_0.approved.json +++ b/testdata/jaeger/batch_0.approved.json @@ -48,6 +48,68 @@ "type": "custom" } }, + { + "@timestamp": "2019-12-20T07:41:44.954Z", + "agent": { + "ephemeral_id": "624386e9c81d2980", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host01", + "ip": "10.0.0.13" + }, + "labels": { + "event": "baggage", + "key": "customer", + "value": "Japanese Desserts" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "Go" + }, + "name": "driver" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + } + }, + { + "@timestamp": "2019-12-20T07:41:44.954Z", + "agent": { + "ephemeral_id": "624386e9c81d2980", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host01", + "ip": "10.0.0.13" + }, + "labels": { + "event": "Searching for nearby drivers", + "level": "info", + "location": "728,326" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "Go" + }, + "name": "driver" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + } + }, { "@timestamp": "2019-12-20T07:41:45.007Z", "agent": { @@ -185,6 +247,37 @@ "sampled": true, "type": "custom" } + }, + { + "@timestamp": "2019-12-20T07:41:45.197Z", + "agent": { + "ephemeral_id": "624386e9c81d2980", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host01", + "ip": "10.0.0.13" + }, + "labels": { + "event": "Search successful", + "level": "info", + "num_drivers": 10 + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "Go" + }, + "name": "driver" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + } } ] } diff --git a/testdata/jaeger/batch_1.approved.json b/testdata/jaeger/batch_1.approved.json index 7dcc2db5a9d..8020fb7285b 100644 --- a/testdata/jaeger/batch_1.approved.json +++ b/testdata/jaeger/batch_1.approved.json @@ -45,6 +45,39 @@ "id": "00000000000000007be2fd98d0973be3" } }, + { + "@timestamp": "2019-12-20T07:41:44.973Z", + "agent": { + "ephemeral_id": "2e3f8db3eb77fae0", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host01", + "ip": "10.0.0.13" + }, + "labels": { + "event": "Found drivers", + "level": "info" + }, + "parent": { + "id": "7be2fd98d0973be3" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "Go" + }, + "name": "redis" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + } + }, { "@timestamp": "2019-12-20T07:41:44.973Z", "agent": { @@ -111,6 +144,9 @@ "hostname": "host01", "ip": "10.0.0.13" }, + "labels": { + "driver_id": "T762465C" + }, "parent": { "id": "333295bfb438ea03" }, @@ -377,6 +413,9 @@ "hostname": "host01", "ip": "10.0.0.13" }, + "labels": { + "driver_id": "T781861C" + }, "parent": { "id": "614811d6c498bfb0" }, @@ -643,6 +682,9 @@ "hostname": "host01", "ip": "10.0.0.13" }, + "labels": { + "driver_id": "T752547C" + }, "parent": { "id": "0242ee3774d9eab1" },