Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][Gitlab] - Introduce Pagination and Run Code in Async #1047

Merged
merged 9 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions integrations/gitlab/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

0.1.124 (2024-09-24)
====================

### Improvements

- Added more logs and implemented the webhook creation in async (0.1.124)


0.1.123 (2024-09-22)
====================

Expand Down
19 changes: 8 additions & 11 deletions integrations/gitlab/gitlab_integration/events/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

class BaseEventHandler(ABC):
def __init__(self) -> None:
self.webhook_tasks_queue: Queue[tuple[EventContext, str, dict[str, Any]]] = (
Queue()
)
self.webhook_tasks_queue: Queue[
tuple[EventContext, str, dict[str, Any]]
] = Queue()

async def _start_event_processor(self) -> None:
logger.info(f"Started {self.__class__.__name__} worker")
Expand Down Expand Up @@ -69,17 +69,14 @@ def on(self, events: list[str], observer: Observer) -> None:
self._observers[event].append(observer)

async def _notify(self, event_id: str, body: dict[str, Any]) -> None:
observers = asyncio.gather(
*(
observer(event_id, body)
for observer in self._observers.get(event_id, [])
)
)

if not observers:
observers_list = self._observers.get(event_id, [])
if not observers_list:
logger.info(
f"event: {event_id} has no matching handler. the handlers available are for events: {self._observers.keys()}"
)
return

await asyncio.gather(*(observer(event_id, body) for observer in observers_list))


class SystemEventHandler(BaseEventHandler):
Expand Down
82 changes: 45 additions & 37 deletions integrations/gitlab/gitlab_integration/events/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,17 @@ def validate_hooks_override_config(
validate_hooks_tokens_are_in_token_mapping(
token_mapping, token_group_override_hooks_mapping
)
groups_paths: dict[str, WebhookGroupConfig] = (
extract_all_groups_from_token_group_override_mapping(
token_group_override_hooks_mapping
)
groups_paths: dict[
str, WebhookGroupConfig
] = extract_all_groups_from_token_group_override_mapping(
token_group_override_hooks_mapping
)

validate_unique_groups_paths(groups_paths)
validate_groups_hooks_events(groups_paths)


def setup_listeners(gitlab_service: GitlabService, webhook_id: str) -> None:
def setup_listeners(gitlab_service: GitlabService, group_id: str) -> None:
handlers = [
PushHook(gitlab_service),
MergeRequest(gitlab_service),
Expand All @@ -127,9 +127,9 @@ def setup_listeners(gitlab_service: GitlabService, webhook_id: str) -> None:
]
for handler in handlers:
logger.info(
f"Setting up listeners for webhook {webhook_id} for group mapping {gitlab_service.group_mapping}"
f"Setting up listeners {handler.events} for group {group_id} for group mapping {gitlab_service.group_mapping}"
)
event_ids = [f"{event_name}:{webhook_id}" for event_name in handler.events]
event_ids = [f"{event_name}:{group_id}" for event_name in handler.events]
event_handler.on(event_ids, handler.on_hook)


Expand All @@ -144,13 +144,14 @@ def setup_system_listeners(gitlab_clients: list[GitlabService]) -> None:
ProjectFiles,
]
for handler in handlers:
logger.info(f"Setting up system listeners {handler.system_events}")
system_event_handler.on(handler)

for gitlab_service in gitlab_clients:
system_event_handler.add_client(gitlab_service)


def create_webhooks_by_client(
async def create_webhooks_by_client(
gitlab_host: str,
app_host: str,
token: str,
Expand All @@ -160,16 +161,16 @@ def create_webhooks_by_client(
gitlab_client = Gitlab(gitlab_host, token)
gitlab_service = GitlabService(gitlab_client, app_host, group_mapping)

groups_for_webhooks = gitlab_service.get_filtered_groups_for_webhooks(
groups_for_webhooks = await gitlab_service.get_filtered_groups_for_webhooks(
list(groups_hooks_events_override.keys())
if groups_hooks_events_override
else None
)

webhooks_ids: list[str] = []
groups_ids_with_webhooks: list[str] = []

for group in groups_for_webhooks:
webhook_id = gitlab_service.create_webhook(
group_id = await gitlab_service.create_webhook(
group,
(
groups_hooks_events_override.get(
Expand All @@ -180,13 +181,13 @@ def create_webhooks_by_client(
),
)

if webhook_id:
webhooks_ids.append(webhook_id)
if group_id:
groups_ids_with_webhooks.append(group_id)

return gitlab_service, webhooks_ids
return gitlab_service, groups_ids_with_webhooks


def setup_application(
async def setup_application(
token_mapping: dict[str, list[str]],
gitlab_host: str,
app_host: str,
Expand All @@ -196,45 +197,52 @@ def setup_application(
validate_token_mapping(token_mapping)

if use_system_hook:
logger.info("Using system hook")
validate_use_system_hook(token_mapping)
token, group_mapping = list(token_mapping.items())[0]
gitlab_client = Gitlab(gitlab_host, token)
gitlab_service = GitlabService(gitlab_client, app_host, group_mapping)
setup_system_listeners([gitlab_service])

else:
logger.info("Using group hooks")
validate_hooks_override_config(
token_mapping, token_group_override_hooks_mapping
)

client_to_webhooks: list[tuple[GitlabService, list[str]]] = []
client_to_group_ids_with_webhooks: list[tuple[GitlabService, list[str]]] = []

for token, group_mapping in token_mapping.items():
if not token_group_override_hooks_mapping:
client_to_webhooks.append(
create_webhooks_by_client(
gitlab_host,
app_host,
token,
None,
group_mapping,
)
)
else:
groups = token_group_override_hooks_mapping.tokens.get(
token, WebhookTokenConfig(groups=[])
).groups
if groups:
client_to_webhooks.append(
create_webhooks_by_client(
try:
if not token_group_override_hooks_mapping:
client_to_group_ids_with_webhooks.append(
await create_webhooks_by_client(
gitlab_host,
app_host,
token,
groups,
None,
group_mapping,
)
)
else:
groups = token_group_override_hooks_mapping.tokens.get(
token, WebhookTokenConfig(groups=[])
).groups
if groups:
client_to_group_ids_with_webhooks.append(
await create_webhooks_by_client(
gitlab_host,
app_host,
token,
groups,
group_mapping,
)
)
except Exception as e:
logger.exception(
f"Failed to create webhooks for group mapping {group_mapping}, error: {e}"
)

for client, webhook_ids in client_to_webhooks:
for webhook_id in webhook_ids:
setup_listeners(client, webhook_id)
for client, group_ids in client_to_group_ids_with_webhooks:
for group_id in group_ids:
setup_listeners(client, group_id)
86 changes: 58 additions & 28 deletions integrations/gitlab/gitlab_integration/gitlab_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ProjectFile,
ProjectPipeline,
ProjectPipelineJob,
Hook,
)
from gitlab_integration.core.async_fetcher import AsyncFetcher
from gitlab_integration.core.entities import generate_entity_from_port_yaml
Expand Down Expand Up @@ -63,22 +64,34 @@ def __init__(
GITLAB_SEARCH_RATE_LIMIT * 0.95, 60
)

def _get_webhook_for_group(self, group: RESTObject) -> RESTObject | None:
async def get_group_hooks(self, group: RESTObject) -> AsyncIterator[List[Hook]]:
async for hooks_batch in AsyncFetcher.fetch_batch(group.hooks.list):
hooks = typing.cast(List[Hook], hooks_batch)
yield hooks

async def _get_webhook_for_group(self, group: RESTObject) -> RESTObject | None:
webhook_url = f"{self.app_host}/integration/hook/{group.get_id()}"
for hook in group.hooks.list(iterator=True):
if hook.url == webhook_url:
return hook
logger.info(
f"Getting webhook for group {group.get_id()} with url {webhook_url}"
)
async for hook_batch in self.get_group_hooks(group):
for hook in hook_batch:
if hook.url == webhook_url:
logger.info(
f"Found webhook for group {group.get_id()} with id {hook.id} and url {hook.url}"
)
return hook
return None

def _delete_group_webhook(self, group: RESTObject, hook_id: int) -> None:
async def _delete_group_webhook(self, group: RESTObject, hook_id: int) -> None:
logger.info(f"Deleting webhook with id {hook_id} in group {group.get_id()}")
try:
group.hooks.delete(hook_id)
await AsyncFetcher.fetch_single(group.hooks.delete, hook_id)
logger.info(f"Deleted webhook for {group.get_id()}")
except Exception as e:
logger.error(f"Failed to delete webhook for {group.get_id()} error={e}")

def _create_group_webhook(
async def _create_group_webhook(
self, group: RESTObject, events: list[str] | None
) -> None:
webhook_events = {
Expand All @@ -87,20 +100,23 @@ def _create_group_webhook(
}

logger.info(
f"Creating webhook for {group.get_id()} with events: {[event for event in webhook_events if webhook_events[event]]}"
f"Creating webhook for group {group.get_id()} with events: {[event for event in webhook_events if webhook_events[event]]}"
)
try:
resp = group.hooks.create(
resp = await AsyncFetcher.fetch_single(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like maybe fetch_single isn't the appropriate name for it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I felt the same but I was also worried about duplication since the implementation will look the same, only the name of the method will change

group.hooks.create,
{
"url": f"{self.app_host}/integration/hook/{group.get_id()}",
**webhook_events,
}
},
)
logger.info(
f"Created webhook for {group.get_id()}, id={resp.id}, url={resp.url}"
f"Created webhook for group {group.get_id()}, webhook id={resp.id}, url={resp.url}"
)
except Exception as e:
logger.error(f"Failed to create webhook for {group.get_id()} error={e}")
logger.exception(
f"Failed to create webhook for group {group.get_id()} error={e}"
)

def _get_changed_files_between_commits(
self, project_id: int, head: str
Expand Down Expand Up @@ -253,14 +269,27 @@ def should_process_project(
return True
return project.name in repos

def get_root_groups(self) -> List[Group]:
groups = self.gitlab_client.groups.list(iterator=True)
async def get_root_groups(self) -> List[Group]:
groups: list[RESTObject] = []
async for groups_batch in AsyncFetcher.fetch_batch(
self.gitlab_client.groups.list, retry_transient_errors=True
):
groups_batch = typing.cast(List[RESTObject], groups_batch)
groups.extend(groups_batch)

return typing.cast(
List[Group], [group for group in groups if group.parent_id is None]
)

def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]:
groups = self.gitlab_client.groups.list(get_all=True)
async def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]:
groups: list[RESTObject] = []

async for groups_batch in AsyncFetcher.fetch_batch(
self.gitlab_client.groups.list, retry_transient_errors=True
):
groups_batch = typing.cast(List[RESTObject], groups_batch)
groups.extend(groups_batch)

return typing.cast(
List[Group],
[
Expand All @@ -270,17 +299,17 @@ def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]:
],
)

def get_filtered_groups_for_webhooks(
async def get_filtered_groups_for_webhooks(
self,
groups_hooks_override_list: list[str] | None,
) -> List[Group]:
groups_for_webhooks = []
if groups_hooks_override_list is not None:
if groups_hooks_override_list:
logger.info(
"Getting all the specified groups in the mapping for a token to create their webhooks"
f"Getting all the specified groups in the mapping for a token to create their webhooks for: {groups_hooks_override_list}"
)
groups_for_webhooks = self.filter_groups_by_paths(
groups_for_webhooks = await self.filter_groups_by_paths(
groups_hooks_override_list
)

Expand All @@ -302,7 +331,7 @@ def get_filtered_groups_for_webhooks(
)
else:
logger.info("Getting all the root groups to create their webhooks")
root_groups = self.get_root_groups()
root_groups = await self.get_root_groups()
groups_for_webhooks = [
group
for group in root_groups
Expand All @@ -316,31 +345,32 @@ def get_filtered_groups_for_webhooks(

return groups_for_webhooks

def create_webhook(self, group: Group, events: list[str] | None) -> str | None:
async def create_webhook(
self, group: Group, events: list[str] | None
) -> str | None:
logger.info(f"Creating webhook for the group: {group.attributes['full_path']}")

webhook_id = None
group_id = group.get_id()

if group_id is None:
logger.info(f"Group {group.attributes['full_path']} has no id. skipping...")
return None
else:
hook = self._get_webhook_for_group(group)
hook = await self._get_webhook_for_group(group)
if hook:
logger.info(f"Webhook already exists for group {group.get_id()}")

if hook.alert_status == "disabled":
logger.info(
f"Webhook exists for group {group.get_id()} but is disabled, deleting and re-creating..."
)
self._delete_group_webhook(group, hook.id)
self._create_group_webhook(group, events)
await self._delete_group_webhook(group, hook.id)
await self._create_group_webhook(group, events)
logger.info(f"Webhook re-created for group {group.get_id()}")
else:
self._create_group_webhook(group, events)
webhook_id = str(group_id)
await self._create_group_webhook(group, events)

return webhook_id
return str(group_id)

def create_system_hook(self) -> None:
logger.info("Checking if system hook already exists")
Expand Down
Loading
Loading