Skip to content

Commit

Permalink
processor/otel: index span events as logs
Browse files Browse the repository at this point in the history
Produce log events for non-exception span events,
capturing the span event name as the log message,
and all other attributes as labels.

We only index logs when data streams are enabled;
they are dropped when classic indices are in use.
  • Loading branch information
axw committed Sep 6, 2021
1 parent 43e491f commit 77b0554
Show file tree
Hide file tree
Showing 17 changed files with 583 additions and 48 deletions.
25 changes: 25 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

"github.com/elastic/apm-server/beater/config"
Expand Down Expand Up @@ -356,10 +357,20 @@ func (s *serverRunner) run(listener net.Listener) error {

// Send config to telemetry.
recordAPMServerConfig(s.config)

publisherConfig := &publish.PublisherConfig{
Pipeline: s.config.Pipeline,
Namespace: s.namespace,
}
if !s.config.DataStreams.Enabled {
// Logs are only supported with data streams;
// add a beat.Processor which drops them.
dropLogsProcessor, err := newDropLogsBeatProcessor()
if err != nil {
return err
}
publisherConfig.Processor = dropLogsProcessor
}

var kibanaClient kibana_client.Client
if s.config.Kibana.Enabled {
Expand Down Expand Up @@ -755,3 +766,17 @@ type transformerFunc func(context.Context) []beat.Event
func (f transformerFunc) Transform(ctx context.Context) []beat.Event {
return f(ctx)
}

func newDropLogsBeatProcessor() (beat.ProcessorList, error) {
return processors.New(processors.PluginConfig{
common.MustNewConfigFrom(map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"contains": map[string]interface{}{
"processor.event": "log",
},
},
},
}),
})
}
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
- The `error.log.message` or `error.exception.message` field of errors will be copied to the ECS field `message` {pull}5974[5974]
- Define index sorting for internal metrics data stream {pull}6116[6116]
- Add histogram dynamic_template to app metrics data stream {pull}6043[6043]
- Index OpenTelemetry span events and Jaeger logs into a log data stream {pull}6122[6122]

[float]
==== Deprecated
7 changes: 2 additions & 5 deletions docs/guide/opentelemetry-elastic.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -371,23 +371,20 @@ Here is an example of AWS Lambda Node.js function managed with Terraform and the
* https://github.com/michaelhyatt/terraform-aws-nodejs-api-worker-otel/tree/v0.23[Sample Terraform code]

[[elastic-open-telemetry-known-limitations]]
==== Limitations in 7.13
==== Limitations


[[elastic-open-telemetry-traces-limitations]]
===== OpenTelemetry traces

* Traces of applications using `messaging` semantics might be wrongly displayed or not shown in the APM UI. You may only see `spans` coming from such services, but no `transaction` https://github.com/elastic/apm-server/issues/5094[#5094]
* Inability to see Stack traces in spans or, in general, arbitrary span events for applications instrumented with OpenTelemetry https://github.com/elastic/apm-server/issues/4715[#4715]
* Inability to see Stack traces in spans
* Inability in APM views to view the "Time Spent by Span Type" https://github.com/elastic/apm-server/issues/5747[#5747]
* Metrics derived from traces (throughput, latency, and errors) are not accurate when traces are sampled before being ingested by Elastic Observability (ie by an OpenTelemetry Collector or OpenTelemetry APM agent or SDK) https://github.com/elastic/apm/issues/472[#472]

[[elastic-open-telemetry-metrics-limitations]]
===== OpenTelemetry metrics

* OpenTelemetry histogram metrics, https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-api/latest/io/opentelemetry/api/metrics/DoubleValueRecorder.html[`DoubleValueRecorder`]
and https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-api/latest/io/opentelemetry/api/metrics/LongValueObserver.html[`LongValueRecorder`], are not yet supported https://github.com/elastic/apm-server/issues/3195[#3195]
* Inability to see JVM metrics in Elastic APM UI for Java applications instrumented with OpenTelemetry https://github.com/elastic/apm-server/issues/4919[#4919]
* Inability to see host metrics in Elastic Metrics Infrastructure view when using the OpenTelemetry Collector host metrics receiver https://github.com/elastic/apm-server/issues/5310[#5310]

[[elastic-open-telemetry-logs-limitations]]
Expand Down
1 change: 0 additions & 1 deletion docs/jaeger-reference.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ There are some limitations and differences between Elastic APM and Jaeger that y
* Because Jaeger has its own trace context header, and does not currently support W3C trace context headers,
it is not possible to mix and match the use of Elastic's APM agents and Jaeger's clients.
* Elastic APM only supports probabilistic sampling.
* We currently only support exception logging. Span logs are not supported.

*Differences between APM Agents and Jaeger Clients:*

Expand Down
27 changes: 27 additions & 0 deletions model/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package model

const (
AppLogsDataset = "apm.app"
)

var (
// LogProcessor is the Processor value that should be assigned to log events.
LogProcessor = Processor{Name: "log", Event: "log"}
)
3 changes: 3 additions & 0 deletions model/modelprocessor/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (s *SetDataStream) setDataStream(event *model.APMEvent) {
case model.ErrorProcessor:
event.DataStream.Type = datastreams.LogsType
event.DataStream.Dataset = model.ErrorsDataset
case model.LogProcessor:
event.DataStream.Type = datastreams.LogsType
event.DataStream.Dataset = model.AppLogsDataset
case model.MetricsetProcessor:
event.DataStream.Type = datastreams.MetricsType
// Metrics that include well-defined transaction/span fields
Expand Down
3 changes: 3 additions & 0 deletions model/modelprocessor/datastream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func TestSetDataStream(t *testing.T) {
}, {
input: model.APMEvent{Processor: model.ErrorProcessor},
output: model.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "custom"},
}, {
input: model.APMEvent{Processor: model.LogProcessor},
output: model.DataStream{Type: "logs", Dataset: "apm.app", Namespace: "custom"},
}, {
input: model.APMEvent{
Processor: model.MetricsetProcessor,
Expand Down
10 changes: 8 additions & 2 deletions processor/otel/exceptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
semconv "go.opentelemetry.io/collector/model/semconv/v1.5.0"

"github.com/elastic/apm-server/model"
"github.com/elastic/beats/v7/libbeat/common"
)

func TestEncodeSpanEventsNonExceptions(t *testing.T) {
Expand All @@ -57,8 +58,10 @@ func TestEncodeSpanEventsNonExceptions(t *testing.T) {
semconv.AttributeExceptionStacktrace: pdata.NewAttributeValueString("stacktrace"),
})

_, errors := transformTransactionSpanEvents(t, "java", nonExceptionEvent, incompleteExceptionEvent)
require.Empty(t, errors)
_, events := transformTransactionSpanEvents(t, "java", nonExceptionEvent, incompleteExceptionEvent)
require.Len(t, events, 2)
assert.Equal(t, model.LogProcessor, events[0].Processor)
assert.Equal(t, model.LogProcessor, events[1].Processor)
}

func TestEncodeSpanEventsJavaExceptions(t *testing.T) {
Expand Down Expand Up @@ -114,6 +117,7 @@ Caused by: LowLevelException
Service: service,
Agent: agent,
Timestamp: timestamp,
Labels: common.MapStr{},
Processor: model.ErrorProcessor,
Trace: transactionEvent.Trace,
Parent: model.Parent{ID: transactionEvent.Transaction.ID},
Expand Down Expand Up @@ -164,6 +168,7 @@ Caused by: LowLevelException
Service: service,
Agent: agent,
Timestamp: timestamp,
Labels: common.MapStr{},
Processor: model.ErrorProcessor,
Trace: transactionEvent.Trace,
Parent: model.Parent{ID: transactionEvent.Transaction.ID},
Expand Down Expand Up @@ -323,6 +328,7 @@ func TestEncodeSpanEventsNonJavaExceptions(t *testing.T) {
Service: service,
Agent: agent,
Timestamp: timestamp,
Labels: common.MapStr{},
Processor: model.ErrorProcessor,
Trace: transactionEvent.Trace,
Parent: model.Parent{ID: transactionEvent.Transaction.ID},
Expand Down
101 changes: 101 additions & 0 deletions processor/otel/test_approved/span_jaeger_http.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,107 @@
"url": {
"original": "http://foo.bar.com?a=12"
}
},
{
"@timestamp": "2019-12-16T12:46:58.000Z",
"agent": {
"name": "Jaeger",
"version": "unknown"
},
"data_stream.type": "logs",
"host": {
"hostname": "host-abc"
},
"labels": {
"event": "baggage",
"isValid": false
},
"parent": {
"id": "0000000058585858"
},
"processor": {
"event": "log",
"name": "log"
},
"service": {
"language": {
"name": "unknown"
},
"name": "unknown"
},
"trace": {
"id": "00000000000000000000000046467830"
},
"url": {
"original": "http://foo.bar.com?a=12"
}
},
{
"@timestamp": "2019-12-16T12:46:58.000Z",
"agent": {
"name": "Jaeger",
"version": "unknown"
},
"data_stream.type": "logs",
"host": {
"hostname": "host-abc"
},
"labels": {
"level": "info"
},
"message": "retrying connection",
"parent": {
"id": "0000000058585858"
},
"processor": {
"event": "log",
"name": "log"
},
"service": {
"language": {
"name": "unknown"
},
"name": "unknown"
},
"trace": {
"id": "00000000000000000000000046467830"
},
"url": {
"original": "http://foo.bar.com?a=12"
}
},
{
"@timestamp": "2019-12-16T12:46:58.000Z",
"agent": {
"name": "Jaeger",
"version": "unknown"
},
"data_stream.type": "logs",
"host": {
"hostname": "host-abc"
},
"labels": {
"level": "error"
},
"parent": {
"id": "0000000058585858"
},
"processor": {
"event": "log",
"name": "log"
},
"service": {
"language": {
"name": "unknown"
},
"name": "unknown"
},
"trace": {
"id": "00000000000000000000000046467830"
},
"url": {
"original": "http://foo.bar.com?a=12"
}
}
]
}
Loading

0 comments on commit 77b0554

Please sign in to comment.