Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward Step Functions Tags with logs to the backend #618

Merged
merged 25 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions aws/logs_monitoring/.dockerignore
Original file line number Diff line number Diff line change
@@ -1,13 +1,4 @@
*
!lambda_function.py
!enhanced_lambda_metrics.py
!logs.py
!parsing.py
!cache.py
!telemetry.py
!settings.py
!setup.py
!**.py
!template.yaml
!trace_forwarder/bin
!trace_forwarder/__init__.py
!trace_forwarder/connection.py
!trace_forwarder/bin
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,12 @@
DD_S3_BUCKET_NAME,
DD_TAGS_CACHE_TTL_SECONDS,
DD_S3_CACHE_LOCK_TTL_SECONDS,
DD_S3_CACHE_FILENAME,
DD_S3_CACHE_LOCK_FILENAME,
DD_S3_LOG_GROUP_CACHE_FILENAME,
DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME,
)
from telemetry import (
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX,
get_forwarder_telemetry_tags,
)


JITTER_MIN = 1
JITTER_MAX = 100

Expand Down Expand Up @@ -99,6 +94,11 @@ def should_fetch_log_group_tags():
return os.environ.get("DD_FETCH_LOG_GROUP_TAGS", "false").lower() == "true"


def should_fetch_step_functions_tags():
"""Checks the env var to determine if the customer has opted-in to fetching step functions tags"""
return os.environ.get("DD_FETCH_STEP_FUNCTIONS_TAGS", "false").lower() == "true"


def get_last_modified_time(s3_file):
last_modified_str = s3_file["ResponseMetadata"]["HTTPHeaders"]["last-modified"]
last_modified_date = datetime.datetime.strptime(
Expand All @@ -108,8 +108,7 @@ def get_last_modified_time(s3_file):
return last_modified_unix_time


class LambdaTagsCache(object):

class BaseTagsCache(object):
CACHE_FILENAME = None
CACHE_LOCK_FILENAME = None

Expand Down Expand Up @@ -233,10 +232,6 @@ def build_tags_cache(self):
raise Exception("BUILD TAGS MUST BE DEFINED FOR TAGS CACHES")


######################
# Lambda Custom Tags #
######################

resource_tagging_client = boto3.client("resourcegroupstaggingapi")
GET_RESOURCES_LAMBDA_FILTER = "lambda"

Expand Down Expand Up @@ -289,159 +284,3 @@ def parse_get_resources_response_for_tags_by_arn(get_resources_page):
tags_by_arn[lowercase_function_arn] += tags

return tags_by_arn


class LambdaCustomTagsCache(LambdaTagsCache):
CACHE_FILENAME = DD_S3_CACHE_FILENAME
CACHE_LOCK_FILENAME = DD_S3_CACHE_LOCK_FILENAME

def should_fetch_tags(self):
return should_fetch_lambda_tags()

def build_tags_cache(self):
"""Makes API calls to GetResources to get the live tags of the account's Lambda functions

Returns an empty dict instead of fetching custom tags if the tag fetch env variable is not set to true

Returns:
tags_by_arn_cache (dict<str, str[]>): each Lambda's tags in a dict keyed by ARN
"""
tags_fetch_success = False
tags_by_arn_cache = {}
get_resources_paginator = resource_tagging_client.get_paginator("get_resources")

try:
for page in get_resources_paginator.paginate(
ResourceTypeFilters=[GET_RESOURCES_LAMBDA_FILTER], ResourcesPerPage=100
):
send_forwarder_internal_metrics("get_resources_api_calls")
page_tags_by_arn = parse_get_resources_response_for_tags_by_arn(page)
tags_by_arn_cache.update(page_tags_by_arn)
tags_fetch_success = True

except ClientError as e:
logger.exception(
"Encountered a ClientError when trying to fetch tags. You may need to give "
"this Lambda's role the 'tag:GetResources' permission"
)
additional_tags = [
f"http_status_code:{e.response['ResponseMetadata']['HTTPStatusCode']}"
]
send_forwarder_internal_metrics(
"client_error", additional_tags=additional_tags
)
tags_fetch_success = False

logger.debug(
"Built this tags cache from GetResources API calls: %s", tags_by_arn_cache
)

return tags_fetch_success, tags_by_arn_cache

def get(self, key):
"""Get the tags for the Lambda function from the cache

Will refetch the tags if they are out of date, or a lambda arn is encountered
which isn't in the tag list

Note: the ARNs in the cache have been lowercased, so resource_arn must be lowercased

Args:
key (str): the key we're getting tags from the cache for

Returns:
lambda_tags (str[]): the list of "key:value" Datadog tag strings
"""
if self._is_expired():
send_forwarder_internal_metrics("local_cache_expired")
logger.debug("Local cache expired, fetching cache from S3")
self._refresh()

function_tags = self.tags_by_id.get(key, [])
return function_tags


#############################
# Cloudwatch Log Group Tags #
#############################

cloudwatch_logs_client = boto3.client("logs")


def get_log_group_tags(log_group):
response = None
try:
send_forwarder_internal_metrics("list_tags_log_group_api_call")
response = cloudwatch_logs_client.list_tags_log_group(logGroupName=log_group)
except Exception as e:
logger.exception(f"Failed to get log group tags due to {e}")
formatted_tags = None
if response is not None:
formatted_tags = [
"{key}:{value}".format(
key=sanitize_aws_tag_string(k, remove_colons=True),
value=sanitize_aws_tag_string(v, remove_leading_digits=False),
)
if v
else sanitize_aws_tag_string(k, remove_colons=True)
for k, v in response["tags"].items()
]
return formatted_tags


class CloudwatchLogGroupTagsCache(LambdaTagsCache):
CACHE_FILENAME = DD_S3_LOG_GROUP_CACHE_FILENAME
CACHE_LOCK_FILENAME = DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME

def should_fetch_tags(self):
return should_fetch_log_group_tags()

def build_tags_cache(self):
"""Makes API calls to GetResources to get the live tags of the account's Lambda functions

Returns an empty dict instead of fetching custom tags if the tag fetch env variable is not set to true

Returns:
tags_by_arn_cache (dict<str, str[]>): each Lambda's tags in a dict keyed by ARN
"""
new_tags = {}
for log_group in self.tags_by_id.keys():
log_group_tags = get_log_group_tags(log_group)
# If we didn't get back log group tags we'll use the locally cached ones if they exist
# This avoids losing tags on a failed api call
if log_group_tags is None:
log_group_tags = self.tags_by_id.get(log_group, [])
new_tags[log_group] = log_group_tags

logger.debug("All tags in Cloudwatch Log Groups refresh: {}".format(new_tags))
return True, new_tags

def get(self, log_group):
"""Get the tags for the Cloudwatch Log Group from the cache

Will refetch the tags if they are out of date, or a log group is encountered
which isn't in the tag list

Args:
key (str): the key we're getting tags from the cache for

Returns:
log_group_tags (str[]): the list of "key:value" Datadog tag strings
"""
if self._is_expired():
send_forwarder_internal_metrics("local_cache_expired")
logger.debug("Local cache expired, fetching cache from S3")
self._refresh()

log_group_tags = self.tags_by_id.get(log_group, None)
if log_group_tags is None:
# If the custom tag fetch env var is not set to true do not fetch
if not self.should_fetch_tags():
logger.debug(
"Not fetching custom tags because the env variable DD_FETCH_LAMBDA_TAGS is not set to true"
)
return []
log_group_tags = get_log_group_tags(log_group) or []
self.tags_by_id[log_group] = log_group_tags

return log_group_tags
96 changes: 96 additions & 0 deletions aws/logs_monitoring/cloudwatch_log_group_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import boto3

from base_tags_cache import (
BaseTagsCache,
logger,
sanitize_aws_tag_string,
send_forwarder_internal_metrics,
should_fetch_log_group_tags,
)
from settings import (
DD_S3_LOG_GROUP_CACHE_FILENAME,
DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME,
)


class CloudwatchLogGroupTagsCache(BaseTagsCache):
CACHE_FILENAME = DD_S3_LOG_GROUP_CACHE_FILENAME
CACHE_LOCK_FILENAME = DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME

def should_fetch_tags(self):
return should_fetch_log_group_tags()

def build_tags_cache(self):
"""Makes API calls to GetResources to get the live tags of the account's Lambda functions

Returns an empty dict instead of fetching custom tags if the tag fetch env variable is not set to true

Returns:
tags_by_arn_cache (dict<str, str[]>): each Lambda's tags in a dict keyed by ARN
"""
new_tags = {}
for log_group in self.tags_by_id.keys():
log_group_tags = get_log_group_tags(log_group)
# If we didn't get back log group tags we'll use the locally cached ones if they exist
# This avoids losing tags on a failed api call
if log_group_tags is None:
log_group_tags = self.tags_by_id.get(log_group, [])
new_tags[log_group] = log_group_tags

logger.debug("All tags in Cloudwatch Log Groups refresh: {}".format(new_tags))
return True, new_tags

def get(self, log_group):
"""Get the tags for the Cloudwatch Log Group from the cache

Will refetch the tags if they are out of date, or a log group is encountered
which isn't in the tag list

Args:
key (str): the key we're getting tags from the cache for

Returns:
log_group_tags (str[]): the list of "key:value" Datadog tag strings
"""
if self._is_expired():
send_forwarder_internal_metrics("local_cache_expired")
logger.debug("Local cache expired, fetching cache from S3")
self._refresh()

log_group_tags = self.tags_by_id.get(log_group, None)
if log_group_tags is None:
# If the custom tag fetch env var is not set to true do not fetch
if not self.should_fetch_tags():
logger.debug(
"Not fetching custom tags because the env variable DD_FETCH_LOG_GROUP_TAGS is "
"not set to true"
)
return []
log_group_tags = get_log_group_tags(log_group) or []
self.tags_by_id[log_group] = log_group_tags

return log_group_tags


def get_log_group_tags(log_group):
response = None
try:
send_forwarder_internal_metrics("list_tags_log_group_api_call")
response = cloudwatch_logs_client.list_tags_log_group(logGroupName=log_group)
except Exception as e:
logger.exception(f"Failed to get log group tags due to {e}")
formatted_tags = None
if response is not None:
formatted_tags = [
"{key}:{value}".format(
key=sanitize_aws_tag_string(k, remove_colons=True),
value=sanitize_aws_tag_string(v, remove_leading_digits=False),
)
if v
else sanitize_aws_tag_string(k, remove_colons=True)
for k, v in response["tags"].items()
]
return formatted_tags


cloudwatch_logs_client = boto3.client("logs")
4 changes: 2 additions & 2 deletions aws/logs_monitoring/enhanced_lambda_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from time import time

from cache import LambdaCustomTagsCache
from lambda_cache import LambdaTagsCache

ENHANCED_METRICS_NAMESPACE_PREFIX = "aws.lambda.enhanced"

Expand Down Expand Up @@ -86,7 +86,7 @@

# Store the cache in the global scope so that it will be reused as long as
# the log forwarder Lambda container is running
account_lambda_custom_tags_cache = LambdaCustomTagsCache()
account_lambda_custom_tags_cache = LambdaTagsCache()


class DatadogMetricPoint(object):
Expand Down
Loading