Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle invalid integration pipelines #254

Merged
merged 2 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion datadog_sync/model/logs_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ class LogsPipelines(BaseResource):
resource_config = ResourceConfig(
concurrent=False,
base_path="/api/v1/logs/config/pipelines",
excluded_attributes=["id", "type"],
excluded_attributes=["id", "type", "__datadog_sync_invalid"],
)
# Additional LogsPipelines specific attributes
destination_integration_pipelines: Dict[str, Dict] = dict()
logs_intake_subdomain = "http-intake.logs"
logs_intake_path = "/api/v2/logs"
logs_intg_pipeline_source_re = r"source:((?P<source>\S+)$|\((?P<source_or>\S+) OR.*\))"
invalid_integration_pipelines = {"cron", "ufw"}

async def get_resources(self, client: CustomClient) -> List[Dict]:
resp = await client.get(self.resource_config.base_path)
Expand Down Expand Up @@ -61,6 +62,13 @@ async def create_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]:
return _id, resp

if resource["name"] not in self.destination_integration_pipelines:
if resource["name"] in self.invalid_integration_pipelines:
# We do not create invalid integration pipelines but rather insert
# the pipeline in local state with additional metadata to indicate
# that it is invalid.
# Upstream resources will selectively drop references to the resource based on the field.
resource["__datadog_sync_invalid"] = True
return _id, resource
# Extract the source from the query
source = self.extract_source_from_query(resource.get("filter", {}).get("query"))
if not source:
Expand Down Expand Up @@ -100,6 +108,14 @@ async def create_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]:
return _id, self.resource_config.destination_resources[_id]

async def update_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]:
current_destination_resource = self.resource_config.destination_resources[_id]
if current_destination_resource.get("__datadog_sync_invalid"):
# We do not update invalid integration pipelines.
# We only update the local state with the new payload to avoid diffs.
current_destination_resource.update(resource)
current_destination_resource["__datadog_sync_invalid"] = True
return _id, current_destination_resource

destination_client = self.config.destination_client
resp = await destination_client.put(
self.resource_config.base_path + f"/{self.resource_config.destination_resources[_id]['id']}",
Expand Down
19 changes: 18 additions & 1 deletion datadog_sync/model/logs_pipelines_order.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,24 @@ async def delete_resource(self, _id: str) -> None:
self.config.logger.warning("logs_pipeline_order cannot deleted. Removing resource from config only.")

def connect_id(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optional[List[str]]:
return super(LogsPipelinesOrder, self).connect_id(key, r_obj, resource_to_connect)
logs_pipelines = self.config.resources["logs_pipelines"].resource_config.destination_resources
failed_connections = []
ids_to_omit = []
for i, _id in enumerate(r_obj[key]):
if _id in logs_pipelines:
if logs_pipelines[_id].get("__datadog_sync_invalid"):
# Invalid logs integration pipelines which cannot be created.
# we remove it from the final logs pipeline order payload.
ids_to_omit.append(_id)
else:
r_obj[key][i] = logs_pipelines[_id]["id"]
else:
failed_connections.append(_id)

if ids_to_omit:
r_obj[key] = [_id for _id in r_obj[key] if _id not in ids_to_omit]

return failed_connections

async def get_destination_pipeline_order(self):
destination_client = self.config.destination_client
Expand Down
7 changes: 3 additions & 4 deletions datadog_sync/utils/resource_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@ def match(self, level):
level.t1 = deepcopy(level.t1)
level.t2 = deepcopy(level.t2)

# If we are at the top level, modify the list to only include the intersections.
# If we are at the top level, modify the list to exclude extra pipelines in destination.
t1 = set(level.t1["pipeline_ids"])
t2 = set(level.t2["pipeline_ids"])
intersection = t1 & t2
d_ignore = t1 - t2

level.t1["pipeline_ids"] = [_id for _id in level.t1["pipeline_ids"] if _id in intersection]
level.t2["pipeline_ids"] = [_id for _id in level.t2["pipeline_ids"] if _id in intersection]
level.t1["pipeline_ids"] = [_id for _id in level.t1["pipeline_ids"] if _id not in d_ignore]
return True

def give_up_diffing(self, level, diff_instance) -> bool:
Expand Down
Loading