Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Part II: Split DuneClient Class #72

Merged
merged 12 commits into from
Sep 8, 2023
Empty file added dune_client/api/__init__.py
Empty file.
115 changes: 115 additions & 0 deletions dune_client/api/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
""""
Basic Dune Client Class responsible for refreshing Dune Queries
Framework built on Dune's API Documentation
https://duneanalytics.notion.site/API-Documentation-1b93d16e0fa941398e15047f643e003a
"""
from __future__ import annotations

import logging.config
import os
from json import JSONDecodeError
from typing import Dict, Optional, Any

import requests
from requests import Response


# pylint: disable=too-few-public-methods
class BaseDuneClient:
"""
A Base Client for Dune which sets up default values
and provides some convenient functions to use in other clients
"""

BASE_URL = "https://api.dune.com"
DEFAULT_TIMEOUT = 10
bh2smith marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self, api_key: str, client_version: str = "v1", performance: str = "medium"
):
self.token = api_key
self.client_version = client_version
self.performance = performance
self.logger = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(message)s")

@classmethod
def from_env(cls) -> BaseDuneClient:
"""
Constructor allowing user to instantiate a client from environment variable
without having to import dotenv or os manually
We use `DUNE_API_KEY` as the environment variable that holds the API key.
"""
return cls(os.environ["DUNE_API_KEY"])

@property
def api_version(self) -> str:
msf marked this conversation as resolved.
Show resolved Hide resolved
"""Returns client version string"""
return f"/api/{self.client_version}"

def default_headers(self) -> Dict[str, str]:
"""Return default headers containing Dune Api token"""
return {"x-dune-api-key": self.token}
msf marked this conversation as resolved.
Show resolved Hide resolved


class BaseRouter(BaseDuneClient):
"""Extending the Base Client with elementary api routing"""

def _handle_response(self, response: Response) -> Any:
"""Generic response handler utilized by all Dune API routes"""
try:
# Some responses can be decoded and converted to DuneErrors
response_json = response.json()
self.logger.debug(f"received response {response_json}")
return response_json
except JSONDecodeError as err:
# Others can't. Only raise HTTP error for not decodable errors
response.raise_for_status()
raise ValueError("Unreachable since previous line raises") from err

def _route_url(self, route: str) -> str:
return f"{self.BASE_URL}{self.api_version}{route}"

def _get(
self,
route: str,
params: Optional[Any] = None,
raw: bool = False,
bh2smith marked this conversation as resolved.
Show resolved Hide resolved
) -> Any:
"""Generic interface for the GET method of a Dune API request"""
url = self._route_url(route)
self.logger.debug(f"GET received input url={url}")
bh2smith marked this conversation as resolved.
Show resolved Hide resolved
response = requests.get(
url=url,
headers=self.default_headers(),
timeout=self.DEFAULT_TIMEOUT,
params=params,
)
if raw:
return response
return self._handle_response(response)

def _post(self, route: str, params: Optional[Any] = None) -> Any:
"""Generic interface for the POST method of a Dune API request"""
url = self._route_url(route)
self.logger.debug(f"POST received input url={url}, params={params}")
response = requests.post(
url=url,
json=params,
headers=self.default_headers(),
timeout=self.DEFAULT_TIMEOUT,
)
return self._handle_response(response)

def _patch(self, route: str, params: Any) -> Any:
bh2smith marked this conversation as resolved.
Show resolved Hide resolved
"""Generic interface for the PATCH method of a Dune API request"""
url = self._route_url(route)
self.logger.debug(f"PATCH received input url={url}, params={params}")
response = requests.request(
method="PATCH",
url=url,
json=params,
headers={"x-dune-api-key": self.token},
timeout=self.DEFAULT_TIMEOUT,
)
return self._handle_response(response)
bh2smith marked this conversation as resolved.
Show resolved Hide resolved
121 changes: 121 additions & 0 deletions dune_client/api/execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
Implementation of all Dune API query execution and get results routes.

Further Documentation:
execution: https://dune.com/docs/api/api-reference/execute-queries/
get results: https://dune.com/docs/api/api-reference/get-results/
"""
from io import BytesIO
from typing import Optional

from deprecated import deprecated

from dune_client.api.base import BaseRouter
from dune_client.models import (
ExecutionResponse,
ExecutionStatusResponse,
ResultsResponse,
ExecutionResultCSV,
DuneError,
)
from dune_client.query import QueryBase


class ExecutionAPI(BaseRouter):
"""
Query execution and result fetching functions.
"""

def execute_query(
self, query: QueryBase, performance: Optional[str] = None
) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
params = query.request_format()
params["performance"] = performance or self.performance

self.logger.info(
f"executing {query.query_id} on {performance or self.performance} cluster"
)
response_json = self._post(
route=f"/query/{query.query_id}/execute",
params=params,
)
try:
return ExecutionResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionResponse", err) from err

def cancel_execution(self, job_id: str) -> bool:
"""POST Execution Cancellation to Dune API for `job_id` (aka `execution_id`)"""
response_json = self._post(
route=f"/execution/{job_id}/cancel",
params=None,
)
try:
# No need to make a dataclass for this since it's just a boolean.
success: bool = response_json["success"]
return success
except KeyError as err:
raise DuneError(response_json, "CancellationResponse", err) from err

def get_execution_status(self, job_id: str) -> ExecutionStatusResponse:
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/status")
try:
return ExecutionStatusResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionStatusResponse", err) from err

def get_execution_results(self, job_id: str) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/results")
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err

def get_execution_results_csv(self, job_id: str) -> ExecutionResultCSV:
"""
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)

this API only returns the raw data in CSV format, it is faster & lighterweight
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
"""
route = f"/execution/{job_id}/results/csv"
url = self._route_url(f"/execution/{job_id}/results/csv")
self.logger.debug(f"GET CSV received input url={url}")
response = self._get(route=route, raw=True)
response.raise_for_status()
return ExecutionResultCSV(data=BytesIO(response.content))

#######################
# Deprecated Functions:
#######################
@deprecated(version="1.2.1", reason="Please use execute_query")
def execute(
self, query: QueryBase, performance: Optional[str] = None
) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
return self.execute_query(query, performance)

@deprecated(version="1.2.1", reason="Please use get_execution_status")
def get_status(self, job_id: str) -> ExecutionStatusResponse:
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
return self.get_execution_status(job_id)

@deprecated(version="1.2.1", reason="Please use get_execution_results")
def get_result(self, job_id: str) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
return self.get_execution_results(job_id)

@deprecated(version="1.2.1", reason="Please use get_execution_results_csv")
def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
"""
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)

this API only returns the raw data in CSV format, it is faster & lighterweight
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
"""
return self.get_execution_results_csv(job_id)
Loading