Skip to content

Commit

Permalink
Add AsyncSession support for non-blocking db operations
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Nov 5, 2024
1 parent 1443af2 commit 998ff04
Show file tree
Hide file tree
Showing 18 changed files with 334 additions and 423 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ dependencies = [
"pymongo>=4.6.0",
"supabase~=2.6.0",
"certifi>=2023.11.17,<2025.0.0",
"psycopg>=3.1.9",
"psycopg[binary,pool]>=3.1.9",
"fastavro>=1.8.0",
"redis>=5.0.1",
"metaphor-python>=0.1.11",
Expand Down Expand Up @@ -110,6 +110,7 @@ dependencies = [
"langchain-google-community~=2.0.1",
"langchain-elasticsearch>=0.2.0",
"langchain-ollama>=0.2.0",
"aiosqlite>=0.20.0",
]

[project.urls]
Expand Down
50 changes: 29 additions & 21 deletions src/backend/base/langflow/__main__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import inspect
import platform
import socket
Expand Down Expand Up @@ -27,7 +28,7 @@
create_default_folder_if_it_doesnt_exist,
)
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_db_service, get_settings_service, session_scope
from langflow.services.deps import async_session_scope, get_db_service, get_settings_service
from langflow.services.settings.constants import DEFAULT_SUPERUSER
from langflow.services.utils import initialize_services
from langflow.utils.version import fetch_latest_version, get_version_info
Expand Down Expand Up @@ -486,28 +487,35 @@ def api_key(
if not auth_settings.AUTO_LOGIN:
typer.echo("Auto login is disabled. API keys cannot be created through the CLI.")
return
with session_scope() as session:
from langflow.services.database.models.user.model import User

superuser = session.exec(select(User).where(User.username == DEFAULT_SUPERUSER)).first()
if not superuser:
typer.echo("Default superuser not found. This command requires a superuser and AUTO_LOGIN to be enabled.")
return
from langflow.services.database.models.api_key import ApiKey, ApiKeyCreate
from langflow.services.database.models.api_key.crud import (
create_api_key,
delete_api_key,
)

api_key = session.exec(select(ApiKey).where(ApiKey.user_id == superuser.id)).first()
if api_key:
delete_api_key(session, api_key.id)
async def aapi_key():
async with async_session_scope() as session:
from langflow.services.database.models.user.model import User

api_key_create = ApiKeyCreate(name="CLI")
unmasked_api_key = create_api_key(session, api_key_create, user_id=superuser.id)
session.commit()
# Create a banner to display the API key and tell the user it won't be shown again
api_key_banner(unmasked_api_key)
superuser = (await session.exec(select(User).where(User.username == DEFAULT_SUPERUSER))).first()
if not superuser:
typer.echo(
"Default superuser not found. This command requires a superuser and AUTO_LOGIN to be enabled."
)
return None
from langflow.services.database.models.api_key import ApiKey, ApiKeyCreate
from langflow.services.database.models.api_key.crud import (
create_api_key,
delete_api_key,
)

api_key = (await session.exec(select(ApiKey).where(ApiKey.user_id == superuser.id))).first()
if api_key:
await delete_api_key(session, api_key.id)

api_key_create = ApiKeyCreate(name="CLI")
unmasked_api_key = await create_api_key(session, api_key_create, user_id=superuser.id)
await session.commit()
return unmasked_api_key

unmasked_api_key = asyncio.run(aapi_key())
# Create a banner to display the API key and tell the user it won't be shown again
api_key_banner(unmasked_api_key)


def api_key_banner(unmasked_api_key) -> None:
Expand Down
4 changes: 3 additions & 1 deletion src/backend/base/langflow/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from loguru import logger
from sqlalchemy import delete
from sqlmodel import Session
from sqlmodel.ext.asyncio.session import AsyncSession

from langflow.graph.graph.base import Graph
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models import User
from langflow.services.database.models.flow import Flow
from langflow.services.database.models.transactions.model import TransactionTable
from langflow.services.database.models.vertex_builds.model import VertexBuildTable
from langflow.services.deps import get_session
from langflow.services.deps import get_async_session, get_session
from langflow.services.store.utils import get_lf_version_from_pypi

if TYPE_CHECKING:
Expand All @@ -31,6 +32,7 @@

CurrentActiveUser = Annotated[User, Depends(get_current_active_user)]
DbSession = Annotated[Session, Depends(get_session)]
AsyncDbSession = Annotated[AsyncSession, Depends(get_async_session)]


def has_api_terms(word: str):
Expand Down
14 changes: 7 additions & 7 deletions src/backend/base/langflow/api/v1/api_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from fastapi import APIRouter, Depends, HTTPException, Response

from langflow.api.utils import CurrentActiveUser, DbSession
from langflow.api.utils import AsyncDbSession, CurrentActiveUser, DbSession
from langflow.api.v1.schemas import ApiKeyCreateRequest, ApiKeysResponse
from langflow.services.auth import utils as auth_utils

Expand All @@ -20,12 +20,12 @@

@router.get("/")
async def get_api_keys_route(
db: DbSession,
db: AsyncDbSession,
current_user: CurrentActiveUser,
) -> ApiKeysResponse:
try:
user_id = current_user.id
keys = get_api_keys(db, user_id)
keys = await get_api_keys(db, user_id)

return ApiKeysResponse(total_count=len(keys), user_id=user_id, api_keys=keys)
except Exception as exc:
Expand All @@ -36,22 +36,22 @@ async def get_api_keys_route(
async def create_api_key_route(
req: ApiKeyCreate,
current_user: CurrentActiveUser,
db: DbSession,
db: AsyncDbSession,
) -> UnmaskedApiKeyRead:
try:
user_id = current_user.id
return create_api_key(db, req, user_id=user_id)
return await create_api_key(db, req, user_id=user_id)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e


@router.delete("/{api_key_id}", dependencies=[Depends(auth_utils.get_current_active_user)])
async def delete_api_key_route(
api_key_id: UUID,
db: DbSession,
db: AsyncDbSession,
):
try:
delete_api_key(db, api_key_id)
await delete_api_key(db, api_key_id)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {"detail": "API Key deleted"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,11 @@
"method": "run_model",
"name": "api_run_model",
"required_inputs": [
"expression"
"glean_access_token",
"glean_api_url",
"page_size",
"query",
"request_options"
],
"selected": "Data",
"types": [
Expand All @@ -713,7 +717,11 @@
"method": "build_tool",
"name": "api_build_tool",
"required_inputs": [
"expression"
"glean_access_token",
"glean_api_url",
"page_size",
"query",
"request_options"
],
"selected": "Tool",
"types": [
Expand Down Expand Up @@ -818,10 +826,11 @@
"method": "run_model",
"name": "api_run_model",
"required_inputs": [
"code",
"description",
"global_imports",
"name"
"glean_access_token",
"glean_api_url",
"page_size",
"query",
"request_options"
],
"selected": "Data",
"types": [
Expand All @@ -835,10 +844,11 @@
"method": "build_tool",
"name": "api_build_tool",
"required_inputs": [
"code",
"description",
"global_imports",
"name"
"glean_access_token",
"glean_api_url",
"page_size",
"query",
"request_options"
],
"selected": "Tool",
"types": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,11 +785,6 @@
"display_name": "Text",
"method": "text_response",
"name": "text_output",
"required_inputs": [
"input_value",
"stream",
"system_message"
],
"selected": "Message",
"types": [
"Message"
Expand All @@ -801,17 +796,6 @@
"display_name": "Language Model",
"method": "build_model",
"name": "model_output",
"required_inputs": [
"api_key",
"json_mode",
"max_tokens",
"model_kwargs",
"model_name",
"openai_api_base",
"output_schema",
"seed",
"temperature"
],
"selected": "LanguageModel",
"types": [
"LanguageModel"
Expand Down Expand Up @@ -857,7 +841,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "import operator\nfrom functools import reduce\n\nfrom langchain_openai import ChatOpenAI\nfrom pydantic.v1 import SecretStr\n\nfrom langflow.base.models.model import LCModelComponent\nfrom langflow.base.models.openai_constants import OPENAI_MODEL_NAMES\nfrom langflow.field_typing import LanguageModel\nfrom langflow.field_typing.range_spec import RangeSpec\nfrom langflow.inputs import BoolInput, DictInput, DropdownInput, FloatInput, IntInput, SecretStrInput, StrInput\nfrom langflow.inputs.inputs import HandleInput\n\n\nclass OpenAIModelComponent(LCModelComponent):\n display_name = \"OpenAI\"\n description = \"Generates text using OpenAI LLMs.\"\n icon = \"OpenAI\"\n name = \"OpenAIModel\"\n\n inputs = [\n *LCModelComponent._base_inputs,\n IntInput(\n name=\"max_tokens\",\n display_name=\"Max Tokens\",\n advanced=True,\n info=\"The maximum number of tokens to generate. Set to 0 for unlimited tokens.\",\n range_spec=RangeSpec(min=0, max=128000),\n ),\n DictInput(name=\"model_kwargs\", display_name=\"Model Kwargs\", advanced=True),\n BoolInput(\n name=\"json_mode\",\n display_name=\"JSON Mode\",\n advanced=True,\n info=\"If True, it will output JSON regardless of passing a schema.\",\n ),\n DictInput(\n name=\"output_schema\",\n is_list=True,\n display_name=\"Schema\",\n advanced=True,\n info=\"The schema for the Output of the model. \"\n \"You must pass the word JSON in the prompt. \"\n \"If left blank, JSON mode will be disabled. [DEPRECATED]\",\n ),\n DropdownInput(\n name=\"model_name\",\n display_name=\"Model Name\",\n advanced=False,\n options=OPENAI_MODEL_NAMES,\n value=OPENAI_MODEL_NAMES[0],\n ),\n StrInput(\n name=\"openai_api_base\",\n display_name=\"OpenAI API Base\",\n advanced=True,\n info=\"The base URL of the OpenAI API. \"\n \"Defaults to https://api.openai.com/v1. \"\n \"You can change this to use other APIs like JinaChat, LocalAI and Prem.\",\n ),\n SecretStrInput(\n name=\"api_key\",\n display_name=\"OpenAI API Key\",\n info=\"The OpenAI API Key to use for the OpenAI model.\",\n advanced=False,\n value=\"OPENAI_API_KEY\",\n ),\n FloatInput(name=\"temperature\", display_name=\"Temperature\", value=0.1),\n IntInput(\n name=\"seed\",\n display_name=\"Seed\",\n info=\"The seed controls the reproducibility of the job.\",\n advanced=True,\n value=1,\n ),\n HandleInput(\n name=\"output_parser\",\n display_name=\"Output Parser\",\n info=\"The parser to use to parse the output of the model\",\n advanced=True,\n input_types=[\"OutputParser\"],\n ),\n ]\n\n def build_model(self) -> LanguageModel: # type: ignore[type-var]\n # self.output_schema is a list of dictionaries\n # let's convert it to a dictionary\n output_schema_dict: dict[str, str] = reduce(operator.ior, self.output_schema or {}, {})\n openai_api_key = self.api_key\n temperature = self.temperature\n model_name: str = self.model_name\n max_tokens = self.max_tokens\n model_kwargs = self.model_kwargs or {}\n openai_api_base = self.openai_api_base or \"https://api.openai.com/v1\"\n json_mode = bool(output_schema_dict) or self.json_mode\n seed = self.seed\n\n api_key = SecretStr(openai_api_key).get_secret_value() if openai_api_key else None\n output = ChatOpenAI(\n max_tokens=max_tokens or None,\n model_kwargs=model_kwargs,\n model=model_name,\n base_url=openai_api_base,\n api_key=api_key,\n temperature=temperature if temperature is not None else 0.1,\n seed=seed,\n )\n if json_mode:\n if output_schema_dict:\n output = output.with_structured_output(schema=output_schema_dict, method=\"json_mode\")\n else:\n output = output.bind(response_format={\"type\": \"json_object\"})\n\n return output\n\n def _get_exception_message(self, e: Exception):\n \"\"\"Get a message from an OpenAI exception.\n\n Args:\n e (Exception): The exception to get the message from.\n\n Returns:\n str: The message from the exception.\n \"\"\"\n try:\n from openai import BadRequestError\n except ImportError:\n return None\n if isinstance(e, BadRequestError):\n message = e.body.get(\"message\")\n if message:\n return message\n return None\n"
"value": "import operator\nfrom functools import reduce\n\nfrom langchain_openai import ChatOpenAI\nfrom pydantic.v1 import SecretStr\n\nfrom langflow.base.models.model import LCModelComponent\nfrom langflow.base.models.openai_constants import OPENAI_MODEL_NAMES\nfrom langflow.field_typing import LanguageModel\nfrom langflow.field_typing.range_spec import RangeSpec\nfrom langflow.inputs import BoolInput, DictInput, DropdownInput, FloatInput, IntInput, SecretStrInput, StrInput\nfrom langflow.inputs.inputs import HandleInput\nfrom langflow.io import Output\n\n\nclass OpenAIModelComponent(LCModelComponent):\n display_name = \"OpenAI\"\n description = \"Generates text using OpenAI LLMs.\"\n icon = \"OpenAI\"\n name = \"OpenAIModel\"\n\n inputs = [\n *LCModelComponent._base_inputs,\n IntInput(\n name=\"max_tokens\",\n display_name=\"Max Tokens\",\n advanced=True,\n info=\"The maximum number of tokens to generate. Set to 0 for unlimited tokens.\",\n range_spec=RangeSpec(min=0, max=128000),\n ),\n DictInput(name=\"model_kwargs\", display_name=\"Model Kwargs\", advanced=True),\n BoolInput(\n name=\"json_mode\",\n display_name=\"JSON Mode\",\n advanced=True,\n info=\"If True, it will output JSON regardless of passing a schema.\",\n ),\n DictInput(\n name=\"output_schema\",\n is_list=True,\n display_name=\"Schema\",\n advanced=True,\n info=\"The schema for the Output of the model. \"\n \"You must pass the word JSON in the prompt. \"\n \"If left blank, JSON mode will be disabled. [DEPRECATED]\",\n ),\n DropdownInput(\n name=\"model_name\",\n display_name=\"Model Name\",\n advanced=False,\n options=OPENAI_MODEL_NAMES,\n value=OPENAI_MODEL_NAMES[0],\n ),\n StrInput(\n name=\"openai_api_base\",\n display_name=\"OpenAI API Base\",\n advanced=True,\n info=\"The base URL of the OpenAI API. \"\n \"Defaults to https://api.openai.com/v1. \"\n \"You can change this to use other APIs like JinaChat, LocalAI and Prem.\",\n ),\n SecretStrInput(\n name=\"api_key\",\n display_name=\"OpenAI API Key\",\n info=\"The OpenAI API Key to use for the OpenAI model.\",\n advanced=False,\n value=\"OPENAI_API_KEY\",\n ),\n FloatInput(name=\"temperature\", display_name=\"Temperature\", value=0.1),\n IntInput(\n name=\"seed\",\n display_name=\"Seed\",\n info=\"The seed controls the reproducibility of the job.\",\n advanced=True,\n value=1,\n ),\n HandleInput(\n name=\"output_parser\",\n display_name=\"Output Parser\",\n info=\"The parser to use to parse the output of the model\",\n advanced=True,\n input_types=[\"OutputParser\"],\n ),\n ]\n outputs = [\n Output(display_name=\"Text\", name=\"text_output\", method=\"text_response\"),\n Output(display_name=\"Language Model\", name=\"model_output\", method=\"build_model\"),\n ]\n\n def build_model(self) -> LanguageModel: # type: ignore[type-var]\n # self.output_schema is a list of dictionaries\n # let's convert it to a dictionary\n output_schema_dict: dict[str, str] = reduce(operator.ior, self.output_schema or {}, {})\n openai_api_key = self.api_key\n temperature = self.temperature\n model_name: str = self.model_name\n max_tokens = self.max_tokens\n model_kwargs = self.model_kwargs or {}\n openai_api_base = self.openai_api_base or \"https://api.openai.com/v1\"\n json_mode = bool(output_schema_dict) or self.json_mode\n seed = self.seed\n\n api_key = SecretStr(openai_api_key).get_secret_value() if openai_api_key else None\n output = ChatOpenAI(\n max_tokens=max_tokens or None,\n model_kwargs=model_kwargs,\n model=model_name,\n base_url=openai_api_base,\n api_key=api_key,\n temperature=temperature if temperature is not None else 0.1,\n seed=seed,\n )\n if json_mode:\n if output_schema_dict:\n output = output.with_structured_output(schema=output_schema_dict, method=\"json_mode\")\n else:\n output = output.bind(response_format={\"type\": \"json_object\"})\n\n return output\n\n def _get_exception_message(self, e: Exception):\n \"\"\"Get a message from an OpenAI exception.\n\n Args:\n e (Exception): The exception to get the message from.\n\n Returns:\n str: The message from the exception.\n \"\"\"\n try:\n from openai import BadRequestError\n except ImportError:\n return None\n if isinstance(e, BadRequestError):\n message = e.body.get(\"message\")\n if message:\n return message\n return None\n"
},
"input_value": {
"advanced": false,
Expand Down
Loading

0 comments on commit 998ff04

Please sign in to comment.