Skip to content

Commit

Permalink
[MAIN-553] Create explicit executors to limit threadcount
Browse files Browse the repository at this point in the history
  • Loading branch information
LeaveMyYard committed Jun 22, 2023
1 parent 56e32ff commit 055226c
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 20 deletions.
39 changes: 31 additions & 8 deletions robusta_krr/core/integrations/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
import itertools
import os
from typing import Optional, Union

from kubernetes import client, config # type: ignore
Expand All @@ -21,6 +23,10 @@
from robusta_krr.utils.configurable import Configurable


# This executor will be running requests to Kubernetes API
executor = ThreadPoolExecutor(12)


class ClusterLoader(Configurable):
def __init__(self, cluster: Optional[str], *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -95,8 +101,10 @@ async def __list_pods(self, resource: Union[V1Deployment, V1DaemonSet, V1Statefu
if selector is None:
return []

ret: V1PodList = await asyncio.to_thread(
self.core.list_namespaced_pod, namespace=resource.metadata.namespace, label_selector=selector
loop = asyncio.get_running_loop()
ret: V1PodList = await loop.run_in_executor(
executor,
lambda: self.core.list_namespaced_pod(namespace=resource.metadata.namespace, label_selector=selector),
)
return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]

Expand All @@ -115,7 +123,10 @@ async def __build_obj(

async def _list_deployments(self) -> list[K8sObjectData]:
self.debug(f"Listing deployments in {self.cluster}")
ret: V1DeploymentList = await asyncio.to_thread(self.apps.list_deployment_for_all_namespaces, watch=False)
loop = asyncio.get_running_loop()
ret: V1DeploymentList = await loop.run_in_executor(
executor, lambda: self.apps.list_deployment_for_all_namespaces(watch=False)
)
self.debug(f"Found {len(ret.items)} deployments in {self.cluster}")

return await asyncio.gather(
Expand All @@ -128,7 +139,10 @@ async def _list_deployments(self) -> list[K8sObjectData]:

async def _list_all_statefulsets(self) -> list[K8sObjectData]:
self.debug(f"Listing statefulsets in {self.cluster}")
ret: V1StatefulSetList = await asyncio.to_thread(self.apps.list_stateful_set_for_all_namespaces, watch=False)
loop = asyncio.get_running_loop()
ret: V1StatefulSetList = await loop.run_in_executor(
executor, lambda: self.apps.list_stateful_set_for_all_namespaces(watch=False)
)
self.debug(f"Found {len(ret.items)} statefulsets in {self.cluster}")

return await asyncio.gather(
Expand All @@ -141,7 +155,10 @@ async def _list_all_statefulsets(self) -> list[K8sObjectData]:

async def _list_all_daemon_set(self) -> list[K8sObjectData]:
self.debug(f"Listing daemonsets in {self.cluster}")
ret: V1DaemonSetList = await asyncio.to_thread(self.apps.list_daemon_set_for_all_namespaces, watch=False)
loop = asyncio.get_running_loop()
ret: V1DaemonSetList = await loop.run_in_executor(
executor, lambda: self.apps.list_daemon_set_for_all_namespaces(watch=False)
)
self.debug(f"Found {len(ret.items)} daemonsets in {self.cluster}")

return await asyncio.gather(
Expand All @@ -154,7 +171,10 @@ async def _list_all_daemon_set(self) -> list[K8sObjectData]:

async def _list_all_jobs(self) -> list[K8sObjectData]:
self.debug(f"Listing jobs in {self.cluster}")
ret: V1JobList = await asyncio.to_thread(self.batch.list_job_for_all_namespaces, watch=False)
loop = asyncio.get_running_loop()
ret: V1JobList = await loop.run_in_executor(
executor, lambda: self.batch.list_job_for_all_namespaces(watch=False)
)
self.debug(f"Found {len(ret.items)} jobs in {self.cluster}")

return await asyncio.gather(
Expand All @@ -169,7 +189,10 @@ async def _list_pods(self) -> list[K8sObjectData]:
"""For future use, not supported yet."""

self.debug(f"Listing pods in {self.cluster}")
ret: V1PodList = await asyncio.to_thread(self.apps.list_pod_for_all_namespaces, watch=False)
loop = asyncio.get_running_loop()
ret: V1PodList = await loop.run_in_executor(
executor, lambda: self.apps.list_pod_for_all_namespaces(watch=False)
)
self.debug(f"Found {len(ret.items)} pods in {self.cluster}")

return await asyncio.gather(
Expand All @@ -190,7 +213,7 @@ async def list_clusters(self) -> Optional[list[str]]:
return None

try:
contexts, current_context = await asyncio.to_thread(config.list_kube_config_contexts)
contexts, current_context = config.list_kube_config_contexts()
except config.ConfigException:
if self.config.clusters is not None and self.config.clusters != "*":
self.warning("Could not load context from kubeconfig.")
Expand Down
4 changes: 4 additions & 0 deletions robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import datetime
from typing import Optional, no_type_check

from concurrent.futures import ThreadPoolExecutor

from kubernetes import config as k8s_config
from kubernetes.client.api_client import ApiClient

Expand Down Expand Up @@ -40,6 +42,8 @@ def __init__(

super().__init__(config=config)

self.executor = ThreadPoolExecutor(8)

self.api_client = (
k8s_config.new_client_from_config(config_file=self.config.kubeconfig, context=cluster)
if cluster is not None
Expand Down
27 changes: 19 additions & 8 deletions robusta_krr/core/integrations/prometheus/metrics/base_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import abc
import asyncio
from concurrent.futures import ThreadPoolExecutor
import datetime
from typing import TYPE_CHECKING, Callable, TypeVar
from typing import TYPE_CHECKING, Callable, Optional, TypeVar

import numpy as np

Expand All @@ -26,10 +27,17 @@ class BaseMetricLoader(Configurable, abc.ABC):
Metric loaders are used to load metrics from a specified source (like Prometheus in this case).
"""

def __init__(self, config: Config, prometheus: CustomPrometheusConnect) -> None:
def __init__(
self,
config: Config,
prometheus: CustomPrometheusConnect,
executor: Optional[ThreadPoolExecutor] = None,
) -> None:
super().__init__(config)
self.prometheus = prometheus

self.executor = executor

def get_prometheus_cluster_label(self) -> str:
"""
Generates the cluster label for querying a centralized Prometheus
Expand Down Expand Up @@ -79,12 +87,15 @@ async def query_prometheus(self, metric: Metric) -> list[dict]:
list[dict]: A list of dictionary where each dictionary represents metrics for a pod.
"""

return await asyncio.to_thread(
self.prometheus.custom_query_range,
query=metric.query,
start_time=metric.start_time,
end_time=metric.end_time,
step=metric.step,
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
lambda: self.prometheus.custom_query_range(
query=metric.query,
start_time=metric.start_time,
end_time=metric.end_time,
step=metric.step,
),
)

async def load_data(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
from concurrent.futures import ThreadPoolExecutor
import datetime
from typing import List, Optional

Expand All @@ -25,10 +26,12 @@ def __init__(
config: Config,
api_client: Optional[ApiClient] = None,
cluster: Optional[str] = None,
executor: Optional[ThreadPoolExecutor] = None,
) -> None:
super().__init__(config=config)
self.api_client = api_client
self.cluster = cluster or "default"
self.executor = executor

@abc.abstractmethod
def check_connection(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import datetime
from typing import List, Optional, no_type_check
from typing import Any, List, Optional, no_type_check

import requests
from kubernetes.client import ApiClient
Expand Down Expand Up @@ -140,7 +140,8 @@ def check_connection(self):
) from e

async def query(self, query: str) -> dict:
return await asyncio.to_thread(self.prometheus.custom_query, query=query)
loop = asyncio.get_running_loop()
return await loop.run_in_executor(self.executor, lambda: self.prometheus.custom_query(query=query))

def validate_cluster_name(self):
cluster_label = self.config.prometheus_cluster_label
Expand Down Expand Up @@ -173,11 +174,14 @@ async def gather_data(
ResourceHistoryData: The gathered resource history data.
"""
self.debug(f"Gathering data for {object} and {resource}")
import threading

print(threading.active_count())

await self.add_historic_pods(object, period)

MetricLoaderType = BaseMetricLoader.get_by_resource(resource)
metric_loader = MetricLoaderType(self.config, self.prometheus)
metric_loader = MetricLoaderType(self.config, self.prometheus, self.executor)
return await metric_loader.load_data(object, period, step, self.name())

async def add_historic_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
Expand Down
8 changes: 7 additions & 1 deletion robusta_krr/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import math
from typing import Optional, Union

from concurrent.futures import ThreadPoolExecutor

from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult
from robusta_krr.core.integrations.kubernetes import KubernetesLoader
from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, MetricsLoader, PrometheusNotFound
Expand All @@ -24,6 +26,9 @@ def __init__(self, config: Config) -> None:
self._metrics_service_loaders_error_logged: set[Exception] = set()
self._strategy = self.config.create_strategy()

# This executor will be running calculations for recommendations
self._executor = ThreadPoolExecutor(12)

def _get_prometheus_loader(self, cluster: Optional[str]) -> Optional[MetricsLoader]:
if cluster not in self._metrics_service_loaders:
try:
Expand Down Expand Up @@ -116,7 +121,8 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> tupl

# NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive
# But keep in mind that numpy calcluations will not block the GIL
result = await asyncio.to_thread(self._strategy.run, data, object)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self._executor, self._strategy.run, data, object)
return self._format_result(result), metrics

async def _gather_objects_recommendations(
Expand Down
2 changes: 2 additions & 0 deletions robusta_krr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import List, Literal, Optional, Union
from uuid import UUID


import typer
import urllib3

Expand Down Expand Up @@ -129,6 +130,7 @@ def {func_name}(
other_args={strategy_args},
)
runner = Runner(config)
asyncio.run(runner.run())
"""
)
Expand Down

0 comments on commit 055226c

Please sign in to comment.