Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TokenAgent implemented. Fix to agent inheritance #993

Merged
merged 3 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/python-beta/src/bailo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import annotations

__version__ = "0.2.3"
__version__ = "0.3.0"

from bailo.core.agent import Agent
from bailo.core.agent import Agent, PkiAgent, TokenAgent
from bailo.core.client import Client
from bailo.core.enums import ModelVisibility, Role, SchemaKind
from bailo.helper.access_request import AccessRequest
Expand Down
47 changes: 41 additions & 6 deletions lib/python-beta/src/bailo/core/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
from json import JSONDecodeError

import requests
from requests.auth import HTTPBasicAuth
from bailo.core.exceptions import BailoException, ResponseException


class Agent:
def __request(self, method, *args, **kwargs):
if "timeout" not in kwargs:
kwargs["timeout"] = 5
if "verify" not in kwargs:
kwargs["verify"] = True

res = requests.request(method, *args, verify=True, **kwargs)
res = requests.request(method, *args, **kwargs)

# Check response for a valid range
if res.status_code < 400:
Expand Down Expand Up @@ -62,16 +65,48 @@ def __init__(
self.auth = auth

def get(self, *args, **kwargs):
return self.get(*args, cert=(self.cert, self.key), verify=self.auth, **kwargs)
return super().get(*args, cert=(self.cert, self.key), verify=self.auth, **kwargs)

def post(self, *args, **kwargs):
return self.post(*args, cert=(self.cert, self.key), verify=self.auth, **kwargs)
return super().post(*args, cert=(self.cert, self.key), verify=self.auth, **kwargs)

def put(self, *args, **kwargs):
return self.put(*args, cert=(self.cert, self.key), verify=self.auth, **kwargs)
return super().put(*args, cert=(self.cert, self.key), verify=self.auth, **kwargs)

def patch(self, *args, **kwargs):
return self.patch(*args, cert=(self.cert, self.key), verify=self.auth, **kwargs)
return super().patch(*args, cert=(self.cert, self.key), verify=self.auth, **kwargs)

def delete(self, *args, **kwargs):
return self.delete(*args, cert=(self.cert, self.key), verify=self.auth, **kwargs)
return super().delete(*args, cert=(self.cert, self.key), verify=self.auth, **kwargs)


class TokenAgent(Agent):
def __init__(
self,
access_key: str,
secret_key: str,
):
"""
Initiates an agent for API token authentication.

:param access_key: Access key
:param secret_key: Secret key
"""
self.access_key = access_key
self.secret_key = secret_key
self.basic = HTTPBasicAuth(access_key, secret_key)

def get(self, *args, **kwargs):
return super().get(*args, auth=self.basic, **kwargs)

def post(self, *args, **kwargs):
return super().post(*args, auth=self.basic, **kwargs)

def put(self, *args, **kwargs):
return super().put(*args, auth=self.basic, **kwargs)

def patch(self, *args, **kwargs):
return super().patch(*args, auth=self.basic, **kwargs)

def delete(self, *args, **kwargs):
return super().delete(*args, auth=self.basic, **kwargs)