Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always fall back to string payload if we can't handle structured logs #701

Merged
merged 3 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions exporter/collector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,17 @@ Additional configuration for the metric exporter:

Addition configuration for the logging exporter:

- `log.default_log_name` (optional): Defines a default name for log entries. If left unset, and a log entry does not have the `gcp.log_name`
attribute set, the exporter will return an error processing that entry.
- `log.error_reporting_type` (option, default = false): If `true`, log records with a severity of `error` or higher will be converted to
JSON payloads with the `@type` field set for [GCP Error Reporting](https://cloud.google.com/error-reporting/docs/formatting-error-messages#log-text).
If the body is currently a string, it will be converted to a `message` field in the new JSON payload. If the body is already a map, the `@type`
field will be added to the map. Other body types (such as byte) are undefined for this behavior.
- `log.default_log_name` (optional): Defines a default name for log entries. If
left unset, and a log entry does not have the `gcp.log_name` attribute set, the
exporter will return an error processing that entry.
- `log.error_reporting_type` (option, default = false): If `true`, log records
with a severity of `error` or higher will be converted to JSON payloads with the
`@type` field set for [GCP Error
Reporting](https://cloud.google.com/error-reporting/docs/formatting-error-messages#log-text).
If the body is currently a string, it will be converted to a `message` field in
the new JSON payload. If the body is already a map, the `@type` field will be
added to the map. Other body types (such as byte) are undefined for this
behavior.

Example:

Expand Down
105 changes: 54 additions & 51 deletions exporter/collector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,66 +455,69 @@
}
}

if len(logRecord.Body().AsString()) == 0 {
return []*logpb.LogEntry{entry}, nil
}

// Handle map and bytes as JSON-structured logs if they are successfully converted.
switch logRecord.Body().Type() {
case pcommon.ValueTypeBytes:
s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw())
if err != nil {
return nil, err
}
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
case pcommon.ValueTypeMap:
s, err := structpb.NewStruct(logRecord.Body().Map().AsRaw())
if err != nil {
return nil, err
if err == nil {
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
return []*logpb.LogEntry{entry}, nil
}
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
case pcommon.ValueTypeStr:
// Calculate the size of the internal log entry so this overhead can be accounted
// for when determining the need to split based on payload size
// TODO(damemi): Find an appropriate estimated buffer to account for the LogSplit struct as well
overheadBytes := proto.Size(entry)
// Split log entries with a string payload into fewer entries
payloadString := logRecord.Body().Str()
splits := int(math.Ceil(float64(len([]byte(payloadString))) / float64(l.maxEntrySize-overheadBytes)))
if splits <= 1 {
entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: payloadString}
l.obs.log.Warn(fmt.Sprintf("map body cannot be converted to a json payload, exporting as raw string: %+v", err))

Check warning on line 466 in exporter/collector/logs.go

View check run for this annotation

Codecov / codecov/patch

exporter/collector/logs.go#L466

Added line #L466 was not covered by tests
case pcommon.ValueTypeBytes:
s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw())
if err == nil {
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}

Check warning on line 470 in exporter/collector/logs.go

View check run for this annotation

Codecov / codecov/patch

exporter/collector/logs.go#L470

Added line #L470 was not covered by tests
return []*logpb.LogEntry{entry}, nil
}
entries := make([]*logpb.LogEntry, splits)
// Start by assuming all splits will be even (this may not be the case)
startIndex := 0
endIndex := int(math.Floor((1.0 / float64(splits)) * float64(len(payloadString))))
for i := 0; i < splits; i++ {
newEntry := proto.Clone(entry).(*logpb.LogEntry)
currentSplit := payloadString[startIndex:endIndex]

// If the current split is larger than the entry size, iterate until it is within the max
// (This may happen since not all characters are exactly 1 byte)
for len([]byte(currentSplit)) > l.maxEntrySize {
endIndex--
currentSplit = payloadString[startIndex:endIndex]
}
newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: currentSplit}
newEntry.Split = &logpb.LogSplit{
Uid: fmt.Sprintf("%s-%s", logName, entry.Timestamp.AsTime().String()),
Index: int32(i),
TotalSplits: int32(splits),
}
entries[i] = newEntry
l.obs.log.Debug(fmt.Sprintf("bytes body cannot be converted to a json payload, exporting as base64 string: %+v", err))
}
// For all other ValueTypes, export as a string payload.

// Update slice indices to the next chunk
startIndex = endIndex
endIndex = int(math.Floor((float64(i+2) / float64(splits)) * float64(len(payloadString))))
// log.Body().AsString() can be expensive, and we use it several times below this, so
// do it once and save that as a variable.
logBodyString := logRecord.Body().AsString()
if len(logBodyString) == 0 {
return []*logpb.LogEntry{entry}, nil
}

// Calculate the size of the internal log entry so this overhead can be accounted
// for when determining the need to split based on payload size
// TODO(damemi): Find an appropriate estimated buffer to account for the LogSplit struct as well
overheadBytes := proto.Size(entry)
// Split log entries with a string payload into fewer entries
splits := int(math.Ceil(float64(len([]byte(logBodyString))) / float64(l.maxEntrySize-overheadBytes)))
if splits <= 1 {
entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: logBodyString}
return []*logpb.LogEntry{entry}, nil
}
entries := make([]*logpb.LogEntry, splits)
// Start by assuming all splits will be even (this may not be the case)
startIndex := 0
endIndex := int(math.Floor((1.0 / float64(splits)) * float64(len(logBodyString))))
for i := 0; i < splits; i++ {
newEntry := proto.Clone(entry).(*logpb.LogEntry)
currentSplit := logBodyString[startIndex:endIndex]

// If the current split is larger than the entry size, iterate until it is within the max
// (This may happen since not all characters are exactly 1 byte)
for len([]byte(currentSplit)) > l.maxEntrySize {
endIndex--
currentSplit = logBodyString[startIndex:endIndex]

Check warning on line 506 in exporter/collector/logs.go

View check run for this annotation

Codecov / codecov/patch

exporter/collector/logs.go#L505-L506

Added lines #L505 - L506 were not covered by tests
}
return entries, nil
default:
return nil, fmt.Errorf("unknown log body value %v", logRecord.Body().Type().String())
newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: currentSplit}
newEntry.Split = &logpb.LogSplit{
Uid: fmt.Sprintf("%s-%s", logName, entry.Timestamp.AsTime().String()),
Index: int32(i),
TotalSplits: int32(splits),
}
entries[i] = newEntry

// Update slice indices to the next chunk
startIndex = endIndex
endIndex = int(math.Floor((float64(i+2) / float64(splits)) * float64(len(logBodyString))))
}
return []*logpb.LogEntry{entry}, nil
return entries, nil
}

// JSON keys derived from:
Expand Down
18 changes: 18 additions & 0 deletions exporter/collector/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,24 @@ func TestLogMapping(t *testing.T) {
},
},
maxEntrySize: defaultMaxEntrySize,
}, {
name: "log with invalid json byte body returns raw byte string",
log: func() plog.LogRecord {
log := plog.NewLogRecord()
log.Body().SetEmptyBytes().FromRaw([]byte(`"this is not json"`))
return log
},
mr: func() *monitoredrespb.MonitoredResource {
return nil
},
expectedEntries: []*logpb.LogEntry{
{
LogName: logName,
Payload: &logpb.LogEntry_TextPayload{TextPayload: "InRoaXMgaXMgbm90IGpzb24i"},
Timestamp: timestamppb.New(testObservedTime),
},
},
maxEntrySize: defaultMaxEntrySize,
},
{
name: "log with json and httpRequest, empty monitoredresource",
Expand Down