Skip to content

Commit

Permalink
Fixed failing fetch of new token when expired
Browse files Browse the repository at this point in the history
  • Loading branch information
peterhpo committed Oct 11, 2024
1 parent 602d8dc commit 150d4ba
Showing 1 changed file with 35 additions and 18 deletions.
53 changes: 35 additions & 18 deletions utleggs-og-kortskjema-automatisering/nettskjema_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,60 @@
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

def obtain_token():
load_dotenv()
token_url = "https://authorization.nettskjema.no/oauth2/token"
client_id = os.getenv('API_CLIENT_ID')
client_secret = os.getenv('API_SECRET')

if not client_id or not client_secret:
raise ValueError("API_CLIENT_ID or API_SECRET not found in environment variables")

data = {
'grant_type': 'client_credentials',
}

response = requests.post(token_url, data=data, auth=HTTPBasicAuth(client_id, client_secret))
response.raise_for_status()
return response.json()
try:
response = requests.post(token_url, data=data, auth=HTTPBasicAuth(client_id, client_secret))
response.raise_for_status()
token_data = response.json()
if 'access_token' not in token_data or 'expires_in' not in token_data:
raise ValueError("Invalid token response")

token_data['expires_at'] = datetime.datetime.now().timestamp() + token_data['expires_in']
return token_data
except requests.RequestException as e:
raise SystemExit(e)

def save_token(token_data):
token_data['expires_at'] = (datetime.datetime.utcnow() + datetime.timedelta(seconds=token_data['expires_in'])).timestamp()
with open('token.json', 'w') as f:
json.dump(token_data, f)

def load_token():
try:
with open('token.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
token_data = json.load(f)
return token_data
except (FileNotFoundError, json.JSONDecodeError):
return None

def check_and_refresh_token():
token_data = load_token()

if not token_data:
print("Token not found, obtaining a new one.")
return obtain_and_save_new_token()

now = datetime.datetime.utcnow().timestamp()
if now >= token_data['expires_at']:
now = datetime.datetime.now().timestamp()
expires_at = token_data.get('expires_at', 0)

if now >= expires_at:
print("Token expired. Obtaining a new token...")
return obtain_and_save_new_token()
else:
print("Token valid.")
return token_data

def obtain_and_save_new_token():
Expand All @@ -50,39 +67,39 @@ def obtain_and_save_new_token():
return token_data

def parse_xndjson(xndjson_str):
"""Parses an x-ndjson string into a list of dictionaries."""
if not xndjson_str.strip():
return []

lines = xndjson_str.strip().split("\n")

parsed_lines = []
for line in lines:
try:
parsed_lines.append(json.loads(line))
except json.JSONDecodeError:
print(f"Error decoding line: {line}")

return parsed_lines

def api_request(url, method='GET', data=None, params=None, timeout=300):
token_data = check_and_refresh_token()
headers = {"Authorization": f"Bearer {token_data['access_token']}"}

response = requests.request(method, url, headers=headers, json=data, params=params, timeout=timeout)
response.raise_for_status()
try:
response = requests.request(method, url, headers=headers, json=data, params=params, timeout=timeout)
response.raise_for_status()
except requests.RequestException as e:
raise SystemExit(f"API request failed: {e}")

content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
return response.json()
elif 'application/x-ndjson' in content_type:
return parse_xndjson(response.text)
else:
# For binary files like PDFs and images, return the raw response
return response

# Endpoint-specific functions
# (a few of these are not tested and probably don't work, but the ones used in this file are know to work)
# Endpoint-specific functions
def get_form_info(form_id):
url = f"https://api.nettskjema.no/v3/form/{form_id}/info"
return api_request(url)
Expand Down

0 comments on commit 150d4ba

Please sign in to comment.