diff --git a/beater/beater.go b/beater/beater.go index cd38d9a5e59..a425b9e74c2 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -46,6 +46,7 @@ import ( "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/logp" esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" "github.com/elastic/apm-server/beater/config" @@ -356,10 +357,20 @@ func (s *serverRunner) run(listener net.Listener) error { // Send config to telemetry. recordAPMServerConfig(s.config) + publisherConfig := &publish.PublisherConfig{ Pipeline: s.config.Pipeline, Namespace: s.namespace, } + if !s.config.DataStreams.Enabled { + // Logs are only supported with data streams; + // add a beat.Processor which drops them. + dropLogsProcessor, err := newDropLogsBeatProcessor() + if err != nil { + return err + } + publisherConfig.Processor = dropLogsProcessor + } var kibanaClient kibana_client.Client if s.config.Kibana.Enabled { @@ -754,3 +765,17 @@ type transformerFunc func(context.Context) []beat.Event func (f transformerFunc) Transform(ctx context.Context) []beat.Event { return f(ctx) } + +func newDropLogsBeatProcessor() (beat.ProcessorList, error) { + return processors.New(processors.PluginConfig{ + common.MustNewConfigFrom(map[string]interface{}{ + "drop_event": map[string]interface{}{ + "when": map[string]interface{}{ + "contains": map[string]interface{}{ + "processor.event": "log", + }, + }, + }, + }), + }) +} diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc new file mode 100644 index 00000000000..020a6d6b5f9 --- /dev/null +++ b/changelogs/head.asciidoc @@ -0,0 +1,42 @@ +[[release-notes-head]] +== APM Server version HEAD + +https://github.com/elastic/apm-server/compare/7.13\...master[View commits] + +[float] +==== Breaking Changes +- `network.connection_type` is now `network.connection.type` {pull}5671[5671] +- `transaction.page` and `error.page` no longer recorded {pull}5872[5872] +- experimental:["This breaking change applies to the experimental tail-based sampling feature."] `apm-server.sampling.tail` now requires `apm-server.data_streams.enabled` {pull}5952[5952] +- beta:["This breaking change applies to the beta <>."] The `traces-sampled-*` data stream is now `traces-apm.sampled-*` {pull}5952[5952] +- Removed unused stacktrace/frame monitoring counters {pull}5984[5984] +- Removed unused support for top-level metricsets and metricset tags for RUMv3 {pull}6065[6065] +- Removed `apm-server.mode` configuration, and "experimental" fields {pull}6086[6086] +- `transaction.sampled` is now only set for sampled transactions {pull}6066[6066] +- Unknown metrics are dropped when `transaction.*` or `span.*` are present in a metricset {pull}6111[6111] +- Removed `metricset.period` from service_destination metrics {pull}6111[6111] + +[float] +==== Bug fixes +- Fix apm_error_grouping_name and apm_convert_destination_address {pull}5876[5876] +- corrected OTel attribute names for `net.host.connection.*` {pull}5671[5671] +- Fix response to agent config when running under Fleet with no agent config defined {pull}5917[5917] +- Fix handling of OTLP sum/gauge metrics with integer values {pull}6106[6106] + +[float] +==== Intake API Changes +- `network.connection.type` was added to stream metadata {pull}5671[5671] + +[float] +==== Added +- `service_destination` span metrics now take into account composite spans {pull}5896[5896] +- add zero-downtime config reloads via `SO_REUSEPORT` {pull}5911[5911] +- experimental support for writing data streams in standalone mode {pull}5928[5928] +- Data streams now define a default `dynamic` mapping parameter, overridable in the `@custom` template {pull}5947[5947] +- The `error.log.message` or `error.exception.message` field of errors will be copied to the ECS field `message` {pull}5974[5974] +- Define index sorting for internal metrics data stream {pull}6116[6116] +- Add histogram dynamic_template to app metrics data stream {pull}6043[6043] +- Index OpenTelemetry span events and Jaeger logs into a log data stream {pull}6122[6122] + +[float] +==== Deprecated diff --git a/docs/guide/opentelemetry-elastic.asciidoc b/docs/guide/opentelemetry-elastic.asciidoc index 71eef16b396..ae2538be83d 100644 --- a/docs/guide/opentelemetry-elastic.asciidoc +++ b/docs/guide/opentelemetry-elastic.asciidoc @@ -371,23 +371,20 @@ Here is an example of AWS Lambda Node.js function managed with Terraform and the * https://github.com/michaelhyatt/terraform-aws-nodejs-api-worker-otel/tree/v0.23[Sample Terraform code] [[elastic-open-telemetry-known-limitations]] -==== Limitations in 7.13 +==== Limitations [[elastic-open-telemetry-traces-limitations]] ===== OpenTelemetry traces * Traces of applications using `messaging` semantics might be wrongly displayed or not shown in the APM UI. You may only see `spans` coming from such services, but no `transaction` https://github.com/elastic/apm-server/issues/5094[#5094] -* Inability to see Stack traces in spans or, in general, arbitrary span events for applications instrumented with OpenTelemetry https://github.com/elastic/apm-server/issues/4715[#4715] +* Inability to see Stack traces in spans * Inability in APM views to view the "Time Spent by Span Type" https://github.com/elastic/apm-server/issues/5747[#5747] * Metrics derived from traces (throughput, latency, and errors) are not accurate when traces are sampled before being ingested by Elastic Observability (ie by an OpenTelemetry Collector or OpenTelemetry APM agent or SDK) https://github.com/elastic/apm/issues/472[#472] [[elastic-open-telemetry-metrics-limitations]] ===== OpenTelemetry metrics -* OpenTelemetry histogram metrics, https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-api/latest/io/opentelemetry/api/metrics/DoubleValueRecorder.html[`DoubleValueRecorder`] -and https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-api/latest/io/opentelemetry/api/metrics/LongValueObserver.html[`LongValueRecorder`], are not yet supported https://github.com/elastic/apm-server/issues/3195[#3195] -* Inability to see JVM metrics in Elastic APM UI for Java applications instrumented with OpenTelemetry https://github.com/elastic/apm-server/issues/4919[#4919] * Inability to see host metrics in Elastic Metrics Infrastructure view when using the OpenTelemetry Collector host metrics receiver https://github.com/elastic/apm-server/issues/5310[#5310] [[elastic-open-telemetry-logs-limitations]] diff --git a/docs/jaeger-reference.asciidoc b/docs/jaeger-reference.asciidoc index 10ed2b7fa2e..9f774a10620 100644 --- a/docs/jaeger-reference.asciidoc +++ b/docs/jaeger-reference.asciidoc @@ -77,7 +77,6 @@ There are some limitations and differences between Elastic APM and Jaeger that y * Because Jaeger has its own trace context header, and does not currently support W3C trace context headers, it is not possible to mix and match the use of Elastic's APM agents and Jaeger's clients. * Elastic APM only supports probabilistic sampling. -* We currently only support exception logging. Span logs are not supported. *Differences between APM Agents and Jaeger Clients:* diff --git a/model/log.go b/model/log.go new file mode 100644 index 00000000000..97f44c6a54f --- /dev/null +++ b/model/log.go @@ -0,0 +1,27 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +const ( + AppLogsDataset = "apm.app" +) + +var ( + // LogProcessor is the Processor value that should be assigned to log events. + LogProcessor = Processor{Name: "log", Event: "log"} +) diff --git a/model/modelprocessor/datastream.go b/model/modelprocessor/datastream.go index df9395c859b..7add272b2ab 100644 --- a/model/modelprocessor/datastream.go +++ b/model/modelprocessor/datastream.go @@ -46,6 +46,9 @@ func (s *SetDataStream) setDataStream(event *model.APMEvent) { case model.ErrorProcessor: event.DataStream.Type = datastreams.LogsType event.DataStream.Dataset = model.ErrorsDataset + case model.LogProcessor: + event.DataStream.Type = datastreams.LogsType + event.DataStream.Dataset = model.AppLogsDataset case model.MetricsetProcessor: event.DataStream.Type = datastreams.MetricsType // Metrics that include well-defined transaction/span fields diff --git a/model/modelprocessor/datastream_test.go b/model/modelprocessor/datastream_test.go index 468c6fe40ed..cc11f88b439 100644 --- a/model/modelprocessor/datastream_test.go +++ b/model/modelprocessor/datastream_test.go @@ -43,6 +43,9 @@ func TestSetDataStream(t *testing.T) { }, { input: model.APMEvent{Processor: model.ErrorProcessor}, output: model.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "custom"}, + }, { + input: model.APMEvent{Processor: model.LogProcessor}, + output: model.DataStream{Type: "logs", Dataset: "apm.app", Namespace: "custom"}, }, { input: model.APMEvent{ Processor: model.MetricsetProcessor, diff --git a/processor/otel/exceptions_test.go b/processor/otel/exceptions_test.go index c93e6731374..6482ad0ffb3 100644 --- a/processor/otel/exceptions_test.go +++ b/processor/otel/exceptions_test.go @@ -44,6 +44,7 @@ import ( semconv "go.opentelemetry.io/collector/model/semconv/v1.5.0" "github.com/elastic/apm-server/model" + "github.com/elastic/beats/v7/libbeat/common" ) func TestEncodeSpanEventsNonExceptions(t *testing.T) { @@ -57,8 +58,10 @@ func TestEncodeSpanEventsNonExceptions(t *testing.T) { semconv.AttributeExceptionStacktrace: pdata.NewAttributeValueString("stacktrace"), }) - _, errors := transformTransactionSpanEvents(t, "java", nonExceptionEvent, incompleteExceptionEvent) - require.Empty(t, errors) + _, events := transformTransactionSpanEvents(t, "java", nonExceptionEvent, incompleteExceptionEvent) + require.Len(t, events, 2) + assert.Equal(t, model.LogProcessor, events[0].Processor) + assert.Equal(t, model.LogProcessor, events[1].Processor) } func TestEncodeSpanEventsJavaExceptions(t *testing.T) { @@ -114,6 +117,7 @@ Caused by: LowLevelException Service: service, Agent: agent, Timestamp: timestamp, + Labels: common.MapStr{}, Processor: model.ErrorProcessor, Trace: transactionEvent.Trace, Parent: model.Parent{ID: transactionEvent.Transaction.ID}, @@ -164,6 +168,7 @@ Caused by: LowLevelException Service: service, Agent: agent, Timestamp: timestamp, + Labels: common.MapStr{}, Processor: model.ErrorProcessor, Trace: transactionEvent.Trace, Parent: model.Parent{ID: transactionEvent.Transaction.ID}, @@ -323,6 +328,7 @@ func TestEncodeSpanEventsNonJavaExceptions(t *testing.T) { Service: service, Agent: agent, Timestamp: timestamp, + Labels: common.MapStr{}, Processor: model.ErrorProcessor, Trace: transactionEvent.Trace, Parent: model.Parent{ID: transactionEvent.Transaction.ID}, diff --git a/processor/otel/test_approved/span_jaeger_http.approved.json b/processor/otel/test_approved/span_jaeger_http.approved.json index d7944dfa5bc..e1fa3e19b86 100644 --- a/processor/otel/test_approved/span_jaeger_http.approved.json +++ b/processor/otel/test_approved/span_jaeger_http.approved.json @@ -309,6 +309,107 @@ "url": { "original": "http://foo.bar.com?a=12" } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "event": "baggage", + "isValid": false + }, + "parent": { + "id": "0000000058585858" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "original": "http://foo.bar.com?a=12" + } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "level": "info" + }, + "message": "retrying connection", + "parent": { + "id": "0000000058585858" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "original": "http://foo.bar.com?a=12" + } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "level": "error" + }, + "parent": { + "id": "0000000058585858" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "original": "http://foo.bar.com?a=12" + } } ] } diff --git a/processor/otel/test_approved/transaction_jaeger_full.approved.json b/processor/otel/test_approved/transaction_jaeger_full.approved.json index 21f131c53f9..dc866c795e0 100644 --- a/processor/otel/test_approved/transaction_jaeger_full.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_full.approved.json @@ -409,6 +409,113 @@ "query": "a=12", "scheme": "http" } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "event": "baggage", + "isValid": false + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown", + "version": "1.0" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "domain": "foo.bar.com", + "full": "http://foo.bar.com?a=12", + "original": "http://foo.bar.com?a=12", + "query": "a=12", + "scheme": "http" + } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "level": "info" + }, + "message": "retrying connection", + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown", + "version": "1.0" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "domain": "foo.bar.com", + "full": "http://foo.bar.com?a=12", + "original": "http://foo.bar.com?a=12", + "query": "a=12", + "scheme": "http" + } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host-abc" + }, + "labels": { + "level": "error" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown", + "version": "1.0" + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "url": { + "domain": "foo.bar.com", + "full": "http://foo.bar.com?a=12", + "original": "http://foo.bar.com?a=12", + "query": "a=12", + "scheme": "http" + } } ] } diff --git a/processor/otel/traces.go b/processor/otel/traces.go index 3fc00f3123e..b2544801553 100644 --- a/processor/otel/traces.go +++ b/processor/otel/traces.go @@ -53,6 +53,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/apm-server/datastreams" logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/model" ) @@ -234,7 +235,7 @@ func (c *Consumer) convertSpan( event.Event.Outcome = "" // don't set event.outcome for span events event.Destination = model.Destination{} // don't set destination for span events for i := 0; i < events.Len(); i++ { - convertSpanEvent(logger, events.At(i), event, timeDelta, out) + *out = append(*out, convertSpanEvent(logger, events.At(i), event, timeDelta)) } } @@ -824,24 +825,22 @@ func convertSpanEvent( spanEvent pdata.SpanEvent, parent model.APMEvent, // either span or transaction timeDelta time.Duration, - out *model.Batch, -) { - var e *model.Error +) model.APMEvent { + event := parent + event.Labels = initEventLabels(event.Labels) + event.Transaction = nil + event.Span = nil + event.Timestamp = spanEvent.Timestamp().AsTime().Add(timeDelta) + isJaeger := strings.HasPrefix(parent.Agent.Name, "Jaeger") if isJaeger { - e = convertJaegerErrorSpanEvent(logger, spanEvent) - } else { + event.Error = convertJaegerErrorSpanEvent(logger, spanEvent, event.Labels) + } else if spanEvent.Name() == "exception" { // Translate exception span events to errors. // // If it's not Jaeger, we assume OpenTelemetry semantic semconv. - // - // TODO(axw) we don't currently support arbitrary events, we only look - // for exceptions and convert those to Elastic APM error events. - if spanEvent.Name() != "exception" { - // Per OpenTelemetry semantic conventions: - // `The name of the event MUST be "exception"` - return - } + // Per OpenTelemetry semantic conventions: + // `The name of the event MUST be "exception"` var exceptionEscaped bool var exceptionMessage, exceptionStacktrace, exceptionType string spanEvent.Attributes().Range(func(k string, v pdata.AttributeValue) bool { @@ -854,34 +853,39 @@ func convertSpanEvent( exceptionType = v.StringVal() case "exception.escaped": exceptionEscaped = v.BoolVal() + default: + event.Labels[replaceDots(k)] = ifaceAttributeValue(v) } return true }) - if exceptionMessage == "" && exceptionType == "" { + if exceptionMessage != "" || exceptionType != "" { // Per OpenTelemetry semantic conventions: // `At least one of the following sets of attributes is required: // - exception.type // - exception.message` - return + event.Error = convertOpenTelemetryExceptionSpanEvent( + exceptionType, exceptionMessage, exceptionStacktrace, + exceptionEscaped, parent.Service.Language.Name, + ) } - e = convertOpenTelemetryExceptionSpanEvent( - exceptionType, exceptionMessage, exceptionStacktrace, - exceptionEscaped, parent.Service.Language.Name, - ) } - if e != nil { - event := parent - event.Transaction = nil - event.Span = nil + + if event.Error != nil { event.Processor = model.ErrorProcessor - event.Error = e - event.Timestamp = spanEvent.Timestamp().AsTime().Add(timeDelta) setErrorContext(&event, parent) - *out = append(*out, event) + } else { + event.Processor = model.LogProcessor + event.DataStream.Type = datastreams.LogsType + event.Message = spanEvent.Name() + spanEvent.Attributes().Range(func(k string, v pdata.AttributeValue) bool { + event.Labels[replaceDots(k)] = ifaceAttributeValue(v) + return true + }) } + return event } -func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent) *model.Error { +func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent, labels common.MapStr) *model.Error { var isError bool var exMessage, exType string logMessage := event.Name() @@ -913,6 +917,8 @@ func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent) *mo isError = true case "level": isError = stringval == "error" + default: + labels[replaceDots(k)] = ifaceAttributeValue(v) } return true }) diff --git a/processor/otel/traces_test.go b/processor/otel/traces_test.go index a95a015b9a2..78482c98d28 100644 --- a/processor/otel/traces_test.go +++ b/processor/otel/traces_test.go @@ -1122,7 +1122,7 @@ func testJaegerLogs() []jaegermodel.Log { }, { Timestamp: testStartTime().Add(65 * time.Nanosecond), Fields: jaegerKeyValues( - "event", "retrying connection", + "message", "retrying connection", "level", "info", ), }, { @@ -1231,7 +1231,7 @@ func transformSpanWithAttributes(t *testing.T, attrs map[string]pdata.AttributeV return events[0] } -func transformTransactionSpanEvents(t *testing.T, language string, spanEvents ...pdata.SpanEvent) (transaction model.APMEvent, errors []model.APMEvent) { +func transformTransactionSpanEvents(t *testing.T, language string, spanEvents ...pdata.SpanEvent) (transaction model.APMEvent, events []model.APMEvent) { traces, spans := newTracesSpans() traces.ResourceSpans().At(0).Resource().Attributes().InitFromMap(map[string]pdata.AttributeValue{ semconv.AttributeTelemetrySDKLanguage: pdata.NewAttributeValueString(language), @@ -1242,12 +1242,10 @@ func transformTransactionSpanEvents(t *testing.T, language string, spanEvents .. for _, spanEvent := range spanEvents { spanEvent.CopyTo(otelSpan.Events().AppendEmpty()) } - events := transformTraces(t, traces) - require.NotEmpty(t, events) - errors = make([]model.APMEvent, len(events)-1) - copy(errors, events[1:]) - return events[0], errors + allEvents := transformTraces(t, traces) + require.NotEmpty(t, allEvents) + return allEvents[0], allEvents[1:] } func transformTraces(t *testing.T, traces pdata.Traces) model.Batch { diff --git a/systemtest/approvals/TestOTLPGRPCTraces.approved.json b/systemtest/approvals/TestOTLPGRPCTraces/data_streams_disabled.approved.json similarity index 100% rename from systemtest/approvals/TestOTLPGRPCTraces.approved.json rename to systemtest/approvals/TestOTLPGRPCTraces/data_streams_disabled.approved.json diff --git a/systemtest/approvals/TestOTLPGRPCTraces/data_streams_enabled.approved.json b/systemtest/approvals/TestOTLPGRPCTraces/data_streams_enabled.approved.json new file mode 100644 index 00000000000..ae27a4cfd3f --- /dev/null +++ b/systemtest/approvals/TestOTLPGRPCTraces/data_streams_enabled.approved.json @@ -0,0 +1,114 @@ +{ + "events": [ + { + "@timestamp": "1970-01-01T00:02:03.000Z", + "agent": { + "name": "opentelemetry/go", + "version": "0.19.0" + }, + "data_stream.dataset": "apm", + "data_stream.namespace": "default", + "data_stream.type": "traces", + "ecs": { + "version": "dynamic" + }, + "event": { + "agent_id_status": "missing", + "ingested": "dynamic", + "outcome": "success" + }, + "labels": { + "resource_attribute_array": [ + "a", + "b" + ], + "span_attribute_array": [ + "a", + "b", + "c" + ] + }, + "observer": { + "ephemeral_id": "dynamic", + "hostname": "dynamic", + "id": "dynamic", + "type": "apm-server", + "version": "dynamic", + "version_major": "dynamic" + }, + "processor": { + "event": "transaction", + "name": "transaction" + }, + "service": { + "framework": { + "name": "systemtest" + }, + "language": { + "name": "go" + }, + "name": "unknown_service_systemtest_test" + }, + "timestamp": { + "us": 123000000 + }, + "trace": { + "id": "d2acbef8b37655e48548fd9d61ad6114" + }, + "transaction": { + "duration": { + "us": 1000000 + }, + "id": "b3ee9be3b687a611", + "name": "operation_name", + "result": "Success", + "sampled": true, + "type": "custom" + } + }, + { + "@timestamp": "1970-01-01T00:02:03.001Z", + "agent": { + "name": "opentelemetry/go", + "version": "0.19.0" + }, + "data_stream.dataset": "apm.app", + "data_stream.namespace": "default", + "data_stream.type": "logs", + "ecs": { + "version": "dynamic" + }, + "labels": { + "resource_attribute_array": [ + "a", + "b" + ] + }, + "message": "a_span_event", + "observer": { + "ephemeral_id": "dynamic", + "hostname": "dynamic", + "id": "dynamic", + "type": "apm-server", + "version": "dynamic", + "version_major": "dynamic" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "framework": { + "name": "systemtest" + }, + "language": { + "name": "go" + }, + "name": "unknown_service_systemtest_test" + }, + "trace": { + "id": "d2acbef8b37655e48548fd9d61ad6114" + } + } + ] +} diff --git a/systemtest/otlp_test.go b/systemtest/otlp_test.go index 66296f0a2d3..fda8c090e79 100644 --- a/systemtest/otlp_test.go +++ b/systemtest/otlp_test.go @@ -64,22 +64,42 @@ func init() { } func TestOTLPGRPCTraces(t *testing.T) { - systemtest.CleanupElasticsearch(t) - srv := apmservertest.NewServer(t) + withDataStreams(t, testOTLPGRPCTraces) +} +func testOTLPGRPCTraces(t *testing.T, srv *apmservertest.Server) { + srv.Start() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := sendOTLPTrace(ctx, newOTLPTracerProvider(newOTLPExporter(t, srv), sdktrace.WithResource( + err := withOTLPTracer(newOTLPTracerProvider(newOTLPExporter(t, srv), sdktrace.WithResource( resource.Merge(resource.Default(), sdkresource.NewWithAttributes( attribute.Array("resource_attribute_array", []string{"a", "b"}), )), - ))) + )), func(tracer trace.Tracer) { + startTime := time.Unix(123, 456) + endTime := startTime.Add(time.Second) + _, span := tracer.Start(ctx, "operation_name", trace.WithTimestamp(startTime), trace.WithAttributes( + attribute.Array("span_attribute_array", []string{"a", "b", "c"}), + )) + span.AddEvent("a_span_event", trace.WithTimestamp(startTime.Add(time.Millisecond))) + span.End(trace.WithTimestamp(endTime)) + }) require.NoError(t, err) - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{ - estest.TermQuery{Field: "processor.event", Value: "transaction"}, - }}) + expectMin := 1 + if srv.Config.DataStreams != nil && srv.Config.DataStreams.Enabled { + expectMin++ // span events only indexed into data streams + } + + indices := "apm-*,traces-apm*,logs-apm*" + result := systemtest.Elasticsearch.ExpectMinDocs(t, expectMin, indices, estest.BoolQuery{ + Should: []interface{}{ + estest.TermQuery{Field: "processor.event", Value: "transaction"}, + estest.TermQuery{Field: "processor.event", Value: "log"}, + }, + MinimumShouldMatch: 1, + }) systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits) } @@ -325,16 +345,23 @@ func newOTLPTracerProvider(exporter *otlp.Exporter, options ...sdktrace.TracerPr } func sendOTLPTrace(ctx context.Context, tracerProvider *sdktrace.TracerProvider) error { - tracer := tracerProvider.Tracer("systemtest") - startTime := time.Unix(123, 456) - endTime := startTime.Add(time.Second) - _, span := tracer.Start(ctx, "operation_name", trace.WithTimestamp(startTime), trace.WithAttributes( - attribute.Array("span_attribute_array", []string{"a", "b", "c"}), - )) - span.End(trace.WithTimestamp(endTime)) + return withOTLPTracer(tracerProvider, func(tracer trace.Tracer) { + startTime := time.Unix(123, 456) + endTime := startTime.Add(time.Second) + _, span := tracer.Start(ctx, "operation_name", trace.WithTimestamp(startTime), trace.WithAttributes( + attribute.Array("span_attribute_array", []string{"a", "b", "c"}), + )) + span.End(trace.WithTimestamp(endTime)) + }) return flushTracerProvider(ctx, tracerProvider) } +func withOTLPTracer(tracerProvider *sdktrace.TracerProvider, f func(trace.Tracer)) error { + tracer := tracerProvider.Tracer("systemtest") + f(tracer) + return flushTracerProvider(context.Background(), tracerProvider) +} + func flushTracerProvider(ctx context.Context, tracerProvider *sdktrace.TracerProvider) error { if err := tracerProvider.ForceFlush(ctx); err != nil { return err diff --git a/testdata/jaeger/batch_0.approved.json b/testdata/jaeger/batch_0.approved.json index e98b42be11e..b102649bc54 100644 --- a/testdata/jaeger/batch_0.approved.json +++ b/testdata/jaeger/batch_0.approved.json @@ -48,6 +48,68 @@ "type": "custom" } }, + { + "@timestamp": "2019-12-20T07:41:44.954Z", + "agent": { + "ephemeral_id": "624386e9c81d2980", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host01", + "ip": "10.0.0.13" + }, + "labels": { + "event": "baggage", + "key": "customer", + "value": "Japanese Desserts" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "Go" + }, + "name": "driver" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + } + }, + { + "@timestamp": "2019-12-20T07:41:44.954Z", + "agent": { + "ephemeral_id": "624386e9c81d2980", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host01", + "ip": "10.0.0.13" + }, + "labels": { + "event": "Searching for nearby drivers", + "level": "info", + "location": "728,326" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "Go" + }, + "name": "driver" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + } + }, { "@timestamp": "2019-12-20T07:41:45.007Z", "agent": { @@ -185,6 +247,37 @@ "sampled": true, "type": "custom" } + }, + { + "@timestamp": "2019-12-20T07:41:45.197Z", + "agent": { + "ephemeral_id": "624386e9c81d2980", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host01", + "ip": "10.0.0.13" + }, + "labels": { + "event": "Search successful", + "level": "info", + "num_drivers": 10 + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "Go" + }, + "name": "driver" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + } } ] } diff --git a/testdata/jaeger/batch_1.approved.json b/testdata/jaeger/batch_1.approved.json index 7dcc2db5a9d..8020fb7285b 100644 --- a/testdata/jaeger/batch_1.approved.json +++ b/testdata/jaeger/batch_1.approved.json @@ -45,6 +45,39 @@ "id": "00000000000000007be2fd98d0973be3" } }, + { + "@timestamp": "2019-12-20T07:41:44.973Z", + "agent": { + "ephemeral_id": "2e3f8db3eb77fae0", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "data_stream.type": "logs", + "host": { + "hostname": "host01", + "ip": "10.0.0.13" + }, + "labels": { + "event": "Found drivers", + "level": "info" + }, + "parent": { + "id": "7be2fd98d0973be3" + }, + "processor": { + "event": "log", + "name": "log" + }, + "service": { + "language": { + "name": "Go" + }, + "name": "redis" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + } + }, { "@timestamp": "2019-12-20T07:41:44.973Z", "agent": { @@ -111,6 +144,9 @@ "hostname": "host01", "ip": "10.0.0.13" }, + "labels": { + "driver_id": "T762465C" + }, "parent": { "id": "333295bfb438ea03" }, @@ -377,6 +413,9 @@ "hostname": "host01", "ip": "10.0.0.13" }, + "labels": { + "driver_id": "T781861C" + }, "parent": { "id": "614811d6c498bfb0" }, @@ -643,6 +682,9 @@ "hostname": "host01", "ip": "10.0.0.13" }, + "labels": { + "driver_id": "T752547C" + }, "parent": { "id": "0242ee3774d9eab1" },