-
Notifications
You must be signed in to change notification settings - Fork 9
/
schemas.py
133 lines (106 loc) · 4.5 KB
/
schemas.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from typing import List, Optional
from app.exceptions import TrustRegistryException
from shared.constants import TRUST_REGISTRY_URL
from shared.log_config import get_logger
from shared.models.trustregistry import Schema
from shared.util.rich_async_client import RichAsyncClient
logger = get_logger(__name__)
async def register_schema(schema_id: str) -> None:
"""Register a schema in the trust registry
Args:
schema_id (str): the schema id to register
Raises:
TrustRegistryException: If an error occurred while registering the schema
"""
bound_logger = logger.bind(body={"schema_id": schema_id})
bound_logger.info("Registering schema on trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
schema_res = await client.post(
f"{TRUST_REGISTRY_URL}/registry/schemas", json={"schema_id": schema_id}
)
if schema_res.is_error:
bound_logger.error(
"Error registering schema. Got status code {} with message `{}`.",
schema_res.status_code,
schema_res.text,
)
raise TrustRegistryException(
f"Error registering schema `{schema_id}`. Error: `{schema_res.text}`.",
schema_res.status_code,
)
bound_logger.info("Successfully registered schema on trust registry.")
async def fetch_schemas() -> List[Schema]:
"""Retrieve all schemas from the trust registry
Raises:
TrustRegistryException: If an error occurred while retrieving the trust registry schemas.
Returns:
A list of schemas
"""
logger.info("Fetching all schemas from trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
schemas_res = await client.get(f"{TRUST_REGISTRY_URL}/registry/schemas")
if schemas_res.is_error:
logger.error(
"Error fetching schemas. Got status code {} with message `{}`.",
schemas_res.status_code,
schemas_res.text,
)
raise TrustRegistryException(
f"Unable to fetch schemas: `{schemas_res.text}`.", schemas_res.status_code
)
result = [Schema.model_validate(schema) for schema in schemas_res.json()]
logger.info("Successfully fetched schemas from trust registry.")
return result
async def get_schema_by_id(schema_id: str) -> Optional[Schema]:
"""Retrieve a schemas from the trust registry
Raises:
TrustRegistryException: If an error occurred while retrieving the trust registry schemas.
Returns:
A schema
"""
bound_logger = logger.bind(body={"schema_id": schema_id})
bound_logger.info("Fetching schema from trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
schema_response = await client.get(
f"{TRUST_REGISTRY_URL}/registry/schemas/{schema_id}"
)
if schema_response.status_code == 404:
bound_logger.info("Bad request: Schema not found.")
return None
if schema_response.is_error:
logger.error(
"Error fetching schema. Got status code {} with message `{}`.",
schema_response.status_code,
schema_response.text,
)
raise TrustRegistryException(
f"Unable to fetch schema: `{schema_response.text}`.",
schema_response.status_code,
)
result = Schema.model_validate(schema_response.json())
logger.info("Successfully fetched schema from trust registry.")
return result
async def remove_schema_by_id(schema_id: str) -> None:
"""Remove schema from trust registry by id
Args:
actor_id (str): identifier of the schema to remove
Raises:
TrustRegistryException: If an error occurred while removing the schema
"""
bound_logger = logger.bind(body={"schema_id": schema_id})
bound_logger.info("Removing schema from trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
remove_response = await client.delete(
f"{TRUST_REGISTRY_URL}/registry/schemas/{schema_id}"
)
if remove_response.is_error:
bound_logger.error(
"Error removing schema. Got status code {} with message `{}`.",
remove_response.status_code,
remove_response.text,
)
raise TrustRegistryException(
f"Error removing schema from trust registry: `{remove_response.text}`.",
remove_response.status_code,
)
bound_logger.info("Successfully removed schema from trust registry.")