Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(EKS Fargate): Add multiline support to EKS Fargate #3059

Merged
merged 2 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/3059.added.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat(EKS Fargate): Add multiline support to EKS Fargate
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,59 @@ processors:
send_batch_max_size: 2000
send_batch_size: 1000
timeout: 1s
transform/cloudwatch_logs:
transform/set_source_identifier:
error_mode: ignore
log_statements:
- context: log
statements:
- set(attributes["k8s.container.name"], resource.attributes["cloudwatch.log.stream"])
- set(attributes["k8s.pod.name"], resource.attributes["cloudwatch.log.stream"])
- set(attributes["k8s.namespace.name"], resource.attributes["cloudwatch.log.stream"])
- replace_pattern(attributes["k8s.pod.name"], "^.*kube\\.var\\.log\\.containers\\.([0-9a-zA-Z\\-]+)\\_.*", "$$1")
- replace_pattern(attributes["k8s.container.name"], "^.*kube\\.var\\.log\\.containers\\.[0-9a-zA-Z\\-]+\\_[a-zA-Z\\-]*\\_([a-zA-Z]*).*", "$$1")
- replace_pattern(attributes["k8s.namespace.name"], "^.*kube\\.var\\.log\\.containers\\.[0-9a-zA-Z\\-]+\\_([a-zA-Z\\-]*)_.*", "$$1")
transform/parse:
- set(attributes["cloudwatch.log.stream"], resource.attributes["cloudwatch.log.stream"])
groupbyattrs/stream:
keys:
- cloudwatch.log.stream
## need to reset the source identifier after grouping
Copy link
Contributor

@sumo-drosiek sumo-drosiek May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to reset it? We can use resource attribute in logstransform's recombine operator. Now I see we set it to use in logstransform and then we remove it, which seems like unnecessary overhead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. Initially we didn't think about that, but that makes a lot of sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to set the cloudwatch.log.stream as a record attribute so that we could use it to group all the log records under a resource. If we don't set this as a record attribute, then what would I group by?

this is from the doc for groupby attr

If the log record and metric data point has at least one of the specified attributes key, it will be moved to a Resource with the same value for these attributes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess he's referring to setting this after grouping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that makes sense, made the change.

Copy link
Contributor Author

@rnishtala-sumo rnishtala-sumo May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sumo-drosiek @aboguszewski-sumo actually I get the following warning when I try to use the log stream resource attribute as the source identifier for the recombine operator

2023-05-19T17:00:15.352Z    warn    recombine/recombine.go:235    entry does not contain the source_identifier, so it may be pooled with other sources    {"kind": "processor", "name": "logstransform/cloudwatch", "pipeline │
│ ": "logs/collector/otelcloudwatch", "operator_id": "merge-cri-lines", "operator_type": "recombine"}

I'll revert to copying the log stream attribute to record level to prevent unexpected issues.

Copy link
Contributor Author

@rnishtala-sumo rnishtala-sumo May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirmed that using a log stream record attr as the source id clears the above warning. In the interest of avoiding a scenario where we combine logs from multiple sources (as the above warning indicates), I'd like to keep the original changes

Copy link
Contributor

@sumo-drosiek sumo-drosiek May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following is working for me:

configuration:

receivers:
  filelog:
    include:
      - ./tmp/logs/multiline.json  
    start_at: beginning
exporters:
  logging:
    verbosity: detailed
processors:
  logstransform/containers_parse_json:
    operators:
      - if: body matches "^{[\\s\\S]+"
        parse_from: body
        parse_to: body
        type: json_parser
      - type: add
        field: resource["cloudwatch.log.stream"]
        value: resource_attribute
  logstransform:
    operators:
      - id: merge-multiline-logs
        combine_field: body.log
        combine_with: "\n"
        is_first_entry: body.log matches "^a"
        source_identifier: resource["cloudwatch.log.stream"]
        # source_identifier: attributes["log.file.name"]
        type: recombine
service:
  pipelines:
    logs:
      receivers:
        - filelog
      processors:
        - logstransform/containers_parse_json
        - logstransform
      exporters:
        - logging

file (./tmp/logs/multiline.json ):

{"log": "abc"}
{"log": "def"}
{"log": "ghi"}
{"log": "asdc"}

Copy link
Contributor Author

@rnishtala-sumo rnishtala-sumo May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the source id to resource["cloudwatch.log.stream"] and that seems to work, no warnings. thanks!
https://stagdata.long.sumologic.net/ui/#/search/create?id=RDmXCQ1yhB5eBla9BPzgBCvMFYprpbG7EY0DAT7M

transform/reset_source_identifier:
error_mode: ignore
log_statements:
- context: log
statements:
- set(attributes["cloudwatch.log.stream"], resource.attributes["cloudwatch.log.stream"])
transform/parsejson:
error_mode: ignore
log_statements:
- context: log
statements:
- set(body, ParseJSON(body)) where IsMatch(body, "^{") == true
- merge_maps(attributes, body, "insert")
- set(body, "") where IsMatch(body, "^{") == true
transform/metadata:
error_mode: ignore
log_statements:
- context: log
statements:
- set(attributes["k8s.container.name"], resource.attributes["cloudwatch.log.stream"])
- set(attributes["k8s.pod.name"], resource.attributes["cloudwatch.log.stream"])
- set(attributes["k8s.namespace.name"], resource.attributes["cloudwatch.log.stream"])
- replace_pattern(attributes["k8s.pod.name"], "^.*kube\\.var\\.log\\.containers\\.([0-9a-zA-Z\\-]+)\\_.*", "$$1")
- replace_pattern(attributes["k8s.container.name"], "^.*kube\\.var\\.log\\.containers\\.[0-9a-zA-Z\\-]+\\_[a-zA-Z\\-]*\\_([a-zA-Z]*).*", "$$1")
- replace_pattern(attributes["k8s.namespace.name"], "^.*kube\\.var\\.log\\.containers\\.[0-9a-zA-Z\\-]+\\_([a-zA-Z\\-]*)_.*", "$$1")
logstransform/cloudwatch:
operators:
- id: merge-cri-lines
combine_field: attributes.log
combine_with: ""
is_last_entry: attributes.logtag == "F"
output: "merge-multiline-logs"
overwrite_with: newest
source_identifier: attributes["cloudwatch.log.stream"]
type: recombine
- id: merge-multiline-logs
combine_field: attributes.log
combine_with: "\n"
is_first_entry: attributes.log matches {{ .Values.sumologic.logs.multiline.first_line_regex | quote }}
source_identifier: attributes["cloudwatch.log.stream"]
type: recombine
- field: attributes["cloudwatch.log.stream"]
type: remove
receivers:
awscloudwatch:
region: {{ .Values.sumologic.logs.collector.otelcloudwatch.region }}
Expand All @@ -63,8 +98,12 @@ service:
receivers:
- awscloudwatch
processors:
- transform/parse
- transform/cloudwatch_logs
- transform/set_source_identifier
- groupbyattrs/stream
Comment on lines +92 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks hacky 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we're grouping all the logs by the cloudwatch log stream. we'll also plan to raise an upstream issue for the cloudwatch receiver so that the logs are placed (correctly) under a log stream resource.

- transform/reset_source_identifier
- transform/parsejson
- logstransform/cloudwatch
- transform/metadata
- batch
exporters:
- otlphttp
Expand Down
9 changes: 3 additions & 6 deletions docs/fargate.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
# Fargate

**NOTE: This is the alpha release of EKS Fargate.**
**Release Note: Generally Available (GA) Release of EKS Fargate.**

The following are some limitations of deploying this helm chart on EKS fargate

- Only supports EKS version 1.24 and above
- Does not support multiline logs
Supports EKS version 1.24 and above

The following documentation assumes that you are using eksctl to manage Fargate cluster. Code snippets are using environment variables in
order to make them as generic and reusable.
Expand Down Expand Up @@ -780,7 +777,7 @@ sumologic:
fluent-bit-cloudwatch:
## The log stream prefix, can also be specified as
## names: []
prefixes: [from-fluent-bit]
names: [from-fluent-bit]
```

where `my-role` is the name of the role created while setting up [authentication](#authenticate-with-cloudwatch)
Expand Down