Skip to content

Commit

Permalink
[INTERNAL] Refactor how we proxy errors #136
Browse files Browse the repository at this point in the history
- Refactor our concurrent.futures code into a parent class + generator
- Add proxy_error method to convert requests error into HttpResponse
  • Loading branch information
kfdm authored Mar 12, 2019
2 parents f5416ce + 97d1626 commit e0f5a68
Showing 1 changed file with 71 additions and 101 deletions.
172 changes: 71 additions & 101 deletions promgen/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,24 @@
logger = logging.getLogger(__name__)


def proxy_error(response):
"""
Return a wrapped proxy error
Taking a request.response object as input, return it slightly modified
with an extra header for debugging so that we can see where the request
failed
"""
r = HttpResponse(
response.content,
content_type=response.headers["content-type"],
status=response.status_code,
)
r.setdefault("X-PROMGEN-PROXY", response.url)
return r


class PrometheusProxy(View):
# Map Django request headers to our sub-request headers
proxy_headers = {"HTTP_REFERER": "Referer"}

@property
Expand All @@ -35,6 +51,20 @@ def headers(self):
if k in self.request.META
}

def proxy(self, request):
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
for host in models.Shard.objects.filter(proxy=True):
futures.append(
executor.submit(
util.get,
urljoin(host.url, request.get_full_path_info()),
headers=self.headers,
)
)
for future in concurrent.futures.as_completed(futures):
yield future


class ProxyGraph(TemplateView):
template_name = "promgen/graph.html"
Expand All @@ -51,95 +81,51 @@ def get_context_data(self, **kwargs):
class ProxyLabel(PrometheusProxy):
def get(self, request, label):
data = set()
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
for host in models.Shard.objects.filter(proxy=True):
futures.append(
executor.submit(
util.get,
"{}/api/v1/label/{}/values".format(host.url, label),
headers=self.headers,
)
)
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
# Need to try to decode the json BEFORE we raise_for_status
# so that we can pass back the error message from Prometheus
_json = result.json()
result.raise_for_status()
logger.debug("Appending data from %s", result.request.url)
data.update(_json["data"])
except HTTPError:
logger.warning("Error with response")
_json["promgen_proxy_request"] = result.request.url
return JsonResponse(_json, status=result.status_code)
for future in self.proxy(request):
try:
result = future.result()
result.raise_for_status()
_json = result.json()
logger.debug("Appending data from %s", result.url)
data.update(_json["data"])
except HTTPError:
logger.warning("Error with response")
return proxy_error(result)

return JsonResponse({"status": "success", "data": sorted(data)})


class ProxySeries(PrometheusProxy):
def get(self, request):
data = []
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
for host in models.Shard.objects.filter(proxy=True):
futures.append(
executor.submit(
util.get,
"{}/api/v1/series?{}".format(
host.url, request.META["QUERY_STRING"]
),
headers=self.headers,
)
)
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
# Need to try to decode the json BEFORE we raise_for_status
# so that we can pass back the error message from Prometheus
_json = result.json()
result.raise_for_status()
logger.debug("Appending data from %s", result.request.url)
data += _json["data"]
except HTTPError:
logger.warning("Error with response")
_json["promgen_proxy_request"] = result.request.url
return JsonResponse(_json, status=result.status_code)
for future in self.proxy(request):
try:
result = future.result()
result.raise_for_status()
_json = result.json()
logger.debug("Appending data from %s", result.url)
data += _json["data"]
except HTTPError:
logger.warning("Error with response")
return proxy_error(result)

return JsonResponse({"status": "success", "data": data})


class ProxyQueryRange(PrometheusProxy):
def get(self, request):
data = []
futures = []
resultType = None
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
for host in models.Shard.objects.filter(proxy=True):
futures.append(
executor.submit(
util.get,
"{}/api/v1/query_range?{}".format(
host.url, request.META["QUERY_STRING"]
),
headers=self.headers,
)
)
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
# Need to try to decode the json BEFORE we raise_for_status
# so that we can pass back the error message from Prometheus
_json = result.json()
result.raise_for_status()
logger.debug("Appending data from %s", result.request.url)
data += _json["data"]["result"]
resultType = _json["data"]["resultType"]
except HTTPError:
logger.warning("Error with response")
_json["promgen_proxy_request"] = result.request.url
return JsonResponse(_json, status=result.status_code)
for future in self.proxy(request):
try:
result = future.result()
result.raise_for_status()
_json = result.json()
logger.debug("Appending data from %s", result.url)
data += _json["data"]["result"]
resultType = _json["data"]["resultType"]
except HTTPError:
return proxy_error(result)

return JsonResponse(
{"status": "success", "data": {"resultType": resultType, "result": data}}
Expand All @@ -149,33 +135,17 @@ def get(self, request):
class ProxyQuery(PrometheusProxy):
def get(self, request):
data = []
futures = []
resultType = None
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
for host in models.Shard.objects.filter(proxy=True):
futures.append(
executor.submit(
util.get,
"{}/api/v1/query?{}".format(
host.url, request.META["QUERY_STRING"]
),
headers=self.headers,
)
)
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
# Need to try to decode the json BEFORE we raise_for_status
# so that we can pass back the error message from Prometheus
_json = result.json()
result.raise_for_status()
logger.debug("Appending data from %s", result.request.url)
data += _json["data"]["result"]
resultType = _json["data"]["resultType"]
except HTTPError:
logger.warning("Error with response")
_json["promgen_proxy_request"] = result.request.url
return JsonResponse(_json, status=result.status_code)
for future in self.proxy(request):
try:
result = future.result()
result.raise_for_status()
_json = result.json()
logger.debug("Appending data from %s", result.url)
data += _json["data"]["result"]
resultType = _json["data"]["resultType"]
except HTTPError:
return proxy_error(result)

return JsonResponse(
{"status": "success", "data": {"resultType": resultType, "result": data}}
Expand Down

0 comments on commit e0f5a68

Please sign in to comment.