diff --git a/integrations/gitlab/CHANGELOG.md b/integrations/gitlab/CHANGELOG.md index 5222f3fd8..2ea4f9748 100644 --- a/integrations/gitlab/CHANGELOG.md +++ b/integrations/gitlab/CHANGELOG.md @@ -7,6 +7,14 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +0.1.124 (2024-09-24) +==================== + +### Improvements + +- Added more logs and implemented the webhook creation in async (0.1.124) + + 0.1.123 (2024-09-22) ==================== diff --git a/integrations/gitlab/gitlab_integration/events/event_handler.py b/integrations/gitlab/gitlab_integration/events/event_handler.py index ce2664f3a..81689dacd 100644 --- a/integrations/gitlab/gitlab_integration/events/event_handler.py +++ b/integrations/gitlab/gitlab_integration/events/event_handler.py @@ -69,17 +69,14 @@ def on(self, events: list[str], observer: Observer) -> None: self._observers[event].append(observer) async def _notify(self, event_id: str, body: dict[str, Any]) -> None: - observers = asyncio.gather( - *( - observer(event_id, body) - for observer in self._observers.get(event_id, []) - ) - ) - - if not observers: + observers_list = self._observers.get(event_id, []) + if not observers_list: logger.info( f"event: {event_id} has no matching handler. the handlers available are for events: {self._observers.keys()}" ) + return + + await asyncio.gather(*(observer(event_id, body) for observer in observers_list)) class SystemEventHandler(BaseEventHandler): diff --git a/integrations/gitlab/gitlab_integration/events/setup.py b/integrations/gitlab/gitlab_integration/events/setup.py index 069d36647..53cae4e79 100644 --- a/integrations/gitlab/gitlab_integration/events/setup.py +++ b/integrations/gitlab/gitlab_integration/events/setup.py @@ -115,7 +115,7 @@ def validate_hooks_override_config( validate_groups_hooks_events(groups_paths) -def setup_listeners(gitlab_service: GitlabService, webhook_id: str) -> None: +def setup_listeners(gitlab_service: GitlabService, group_id: str) -> None: handlers = [ PushHook(gitlab_service), MergeRequest(gitlab_service), @@ -127,9 +127,9 @@ def setup_listeners(gitlab_service: GitlabService, webhook_id: str) -> None: ] for handler in handlers: logger.info( - f"Setting up listeners for webhook {webhook_id} for group mapping {gitlab_service.group_mapping}" + f"Setting up listeners {handler.events} for group {group_id} for group mapping {gitlab_service.group_mapping}" ) - event_ids = [f"{event_name}:{webhook_id}" for event_name in handler.events] + event_ids = [f"{event_name}:{group_id}" for event_name in handler.events] event_handler.on(event_ids, handler.on_hook) @@ -144,13 +144,14 @@ def setup_system_listeners(gitlab_clients: list[GitlabService]) -> None: ProjectFiles, ] for handler in handlers: + logger.info(f"Setting up system listeners {handler.system_events}") system_event_handler.on(handler) for gitlab_service in gitlab_clients: system_event_handler.add_client(gitlab_service) -def create_webhooks_by_client( +async def create_webhooks_by_client( gitlab_host: str, app_host: str, token: str, @@ -160,16 +161,16 @@ def create_webhooks_by_client( gitlab_client = Gitlab(gitlab_host, token) gitlab_service = GitlabService(gitlab_client, app_host, group_mapping) - groups_for_webhooks = gitlab_service.get_filtered_groups_for_webhooks( + groups_for_webhooks = await gitlab_service.get_filtered_groups_for_webhooks( list(groups_hooks_events_override.keys()) if groups_hooks_events_override else None ) - webhooks_ids: list[str] = [] + groups_ids_with_webhooks: list[str] = [] for group in groups_for_webhooks: - webhook_id = gitlab_service.create_webhook( + group_id = await gitlab_service.create_webhook( group, ( groups_hooks_events_override.get( @@ -180,13 +181,13 @@ def create_webhooks_by_client( ), ) - if webhook_id: - webhooks_ids.append(webhook_id) + if group_id: + groups_ids_with_webhooks.append(group_id) - return gitlab_service, webhooks_ids + return gitlab_service, groups_ids_with_webhooks -def setup_application( +async def setup_application( token_mapping: dict[str, list[str]], gitlab_host: str, app_host: str, @@ -196,6 +197,7 @@ def setup_application( validate_token_mapping(token_mapping) if use_system_hook: + logger.info("Using system hook") validate_use_system_hook(token_mapping) token, group_mapping = list(token_mapping.items())[0] gitlab_client = Gitlab(gitlab_host, token) @@ -203,38 +205,44 @@ def setup_application( setup_system_listeners([gitlab_service]) else: + logger.info("Using group hooks") validate_hooks_override_config( token_mapping, token_group_override_hooks_mapping ) - client_to_webhooks: list[tuple[GitlabService, list[str]]] = [] + client_to_group_ids_with_webhooks: list[tuple[GitlabService, list[str]]] = [] for token, group_mapping in token_mapping.items(): - if not token_group_override_hooks_mapping: - client_to_webhooks.append( - create_webhooks_by_client( - gitlab_host, - app_host, - token, - None, - group_mapping, - ) - ) - else: - groups = token_group_override_hooks_mapping.tokens.get( - token, WebhookTokenConfig(groups=[]) - ).groups - if groups: - client_to_webhooks.append( - create_webhooks_by_client( + try: + if not token_group_override_hooks_mapping: + client_to_group_ids_with_webhooks.append( + await create_webhooks_by_client( gitlab_host, app_host, token, - groups, + None, group_mapping, ) ) + else: + groups = token_group_override_hooks_mapping.tokens.get( + token, WebhookTokenConfig(groups=[]) + ).groups + if groups: + client_to_group_ids_with_webhooks.append( + await create_webhooks_by_client( + gitlab_host, + app_host, + token, + groups, + group_mapping, + ) + ) + except Exception as e: + logger.exception( + f"Failed to create webhooks for group mapping {group_mapping}, error: {e}" + ) - for client, webhook_ids in client_to_webhooks: - for webhook_id in webhook_ids: - setup_listeners(client, webhook_id) + for client, group_ids in client_to_group_ids_with_webhooks: + for group_id in group_ids: + setup_listeners(client, group_id) diff --git a/integrations/gitlab/gitlab_integration/gitlab_service.py b/integrations/gitlab/gitlab_integration/gitlab_service.py index 0da6542ba..05e0657db 100644 --- a/integrations/gitlab/gitlab_integration/gitlab_service.py +++ b/integrations/gitlab/gitlab_integration/gitlab_service.py @@ -19,6 +19,7 @@ ProjectFile, ProjectPipeline, ProjectPipelineJob, + Hook, ) from gitlab_integration.core.async_fetcher import AsyncFetcher from gitlab_integration.core.entities import generate_entity_from_port_yaml @@ -63,22 +64,34 @@ def __init__( GITLAB_SEARCH_RATE_LIMIT * 0.95, 60 ) - def _get_webhook_for_group(self, group: RESTObject) -> RESTObject | None: + async def get_group_hooks(self, group: RESTObject) -> AsyncIterator[List[Hook]]: + async for hooks_batch in AsyncFetcher.fetch_batch(group.hooks.list): + hooks = typing.cast(List[Hook], hooks_batch) + yield hooks + + async def _get_webhook_for_group(self, group: RESTObject) -> RESTObject | None: webhook_url = f"{self.app_host}/integration/hook/{group.get_id()}" - for hook in group.hooks.list(iterator=True): - if hook.url == webhook_url: - return hook + logger.info( + f"Getting webhook for group {group.get_id()} with url {webhook_url}" + ) + async for hook_batch in self.get_group_hooks(group): + for hook in hook_batch: + if hook.url == webhook_url: + logger.info( + f"Found webhook for group {group.get_id()} with id {hook.id} and url {hook.url}" + ) + return hook return None - def _delete_group_webhook(self, group: RESTObject, hook_id: int) -> None: + async def _delete_group_webhook(self, group: RESTObject, hook_id: int) -> None: logger.info(f"Deleting webhook with id {hook_id} in group {group.get_id()}") try: - group.hooks.delete(hook_id) + await AsyncFetcher.fetch_single(group.hooks.delete, hook_id) logger.info(f"Deleted webhook for {group.get_id()}") except Exception as e: logger.error(f"Failed to delete webhook for {group.get_id()} error={e}") - def _create_group_webhook( + async def _create_group_webhook( self, group: RESTObject, events: list[str] | None ) -> None: webhook_events = { @@ -87,20 +100,23 @@ def _create_group_webhook( } logger.info( - f"Creating webhook for {group.get_id()} with events: {[event for event in webhook_events if webhook_events[event]]}" + f"Creating webhook for group {group.get_id()} with events: {[event for event in webhook_events if webhook_events[event]]}" ) try: - resp = group.hooks.create( + resp = await AsyncFetcher.fetch_single( + group.hooks.create, { "url": f"{self.app_host}/integration/hook/{group.get_id()}", **webhook_events, - } + }, ) logger.info( - f"Created webhook for {group.get_id()}, id={resp.id}, url={resp.url}" + f"Created webhook for group {group.get_id()}, webhook id={resp.id}, url={resp.url}" ) except Exception as e: - logger.error(f"Failed to create webhook for {group.get_id()} error={e}") + logger.exception( + f"Failed to create webhook for group {group.get_id()} error={e}" + ) def _get_changed_files_between_commits( self, project_id: int, head: str @@ -253,14 +269,27 @@ def should_process_project( return True return project.name in repos - def get_root_groups(self) -> List[Group]: - groups = self.gitlab_client.groups.list(iterator=True) + async def get_root_groups(self) -> List[Group]: + groups: list[RESTObject] = [] + async for groups_batch in AsyncFetcher.fetch_batch( + self.gitlab_client.groups.list, retry_transient_errors=True + ): + groups_batch = typing.cast(List[RESTObject], groups_batch) + groups.extend(groups_batch) + return typing.cast( List[Group], [group for group in groups if group.parent_id is None] ) - def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]: - groups = self.gitlab_client.groups.list(get_all=True) + async def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]: + groups: list[RESTObject] = [] + + async for groups_batch in AsyncFetcher.fetch_batch( + self.gitlab_client.groups.list, retry_transient_errors=True + ): + groups_batch = typing.cast(List[RESTObject], groups_batch) + groups.extend(groups_batch) + return typing.cast( List[Group], [ @@ -270,7 +299,7 @@ def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]: ], ) - def get_filtered_groups_for_webhooks( + async def get_filtered_groups_for_webhooks( self, groups_hooks_override_list: list[str] | None, ) -> List[Group]: @@ -278,9 +307,9 @@ def get_filtered_groups_for_webhooks( if groups_hooks_override_list is not None: if groups_hooks_override_list: logger.info( - "Getting all the specified groups in the mapping for a token to create their webhooks" + f"Getting all the specified groups in the mapping for a token to create their webhooks for: {groups_hooks_override_list}" ) - groups_for_webhooks = self.filter_groups_by_paths( + groups_for_webhooks = await self.filter_groups_by_paths( groups_hooks_override_list ) @@ -302,7 +331,7 @@ def get_filtered_groups_for_webhooks( ) else: logger.info("Getting all the root groups to create their webhooks") - root_groups = self.get_root_groups() + root_groups = await self.get_root_groups() groups_for_webhooks = [ group for group in root_groups @@ -316,16 +345,18 @@ def get_filtered_groups_for_webhooks( return groups_for_webhooks - def create_webhook(self, group: Group, events: list[str] | None) -> str | None: + async def create_webhook( + self, group: Group, events: list[str] | None + ) -> str | None: logger.info(f"Creating webhook for the group: {group.attributes['full_path']}") - webhook_id = None group_id = group.get_id() if group_id is None: logger.info(f"Group {group.attributes['full_path']} has no id. skipping...") + return None else: - hook = self._get_webhook_for_group(group) + hook = await self._get_webhook_for_group(group) if hook: logger.info(f"Webhook already exists for group {group.get_id()}") @@ -333,14 +364,13 @@ def create_webhook(self, group: Group, events: list[str] | None) -> str | None: logger.info( f"Webhook exists for group {group.get_id()} but is disabled, deleting and re-creating..." ) - self._delete_group_webhook(group, hook.id) - self._create_group_webhook(group, events) + await self._delete_group_webhook(group, hook.id) + await self._create_group_webhook(group, events) logger.info(f"Webhook re-created for group {group.get_id()}") else: - self._create_group_webhook(group, events) - webhook_id = str(group_id) + await self._create_group_webhook(group, events) - return webhook_id + return str(group_id) def create_system_hook(self) -> None: logger.info("Checking if system hook already exists") diff --git a/integrations/gitlab/gitlab_integration/ocean.py b/integrations/gitlab/gitlab_integration/ocean.py index 12e5e9949..8ed72c113 100644 --- a/integrations/gitlab/gitlab_integration/ocean.py +++ b/integrations/gitlab/gitlab_integration/ocean.py @@ -27,6 +27,16 @@ PROJECT_RESYNC_BATCH_SIZE = 10 +async def start_processors() -> None: + """Helper function to start the event processors.""" + try: + logger.info("Starting event processors") + await event_handler.start_event_processor() + await system_event_handler.start_event_processor() + except Exception as e: + logger.exception(f"Failed to start event processors: {e}") + + @ocean.router.post("/hook/{group_id}") async def handle_webhook_request(group_id: str, request: Request) -> dict[str, Any]: event_id = f"{request.headers.get('X-Gitlab-Event')}:{group_id}" @@ -79,8 +89,14 @@ async def on_start() -> None: if not integration_config.get("app_host"): logger.warning( - f"No app host provided, skipping webhook creation. {NO_WEBHOOK_WARNING}" + f"No app host provided, skipping webhook creation. {NO_WEBHOOK_WARNING}. Starting the event processors" ) + try: + await start_processors() + except Exception as e: + logger.exception( + f"Failed to start event processors: {e}. {NO_WEBHOOK_WARNING}" + ) return token_webhook_mapping: WebhookMappingConfig | None = None @@ -91,21 +107,19 @@ async def on_start() -> None: ) try: - setup_application( + await setup_application( integration_config["token_mapping"], integration_config["gitlab_host"], integration_config["app_host"], integration_config["use_system_hook"], token_webhook_mapping, ) - - await event_handler.start_event_processor() - await system_event_handler.start_event_processor() except Exception as e: - logger.warning( - f"Failed to setup webhook: {e}. {NO_WEBHOOK_WARNING}", - stack_info=True, - ) + logger.exception(f"Failed to setup webhook: {e}. {NO_WEBHOOK_WARNING}") + try: + await start_processors() # Ensure event processors are started regardless of webhook setup + except Exception as e: + logger.exception(f"Failed to start event processors: {e}. {NO_WEBHOOK_WARNING}") @ocean.on_resync(ObjectKind.GROUP) diff --git a/integrations/gitlab/pyproject.toml b/integrations/gitlab/pyproject.toml index d919a377d..7d0ca910f 100644 --- a/integrations/gitlab/pyproject.toml +++ b/integrations/gitlab/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gitlab" -version = "0.1.123" +version = "0.1.124" description = "Gitlab integration for Port using Port-Ocean Framework" authors = ["Yair Siman-Tov "] diff --git a/integrations/gitlab/tests/gitlab_integration/test_gitlab_service_webhook.py b/integrations/gitlab/tests/gitlab_integration/test_gitlab_service_webhook.py index 354caadfb..f3656de00 100644 --- a/integrations/gitlab/tests/gitlab_integration/test_gitlab_service_webhook.py +++ b/integrations/gitlab/tests/gitlab_integration/test_gitlab_service_webhook.py @@ -1,150 +1,107 @@ -from unittest.mock import MagicMock -from typing import Any +from unittest.mock import MagicMock, AsyncMock from gitlab_integration.gitlab_service import GitlabService +import pytest -def test_get_webhook_for_group_found(mocked_gitlab_service: GitlabService) -> None: +@pytest.mark.asyncio +async def test_delete_group_webhook_success( + mocked_gitlab_service: GitlabService, +) -> None: # Arrange mock_group = MagicMock() - mock_group.get_id.return_value = 456 - mock_webhook_url = "http://example.com/integration/hook/456" + mock_group.get_id.return_value = 123 + mock_group.hooks.delete = AsyncMock() # Mock successful deletion + + # Mock the group hooks.list method to return a webhook mock_hook = MagicMock() - mock_hook.url = mock_webhook_url - mock_hook.id = 984 - mock_group.hooks.list.return_value = [mock_hook] + mock_hook.url = "http://example.com/integration/hook/456" + mock_hook.id = 17 # Act - result = mocked_gitlab_service._get_webhook_for_group(mock_group) + await mocked_gitlab_service._delete_group_webhook(mock_group, mock_hook.id) # Assert - assert result == mock_hook - mock_group.hooks.list.assert_called_once_with(iterator=True) + mock_group.hooks.delete.assert_called_once_with(mock_hook.id) -def test_get_webhook_for_group_not_found(mocked_gitlab_service: GitlabService) -> None: +@pytest.mark.asyncio +async def test_delete_group_webhook_failure( + mocked_gitlab_service: GitlabService, +) -> None: # Arrange mock_group = MagicMock() - mock_group.get_id.return_value = 789 - mock_hook = MagicMock() - mock_hook.url = "http://example.com/other/hook" - mock_group.hooks.list.return_value = [mock_hook] + mock_group.get_id.return_value = 123 + mock_group.hooks.delete = AsyncMock(side_effect=Exception("Delete failed")) + mock_hook = MagicMock() + mock_hook.url = "http://example.com/integration/hook/456" + mock_hook.id = 17 # Act - result = mocked_gitlab_service._get_webhook_for_group(mock_group) + await mocked_gitlab_service._delete_group_webhook(mock_group, mock_hook.id) # Assert - assert result is None - mock_group.hooks.list.assert_called_once_with(iterator=True) + mock_group.hooks.delete.assert_called_once_with(mock_hook.id) -def test_create_webhook_when_webhook_exists_but_disabled( - mocked_gitlab_service: GitlabService, monkeypatch: Any -): +@pytest.mark.asyncio +async def test_create_group_webhook_success( + mocked_gitlab_service: GitlabService, +) -> None: # Arrange mock_group = MagicMock() - mock_group.get_id.return_value = 456 - mock_group.attributes = {"full_path": "group2"} - - # Mock the group hooks.list method to return an existing disabled webhook - mock_hook = MagicMock() - mock_hook.url = "http://example.com/integration/hook/456" # Updated URL for clarity - mock_hook.alert_status = "disabled" - mock_hook.id = 456 - mock_group.hooks.list.return_value = [mock_hook] - - # Mock the methods for deleting and creating webhooks - mock_delete_webhook = MagicMock() - monkeypatch.setattr( - mocked_gitlab_service, "_delete_group_webhook", mock_delete_webhook - ) - mock_create_webhook = MagicMock() - monkeypatch.setattr( - mocked_gitlab_service, "_create_group_webhook", mock_create_webhook + mock_group.get_id.return_value = 123 + mock_group.hooks.create = AsyncMock( + return_value=MagicMock(id=789, url="http://example.com/hook/123") ) # Act - webhook_id = mocked_gitlab_service.create_webhook( - mock_group, events=["push", "merge_request"] + await mocked_gitlab_service._create_group_webhook( + mock_group, ["push_events", "merge_requests_events"] ) # Assert - assert webhook_id == "456" - mock_delete_webhook.assert_called_once_with( - mock_group, mock_hook.id - ) # Ensure delete method is called - mock_create_webhook.assert_called_once_with( - mock_group, ["push", "merge_request"] - ) # Ensure create method is called with correct arguments - - -def test_create_webhook_when_webhook_exists_and_enabled( - mocked_gitlab_service: GitlabService, monkeypatch: Any -): - # Arrange - mock_group = MagicMock() - mock_group.get_id.return_value = 789 - mock_group.attributes = {"full_path": "group3"} - - # Mock the group hooks.list method to return an existing enabled webhook - mock_hook = MagicMock() - mock_hook.url = "http://example.com/integration/hook/789" - mock_hook.alert_status = "executable" - mock_hook.id = 789 - mock_group.hooks.list.return_value = [mock_hook] - - # Mock the method for creating webhooks - mock_create_webhook = MagicMock() - monkeypatch.setattr( - mocked_gitlab_service, "_create_group_webhook", mock_create_webhook - ) - - # Act - webhook_id = mocked_gitlab_service.create_webhook( - mock_group, events=["push", "merge_request"] + mock_group.hooks.create.assert_called_once_with( + { + "url": "http://example.com/integration/hook/123", + "push_events": True, + "merge_requests_events": True, + "issues_events": False, + "job_events": False, + "pipeline_events": False, + "releases_events": False, + "tag_push_events": False, + "subgroup_events": False, + "confidential_issues_events": False, + } ) - # Assert - assert webhook_id == "789" - mock_create_webhook.assert_not_called() # Ensure no new webhook is created - -def test_create_webhook_when_no_webhook_exists( - mocked_gitlab_service: GitlabService, monkeypatch: Any -): +@pytest.mark.asyncio +async def test_create_group_webhook_failure( + mocked_gitlab_service: GitlabService, +) -> None: # Arrange mock_group = MagicMock() mock_group.get_id.return_value = 123 - mock_group.attributes = {"full_path": "group1"} - - # Mock the group hooks.list method to return no webhook - mock_group.hooks.list.return_value = [] + mock_group.hooks.create = AsyncMock(side_effect=Exception("Create failed")) # Act - webhook_id = mocked_gitlab_service.create_webhook( - mock_group, events=["push", "merge_request"] + await mocked_gitlab_service._create_group_webhook( + mock_group, ["push_events", "merge_requests_events"] ) # Assert - assert webhook_id == "123" - mock_group.hooks.create.assert_called_once() # A new webhook should be created - - -def test_delete_webhook(mocked_gitlab_service: GitlabService, monkeypatch: Any): - # Arrange - mock_group = MagicMock() - mock_group.get_id.return_value = 456 - mock_group.attributes = {"full_path": "group2"} - - # Mock the group hooks.list method to return a webhook - mock_hook = MagicMock() - mock_hook.url = "http://example.com/integration/hook/456" - mock_hook.id = 17 - mock_group.hooks.list.return_value = [mock_hook] - - # Act - mocked_gitlab_service._delete_group_webhook(mock_group, mock_hook.id) - - # Assert - mock_group.hooks.delete.assert_called_once_with( - mock_hook.id - ) # Ensure the webhook is deleted + mock_group.hooks.create.assert_called_once_with( + { + "url": "http://example.com/integration/hook/123", + "push_events": True, + "merge_requests_events": True, + "issues_events": False, + "job_events": False, + "pipeline_events": False, + "releases_events": False, + "tag_push_events": False, + "subgroup_events": False, + "confidential_issues_events": False, + } + )