-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Integration][Gitlab] - Introduce Pagination and Run Code in Async #1047
Changes from 3 commits
cf750d8
a64c561
91de5b9
e779b70
53df017
95b2c0e
bc7c1b8
8ceed91
111a2fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,22 +63,25 @@ def __init__( | |
GITLAB_SEARCH_RATE_LIMIT * 0.95, 60 | ||
) | ||
|
||
def _get_webhook_for_group(self, group: RESTObject) -> RESTObject | None: | ||
async def _get_webhook_for_group(self, group: RESTObject) -> RESTObject | None: | ||
webhook_url = f"{self.app_host}/integration/hook/{group.get_id()}" | ||
for hook in group.hooks.list(iterator=True): | ||
if hook.url == webhook_url: | ||
return hook | ||
async for hook_batch in AsyncFetcher.fetch_batch( | ||
group.hooks.list | ||
): | ||
for hook in hook_batch: | ||
if hook.url == webhook_url: | ||
return hook | ||
return None | ||
|
||
def _delete_group_webhook(self, group: RESTObject, hook_id: int) -> None: | ||
async def _delete_group_webhook(self, group: RESTObject, hook_id: int) -> None: | ||
logger.info(f"Deleting webhook with id {hook_id} in group {group.get_id()}") | ||
try: | ||
group.hooks.delete(hook_id) | ||
await AsyncFetcher.fetch_single(group.hooks.delete, hook_id) | ||
logger.info(f"Deleted webhook for {group.get_id()}") | ||
except Exception as e: | ||
logger.error(f"Failed to delete webhook for {group.get_id()} error={e}") | ||
|
||
def _create_group_webhook( | ||
async def _create_group_webhook( | ||
self, group: RESTObject, events: list[str] | None | ||
) -> None: | ||
webhook_events = { | ||
|
@@ -90,11 +93,12 @@ def _create_group_webhook( | |
f"Creating webhook for {group.get_id()} with events: {[event for event in webhook_events if webhook_events[event]]}" | ||
) | ||
try: | ||
resp = group.hooks.create( | ||
resp = await AsyncFetcher.fetch_single( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I felt the same but I was also worried about duplication since the implementation will look the same, only the name of the method will change |
||
group.hooks.create, | ||
{ | ||
"url": f"{self.app_host}/integration/hook/{group.get_id()}", | ||
**webhook_events, | ||
} | ||
}, | ||
) | ||
logger.info( | ||
f"Created webhook for {group.get_id()}, id={resp.id}, url={resp.url}" | ||
|
@@ -253,14 +257,24 @@ def should_process_project( | |
return True | ||
return project.name in repos | ||
|
||
def get_root_groups(self) -> List[Group]: | ||
groups = self.gitlab_client.groups.list(iterator=True) | ||
async def get_root_groups(self) -> List[Group]: | ||
groups = [] | ||
async for groups_batch in AsyncFetcher.fetch_batch( | ||
self.gitlab_client.groups.list | ||
): | ||
groups.extend(groups_batch) | ||
|
||
return typing.cast( | ||
List[Group], [group for group in groups if group.parent_id is None] | ||
) | ||
|
||
def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]: | ||
groups = self.gitlab_client.groups.list(get_all=True) | ||
async def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]: | ||
groups = [] | ||
async for groups_batch in AsyncFetcher.fetch_batch( | ||
self.gitlab_client.groups.list | ||
): | ||
groups.extend(groups_batch) | ||
|
||
return typing.cast( | ||
List[Group], | ||
[ | ||
|
@@ -270,17 +284,17 @@ def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]: | |
], | ||
) | ||
|
||
def get_filtered_groups_for_webhooks( | ||
async def get_filtered_groups_for_webhooks( | ||
self, | ||
groups_hooks_override_list: list[str] | None, | ||
) -> List[Group]: | ||
groups_for_webhooks = [] | ||
if groups_hooks_override_list is not None: | ||
if groups_hooks_override_list: | ||
logger.info( | ||
"Getting all the specified groups in the mapping for a token to create their webhooks" | ||
f"Getting all the specified groups in the mapping for a token to create their webhooks for: {groups_hooks_override_list}" | ||
) | ||
groups_for_webhooks = self.filter_groups_by_paths( | ||
groups_for_webhooks = await self.filter_groups_by_paths( | ||
groups_hooks_override_list | ||
) | ||
|
||
|
@@ -302,7 +316,7 @@ def get_filtered_groups_for_webhooks( | |
) | ||
else: | ||
logger.info("Getting all the root groups to create their webhooks") | ||
root_groups = self.get_root_groups() | ||
root_groups = await self.get_root_groups() | ||
groups_for_webhooks = [ | ||
group | ||
for group in root_groups | ||
|
@@ -316,7 +330,9 @@ def get_filtered_groups_for_webhooks( | |
|
||
return groups_for_webhooks | ||
|
||
def create_webhook(self, group: Group, events: list[str] | None) -> str | None: | ||
async def create_webhook( | ||
self, group: Group, events: list[str] | None | ||
) -> str | None: | ||
logger.info(f"Creating webhook for the group: {group.attributes['full_path']}") | ||
|
||
webhook_id = None | ||
|
@@ -325,19 +341,19 @@ def create_webhook(self, group: Group, events: list[str] | None) -> str | None: | |
if group_id is None: | ||
logger.info(f"Group {group.attributes['full_path']} has no id. skipping...") | ||
else: | ||
hook = self._get_webhook_for_group(group) | ||
hook = await self._get_webhook_for_group(group) | ||
if hook: | ||
logger.info(f"Webhook already exists for group {group.get_id()}") | ||
|
||
if hook.alert_status == "disabled": | ||
logger.info( | ||
f"Webhook exists for group {group.get_id()} but is disabled, deleting and re-creating..." | ||
) | ||
self._delete_group_webhook(group, hook.id) | ||
self._create_group_webhook(group, events) | ||
await self._delete_group_webhook(group, hook.id) | ||
await self._create_group_webhook(group, events) | ||
logger.info(f"Webhook re-created for group {group.get_id()}") | ||
else: | ||
self._create_group_webhook(group, events) | ||
await self._create_group_webhook(group, events) | ||
webhook_id = str(group_id) | ||
|
||
return webhook_id | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,7 +91,7 @@ async def on_start() -> None: | |
) | ||
|
||
try: | ||
setup_application( | ||
await setup_application( | ||
integration_config["token_mapping"], | ||
integration_config["gitlab_host"], | ||
integration_config["app_host"], | ||
|
@@ -102,10 +102,7 @@ async def on_start() -> None: | |
await event_handler.start_event_processor() | ||
await system_event_handler.start_event_processor() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets run them either way if |
||
except Exception as e: | ||
logger.warning( | ||
f"Failed to setup webhook: {e}. {NO_WEBHOOK_WARNING}", | ||
stack_info=True, | ||
) | ||
logger.exception(f"Failed to setup webhook: {e}. {NO_WEBHOOK_WARNING}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to make sure that it actually prints the stacktrace, I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
|
||
@ocean.on_resync(ObjectKind.GROUP) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets create a method in the client for it