diff --git a/exporter/collector/README.md b/exporter/collector/README.md index 40d20073e..ee6aa5463 100644 --- a/exporter/collector/README.md +++ b/exporter/collector/README.md @@ -186,12 +186,17 @@ Additional configuration for the metric exporter: Addition configuration for the logging exporter: -- `log.default_log_name` (optional): Defines a default name for log entries. If left unset, and a log entry does not have the `gcp.log_name` -attribute set, the exporter will return an error processing that entry. -- `log.error_reporting_type` (option, default = false): If `true`, log records with a severity of `error` or higher will be converted to -JSON payloads with the `@type` field set for [GCP Error Reporting](https://cloud.google.com/error-reporting/docs/formatting-error-messages#log-text). -If the body is currently a string, it will be converted to a `message` field in the new JSON payload. If the body is already a map, the `@type` -field will be added to the map. Other body types (such as byte) are undefined for this behavior. +- `log.default_log_name` (optional): Defines a default name for log entries. If +left unset, and a log entry does not have the `gcp.log_name` attribute set, the +exporter will return an error processing that entry. +- `log.error_reporting_type` (option, default = false): If `true`, log records +with a severity of `error` or higher will be converted to JSON payloads with the +`@type` field set for [GCP Error +Reporting](https://cloud.google.com/error-reporting/docs/formatting-error-messages#log-text). +If the body is currently a string, it will be converted to a `message` field in +the new JSON payload. If the body is already a map, the `@type` field will be +added to the map. Other body types (such as byte) are undefined for this +behavior. Example: diff --git a/exporter/collector/logs.go b/exporter/collector/logs.go index a4e8d848c..a472ea8ce 100644 --- a/exporter/collector/logs.go +++ b/exporter/collector/logs.go @@ -455,66 +455,69 @@ func (l logMapper) logToSplitEntries( } } - if len(logRecord.Body().AsString()) == 0 { - return []*logpb.LogEntry{entry}, nil - } - + // Handle map and bytes as JSON-structured logs if they are successfully converted. switch logRecord.Body().Type() { - case pcommon.ValueTypeBytes: - s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw()) - if err != nil { - return nil, err - } - entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s} case pcommon.ValueTypeMap: s, err := structpb.NewStruct(logRecord.Body().Map().AsRaw()) - if err != nil { - return nil, err + if err == nil { + entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s} + return []*logpb.LogEntry{entry}, nil } - entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s} - case pcommon.ValueTypeStr: - // Calculate the size of the internal log entry so this overhead can be accounted - // for when determining the need to split based on payload size - // TODO(damemi): Find an appropriate estimated buffer to account for the LogSplit struct as well - overheadBytes := proto.Size(entry) - // Split log entries with a string payload into fewer entries - payloadString := logRecord.Body().Str() - splits := int(math.Ceil(float64(len([]byte(payloadString))) / float64(l.maxEntrySize-overheadBytes))) - if splits <= 1 { - entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: payloadString} + l.obs.log.Warn(fmt.Sprintf("map body cannot be converted to a json payload, exporting as raw string: %+v", err)) + case pcommon.ValueTypeBytes: + s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw()) + if err == nil { + entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s} return []*logpb.LogEntry{entry}, nil } - entries := make([]*logpb.LogEntry, splits) - // Start by assuming all splits will be even (this may not be the case) - startIndex := 0 - endIndex := int(math.Floor((1.0 / float64(splits)) * float64(len(payloadString)))) - for i := 0; i < splits; i++ { - newEntry := proto.Clone(entry).(*logpb.LogEntry) - currentSplit := payloadString[startIndex:endIndex] - - // If the current split is larger than the entry size, iterate until it is within the max - // (This may happen since not all characters are exactly 1 byte) - for len([]byte(currentSplit)) > l.maxEntrySize { - endIndex-- - currentSplit = payloadString[startIndex:endIndex] - } - newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: currentSplit} - newEntry.Split = &logpb.LogSplit{ - Uid: fmt.Sprintf("%s-%s", logName, entry.Timestamp.AsTime().String()), - Index: int32(i), - TotalSplits: int32(splits), - } - entries[i] = newEntry + l.obs.log.Debug(fmt.Sprintf("bytes body cannot be converted to a json payload, exporting as base64 string: %+v", err)) + } + // For all other ValueTypes, export as a string payload. - // Update slice indices to the next chunk - startIndex = endIndex - endIndex = int(math.Floor((float64(i+2) / float64(splits)) * float64(len(payloadString)))) + // log.Body().AsString() can be expensive, and we use it several times below this, so + // do it once and save that as a variable. + logBodyString := logRecord.Body().AsString() + if len(logBodyString) == 0 { + return []*logpb.LogEntry{entry}, nil + } + + // Calculate the size of the internal log entry so this overhead can be accounted + // for when determining the need to split based on payload size + // TODO(damemi): Find an appropriate estimated buffer to account for the LogSplit struct as well + overheadBytes := proto.Size(entry) + // Split log entries with a string payload into fewer entries + splits := int(math.Ceil(float64(len([]byte(logBodyString))) / float64(l.maxEntrySize-overheadBytes))) + if splits <= 1 { + entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: logBodyString} + return []*logpb.LogEntry{entry}, nil + } + entries := make([]*logpb.LogEntry, splits) + // Start by assuming all splits will be even (this may not be the case) + startIndex := 0 + endIndex := int(math.Floor((1.0 / float64(splits)) * float64(len(logBodyString)))) + for i := 0; i < splits; i++ { + newEntry := proto.Clone(entry).(*logpb.LogEntry) + currentSplit := logBodyString[startIndex:endIndex] + + // If the current split is larger than the entry size, iterate until it is within the max + // (This may happen since not all characters are exactly 1 byte) + for len([]byte(currentSplit)) > l.maxEntrySize { + endIndex-- + currentSplit = logBodyString[startIndex:endIndex] } - return entries, nil - default: - return nil, fmt.Errorf("unknown log body value %v", logRecord.Body().Type().String()) + newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: currentSplit} + newEntry.Split = &logpb.LogSplit{ + Uid: fmt.Sprintf("%s-%s", logName, entry.Timestamp.AsTime().String()), + Index: int32(i), + TotalSplits: int32(splits), + } + entries[i] = newEntry + + // Update slice indices to the next chunk + startIndex = endIndex + endIndex = int(math.Floor((float64(i+2) / float64(splits)) * float64(len(logBodyString)))) } - return []*logpb.LogEntry{entry}, nil + return entries, nil } // JSON keys derived from: diff --git a/exporter/collector/logs_test.go b/exporter/collector/logs_test.go index fbc4735fc..65951c5b7 100644 --- a/exporter/collector/logs_test.go +++ b/exporter/collector/logs_test.go @@ -139,6 +139,24 @@ func TestLogMapping(t *testing.T) { }, }, maxEntrySize: defaultMaxEntrySize, + }, { + name: "log with invalid json byte body returns raw byte string", + log: func() plog.LogRecord { + log := plog.NewLogRecord() + log.Body().SetEmptyBytes().FromRaw([]byte(`"this is not json"`)) + return log + }, + mr: func() *monitoredrespb.MonitoredResource { + return nil + }, + expectedEntries: []*logpb.LogEntry{ + { + LogName: logName, + Payload: &logpb.LogEntry_TextPayload{TextPayload: "InRoaXMgaXMgbm90IGpzb24i"}, + Timestamp: timestamppb.New(testObservedTime), + }, + }, + maxEntrySize: defaultMaxEntrySize, }, { name: "log with json and httpRequest, empty monitoredresource",