Skip to content

Commit

Permalink
Truncate and de-duplicate log attribute values (#5230)
Browse files Browse the repository at this point in the history
* Truncate and de-duplicate log attr values

* Fix test in otlploghttp

* Remove duplicate decl of assertKV
  • Loading branch information
MrAlias authored Apr 29, 2024
1 parent 9794825 commit 1e357c7
Show file tree
Hide file tree
Showing 3 changed files with 434 additions and 49 deletions.
102 changes: 57 additions & 45 deletions exporters/otlp/otlplog/otlploghttp/internal/transform/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

api "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/log/logtest"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -60,51 +62,61 @@ var (
flagsB = byte(0)

records = func() []log.Record {
r0 := new(log.Record)
r0.SetTimestamp(ts)
r0.SetObservedTimestamp(obs)
r0.SetSeverity(sevA)
r0.SetSeverityText("A")
r0.SetBody(bodyA)
r0.SetAttributes(alice)
r0.SetTraceID(trace.TraceID(traceIDA))
r0.SetSpanID(trace.SpanID(spanIDA))
r0.SetTraceFlags(trace.TraceFlags(flagsA))

r1 := new(log.Record)
r1.SetTimestamp(ts)
r1.SetObservedTimestamp(obs)
r1.SetSeverity(sevA)
r1.SetSeverityText("A")
r1.SetBody(bodyA)
r1.SetAttributes(bob)
r1.SetTraceID(trace.TraceID(traceIDA))
r1.SetSpanID(trace.SpanID(spanIDA))
r1.SetTraceFlags(trace.TraceFlags(flagsA))

r2 := new(log.Record)
r2.SetTimestamp(ts)
r2.SetObservedTimestamp(obs)
r2.SetSeverity(sevB)
r2.SetSeverityText("B")
r2.SetBody(bodyB)
r2.SetAttributes(alice)
r2.SetTraceID(trace.TraceID(traceIDB))
r2.SetSpanID(trace.SpanID(spanIDB))
r2.SetTraceFlags(trace.TraceFlags(flagsB))

r3 := new(log.Record)
r3.SetTimestamp(ts)
r3.SetObservedTimestamp(obs)
r3.SetSeverity(sevB)
r3.SetSeverityText("B")
r3.SetBody(bodyB)
r3.SetAttributes(bob)
r3.SetTraceID(trace.TraceID(traceIDB))
r3.SetSpanID(trace.SpanID(spanIDB))
r3.SetTraceFlags(trace.TraceFlags(flagsB))

return []log.Record{*r0, *r1, *r2, *r3}
var out []log.Record

out = append(out, logtest.RecordFactory{
Timestamp: ts,
ObservedTimestamp: obs,
Severity: sevA,
SeverityText: "A",
Body: bodyA,
Attributes: []api.KeyValue{alice},
TraceID: trace.TraceID(traceIDA),
SpanID: trace.SpanID(spanIDA),
TraceFlags: trace.TraceFlags(flagsA),
Resource: resource.Empty(), // TODO(#5228): populate and test.
}.NewRecord())

out = append(out, logtest.RecordFactory{
Timestamp: ts,
ObservedTimestamp: obs,
Severity: sevA,
SeverityText: "A",
Body: bodyA,
Attributes: []api.KeyValue{bob},
TraceID: trace.TraceID(traceIDA),
SpanID: trace.SpanID(spanIDA),
TraceFlags: trace.TraceFlags(flagsA),
Resource: resource.Empty(), // TODO(#5228): populate and test.
}.NewRecord())

out = append(out, logtest.RecordFactory{
Timestamp: ts,
ObservedTimestamp: obs,
Severity: sevB,
SeverityText: "B",
Body: bodyB,
Attributes: []api.KeyValue{alice},
TraceID: trace.TraceID(traceIDB),
SpanID: trace.SpanID(spanIDB),
TraceFlags: trace.TraceFlags(flagsB),
Resource: resource.Empty(), // TODO(#5228): populate and test.
}.NewRecord())

out = append(out, logtest.RecordFactory{
Timestamp: ts,
ObservedTimestamp: obs,
Severity: sevB,
SeverityText: "B",
Body: bodyB,
Attributes: []api.KeyValue{bob},
TraceID: trace.TraceID(traceIDB),
SpanID: trace.SpanID(spanIDB),
TraceFlags: trace.TraceFlags(flagsB),
Resource: resource.Empty(), // TODO(#5228): populate and test.
}.NewRecord())

return out
}()

pbLogRecords = []*lpb.LogRecord{
Expand Down
83 changes: 79 additions & 4 deletions sdk/log/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"slices"
"strings"
"sync"
"time"
"unicode/utf8"

"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/instrumentation"
Expand Down Expand Up @@ -198,8 +200,6 @@ func (r *Record) AddAttributes(attrs ...log.KeyValue) {
}
} else {
// Unique attribute.
// TODO: apply truncation to string and []string values.
// TODO: deduplicate map values.
unique = append(unique, a)
uIndex[a.Key] = len(unique) - 1
}
Expand Down Expand Up @@ -246,10 +246,13 @@ func (r *Record) addAttrs(attrs []log.KeyValue) {
var i int
for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ {
a := attrs[i]
r.front[r.nFront] = a
r.front[r.nFront] = r.applyAttrLimits(a)
r.nFront++
}

for j, a := range attrs[i:] {
attrs[i+j] = r.applyAttrLimits(a)
}
r.back = slices.Grow(r.back, len(attrs[i:]))
r.back = append(r.back, attrs[i:]...)
}
Expand All @@ -268,11 +271,14 @@ func (r *Record) SetAttributes(attrs ...log.KeyValue) {
var i int
for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ {
a := attrs[i]
r.front[r.nFront] = a
r.front[r.nFront] = r.applyAttrLimits(a)
r.nFront++
}

r.back = slices.Clone(attrs[i:])
for i, a := range r.back {
r.back[i] = r.applyAttrLimits(a)
}
}

// head returns the first n values of kvs along with the number of elements
Expand Down Expand Up @@ -367,3 +373,72 @@ func (r *Record) Clone() Record {
res.back = slices.Clone(r.back)
return res
}

func (r Record) applyAttrLimits(attr log.KeyValue) log.KeyValue {
attr.Value = r.applyValueLimits(attr.Value)
return attr
}

func (r Record) applyValueLimits(val log.Value) log.Value {
switch val.Kind() {
case log.KindString:
s := val.AsString()
if len(s) > r.attributeValueLengthLimit {
val = log.StringValue(truncate(s, r.attributeValueLengthLimit))
}
case log.KindSlice:
sl := val.AsSlice()
for i := range sl {
sl[i] = r.applyValueLimits(sl[i])
}
val = log.SliceValue(sl...)
case log.KindMap:
// Deduplicate then truncate. Do not do at the same time to avoid
// wasted truncation operations.
kvs, dropped := dedup(val.AsMap())
r.dropped += dropped
for i := range kvs {
kvs[i] = r.applyAttrLimits(kvs[i])
}
val = log.MapValue(kvs...)
}
return val
}

// truncate returns a copy of str truncated to have a length of at most n
// characters. If the length of str is less than n, str itself is returned.
//
// The truncate of str ensures that no valid UTF-8 code point is split. The
// copy returned will be less than n if a characters straddles the length
// limit.
//
// No truncation is performed if n is less than zero.
func truncate(str string, n int) string {
if n < 0 {
return str
}

// cut returns a copy of the s truncated to not exceed a length of n. If
// invalid UTF-8 is encountered, s is returned with false. Otherwise, the
// truncated copy will be returned with true.
cut := func(s string) (string, bool) {
var i int
for i = 0; i < n; {
r, size := utf8.DecodeRuneInString(s[i:])
if r == utf8.RuneError {
return s, false
}
if i+size > n {
break
}
i += size
}
return s[:i], true
}

cp, ok := cut(str)
if !ok {
cp, _ = cut(strings.ToValidUTF8(str, ""))
}
return cp
}
Loading

0 comments on commit 1e357c7

Please sign in to comment.