Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gcp.source_location being set as other types #794

Merged
merged 10 commits into from
Feb 7, 2024
51 changes: 43 additions & 8 deletions exporter/collector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,27 @@
"fatal4": plog.SeverityNumberFatal4,
}

type attributeProcessingError struct {
Err error
Key string
}

func (e *attributeProcessingError) Error() string {
return fmt.Sprintf("could not process attribute %s: %s", e.Key, e.Err.Error())
}

func (e *attributeProcessingError) Unwrap() error {
return e.Err

Check warning on line 137 in exporter/collector/logs.go

View check run for this annotation

Codecov / codecov/patch

exporter/collector/logs.go#L136-L137

Added lines #L136 - L137 were not covered by tests
}

type unsupportedValueTypeError struct {
ValueType pcommon.ValueType
}

func (e *unsupportedValueTypeError) Error() string {
return fmt.Sprintf("unsupported value type %v", e.ValueType)
}

type LogsExporter struct {
obs selfObservability
loggingClient *loggingv2.Client
Expand Down Expand Up @@ -393,8 +414,12 @@

// parse LogEntrySourceLocation struct from OTel attribute
if sourceLocation, ok := attrsMap[SourceLocationAttributeKey]; ok {
sourceLocationBytes, err := bytesFromValue(sourceLocation)
if err != nil {
return nil, &attributeProcessingError{Key: SourceLocationAttributeKey, Err: err}
}
var logEntrySourceLocation logpb.LogEntrySourceLocation
err := json.Unmarshal(sourceLocation.Bytes().AsRaw(), &logEntrySourceLocation)
err = json.Unmarshal(sourceLocationBytes, &logEntrySourceLocation)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -548,14 +573,10 @@
}

func (l logMapper) parseHTTPRequest(httpRequestAttr pcommon.Value) (*logtypepb.HttpRequest, error) {
var httpBytes []byte
switch httpRequestAttr.Type() {
case pcommon.ValueTypeBytes:
httpBytes = httpRequestAttr.Bytes().AsRaw()
case pcommon.ValueTypeStr, pcommon.ValueTypeMap:
httpBytes = []byte(httpRequestAttr.AsString())
httpBytes, err := bytesFromValue(httpRequestAttr)
if err != nil {
return nil, &attributeProcessingError{Key: HTTPRequestAttributeKey, Err: err}
}

// TODO: Investigate doing this without the JSON unmarshal. Getting the attribute as a map
// instead of a slice of bytes could do, but would need a lot of type casting and checking
// assertions with it.
Expand Down Expand Up @@ -631,3 +652,17 @@
}
return buf.String()
}

func bytesFromValue(v pcommon.Value) ([]byte, error) {
braydonk marked this conversation as resolved.
Show resolved Hide resolved
var valueBytes []byte
var err error
switch v.Type() {
case pcommon.ValueTypeBytes:
valueBytes = v.Bytes().AsRaw()
case pcommon.ValueTypeMap, pcommon.ValueTypeStr:
valueBytes = []byte(v.AsString())
default:
err = &unsupportedValueTypeError{ValueType: v.Type()}
}
return valueBytes, err
}
167 changes: 162 additions & 5 deletions exporter/collector/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ func TestLogMapping(t *testing.T) {
logName := "projects/fakeprojectid/logs/default-log"

testCases := []struct {
expectedError error
log func() plog.LogRecord
mr func() *monitoredrespb.MonitoredResource
config Option
name string
expectedEntries []*logpb.LogEntry
maxEntrySize int
expectError bool
maxEntrySize int
}{
{
name: "split entry size",
Expand Down Expand Up @@ -164,8 +165,8 @@ func TestLogMapping(t *testing.T) {
log := plog.NewLogRecord()
log.Body().SetEmptyMap().PutStr("message", "hello!")
log.Attributes().PutEmptyBytes(HTTPRequestAttributeKey).FromRaw([]byte(`{
"requestMethod": "GET",
"requestURL": "https://www.example.com",
"requestMethod": "GET",
"requestURL": "https://www.example.com",
"requestSize": "1",
"status": "200",
"responseSize": "1",
Expand Down Expand Up @@ -210,6 +211,28 @@ func TestLogMapping(t *testing.T) {
},
maxEntrySize: defaultMaxEntrySize,
},
{
name: "log with httpRequest attribute unsupported type",
log: func() plog.LogRecord {
log := plog.NewLogRecord()
log.Body().SetEmptyMap().PutStr("message", "hello!")
log.Attributes().PutBool(HTTPRequestAttributeKey, true)
return log
},
mr: func() *monitoredrespb.MonitoredResource {
return nil
},
expectedEntries: []*logpb.LogEntry{
{
LogName: logName,
Timestamp: timestamppb.New(testObservedTime),
Payload: &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: map[string]*structpb.Value{
"message": {Kind: &structpb.Value_StringValue{StringValue: "hello!"}},
}}},
},
},
maxEntrySize: defaultMaxEntrySize,
},
{
name: "log with timestamp",
log: func() plog.LogRecord {
Expand Down Expand Up @@ -371,8 +394,7 @@ func TestLogMapping(t *testing.T) {
},
},
{
// TODO(damemi): parse/test sourceLocation from more than just bytes values
name: "log with sourceLocation (bytes)",
name: "log with valid sourceLocation (bytes)",
mr: func() *monitoredrespb.MonitoredResource {
return nil
},
Expand All @@ -396,6 +418,120 @@ func TestLogMapping(t *testing.T) {
},
maxEntrySize: defaultMaxEntrySize,
},
{
name: "log with invalid sourceLocation (bytes)",
mr: func() *monitoredrespb.MonitoredResource {
return nil
},
log: func() plog.LogRecord {
log := plog.NewLogRecord()
log.Attributes().PutEmptyBytes(SourceLocationAttributeKey).FromRaw(
[]byte(`{"file": 100}`),
)
return log
},
maxEntrySize: defaultMaxEntrySize,
expectError: true,
},
{
name: "log with valid sourceLocation (map)",
mr: func() *monitoredrespb.MonitoredResource {
return nil
},
log: func() plog.LogRecord {
log := plog.NewLogRecord()
sourceLocationMap := log.Attributes().PutEmptyMap(SourceLocationAttributeKey)
sourceLocationMap.PutStr("file", "test.php")
sourceLocationMap.PutInt("line", 100)
sourceLocationMap.PutStr("function", "helloWorld")
return log
},
expectedEntries: []*logpb.LogEntry{
{
LogName: logName,
Timestamp: timestamppb.New(testObservedTime),
SourceLocation: &logpb.LogEntrySourceLocation{
File: "test.php",
Line: 100,
Function: "helloWorld",
},
},
},
maxEntrySize: defaultMaxEntrySize,
},
{
name: "log with invalid sourceLocation (map)",
mr: func() *monitoredrespb.MonitoredResource {
return nil
},
log: func() plog.LogRecord {
log := plog.NewLogRecord()
sourceLocationMap := log.Attributes().PutEmptyMap(SourceLocationAttributeKey)
sourceLocationMap.PutStr("line", "100")
return log
},
maxEntrySize: defaultMaxEntrySize,
expectError: true,
},
{
name: "log with valid sourceLocation (string)",
mr: func() *monitoredrespb.MonitoredResource {
return nil
},
log: func() plog.LogRecord {
log := plog.NewLogRecord()
log.Attributes().PutStr(
SourceLocationAttributeKey,
`{"file": "test.php", "line":100, "function":"helloWorld"}`,
)
return log
},
expectedEntries: []*logpb.LogEntry{
{
LogName: logName,
Timestamp: timestamppb.New(testObservedTime),
SourceLocation: &logpb.LogEntrySourceLocation{
File: "test.php",
Line: 100,
Function: "helloWorld",
},
},
},
maxEntrySize: defaultMaxEntrySize,
},
{
name: "log with invalid sourceLocation (string)",
mr: func() *monitoredrespb.MonitoredResource {
return nil
},
log: func() plog.LogRecord {
log := plog.NewLogRecord()
log.Attributes().PutStr(
SourceLocationAttributeKey,
`{"file": 100}`,
)
return log
},
maxEntrySize: defaultMaxEntrySize,
expectError: true,
},
{
name: "log with unsupported sourceLocation type",
mr: func() *monitoredrespb.MonitoredResource {
return nil
},
log: func() plog.LogRecord {
log := plog.NewLogRecord()
log.Attributes().PutBool(SourceLocationAttributeKey, true)
return log
},
maxEntrySize: defaultMaxEntrySize,
expectError: true,
expectedError: &attributeProcessingError{
Key: SourceLocationAttributeKey,
Err: &unsupportedValueTypeError{ValueType: pcommon.ValueTypeBool},
},
},
{
name: "log with traceSampled (bool)",
mr: func() *monitoredrespb.MonitoredResource {
Expand All @@ -415,6 +551,24 @@ func TestLogMapping(t *testing.T) {
},
maxEntrySize: defaultMaxEntrySize,
},
{
name: "log with traceSampled (unsupported type)",
mr: func() *monitoredrespb.MonitoredResource {
return nil
},
log: func() plog.LogRecord {
log := plog.NewLogRecord()
log.Attributes().PutStr(TraceSampledAttributeKey, "hi!")
return log
},
expectedEntries: []*logpb.LogEntry{
{
LogName: logName,
Timestamp: timestamppb.New(testObservedTime),
},
},
maxEntrySize: defaultMaxEntrySize,
},
{
name: "log with trace and span id",
mr: func() *monitoredrespb.MonitoredResource {
Expand Down Expand Up @@ -506,6 +660,9 @@ func TestLogMapping(t *testing.T) {

if testCase.expectError {
assert.NotNil(t, err)
if testCase.expectedError != nil {
assert.Equal(t, err.Error(), testCase.expectedError.Error())
}
} else {
assert.Nil(t, err)
assert.Equal(t, len(testCase.expectedEntries), len(entries))
Expand Down