Skip to content

Commit

Permalink
Set executor per cluster for k8s API
Browse files Browse the repository at this point in the history
  • Loading branch information
LeaveMyYard committed Jun 22, 2023
1 parent d925fb1 commit 38d7406
Showing 1 changed file with 8 additions and 10 deletions.
18 changes: 8 additions & 10 deletions robusta_krr/core/integrations/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
from robusta_krr.utils.configurable import Configurable


# This executor will be running requests to Kubernetes API
executor = ThreadPoolExecutor(6)


class ClusterLoader(Configurable):
def __init__(self, cluster: Optional[str], *args, **kwargs):
super().__init__(*args, **kwargs)

self.cluster = cluster
# This executor will be running requests to Kubernetes API
self.executor = ThreadPoolExecutor(6)
self.api_client = (
config.new_client_from_config(context=cluster, config_file=self.config.kubeconfig)
if cluster is not None
Expand Down Expand Up @@ -103,7 +101,7 @@ async def __list_pods(self, resource: Union[V1Deployment, V1DaemonSet, V1Statefu

loop = asyncio.get_running_loop()
ret: V1PodList = await loop.run_in_executor(
executor,
self.executor,
lambda: self.core.list_namespaced_pod(namespace=resource.metadata.namespace, label_selector=selector),
)
return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]
Expand All @@ -125,7 +123,7 @@ async def _list_deployments(self) -> list[K8sObjectData]:
self.debug(f"Listing deployments in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1DeploymentList = await loop.run_in_executor(
executor, lambda: self.apps.list_deployment_for_all_namespaces(watch=False)
self.executor, lambda: self.apps.list_deployment_for_all_namespaces(watch=False)
)
self.debug(f"Found {len(ret.items)} deployments in {self.cluster}")

Expand All @@ -141,7 +139,7 @@ async def _list_all_statefulsets(self) -> list[K8sObjectData]:
self.debug(f"Listing statefulsets in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1StatefulSetList = await loop.run_in_executor(
executor, lambda: self.apps.list_stateful_set_for_all_namespaces(watch=False)
self.executor, lambda: self.apps.list_stateful_set_for_all_namespaces(watch=False)
)
self.debug(f"Found {len(ret.items)} statefulsets in {self.cluster}")

Expand All @@ -157,7 +155,7 @@ async def _list_all_daemon_set(self) -> list[K8sObjectData]:
self.debug(f"Listing daemonsets in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1DaemonSetList = await loop.run_in_executor(
executor, lambda: self.apps.list_daemon_set_for_all_namespaces(watch=False)
self.executor, lambda: self.apps.list_daemon_set_for_all_namespaces(watch=False)
)
self.debug(f"Found {len(ret.items)} daemonsets in {self.cluster}")

Expand All @@ -173,7 +171,7 @@ async def _list_all_jobs(self) -> list[K8sObjectData]:
self.debug(f"Listing jobs in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1JobList = await loop.run_in_executor(
executor, lambda: self.batch.list_job_for_all_namespaces(watch=False)
self.executor, lambda: self.batch.list_job_for_all_namespaces(watch=False)
)
self.debug(f"Found {len(ret.items)} jobs in {self.cluster}")

Expand All @@ -191,7 +189,7 @@ async def _list_pods(self) -> list[K8sObjectData]:
self.debug(f"Listing pods in {self.cluster}")
loop = asyncio.get_running_loop()
ret: V1PodList = await loop.run_in_executor(
executor, lambda: self.apps.list_pod_for_all_namespaces(watch=False)
self.executor, lambda: self.apps.list_pod_for_all_namespaces(watch=False)
)
self.debug(f"Found {len(ret.items)} pods in {self.cluster}")

Expand Down

0 comments on commit 38d7406

Please sign in to comment.