-
Notifications
You must be signed in to change notification settings - Fork 0
/
application.py
85 lines (59 loc) · 2.4 KB
/
application.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import logging
from contextlib import asynccontextmanager
from os import environ
from typing import AsyncGenerator
from apscheduler.schedulers.background import BackgroundScheduler
from autogen.io.websockets import IOWebsockets
from dotenv import load_dotenv
from fastapi import FastAPI
from captn.captn_agents.application import on_connect
from captn.captn_agents.backend.teams._weekly_analysis_team import execute_weekly_analysis
from captn.observability import PrometheusMiddleware, metrics, setting_otlp
load_dotenv()
import captn.captn_agents # noqa
import google_ads # noqa
import openai_agent # noqa
APP_NAME = environ.get("APP_NAME", "fastapi-app")
OTLP_GRPC_ENDPOINT = environ.get("OTLP_GRPC_ENDPOINT", "http://tempo:4317")
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator: # type: ignore
scheduler = BackgroundScheduler()
scheduler.add_job(
execute_weekly_analysis,
"cron",
hour="4",
minute="15",
# day_of_week="wed",
)
scheduler.start()
with IOWebsockets.run_server_in_thread(
on_connect=on_connect,
host="0.0.0.0", # nosec [B104]
port=8080,
) as uri:
print(f"Websocket server started at {uri}.", flush=True)
yield
# yield
app = FastAPI(lifespan=lifespan)
# Setting metrics middleware
app.add_middleware(PrometheusMiddleware)
app.add_route("/metrics", metrics)
app.include_router(openai_agent.router, prefix="/openai", tags=["OpenAI"])
app.include_router(google_ads.router, tags=["Google Ads"])
app.include_router(captn.captn_agents.router, tags=["Captn Agents"])
# Setting OpenTelemetry exporter
setting_otlp(app, APP_NAME, OTLP_GRPC_ENDPOINT)
class EndpointFilter(logging.Filter):
# Uvicorn endpoint access log filter
def filter(self, record: logging.LogRecord) -> bool:
return record.getMessage().find("GET /metrics") == -1
# Filter out /endpoint
logging.getLogger("uvicorn.access").addFilter(EndpointFilter())
if __name__ == "__main__":
import uvicorn
# update uvicorn access logger format
log_config = uvicorn.config.LOGGING_CONFIG
log_config["formatters"]["access"][
"fmt"
] = "%(asctime)s %(levelname)s [%(name)s] [%(filename)s:%(lineno)d] [trace_id=%(otelTraceID)s span_id=%(otelSpanID)s resource.service.name=%(otelServiceName)s] - %(message)s"
uvicorn.run(app, host="0.0.0.0", port=9000, log_config=log_config) # nosec [B104]