-
Notifications
You must be signed in to change notification settings - Fork 8
/
main.py
210 lines (168 loc) · 6.35 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import io
import os
import traceback
from contextlib import asynccontextmanager
import pydantic
import yaml
from aries_cloudcontroller import ApiException
from fastapi import FastAPI, Request, Response
from fastapi.exceptions import HTTPException
from fastapi.responses import JSONResponse
from scalar_fastapi import get_scalar_api_reference
from app.exceptions import CloudApiException
from app.routes import (
connections,
definitions,
issuer,
jsonld,
messaging,
oob,
sse,
trust_registry,
verifier,
webhooks,
websocket_endpoint,
)
from app.routes.admin import tenants
from app.routes.wallet import credentials as wallet_credentials
from app.routes.wallet import dids as wallet_dids
from app.routes.wallet import jws as wallet_jws
from app.routes.wallet import sd_jws as wallet_sd_jws
from app.services.event_handling.websocket_manager import WebsocketManager
from app.util.extract_validation_error import extract_validation_error_msg
from shared.constants import PROJECT_VERSION
from shared.exceptions import CloudApiValueError
from shared.log_config import get_logger
OPENAPI_NAME = os.getenv("OPENAPI_NAME", "OpenAPI")
ROLE = os.getenv("ROLE", "*")
ROOT_PATH = os.getenv("ROOT_PATH", "")
cloud_api_docs_description = """
Welcome to the Aries CloudAPI Python project.
In addition to the traditional HTTP-based endpoints described below, we also offer WebSocket endpoints for
real-time interfacing with webhook events.
WebSocket endpoints are authenticated. This means that only users with valid authentication tokens can establish
a WebSocket connection, and they can only subscribe to their own wallet's events. However, Admin users have the
ability to subscribe by topic, or to any wallet in their group.
Our WebSocket endpoints are as follows:
1. `/v1/ws/`: This endpoint allows admins to receive all webhook events for their group.
2. `/v1/ws/{wallet_id}`: This endpoint allows admins (or authenticated users holding this wallet) to receive webhook
events for a specific wallet ID.
3. `/v1/ws/{wallet_id}/{topic}`: Similar to above, but subscribing to a specific topic.
4. `/v1/ws/topic/{topic}`: This endpoint allows admins to receive all webhook events on a specific topic (e.g.
`connections`, `credentials`, `proofs`, `endorsements`).
For authentication, the WebSocket headers should include `x-api-key`: `<your key>`.
Please refer to our API documentation for more details about our authentication mechanism, as well as for information
about the available topics.
"""
default_docs_description = """
Welcome to the Aries CloudAPI Python project.
"""
logger = get_logger(__name__)
prod = os.environ.get("prod", "true").upper() == "TRUE"
debug = not prod
@asynccontextmanager
async def lifespan(_: FastAPI):
# Startup logic occurs before yield
yield
# Shutdown logic occurs after yield
logger.info("Calling WebsocketManager shutdown")
await WebsocketManager.disconnect_all()
webhook_routes = [webhooks, sse, websocket_endpoint]
trust_registry_routes = [trust_registry]
tenant_admin_routes = [tenants] + webhook_routes
tenant_routes = [
connections,
definitions,
issuer,
jsonld,
messaging,
oob,
verifier,
wallet_credentials,
wallet_dids,
wallet_jws,
wallet_sd_jws,
] + webhook_routes
def routes_for_role(role: str) -> list:
if role in ("governance", "tenant"):
return tenant_routes
elif role == "tenant-admin":
return tenant_admin_routes
elif role == "public":
return trust_registry_routes
elif role == "*":
return set(tenant_admin_routes + tenant_routes + trust_registry_routes)
else:
return []
def cloud_api_description(role: str) -> str:
if role in ("governance", "tenant", "tenant-admin", "*"):
return cloud_api_docs_description
else:
return default_docs_description
def create_app() -> FastAPI:
application = FastAPI(
root_path=ROOT_PATH,
title=OPENAPI_NAME,
version=PROJECT_VERSION,
description=cloud_api_description(ROLE),
lifespan=lifespan,
debug=debug,
redoc_url=None,
docs_url=None,
)
for route in routes_for_role(ROLE):
# Routes will appear in the openapi docs with the same order as defined in `routes`
application.include_router(route.router)
return application
app = create_app()
# Use Scalar instead of Swagger
@app.get("/docs", include_in_schema=False)
async def scalar_html():
openapi_url = os.path.join(ROOT_PATH, app.openapi_url.lstrip("/"))
return get_scalar_api_reference(
openapi_url=openapi_url,
title=app.title,
servers=app.servers,
)
# additional yaml version of openapi.json
@app.get("/openapi.yaml", include_in_schema=False)
def read_openapi_yaml() -> Response:
logger.debug("GET request received: OpenAPI yaml endpoint")
openapi_json = app.openapi()
yaml_s = io.StringIO()
yaml.dump(openapi_json, yaml_s, allow_unicode=True, sort_keys=False)
logger.debug("Returning OpenAPI yaml text.")
return Response(content=yaml_s.getvalue(), media_type="text/yaml")
@app.exception_handler(Exception)
async def universal_exception_handler(_: Request, exception: Exception) -> JSONResponse:
stacktrace = {"traceback": traceback.format_exc()} if debug else {}
if isinstance(exception, CloudApiException):
return JSONResponse(
content={"detail": exception.detail, **stacktrace},
status_code=exception.status_code,
)
if isinstance(exception, CloudApiValueError):
return JSONResponse(
{"detail": exception.detail, **stacktrace},
status_code=422,
)
if isinstance(exception, pydantic.ValidationError):
return JSONResponse(
{"detail": extract_validation_error_msg(exception), **stacktrace},
status_code=422,
)
if isinstance(exception, ApiException):
return JSONResponse(
{"detail": exception.reason, **stacktrace},
status_code=exception.status,
)
if isinstance(exception, HTTPException):
return JSONResponse(
{"detail": exception.detail, **stacktrace},
status_code=exception.status_code,
headers=exception.headers,
)
return JSONResponse(
{"detail": "Internal server error", "exception": str(exception), **stacktrace},
status_code=500,
)