Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not able to copy/move from sub object to attributes (merge) #35020

Open
perry-mitchell opened this issue Sep 5, 2024 · 11 comments
Open

Not able to copy/move from sub object to attributes (merge) #35020

perry-mitchell opened this issue Sep 5, 2024 · 11 comments
Labels

Comments

@perry-mitchell
Copy link

perry-mitchell commented Sep 5, 2024

Component(s)

pkg/stanza, receiver/filelog

What happened?

Description

I'm receiving a blob of JSON, occasionally, on attributes.log, in a filelog receiver. If and when this field is present, I'm parsing it and setting it to another attribute field attributes.otlp. These logs are generally a single line of Open Telemetry formatted JSON that need merging with the current log item.

After parsing successfully to the attributes.otlp property, I'm trying to move and merge sub properties of that field to the attributes top level object. I receive an error when trying to do so.

Here's the log format before I add the copy/move operator:

{
    "body": "2024-09-05T04:14:15.42553326Z stdout F {\"resource\":{\"attributes\":{\"service.name\":\"my-service\",\"telemetry.sdk.language\":\"nodejs\",\"telemetry.sdk.name\":\"opentelemetry\",\"telemetry.sdk.version\":\"1.25.1\",\"service.version\":\"1.0.2-github-featotellogsstdout+0ef3f2e.20240904T112828Z\",\"process.pid\":1,\"process.executable.name\":\"node\",\"process.executable.path\":\"/usr/local/bin/node\",\"process.command_args\":[\"/usr/local/bin/node\",\"/app/applications/my-service/dist/server.cjs\"],\"process.runtime.version\":\"20.17.0\",\"process.runtime.name\":\"nodejs\",\"process.runtime.description\":\"Node.js\",\"process.command\":\"/app/applications/my-service/dist/server.cjs\",\"process.owner\":\"asuser\",\"host.name\":\"<snip>\",\"host.arch\":\"amd64\",\"cloud.provider\":\"aws\",\"cloud.platform\":\"aws_ec2\",\"cloud.account.id\":\"0123456789\",\"cloud.region\":\"eu-west-1\",\"cloud.availability_zone\":\"eu-west-1b\",\"host.id\":\"i-<snip>\",\"host.type\":\"m7i.large\"}},\"instrumentationScope\":{\"name\":\"default\"},\"timestamp\":1725509650425000,\"traceId\":\"64f390c30dc4f6a780a42dd3e56c2ad0\",\"spanId\":\"3fa4c5e63aa33dee\",\"traceFlags\":1,\"severityText\":\"INFO\",\"severityNumber\":9,\"body\":\"request\",\"attributes\":{\"requestMethod\":\"GET\",\"requestUrl\":\"/ready\",\"name\":\"my-service\"}}",
    "attributes": {
        "aceId": "c2f1a48c-1d6a-4399-a98d-f37cf254e8ab",
        "log": "{\"resource\":{\"attributes\":{\"service.name\":\"my-service\",\"telemetry.sdk.language\":\"nodejs\",\"telemetry.sdk.name\":\"opentelemetry\",\"telemetry.sdk.version\":\"1.25.1\",\"service.version\":\"1.0.2-github-featotellogsstdout+0ef3f2e.20240904T112828Z\",\"process.pid\":1,\"process.executable.name\":\"node\",\"process.executable.path\":\"/usr/local/bin/node\",\"process.command_args\":[\"/usr/local/bin/node\",\"/app/applications/my-service/dist/server.cjs\"],\"process.runtime.version\":\"20.17.0\",\"process.runtime.name\":\"nodejs\",\"process.runtime.description\":\"Node.js\",\"process.command\":\"/app/applications/my-service/dist/server.cjs\",\"process.owner\":\"asuser\",\"host.name\":\"<snip>\",\"host.arch\":\"amd64\",\"cloud.provider\":\"aws\",\"cloud.platform\":\"aws_ec2\",\"cloud.account.id\":\"0123456789\",\"cloud.region\":\"eu-west-1\",\"cloud.availability_zone\":\"eu-west-1b\",\"host.id\":\"i-<snip>\",\"host.type\":\"m7i.large\"}},\"instrumentationScope\":{\"name\":\"default\"},\"timestamp\":1725509650425000,\"traceId\":\"64f390c30dc4f6a780a42dd3e56c2ad0\",\"spanId\":\"3fa4c5e63aa33dee\",\"traceFlags\":1,\"severityText\":\"INFO\",\"severityNumber\":9,\"body\":\"request\",\"attributes\":{\"requestMethod\":\"GET\",\"requestUrl\":\"/ready\",\"name\":\"my-service\"}}",
        "log.file.path": "/var/log/pods/my-namespace-c2f1a48c-1d6a-4399-a98d-f37cf254e8ab_my-service-8666dcd8dd-lz5x4_152b0639-0c78-4a32-9126-c054444952bb/my-service/0.log",
        "log.iostream": "stdout",
        "logtag": "F",
        "otelConfigRev": "15",
        "otlp": {
            "attributes": {
                "name": "my-service",
                "requestMethod": "GET",
                "requestUrl": "/ready"
            },
            "body": "request",
            "instrumentationScope": {
                "name": "default"
            },
            "resource": {
                "attributes": {
                    "cloud.account.id": "0123456789",
                    "cloud.availability_zone": "eu-west-1b",
                    "cloud.platform": "aws_ec2",
                    "cloud.provider": "aws",
                    "cloud.region": "eu-west-1",
                    "host.arch": "amd64",
                    "host.id": "i-<snip>",
                    "host.name": "<snip>",
                    "host.type": "m7i.large",
                    "process.command": "/app/applications/my-service/dist/server.cjs",
                    "process.command_args": [
                        "/usr/local/bin/node",
                        "/app/applications/my-service/dist/server.cjs"
                    ],
                    "process.executable.name": "node",
                    "process.executable.path": "/usr/local/bin/node",
                    "process.owner": "asuser",
                    "process.pid": 1,
                    "process.runtime.description": "Node.js",
                    "process.runtime.name": "nodejs",
                    "process.runtime.version": "20.17.0",
                    "service.name": "my-service",
                    "service.version": "1.0.2-github-featotellogsstdout+0ef3f2e.20240904T112828Z",
                    "telemetry.sdk.language": "nodejs",
                    "telemetry.sdk.name": "opentelemetry",
                    "telemetry.sdk.version": "1.25.1"
                }
            },
            "severityNumber": 9,
            "severityText": "INFO",
            "spanId": "3fa4c5e63aa33dee",
            "timestamp": 1725509650425000,
            "traceFlags": 1,
            "traceId": "64f390c30dc4f6a780a42dd3e56c2ad0"
        },
        "time": "2024-09-05T04:14:15.42553326Z"
    },
    "resource": {
        "k8s.container.name": "my-service",
        "k8s.container.restart_count": "0",
        "k8s.namespace.name": "my-namespace-c2f1a48c-1d6a-4399-a98d-f37cf254e8ab",
        "k8s.pod.name": "my-service-8666dcd8dd-lz5x4",
        "k8s.pod.uid": "152b0639-0c78-4a32-9126-c054444952bb",
        "service.name": "my-service"
    }
}

Steps to Reproduce

Please pardon the formatting, I'm configuring the receiver and collector using Terraform. Below is a snippet of the operators I'm using, which cause the issue:

    {
      type       = "json_parser"
      id         = "parse_single_json_line"
      "if"       = "\"log\" in keys(attributes) && attributes.log matches \"^{.*}$\""
      parse_from = "attributes.log"
      parse_to   = "attributes.otlp"
    },
    {
      type  = "remove"
      "if"  = "\"log\" in keys(attributes)"
      field = "attributes.log"
    },
    {
      type = "copy"
      from = "attributes.otlp.attributes"
      "if" = "\"otlp\" in keys(attributes)"
      to   = "attributes"
    },

Adding the last copy operator (same issue for move) breaks the process, with the following error:

image

To re-cap, the first two operators result in the uppermost example of the log object. Adding the final copy operator results in the error.

Expected Result

I'd expect that the attributes.otlp.attributes object, when present, be merged into the top-level attributes. Either that, or have the filelog receiver provide a way to merge a JSON parsed object over the entire log object at the root.

Actual Result

An error as shown above.

Collector version

0.101.1

Environment information

Environment

OS: Docker - Helm chart on Kubernetes

OpenTelemetry Collector configuration

(entire configuration not applicable)

Log output

Error: failed to get config: cannot unmarshal the configuration: decoding failed due to the following error(s):

error decoding 'receivers': error reading configuration for "filelog/global": decoding failed due to the following error(s):

error decoding 'operators[7]': unmarshal to copy: decoding failed due to the following error(s):

error decoding 'to': attributes cannot be referenced without subfield
2024/09/05 04:27:40 collector server run finished with error: failed to get config: cannot unmarshal the configuration: decoding failed due to the following error(s):│

error decoding 'receivers': error reading configuration for "filelog/global": decoding failed due to the following error(s):

error decoding 'operators[7]': unmarshal to copy: decoding failed due to the following error(s):

error decoding 'to': attributes cannot be referenced without subfield
Stream closed EOF for ace-system-c2f1a48c-1d6a-4399-a98d-f37cf254e8ab/opentelemetry-collector-agent-tq87b (opentelemetry-collector)

Additional context

No response

@perry-mitchell perry-mitchell added bug Something isn't working needs triage New item requiring triage labels Sep 5, 2024
Copy link
Contributor

github-actions bot commented Sep 5, 2024

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@djaglowski
Copy link
Member

This limitation was a design decision made long ago when some guidelines were created to govern whether to allow e.g. copying to attributes. Here is a conversation about this which is quite relevant.

That said, I think what you're asking for is quite reasonable and we could reconsider the cited guideline: Field parameters should be validated at build time. Validity should not be dynamic. i.e. should not depend on the contents of a particular log entry.

It's not clear to me at this point that allowing validity to be determined dynamically is really a problem here. It's not all that different from trying to copy from an attribute that turns out not to be present.

I think the broader implications of this change look something like this:

  1. add, copy, and move operators are all allowed to specify to: attributes or to: resource.
  2. When configured with either of these "root" fields, the value must be a map with string keys.
  3. If (2) is not true, then an error occurs and the behavior follows on_error setting.

If I recall correctly, the implementation could be a bit tricky but there is at least one other operator which introduced a notion of referring to attributes or resource directly so we could borrow from that.

@perry-mitchell
Copy link
Author

That's cool to hear! Thanks for the quick reply. I was wondering if what I'm doing was completely beyond the guardrails here.

So I guess the fact that I'm parsing OTLP/JSON format logs from stdout output is mostly unsupported? Wouldn't this be a more scalable way to process logs from my application stack in general?

Moreover, as you mentioned allowing to: attributes and to:resource, would there be some way to go to: <root> in some manner? Or if that sounds like entirely the wrong way to do it, would you have any guidance on what I'm trying to do?

I'm basically ingesting logs from a full cluster, where some apps output unstructured stdio logs, and some output OTLP JSON (with some doing both). Right now I've routed the OTLP formatted logs back over HTTP to our collector, but I'd prefer that they all end up on stdout for the aforementioned scaling reason.

@perry-mitchell
Copy link
Author

This next item I'm having, along the same vein, is probably related to #35049: I'm not able to manually set severity:

    {
      type  = "add"
      field = "severity_text"
      value = "ERROR"
      "if"  = "attributes[\"log.iostream\"] == \"stderr\""
    },
    {
      type  = "add"
      field = "severity_number"
      value = 17
      "if"  = "attributes[\"log.iostream\"] == \"stderr\""
    },
    {
      type  = "add"
      field = "severity_text"
      value = "INFO"
      "if"  = "attributes[\"log.iostream\"] == \"stdout\""
    },
    {
      type  = "add"
      field = "severity_number"
      value = 9
      "if"  = "attributes[\"log.iostream\"] == \"stdout\""
    }

The severity parser isn't working for me (not related to this issue), but I could have overcome that limitation had I been able to manually set these properties as above.

Of course I get similar errors:

error decoding 'operators[15]': unmarshal to add: decoding failed due to the following error(s)

error decoding 'field': unrecognized prefix                                                    

error decoding 'operators[16]': unmarshal to add: decoding failed due to the following error(s):                                                                                                         

error decoding 'field': unrecognized prefix                                                                                                                                                              

error decoding 'operators[17]': unmarshal to add: decoding failed due to the following error(s):

@djaglowski
Copy link
Member

So I guess the fact that I'm parsing OTLP/JSON format logs from stdout output is mostly unsupported?

I didn't catch that was your goal. Have you looked at using the otlpjson connector? It's for exactly this purpose?

@djaglowski
Copy link
Member

{
type = "add"
field = "severity_text"
value = "ERROR"
"if" = "attributes["log.iostream"] == "stderr""
},
{
type = "add"
field = "severity_number"
value = 17
"if" = "attributes["log.iostream"] == "stderr""
},
{
type = "add"
field = "severity_text"
value = "INFO"
"if" = "attributes["log.iostream"] == "stdout""
},
{
type = "add"
field = "severity_number"
value = 9
"if" = "attributes["log.iostream"] == "stdout""
}

You should be able to use the severity parser to set severity number, but the text is intentionally preserved. You could do something like though:

operators: 
  - type: add
     field: attributes["log.iostream"]
     value: ERROR
     if: attributes["log.iostream"] == "stderr"
  - type: add
     field: attributes["log.iostream"]
     value: INFO
     if: attributes["log.iostream"] == "stdout"
  severity_parser:
    parse_from: attributes["log.iostream"]

@djaglowski
Copy link
Member

Seems I forgot about a feature we added which may do just what you're looking for wrt severity text: #26671

@perry-mitchell
Copy link
Author

Thanks, that severity overwrite_text option did the trick.

I didn't catch that was your goal. Have you looked at using the otlpjson connector? It's for exactly this purpose?

Well, not quite.. I have a bunch of services running in a k8s cluster. Most of them I control, some I do not. The ones I do not are going to output some random log format to stdout.. that's cool, I can't control that but I still want to process their logs - hence filelog receiver.

The ones I do control, I want to monitor but with added granularity. I want to log in OTLP format - specifically JSON, so that all of the attributes and resource labels I'm adding are retained. Of course due to scaling etc. I would also want the logs in the OTLP format going to the same location as all the other logs I'm already collecting - stdout / stderr.

The problem here is not getting them into the same stream - I can use the console log exporter in NodeJS for that - but rather once they're in the stdout stream, how can I properly process them using filelog? I use a regex_parser to get the JSON blob out, then a json_parser to parse it into an object, but then what? The log object I'm currently processing in the operators is a plain stdout object.. I'd want to replace it using the newly parsed OTLP-JSON object, but there doesn't seem to be a way to do that.

I can't move any sub object to root, which would be easiest. Failing that I'd expect to be able to move a sub object directly to attributes, and then the sub resources object to resource, but as the issue originally stated this isn't possible currently.

Ultimately having an operator like overwrite_root_from: 'attributes["my.otlp.object"]' would be the easiest for me.. either overwriting or merging.. but that's what I'm trying to achieve.

@djaglowski
Copy link
Member

I use a regex_parser to get the JSON blob out

You should look at the new container log parser. It accomplishes the same thing and probably a lot more efficiently.

The ones I do control, I want to monitor but with added granularity. I want to log in OTLP format - specifically JSON, so that all of the attributes and resource labels I'm adding are retained.

It sounds like otlpjson connector could be used for these logs, (after going through the container parser). The problem is separating this format from the others.

The ideal solution would be to use the routing connector to separate the logs, into separate processing pipelines. Then you could run only the otlpjson formatted logs through the otlpjson connector.

The challenge is that the routing connector currently only supports routing based on resource attributes. To address this, you could annotate a resource attribute onto the appropriate logs. Your config might look something like this:

receivers:
  filelog:
    include: ...
    operators:
      - type: container
      - type: add
        field: resource["log.type"]
        value:  otlpjson
        if: ... # TODO figure out how to detect otlp json format consistently

connectors: 
  routing:
    default_pipelines: [logs/nonotlp]
    table:
      - statement: route() where attributes["log.type"] == "otlpjson"
        pipelines: [logs/rehydrate]
  otlpjson:

exporters:
  foo: ...

service:
  pipelines:
    logs/in:
      receivers: [ filelog ]
      exporters: [ routing ]
    logs/rehydrate:
      receivers: [ routing ]
      exporters: [ otlpjson ]
    logs/rehydrated:
      receivers: [ otlpjson ]
      exporters: [ foo ]
    logs/nonotlpjson:
      receivers: [ routing ]
      exporters: [ foo ]

@perry-mitchell
Copy link
Author

Ah that's really interesting! Looks quite advanced. Though I'm still a bit confused about how I go from potentially a parsed sub-property or JSON object on the body to having that handled by the logs/rehydrate pipeline.. How does the otlpjson connector/receiver know which property to get the JSON from? Can it just be in body like:

{
  "body": {
    "body": "test log",
    "trace_id": "..."
  },
  "resource": {},
  "attributes": {}
}

@djaglowski
Copy link
Member

The otlpjson connector assumes that each log record's Body contains a full otlp json encoded string. So to use it you should not do any parsing on the original log except with the container parser.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants