Skip to content

Commit

Permalink
Always fall back to string payload if we can't handle structured logs (
Browse files Browse the repository at this point in the history
…#701)

always fall back to string payload if we can't handle structured logs
  • Loading branch information
dashpole authored Aug 11, 2023
1 parent 419d54a commit c915721
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 57 deletions.
17 changes: 11 additions & 6 deletions exporter/collector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,17 @@ Additional configuration for the metric exporter:

Addition configuration for the logging exporter:

- `log.default_log_name` (optional): Defines a default name for log entries. If left unset, and a log entry does not have the `gcp.log_name`
attribute set, the exporter will return an error processing that entry.
- `log.error_reporting_type` (option, default = false): If `true`, log records with a severity of `error` or higher will be converted to
JSON payloads with the `@type` field set for [GCP Error Reporting](https://cloud.google.com/error-reporting/docs/formatting-error-messages#log-text).
If the body is currently a string, it will be converted to a `message` field in the new JSON payload. If the body is already a map, the `@type`
field will be added to the map. Other body types (such as byte) are undefined for this behavior.
- `log.default_log_name` (optional): Defines a default name for log entries. If
left unset, and a log entry does not have the `gcp.log_name` attribute set, the
exporter will return an error processing that entry.
- `log.error_reporting_type` (option, default = false): If `true`, log records
with a severity of `error` or higher will be converted to JSON payloads with the
`@type` field set for [GCP Error
Reporting](https://cloud.google.com/error-reporting/docs/formatting-error-messages#log-text).
If the body is currently a string, it will be converted to a `message` field in
the new JSON payload. If the body is already a map, the `@type` field will be
added to the map. Other body types (such as byte) are undefined for this
behavior.

Example:

Expand Down
105 changes: 54 additions & 51 deletions exporter/collector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,66 +455,69 @@ func (l logMapper) logToSplitEntries(
}
}

if len(logRecord.Body().AsString()) == 0 {
return []*logpb.LogEntry{entry}, nil
}

// Handle map and bytes as JSON-structured logs if they are successfully converted.
switch logRecord.Body().Type() {
case pcommon.ValueTypeBytes:
s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw())
if err != nil {
return nil, err
}
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
case pcommon.ValueTypeMap:
s, err := structpb.NewStruct(logRecord.Body().Map().AsRaw())
if err != nil {
return nil, err
if err == nil {
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
return []*logpb.LogEntry{entry}, nil
}
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
case pcommon.ValueTypeStr:
// Calculate the size of the internal log entry so this overhead can be accounted
// for when determining the need to split based on payload size
// TODO(damemi): Find an appropriate estimated buffer to account for the LogSplit struct as well
overheadBytes := proto.Size(entry)
// Split log entries with a string payload into fewer entries
payloadString := logRecord.Body().Str()
splits := int(math.Ceil(float64(len([]byte(payloadString))) / float64(l.maxEntrySize-overheadBytes)))
if splits <= 1 {
entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: payloadString}
l.obs.log.Warn(fmt.Sprintf("map body cannot be converted to a json payload, exporting as raw string: %+v", err))
case pcommon.ValueTypeBytes:
s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw())
if err == nil {
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
return []*logpb.LogEntry{entry}, nil
}
entries := make([]*logpb.LogEntry, splits)
// Start by assuming all splits will be even (this may not be the case)
startIndex := 0
endIndex := int(math.Floor((1.0 / float64(splits)) * float64(len(payloadString))))
for i := 0; i < splits; i++ {
newEntry := proto.Clone(entry).(*logpb.LogEntry)
currentSplit := payloadString[startIndex:endIndex]

// If the current split is larger than the entry size, iterate until it is within the max
// (This may happen since not all characters are exactly 1 byte)
for len([]byte(currentSplit)) > l.maxEntrySize {
endIndex--
currentSplit = payloadString[startIndex:endIndex]
}
newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: currentSplit}
newEntry.Split = &logpb.LogSplit{
Uid: fmt.Sprintf("%s-%s", logName, entry.Timestamp.AsTime().String()),
Index: int32(i),
TotalSplits: int32(splits),
}
entries[i] = newEntry
l.obs.log.Debug(fmt.Sprintf("bytes body cannot be converted to a json payload, exporting as base64 string: %+v", err))
}
// For all other ValueTypes, export as a string payload.

// Update slice indices to the next chunk
startIndex = endIndex
endIndex = int(math.Floor((float64(i+2) / float64(splits)) * float64(len(payloadString))))
// log.Body().AsString() can be expensive, and we use it several times below this, so
// do it once and save that as a variable.
logBodyString := logRecord.Body().AsString()
if len(logBodyString) == 0 {
return []*logpb.LogEntry{entry}, nil
}

// Calculate the size of the internal log entry so this overhead can be accounted
// for when determining the need to split based on payload size
// TODO(damemi): Find an appropriate estimated buffer to account for the LogSplit struct as well
overheadBytes := proto.Size(entry)
// Split log entries with a string payload into fewer entries
splits := int(math.Ceil(float64(len([]byte(logBodyString))) / float64(l.maxEntrySize-overheadBytes)))
if splits <= 1 {
entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: logBodyString}
return []*logpb.LogEntry{entry}, nil
}
entries := make([]*logpb.LogEntry, splits)
// Start by assuming all splits will be even (this may not be the case)
startIndex := 0
endIndex := int(math.Floor((1.0 / float64(splits)) * float64(len(logBodyString))))
for i := 0; i < splits; i++ {
newEntry := proto.Clone(entry).(*logpb.LogEntry)
currentSplit := logBodyString[startIndex:endIndex]

// If the current split is larger than the entry size, iterate until it is within the max
// (This may happen since not all characters are exactly 1 byte)
for len([]byte(currentSplit)) > l.maxEntrySize {
endIndex--
currentSplit = logBodyString[startIndex:endIndex]
}
return entries, nil
default:
return nil, fmt.Errorf("unknown log body value %v", logRecord.Body().Type().String())
newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: currentSplit}
newEntry.Split = &logpb.LogSplit{
Uid: fmt.Sprintf("%s-%s", logName, entry.Timestamp.AsTime().String()),
Index: int32(i),
TotalSplits: int32(splits),
}
entries[i] = newEntry

// Update slice indices to the next chunk
startIndex = endIndex
endIndex = int(math.Floor((float64(i+2) / float64(splits)) * float64(len(logBodyString))))
}
return []*logpb.LogEntry{entry}, nil
return entries, nil
}

// JSON keys derived from:
Expand Down
18 changes: 18 additions & 0 deletions exporter/collector/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,24 @@ func TestLogMapping(t *testing.T) {
},
},
maxEntrySize: defaultMaxEntrySize,
}, {
name: "log with invalid json byte body returns raw byte string",
log: func() plog.LogRecord {
log := plog.NewLogRecord()
log.Body().SetEmptyBytes().FromRaw([]byte(`"this is not json"`))
return log
},
mr: func() *monitoredrespb.MonitoredResource {
return nil
},
expectedEntries: []*logpb.LogEntry{
{
LogName: logName,
Payload: &logpb.LogEntry_TextPayload{TextPayload: "InRoaXMgaXMgbm90IGpzb24i"},
Timestamp: timestamppb.New(testObservedTime),
},
},
maxEntrySize: defaultMaxEntrySize,
},
{
name: "log with json and httpRequest, empty monitoredresource",
Expand Down

0 comments on commit c915721

Please sign in to comment.