Skip to content

Commit

Permalink
feat: Add database cleanup functions for transactions and vertex buil…
Browse files Browse the repository at this point in the history
…ds (#4694)

* feat: Add configuration options for maximum transactions and vertex builds retention

* Add functions to clean up old transactions and vertex builds in the database

- Implement `clean_transactions` to delete transactions exceeding the configured limit.
- Implement `clean_vertex_builds` to delete vertex builds exceeding the configured limit.
- Integrate cleanup functions into the service initialization process.

* Add error handling and logging for cleanup tasks in utils.py

- Wrap transaction and vertex build cleanup operations in try-except blocks.
- Log success and error messages for cleanup operations.
- Rollback session on exceptions without re-raising, as these are cleanup tasks.
- Adjust service initialization order to ensure proper setup.

* Reorder setup and cleanup tasks in database initialization process

* fix: Update type hints for settings_service in cleanup functions

* Remove execution options in cleanup functions

* Handle specific exceptions during cleanup tasks in utils.py

* Use `col` for column references in delete statements to improve SQL query clarity.
  • Loading branch information
ogabrielluiz authored Nov 19, 2024
1 parent ebe4f34 commit a0acf39
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 3 deletions.
4 changes: 4 additions & 0 deletions src/backend/base/langflow/services/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ class Settings(BaseSettings):
"""The maximum file size for the upload in MB."""
deactivate_tracing: bool = False
"""If set to True, tracing will be deactivated."""
max_transactions_to_keep: int = 3000
"""The maximum number of transactions to keep in the database."""
max_vertex_builds_to_keep: int = 3000
"""The maximum number of vertex builds to keep in the database."""

@field_validator("dev")
@classmethod
Expand Down
84 changes: 81 additions & 3 deletions src/backend/base/langflow/services/utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING

from loguru import logger
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy import delete
from sqlalchemy import exc as sqlalchemy_exc
from sqlmodel import col, select

from langflow.services.auth.utils import create_super_user, verify_password
from langflow.services.cache.factory import CacheServiceFactory
from langflow.services.database.models.transactions.model import TransactionTable
from langflow.services.database.models.vertex_builds.model import VertexBuildTable
from langflow.services.database.utils import initialize_database
from langflow.services.schema import ServiceType
from langflow.services.settings.constants import DEFAULT_SUPERUSER, DEFAULT_SUPERUSER_PASSWORD

from .deps import get_db_service, get_service, get_settings_service

if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession

from langflow.services.settings.manager import SettingsService


async def get_or_create_super_user(session: AsyncSession, username, password, is_default):
from langflow.services.database.models.user.model import User
Expand Down Expand Up @@ -157,17 +168,84 @@ def initialize_session_service() -> None:
)


async def clean_transactions(settings_service: SettingsService, session: AsyncSession) -> None:
"""Clean up old transactions from the database.
This function deletes transactions that exceed the maximum number to keep (configured in settings).
It orders transactions by timestamp descending and removes the oldest ones beyond the limit.
Args:
settings_service: The settings service containing configuration like max_transactions_to_keep
session: The database session to use for the deletion
Returns:
None
"""
try:
# Delete transactions using bulk delete
delete_stmt = delete(TransactionTable).where(
col(TransactionTable.id).in_(
select(TransactionTable.id)
.order_by(col(TransactionTable.timestamp).desc())
.offset(settings_service.settings.max_transactions_to_keep)
)
)

await session.exec(delete_stmt)
await session.commit()
logger.debug("Successfully cleaned up old transactions")
except (sqlalchemy_exc.SQLAlchemyError, asyncio.TimeoutError) as exc:
logger.error(f"Error cleaning up transactions: {exc!s}")
await session.rollback()
# Don't re-raise since this is a cleanup task


async def clean_vertex_builds(settings_service: SettingsService, session: AsyncSession) -> None:
"""Clean up old vertex builds from the database.
This function deletes vertex builds that exceed the maximum number to keep (configured in settings).
It orders vertex builds by timestamp descending and removes the oldest ones beyond the limit.
Args:
settings_service: The settings service containing configuration like max_vertex_builds_to_keep
session: The database session to use for the deletion
Returns:
None
"""
try:
# Delete vertex builds using bulk delete
delete_stmt = delete(VertexBuildTable).where(
col(VertexBuildTable.id).in_(
select(VertexBuildTable.id)
.order_by(col(VertexBuildTable.timestamp).desc())
.offset(settings_service.settings.max_vertex_builds_to_keep)
)
)

await session.exec(delete_stmt)
await session.commit()
logger.debug("Successfully cleaned up old vertex builds")
except (sqlalchemy_exc.SQLAlchemyError, asyncio.TimeoutError) as exc:
logger.error(f"Error cleaning up vertex builds: {exc!s}")
await session.rollback()
# Don't re-raise since this is a cleanup task


async def initialize_services(*, fix_migration: bool = False) -> None:
"""Initialize all the services needed."""
# Test cache connection
get_service(ServiceType.CACHE_SERVICE, default=CacheServiceFactory())
# Setup the superuser
await asyncio.to_thread(initialize_database, fix_migration=fix_migration)
async with get_db_service().with_async_session() as session:
await setup_superuser(get_service(ServiceType.SETTINGS_SERVICE), session)
settings_service = get_service(ServiceType.SETTINGS_SERVICE)
await setup_superuser(settings_service, session)
try:
await get_db_service().assign_orphaned_flows_to_superuser()
except Exception as exc:
msg = "Error assigning orphaned flows to the superuser"
logger.exception(msg)
raise RuntimeError(msg) from exc
await clean_transactions(settings_service, session)
await clean_vertex_builds(settings_service, session)

0 comments on commit a0acf39

Please sign in to comment.