diff --git a/datadog_sync/model/logs_pipelines.py b/datadog_sync/model/logs_pipelines.py index 65d81f65..69da46ab 100644 --- a/datadog_sync/model/logs_pipelines.py +++ b/datadog_sync/model/logs_pipelines.py @@ -21,13 +21,14 @@ class LogsPipelines(BaseResource): resource_config = ResourceConfig( concurrent=False, base_path="/api/v1/logs/config/pipelines", - excluded_attributes=["id", "type"], + excluded_attributes=["id", "type", "__datadog_sync_invalid"], ) # Additional LogsPipelines specific attributes destination_integration_pipelines: Dict[str, Dict] = dict() logs_intake_subdomain = "http-intake.logs" logs_intake_path = "/api/v2/logs" logs_intg_pipeline_source_re = r"source:((?P\S+)$|\((?P\S+) OR.*\))" + invalid_integration_pipelines = {"cron", "ufw"} async def get_resources(self, client: CustomClient) -> List[Dict]: resp = await client.get(self.resource_config.base_path) @@ -61,6 +62,13 @@ async def create_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]: return _id, resp if resource["name"] not in self.destination_integration_pipelines: + if resource["name"] in self.invalid_integration_pipelines: + # We do not create invalid integration pipelines but rather insert + # the pipeline in local state with additional metadata to indicate + # that it is invalid. + # Upstream resources will selectively drop references to the resource based on the field. + resource["__datadog_sync_invalid"] = True + return _id, resource # Extract the source from the query source = self.extract_source_from_query(resource.get("filter", {}).get("query")) if not source: @@ -100,6 +108,14 @@ async def create_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]: return _id, self.resource_config.destination_resources[_id] async def update_resource(self, _id: str, resource: Dict) -> Tuple[str, Dict]: + current_destination_resource = self.resource_config.destination_resources[_id] + if current_destination_resource.get("__datadog_sync_invalid"): + # We do not update invalid integration pipelines. + # We only update the local state with the new payload to avoid diffs. + current_destination_resource.update(resource) + current_destination_resource["__datadog_sync_invalid"] = True + return _id, current_destination_resource + destination_client = self.config.destination_client resp = await destination_client.put( self.resource_config.base_path + f"/{self.resource_config.destination_resources[_id]['id']}", diff --git a/datadog_sync/model/logs_pipelines_order.py b/datadog_sync/model/logs_pipelines_order.py index 52843932..06ceecca 100644 --- a/datadog_sync/model/logs_pipelines_order.py +++ b/datadog_sync/model/logs_pipelines_order.py @@ -75,7 +75,24 @@ async def delete_resource(self, _id: str) -> None: self.config.logger.warning("logs_pipeline_order cannot deleted. Removing resource from config only.") def connect_id(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optional[List[str]]: - return super(LogsPipelinesOrder, self).connect_id(key, r_obj, resource_to_connect) + logs_pipelines = self.config.resources["logs_pipelines"].resource_config.destination_resources + failed_connections = [] + ids_to_omit = [] + for i, _id in enumerate(r_obj[key]): + if _id in logs_pipelines: + if logs_pipelines[_id].get("__datadog_sync_invalid"): + # Invalid logs integration pipelines which cannot be created. + # we remove it from the final logs pipeline order payload. + ids_to_omit.append(_id) + else: + r_obj[key][i] = logs_pipelines[_id]["id"] + else: + failed_connections.append(_id) + + if ids_to_omit: + r_obj[key] = [_id for _id in r_obj[key] if _id not in ids_to_omit] + + return failed_connections async def get_destination_pipeline_order(self): destination_client = self.config.destination_client diff --git a/datadog_sync/utils/resource_utils.py b/datadog_sync/utils/resource_utils.py index 8973b5c3..080a7f90 100644 --- a/datadog_sync/utils/resource_utils.py +++ b/datadog_sync/utils/resource_utils.py @@ -52,13 +52,12 @@ def match(self, level): level.t1 = deepcopy(level.t1) level.t2 = deepcopy(level.t2) - # If we are at the top level, modify the list to only include the intersections. + # If we are at the top level, modify the list to exclude extra pipelines in destination. t1 = set(level.t1["pipeline_ids"]) t2 = set(level.t2["pipeline_ids"]) - intersection = t1 & t2 + d_ignore = t1 - t2 - level.t1["pipeline_ids"] = [_id for _id in level.t1["pipeline_ids"] if _id in intersection] - level.t2["pipeline_ids"] = [_id for _id in level.t2["pipeline_ids"] if _id in intersection] + level.t1["pipeline_ids"] = [_id for _id in level.t1["pipeline_ids"] if _id not in d_ignore] return True def give_up_diffing(self, level, diff_instance) -> bool: